#!/usr/bin/env python3 """Knowledge-Conduit Generaltest Harness. Ziele: - kleine/grosse Aenderungen - mit/ohne Noise - 1 oder 4 Capabilities gleichzeitig - zusaetzliche Mutationstypen fuer realistische Drifts - deterministischer Seed je Szenario - Gold-Expected-Files und Metriken """ from __future__ import annotations import argparse import hashlib import itertools import json import random import shutil import time from dataclasses import asdict, dataclass from pathlib import Path from typing import Dict, List, Tuple SCRIPT_DIR = Path(__file__).resolve().parent FIXTURES_DIR = SCRIPT_DIR / "fixtures" DEFAULT_OUT_DIR = SCRIPT_DIR / "artifacts" REPOS = ["repo-alpha", "repo-beta"] SKILLS = [ Path(".github/skills/kc-dataset-cleaner/SKILL.md"), Path(".github/skills/kc-api-smoke/SKILL.md"), Path(".github/skills/kc-release-notes/SKILL.md"), ] BASE_CAPABILITIES = [ "input-validation", "response-schema-check", "retry-backoff", "latency-budget", "auth-guard", "idempotency-check", ] IMPROVED_CAPABILITIES = [ "strict-input-validation", "response-contract-check", "retry-jitter-backoff", "p95-latency-budget", "token-scope-guard", "idempotency-key-replay-check", ] TRIGGER_SYNONYMS = { "Bereinige den Datensatz": "Bereinige den Input-Bestand", "Normalisiere diese CSV": "Standardisiere diese CSV", "Finde Dubletten und fehlende Felder": "Erkenne Duplikate und Null-Felder", "Starte API-Smoke-Test": "Starte API-Basischeck", "Pruefe Health- und Auth-Endpunkte": "Validiere Health- und Auth-Routen", "Validiere Basis-Responses": "Pruefe Grundantworten", "Schreibe Release Notes": "Erstelle Release Notes", } @dataclass class Scenario: scenario_id: str repo: str skill: str size: str noise: bool capabilities_to_change: int change_kind: str seed: int def scenario_seed(material: str) -> int: digest = hashlib.sha256(material.encode("utf-8")).hexdigest() return int(digest[:8], 16) def ensure_capability_section(text: str) -> str: if "## Capabilities" in text: return text marker = "## Checkliste" section = ["## Capabilities", ""] for cap in BASE_CAPABILITIES: section.append(f"- {cap}") section.append("") insertion = "\n".join(section) if marker in text: return text.replace(marker, f"{insertion}\n{marker}", 1) return text.rstrip() + "\n\n" + insertion + "\n" def mutate_capabilities(text: str, count: int) -> Tuple[str, int]: text = ensure_capability_section(text) lines = text.splitlines() cap_start = None for i, line in enumerate(lines): if line.strip() == "## Capabilities": cap_start = i + 1 break if cap_start is None: return text, 0 bullets: List[int] = [] for i in range(cap_start, len(lines)): striped = lines[i].strip() if striped.startswith("## "): break if striped.startswith("- "): bullets.append(i) replace_count = min(count, len(bullets), len(IMPROVED_CAPABILITIES)) for idx in range(replace_count): lines[bullets[idx]] = f"- {IMPROVED_CAPABILITIES[idx]}" return "\n".join(lines) + "\n", replace_count def add_content_or_structure_hint(text: str, change_kind: str) -> str: if change_kind == "structure-tune": return text.replace("## Zweck", "## Zweck\n\nHinweis: Struktur-Tune aktiv.", 1) return text.replace("## Zweck", "## Zweck\n\nHinweis: Content-Tune aktiv.", 1) def reorder_sections(text: str) -> str: if "## Trigger-Phrasen" not in text or "## Checkliste" not in text: return text start_trigger = text.index("## Trigger-Phrasen") start_check = text.index("## Checkliste") if start_check < start_trigger: return text head = text[:start_trigger] trigger_part = text[start_trigger:start_check] rest = text[start_check:] if "\n## " in rest: check_part = rest.split("\n## ", 1)[0] tail = "\n## " + rest.split("\n## ", 1)[1] else: check_part = rest tail = "" return head + check_part.rstrip() + "\n\n" + trigger_part.strip() + "\n" + tail def apply_trigger_synonyms(text: str) -> str: for old, new in TRIGGER_SYNONYMS.items(): text = text.replace(old, new) return text def inject_noise(repo_dir: Path, scenario_id: str, seed: int) -> None: rnd = random.Random(seed) noise_dir = repo_dir / "noise" / scenario_id noise_dir.mkdir(parents=True, exist_ok=True) (noise_dir / "notes.txt").write_text("noise payload\nignore me\n", encoding="utf-8") payload = { "scenario": scenario_id, "kind": "noise", "seed": seed, "checksum_hint": rnd.randint(1000, 9999), } (noise_dir / "payload.json").write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") def build_scenarios() -> List[Scenario]: sizes = ["small", "large"] noises = [False, True] capability_counts = [1, 4] change_kinds = [ "content-tune", "structure-tune", "rename-skill", "reorder-sections", "trigger-synonyms", ] scenarios: List[Scenario] = [] index = 1 for repo, skill, size, noise, cap_count, kind in itertools.product( REPOS, SKILLS, sizes, noises, capability_counts, change_kinds ): material = "|".join([repo, str(skill), size, str(noise), str(cap_count), kind]) scenarios.append( Scenario( scenario_id=f"KC-{index:03d}", repo=repo, skill=str(skill).replace("\\", "/"), size=size, noise=noise, capabilities_to_change=cap_count, change_kind=kind, seed=scenario_seed(material), ) ) index += 1 return scenarios def error_signature(exc: Exception) -> str: text = f"{type(exc).__name__}: {exc}" return text[:160] def apply_scenarios( scenarios: List[Scenario], out_dir: Path, fixtures_dir: Path ) -> Dict[str, object]: runs_dir = out_dir / "runs" expected_dir = out_dir / "expected" if runs_dir.exists(): shutil.rmtree(runs_dir) if expected_dir.exists(): shutil.rmtree(expected_dir) runs_dir.mkdir(parents=True, exist_ok=True) expected_dir.mkdir(parents=True, exist_ok=True) results: List[Dict[str, object]] = [] errors: List[Dict[str, str]] = [] runtime_by_class: Dict[str, List[float]] = {} for scenario in scenarios: start = time.perf_counter() class_key = "|".join( [ scenario.size, "noise" if scenario.noise else "clean", f"cap{scenario.capabilities_to_change}", scenario.change_kind, ] ) try: source_repo = fixtures_dir / scenario.repo if not source_repo.exists(): raise FileNotFoundError(f"fixture repo missing: {source_repo}") scenario_root = runs_dir / scenario.scenario_id target_repo = scenario_root / scenario.repo shutil.copytree(source_repo, target_repo) skill_path = target_repo / Path(scenario.skill) if not skill_path.exists(): raise FileNotFoundError(f"skill missing: {skill_path}") original = skill_path.read_text(encoding="utf-8") changed, replaced_count = mutate_capabilities(original, scenario.capabilities_to_change) if scenario.change_kind in ("content-tune", "structure-tune"): changed = add_content_or_structure_hint(changed, scenario.change_kind) elif scenario.change_kind == "reorder-sections": changed = reorder_sections(changed) elif scenario.change_kind == "trigger-synonyms": changed = apply_trigger_synonyms(changed) final_skill_path = skill_path if scenario.change_kind == "rename-skill": final_skill_path = skill_path.with_name("SKILL_RENAMED.md") skill_path.unlink() if scenario.size == "large": changed += ( "\n## Erweiterte Testnotiz\n" "- Fuehre den gleichen Vorgang mit 3 Umgebungen aus.\n" "- Vergleiche Diff, Laufzeit und Fehlersignaturen.\n" ) final_skill_path.write_text(changed, encoding="utf-8") if scenario.noise: inject_noise(target_repo, scenario.scenario_id, scenario.seed) runtime_ms = round((time.perf_counter() - start) * 1000, 3) runtime_by_class.setdefault(class_key, []).append(runtime_ms) expected_payload = { "scenario_id": scenario.scenario_id, "seed": scenario.seed, "repo": scenario.repo, "skill_input": scenario.skill, "skill_output": str(final_skill_path.relative_to(target_repo)).replace("\\", "/"), "change_kind": scenario.change_kind, "size": scenario.size, "noise": scenario.noise, "expected_capability_replacements": replaced_count, "capabilities_requested": scenario.capabilities_to_change, "status": "ok", } (expected_dir / f"{scenario.scenario_id}.expected.json").write_text( json.dumps(expected_payload, indent=2) + "\n", encoding="utf-8" ) results.append( { "scenario_id": scenario.scenario_id, "runtime_ms": runtime_ms, "class_key": class_key, "status": "ok", } ) except Exception as exc: runtime_ms = round((time.perf_counter() - start) * 1000, 3) runtime_by_class.setdefault(class_key, []).append(runtime_ms) signature = error_signature(exc) errors.append({"scenario_id": scenario.scenario_id, "signature": signature}) results.append( { "scenario_id": scenario.scenario_id, "runtime_ms": runtime_ms, "class_key": class_key, "status": "error", "error_signature": signature, } ) class_metrics = {} for key, values in runtime_by_class.items(): ordered = sorted(values) p95_idx = max(0, min(len(ordered) - 1, int((len(ordered) - 1) * 0.95))) class_metrics[key] = { "count": len(ordered), "avg_runtime_ms": round(sum(ordered) / len(ordered), 3), "p95_runtime_ms": ordered[p95_idx], } signature_counts: Dict[str, int] = {} for item in errors: signature_counts[item["signature"]] = signature_counts.get(item["signature"], 0) + 1 metrics = { "total_scenarios": len(scenarios), "ok": len([r for r in results if r["status"] == "ok"]), "error": len([r for r in results if r["status"] == "error"]), "class_metrics": class_metrics, "error_signatures": signature_counts, } (out_dir / "metrics.json").write_text(json.dumps(metrics, indent=2) + "\n", encoding="utf-8") (out_dir / "scenario-results.json").write_text(json.dumps(results, indent=2) + "\n", encoding="utf-8") return metrics def main() -> int: parser = argparse.ArgumentParser(description="Knowledge-Conduit Generaltest") parser.add_argument( "--mode", choices=["plan", "apply"], default="plan", help="plan: nur Szenarien schreiben; apply: Szenarien materialisieren", ) parser.add_argument( "--fixtures", default=str(FIXTURES_DIR), help="Verzeichnis mit repo-alpha und repo-beta Fixtures", ) parser.add_argument( "--out", default=str(DEFAULT_OUT_DIR), help="Ausgabeverzeichnis fuer Plan und Artefakte", ) args = parser.parse_args() fixtures_dir = Path(args.fixtures) out_dir = Path(args.out) out_dir.mkdir(parents=True, exist_ok=True) scenarios = build_scenarios() plan_path = out_dir / "scenario-plan.json" plan_path.write_text(json.dumps([asdict(s) for s in scenarios], indent=2) + "\n", encoding="utf-8") metrics = None if args.mode == "apply": metrics = apply_scenarios(scenarios, out_dir, fixtures_dir) summary = { "scenario_count": len(scenarios), "mode": args.mode, "fixtures": str(fixtures_dir), "output": str(out_dir), "plan": str(plan_path), "metrics": metrics, } print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())