394 lines
13 KiB
Python
394 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Knowledge-Conduit Generaltest Harness.
|
|
|
|
Ziele:
|
|
- kleine/grosse Aenderungen
|
|
- mit/ohne Noise
|
|
- 1 oder 4 Capabilities gleichzeitig
|
|
- zusaetzliche Mutationstypen fuer realistische Drifts
|
|
- deterministischer Seed je Szenario
|
|
- Gold-Expected-Files und Metriken
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import itertools
|
|
import json
|
|
import random
|
|
import shutil
|
|
import time
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
FIXTURES_DIR = SCRIPT_DIR / "fixtures"
|
|
DEFAULT_OUT_DIR = SCRIPT_DIR / "artifacts"
|
|
|
|
REPOS = ["repo-alpha", "repo-beta"]
|
|
SKILLS = [
|
|
Path(".github/skills/kc-dataset-cleaner/SKILL.md"),
|
|
Path(".github/skills/kc-api-smoke/SKILL.md"),
|
|
Path(".github/skills/kc-release-notes/SKILL.md"),
|
|
]
|
|
|
|
BASE_CAPABILITIES = [
|
|
"input-validation",
|
|
"response-schema-check",
|
|
"retry-backoff",
|
|
"latency-budget",
|
|
"auth-guard",
|
|
"idempotency-check",
|
|
]
|
|
|
|
IMPROVED_CAPABILITIES = [
|
|
"strict-input-validation",
|
|
"response-contract-check",
|
|
"retry-jitter-backoff",
|
|
"p95-latency-budget",
|
|
"token-scope-guard",
|
|
"idempotency-key-replay-check",
|
|
]
|
|
|
|
TRIGGER_SYNONYMS = {
|
|
"Bereinige den Datensatz": "Bereinige den Input-Bestand",
|
|
"Normalisiere diese CSV": "Standardisiere diese CSV",
|
|
"Finde Dubletten und fehlende Felder": "Erkenne Duplikate und Null-Felder",
|
|
"Starte API-Smoke-Test": "Starte API-Basischeck",
|
|
"Pruefe Health- und Auth-Endpunkte": "Validiere Health- und Auth-Routen",
|
|
"Validiere Basis-Responses": "Pruefe Grundantworten",
|
|
"Schreibe Release Notes": "Erstelle Release Notes",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Scenario:
|
|
scenario_id: str
|
|
repo: str
|
|
skill: str
|
|
size: str
|
|
noise: bool
|
|
capabilities_to_change: int
|
|
change_kind: str
|
|
seed: int
|
|
|
|
|
|
def scenario_seed(material: str) -> int:
|
|
digest = hashlib.sha256(material.encode("utf-8")).hexdigest()
|
|
return int(digest[:8], 16)
|
|
|
|
|
|
def ensure_capability_section(text: str) -> str:
|
|
if "## Capabilities" in text:
|
|
return text
|
|
|
|
marker = "## Checkliste"
|
|
section = ["## Capabilities", ""]
|
|
for cap in BASE_CAPABILITIES:
|
|
section.append(f"- {cap}")
|
|
section.append("")
|
|
insertion = "\n".join(section)
|
|
|
|
if marker in text:
|
|
return text.replace(marker, f"{insertion}\n{marker}", 1)
|
|
return text.rstrip() + "\n\n" + insertion + "\n"
|
|
|
|
|
|
def mutate_capabilities(text: str, count: int) -> Tuple[str, int]:
|
|
text = ensure_capability_section(text)
|
|
lines = text.splitlines()
|
|
|
|
cap_start = None
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "## Capabilities":
|
|
cap_start = i + 1
|
|
break
|
|
if cap_start is None:
|
|
return text, 0
|
|
|
|
bullets: List[int] = []
|
|
for i in range(cap_start, len(lines)):
|
|
striped = lines[i].strip()
|
|
if striped.startswith("## "):
|
|
break
|
|
if striped.startswith("- "):
|
|
bullets.append(i)
|
|
|
|
replace_count = min(count, len(bullets), len(IMPROVED_CAPABILITIES))
|
|
for idx in range(replace_count):
|
|
lines[bullets[idx]] = f"- {IMPROVED_CAPABILITIES[idx]}"
|
|
|
|
return "\n".join(lines) + "\n", replace_count
|
|
|
|
|
|
def add_content_or_structure_hint(text: str, change_kind: str) -> str:
|
|
if change_kind == "structure-tune":
|
|
return text.replace("## Zweck", "## Zweck\n\nHinweis: Struktur-Tune aktiv.", 1)
|
|
return text.replace("## Zweck", "## Zweck\n\nHinweis: Content-Tune aktiv.", 1)
|
|
|
|
|
|
def reorder_sections(text: str) -> str:
|
|
if "## Trigger-Phrasen" not in text or "## Checkliste" not in text:
|
|
return text
|
|
start_trigger = text.index("## Trigger-Phrasen")
|
|
start_check = text.index("## Checkliste")
|
|
if start_check < start_trigger:
|
|
return text
|
|
head = text[:start_trigger]
|
|
trigger_part = text[start_trigger:start_check]
|
|
rest = text[start_check:]
|
|
if "\n## " in rest:
|
|
check_part = rest.split("\n## ", 1)[0]
|
|
tail = "\n## " + rest.split("\n## ", 1)[1]
|
|
else:
|
|
check_part = rest
|
|
tail = ""
|
|
return head + check_part.rstrip() + "\n\n" + trigger_part.strip() + "\n" + tail
|
|
|
|
|
|
def apply_trigger_synonyms(text: str) -> str:
|
|
for old, new in TRIGGER_SYNONYMS.items():
|
|
text = text.replace(old, new)
|
|
return text
|
|
|
|
|
|
def inject_noise(repo_dir: Path, scenario_id: str, seed: int) -> None:
|
|
rnd = random.Random(seed)
|
|
noise_dir = repo_dir / "noise" / scenario_id
|
|
noise_dir.mkdir(parents=True, exist_ok=True)
|
|
(noise_dir / "notes.txt").write_text("noise payload\nignore me\n", encoding="utf-8")
|
|
payload = {
|
|
"scenario": scenario_id,
|
|
"kind": "noise",
|
|
"seed": seed,
|
|
"checksum_hint": rnd.randint(1000, 9999),
|
|
}
|
|
(noise_dir / "payload.json").write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def build_scenarios() -> List[Scenario]:
|
|
sizes = ["small", "large"]
|
|
noises = [False, True]
|
|
capability_counts = [1, 4]
|
|
change_kinds = [
|
|
"content-tune",
|
|
"structure-tune",
|
|
"rename-skill",
|
|
"reorder-sections",
|
|
"trigger-synonyms",
|
|
]
|
|
|
|
scenarios: List[Scenario] = []
|
|
index = 1
|
|
for repo, skill, size, noise, cap_count, kind in itertools.product(
|
|
REPOS, SKILLS, sizes, noises, capability_counts, change_kinds
|
|
):
|
|
material = "|".join([repo, str(skill), size, str(noise), str(cap_count), kind])
|
|
scenarios.append(
|
|
Scenario(
|
|
scenario_id=f"KC-{index:03d}",
|
|
repo=repo,
|
|
skill=str(skill).replace("\\", "/"),
|
|
size=size,
|
|
noise=noise,
|
|
capabilities_to_change=cap_count,
|
|
change_kind=kind,
|
|
seed=scenario_seed(material),
|
|
)
|
|
)
|
|
index += 1
|
|
return scenarios
|
|
|
|
|
|
def error_signature(exc: Exception) -> str:
|
|
text = f"{type(exc).__name__}: {exc}"
|
|
return text[:160]
|
|
|
|
|
|
def apply_scenarios(
|
|
scenarios: List[Scenario], out_dir: Path, fixtures_dir: Path
|
|
) -> Dict[str, object]:
|
|
runs_dir = out_dir / "runs"
|
|
expected_dir = out_dir / "expected"
|
|
if runs_dir.exists():
|
|
shutil.rmtree(runs_dir)
|
|
if expected_dir.exists():
|
|
shutil.rmtree(expected_dir)
|
|
runs_dir.mkdir(parents=True, exist_ok=True)
|
|
expected_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
results: List[Dict[str, object]] = []
|
|
errors: List[Dict[str, str]] = []
|
|
runtime_by_class: Dict[str, List[float]] = {}
|
|
|
|
for scenario in scenarios:
|
|
start = time.perf_counter()
|
|
class_key = "|".join(
|
|
[
|
|
scenario.size,
|
|
"noise" if scenario.noise else "clean",
|
|
f"cap{scenario.capabilities_to_change}",
|
|
scenario.change_kind,
|
|
]
|
|
)
|
|
|
|
try:
|
|
source_repo = fixtures_dir / scenario.repo
|
|
if not source_repo.exists():
|
|
raise FileNotFoundError(f"fixture repo missing: {source_repo}")
|
|
|
|
scenario_root = runs_dir / scenario.scenario_id
|
|
target_repo = scenario_root / scenario.repo
|
|
shutil.copytree(source_repo, target_repo)
|
|
|
|
skill_path = target_repo / Path(scenario.skill)
|
|
if not skill_path.exists():
|
|
raise FileNotFoundError(f"skill missing: {skill_path}")
|
|
|
|
original = skill_path.read_text(encoding="utf-8")
|
|
changed, replaced_count = mutate_capabilities(original, scenario.capabilities_to_change)
|
|
|
|
if scenario.change_kind in ("content-tune", "structure-tune"):
|
|
changed = add_content_or_structure_hint(changed, scenario.change_kind)
|
|
elif scenario.change_kind == "reorder-sections":
|
|
changed = reorder_sections(changed)
|
|
elif scenario.change_kind == "trigger-synonyms":
|
|
changed = apply_trigger_synonyms(changed)
|
|
|
|
final_skill_path = skill_path
|
|
if scenario.change_kind == "rename-skill":
|
|
final_skill_path = skill_path.with_name("SKILL_RENAMED.md")
|
|
skill_path.unlink()
|
|
|
|
if scenario.size == "large":
|
|
changed += (
|
|
"\n## Erweiterte Testnotiz\n"
|
|
"- Fuehre den gleichen Vorgang mit 3 Umgebungen aus.\n"
|
|
"- Vergleiche Diff, Laufzeit und Fehlersignaturen.\n"
|
|
)
|
|
|
|
final_skill_path.write_text(changed, encoding="utf-8")
|
|
|
|
if scenario.noise:
|
|
inject_noise(target_repo, scenario.scenario_id, scenario.seed)
|
|
|
|
runtime_ms = round((time.perf_counter() - start) * 1000, 3)
|
|
runtime_by_class.setdefault(class_key, []).append(runtime_ms)
|
|
|
|
expected_payload = {
|
|
"scenario_id": scenario.scenario_id,
|
|
"seed": scenario.seed,
|
|
"repo": scenario.repo,
|
|
"skill_input": scenario.skill,
|
|
"skill_output": str(final_skill_path.relative_to(target_repo)).replace("\\", "/"),
|
|
"change_kind": scenario.change_kind,
|
|
"size": scenario.size,
|
|
"noise": scenario.noise,
|
|
"expected_capability_replacements": replaced_count,
|
|
"capabilities_requested": scenario.capabilities_to_change,
|
|
"status": "ok",
|
|
}
|
|
(expected_dir / f"{scenario.scenario_id}.expected.json").write_text(
|
|
json.dumps(expected_payload, indent=2) + "\n", encoding="utf-8"
|
|
)
|
|
|
|
results.append(
|
|
{
|
|
"scenario_id": scenario.scenario_id,
|
|
"runtime_ms": runtime_ms,
|
|
"class_key": class_key,
|
|
"status": "ok",
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
runtime_ms = round((time.perf_counter() - start) * 1000, 3)
|
|
runtime_by_class.setdefault(class_key, []).append(runtime_ms)
|
|
signature = error_signature(exc)
|
|
errors.append({"scenario_id": scenario.scenario_id, "signature": signature})
|
|
results.append(
|
|
{
|
|
"scenario_id": scenario.scenario_id,
|
|
"runtime_ms": runtime_ms,
|
|
"class_key": class_key,
|
|
"status": "error",
|
|
"error_signature": signature,
|
|
}
|
|
)
|
|
|
|
class_metrics = {}
|
|
for key, values in runtime_by_class.items():
|
|
ordered = sorted(values)
|
|
p95_idx = max(0, min(len(ordered) - 1, int((len(ordered) - 1) * 0.95)))
|
|
class_metrics[key] = {
|
|
"count": len(ordered),
|
|
"avg_runtime_ms": round(sum(ordered) / len(ordered), 3),
|
|
"p95_runtime_ms": ordered[p95_idx],
|
|
}
|
|
|
|
signature_counts: Dict[str, int] = {}
|
|
for item in errors:
|
|
signature_counts[item["signature"]] = signature_counts.get(item["signature"], 0) + 1
|
|
|
|
metrics = {
|
|
"total_scenarios": len(scenarios),
|
|
"ok": len([r for r in results if r["status"] == "ok"]),
|
|
"error": len([r for r in results if r["status"] == "error"]),
|
|
"class_metrics": class_metrics,
|
|
"error_signatures": signature_counts,
|
|
}
|
|
(out_dir / "metrics.json").write_text(json.dumps(metrics, indent=2) + "\n", encoding="utf-8")
|
|
(out_dir / "scenario-results.json").write_text(json.dumps(results, indent=2) + "\n", encoding="utf-8")
|
|
|
|
return metrics
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Knowledge-Conduit Generaltest")
|
|
parser.add_argument(
|
|
"--mode",
|
|
choices=["plan", "apply"],
|
|
default="plan",
|
|
help="plan: nur Szenarien schreiben; apply: Szenarien materialisieren",
|
|
)
|
|
parser.add_argument(
|
|
"--fixtures",
|
|
default=str(FIXTURES_DIR),
|
|
help="Verzeichnis mit repo-alpha und repo-beta Fixtures",
|
|
)
|
|
parser.add_argument(
|
|
"--out",
|
|
default=str(DEFAULT_OUT_DIR),
|
|
help="Ausgabeverzeichnis fuer Plan und Artefakte",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
fixtures_dir = Path(args.fixtures)
|
|
|
|
out_dir = Path(args.out)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
scenarios = build_scenarios()
|
|
plan_path = out_dir / "scenario-plan.json"
|
|
plan_path.write_text(json.dumps([asdict(s) for s in scenarios], indent=2) + "\n", encoding="utf-8")
|
|
|
|
metrics = None
|
|
if args.mode == "apply":
|
|
metrics = apply_scenarios(scenarios, out_dir, fixtures_dir)
|
|
|
|
summary = {
|
|
"scenario_count": len(scenarios),
|
|
"mode": args.mode,
|
|
"fixtures": str(fixtures_dir),
|
|
"output": str(out_dir),
|
|
"plan": str(plan_path),
|
|
"metrics": metrics,
|
|
}
|
|
print(json.dumps(summary, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|