bollwerk/knowledge-conduit/kc_generaltest.py

394 lines
13 KiB
Python

#!/usr/bin/env python3
"""Knowledge-Conduit Generaltest Harness.
Ziele:
- kleine/grosse Aenderungen
- mit/ohne Noise
- 1 oder 4 Capabilities gleichzeitig
- zusaetzliche Mutationstypen fuer realistische Drifts
- deterministischer Seed je Szenario
- Gold-Expected-Files und Metriken
"""
from __future__ import annotations
import argparse
import hashlib
import itertools
import json
import random
import shutil
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Dict, List, Tuple
SCRIPT_DIR = Path(__file__).resolve().parent
FIXTURES_DIR = SCRIPT_DIR / "fixtures"
DEFAULT_OUT_DIR = SCRIPT_DIR / "artifacts"
REPOS = ["repo-alpha", "repo-beta"]
SKILLS = [
Path(".github/skills/kc-dataset-cleaner/SKILL.md"),
Path(".github/skills/kc-api-smoke/SKILL.md"),
Path(".github/skills/kc-release-notes/SKILL.md"),
]
BASE_CAPABILITIES = [
"input-validation",
"response-schema-check",
"retry-backoff",
"latency-budget",
"auth-guard",
"idempotency-check",
]
IMPROVED_CAPABILITIES = [
"strict-input-validation",
"response-contract-check",
"retry-jitter-backoff",
"p95-latency-budget",
"token-scope-guard",
"idempotency-key-replay-check",
]
TRIGGER_SYNONYMS = {
"Bereinige den Datensatz": "Bereinige den Input-Bestand",
"Normalisiere diese CSV": "Standardisiere diese CSV",
"Finde Dubletten und fehlende Felder": "Erkenne Duplikate und Null-Felder",
"Starte API-Smoke-Test": "Starte API-Basischeck",
"Pruefe Health- und Auth-Endpunkte": "Validiere Health- und Auth-Routen",
"Validiere Basis-Responses": "Pruefe Grundantworten",
"Schreibe Release Notes": "Erstelle Release Notes",
}
@dataclass
class Scenario:
scenario_id: str
repo: str
skill: str
size: str
noise: bool
capabilities_to_change: int
change_kind: str
seed: int
def scenario_seed(material: str) -> int:
digest = hashlib.sha256(material.encode("utf-8")).hexdigest()
return int(digest[:8], 16)
def ensure_capability_section(text: str) -> str:
if "## Capabilities" in text:
return text
marker = "## Checkliste"
section = ["## Capabilities", ""]
for cap in BASE_CAPABILITIES:
section.append(f"- {cap}")
section.append("")
insertion = "\n".join(section)
if marker in text:
return text.replace(marker, f"{insertion}\n{marker}", 1)
return text.rstrip() + "\n\n" + insertion + "\n"
def mutate_capabilities(text: str, count: int) -> Tuple[str, int]:
text = ensure_capability_section(text)
lines = text.splitlines()
cap_start = None
for i, line in enumerate(lines):
if line.strip() == "## Capabilities":
cap_start = i + 1
break
if cap_start is None:
return text, 0
bullets: List[int] = []
for i in range(cap_start, len(lines)):
striped = lines[i].strip()
if striped.startswith("## "):
break
if striped.startswith("- "):
bullets.append(i)
replace_count = min(count, len(bullets), len(IMPROVED_CAPABILITIES))
for idx in range(replace_count):
lines[bullets[idx]] = f"- {IMPROVED_CAPABILITIES[idx]}"
return "\n".join(lines) + "\n", replace_count
def add_content_or_structure_hint(text: str, change_kind: str) -> str:
if change_kind == "structure-tune":
return text.replace("## Zweck", "## Zweck\n\nHinweis: Struktur-Tune aktiv.", 1)
return text.replace("## Zweck", "## Zweck\n\nHinweis: Content-Tune aktiv.", 1)
def reorder_sections(text: str) -> str:
if "## Trigger-Phrasen" not in text or "## Checkliste" not in text:
return text
start_trigger = text.index("## Trigger-Phrasen")
start_check = text.index("## Checkliste")
if start_check < start_trigger:
return text
head = text[:start_trigger]
trigger_part = text[start_trigger:start_check]
rest = text[start_check:]
if "\n## " in rest:
check_part = rest.split("\n## ", 1)[0]
tail = "\n## " + rest.split("\n## ", 1)[1]
else:
check_part = rest
tail = ""
return head + check_part.rstrip() + "\n\n" + trigger_part.strip() + "\n" + tail
def apply_trigger_synonyms(text: str) -> str:
for old, new in TRIGGER_SYNONYMS.items():
text = text.replace(old, new)
return text
def inject_noise(repo_dir: Path, scenario_id: str, seed: int) -> None:
rnd = random.Random(seed)
noise_dir = repo_dir / "noise" / scenario_id
noise_dir.mkdir(parents=True, exist_ok=True)
(noise_dir / "notes.txt").write_text("noise payload\nignore me\n", encoding="utf-8")
payload = {
"scenario": scenario_id,
"kind": "noise",
"seed": seed,
"checksum_hint": rnd.randint(1000, 9999),
}
(noise_dir / "payload.json").write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
def build_scenarios() -> List[Scenario]:
sizes = ["small", "large"]
noises = [False, True]
capability_counts = [1, 4]
change_kinds = [
"content-tune",
"structure-tune",
"rename-skill",
"reorder-sections",
"trigger-synonyms",
]
scenarios: List[Scenario] = []
index = 1
for repo, skill, size, noise, cap_count, kind in itertools.product(
REPOS, SKILLS, sizes, noises, capability_counts, change_kinds
):
material = "|".join([repo, str(skill), size, str(noise), str(cap_count), kind])
scenarios.append(
Scenario(
scenario_id=f"KC-{index:03d}",
repo=repo,
skill=str(skill).replace("\\", "/"),
size=size,
noise=noise,
capabilities_to_change=cap_count,
change_kind=kind,
seed=scenario_seed(material),
)
)
index += 1
return scenarios
def error_signature(exc: Exception) -> str:
text = f"{type(exc).__name__}: {exc}"
return text[:160]
def apply_scenarios(
scenarios: List[Scenario], out_dir: Path, fixtures_dir: Path
) -> Dict[str, object]:
runs_dir = out_dir / "runs"
expected_dir = out_dir / "expected"
if runs_dir.exists():
shutil.rmtree(runs_dir)
if expected_dir.exists():
shutil.rmtree(expected_dir)
runs_dir.mkdir(parents=True, exist_ok=True)
expected_dir.mkdir(parents=True, exist_ok=True)
results: List[Dict[str, object]] = []
errors: List[Dict[str, str]] = []
runtime_by_class: Dict[str, List[float]] = {}
for scenario in scenarios:
start = time.perf_counter()
class_key = "|".join(
[
scenario.size,
"noise" if scenario.noise else "clean",
f"cap{scenario.capabilities_to_change}",
scenario.change_kind,
]
)
try:
source_repo = fixtures_dir / scenario.repo
if not source_repo.exists():
raise FileNotFoundError(f"fixture repo missing: {source_repo}")
scenario_root = runs_dir / scenario.scenario_id
target_repo = scenario_root / scenario.repo
shutil.copytree(source_repo, target_repo)
skill_path = target_repo / Path(scenario.skill)
if not skill_path.exists():
raise FileNotFoundError(f"skill missing: {skill_path}")
original = skill_path.read_text(encoding="utf-8")
changed, replaced_count = mutate_capabilities(original, scenario.capabilities_to_change)
if scenario.change_kind in ("content-tune", "structure-tune"):
changed = add_content_or_structure_hint(changed, scenario.change_kind)
elif scenario.change_kind == "reorder-sections":
changed = reorder_sections(changed)
elif scenario.change_kind == "trigger-synonyms":
changed = apply_trigger_synonyms(changed)
final_skill_path = skill_path
if scenario.change_kind == "rename-skill":
final_skill_path = skill_path.with_name("SKILL_RENAMED.md")
skill_path.unlink()
if scenario.size == "large":
changed += (
"\n## Erweiterte Testnotiz\n"
"- Fuehre den gleichen Vorgang mit 3 Umgebungen aus.\n"
"- Vergleiche Diff, Laufzeit und Fehlersignaturen.\n"
)
final_skill_path.write_text(changed, encoding="utf-8")
if scenario.noise:
inject_noise(target_repo, scenario.scenario_id, scenario.seed)
runtime_ms = round((time.perf_counter() - start) * 1000, 3)
runtime_by_class.setdefault(class_key, []).append(runtime_ms)
expected_payload = {
"scenario_id": scenario.scenario_id,
"seed": scenario.seed,
"repo": scenario.repo,
"skill_input": scenario.skill,
"skill_output": str(final_skill_path.relative_to(target_repo)).replace("\\", "/"),
"change_kind": scenario.change_kind,
"size": scenario.size,
"noise": scenario.noise,
"expected_capability_replacements": replaced_count,
"capabilities_requested": scenario.capabilities_to_change,
"status": "ok",
}
(expected_dir / f"{scenario.scenario_id}.expected.json").write_text(
json.dumps(expected_payload, indent=2) + "\n", encoding="utf-8"
)
results.append(
{
"scenario_id": scenario.scenario_id,
"runtime_ms": runtime_ms,
"class_key": class_key,
"status": "ok",
}
)
except Exception as exc:
runtime_ms = round((time.perf_counter() - start) * 1000, 3)
runtime_by_class.setdefault(class_key, []).append(runtime_ms)
signature = error_signature(exc)
errors.append({"scenario_id": scenario.scenario_id, "signature": signature})
results.append(
{
"scenario_id": scenario.scenario_id,
"runtime_ms": runtime_ms,
"class_key": class_key,
"status": "error",
"error_signature": signature,
}
)
class_metrics = {}
for key, values in runtime_by_class.items():
ordered = sorted(values)
p95_idx = max(0, min(len(ordered) - 1, int((len(ordered) - 1) * 0.95)))
class_metrics[key] = {
"count": len(ordered),
"avg_runtime_ms": round(sum(ordered) / len(ordered), 3),
"p95_runtime_ms": ordered[p95_idx],
}
signature_counts: Dict[str, int] = {}
for item in errors:
signature_counts[item["signature"]] = signature_counts.get(item["signature"], 0) + 1
metrics = {
"total_scenarios": len(scenarios),
"ok": len([r for r in results if r["status"] == "ok"]),
"error": len([r for r in results if r["status"] == "error"]),
"class_metrics": class_metrics,
"error_signatures": signature_counts,
}
(out_dir / "metrics.json").write_text(json.dumps(metrics, indent=2) + "\n", encoding="utf-8")
(out_dir / "scenario-results.json").write_text(json.dumps(results, indent=2) + "\n", encoding="utf-8")
return metrics
def main() -> int:
parser = argparse.ArgumentParser(description="Knowledge-Conduit Generaltest")
parser.add_argument(
"--mode",
choices=["plan", "apply"],
default="plan",
help="plan: nur Szenarien schreiben; apply: Szenarien materialisieren",
)
parser.add_argument(
"--fixtures",
default=str(FIXTURES_DIR),
help="Verzeichnis mit repo-alpha und repo-beta Fixtures",
)
parser.add_argument(
"--out",
default=str(DEFAULT_OUT_DIR),
help="Ausgabeverzeichnis fuer Plan und Artefakte",
)
args = parser.parse_args()
fixtures_dir = Path(args.fixtures)
out_dir = Path(args.out)
out_dir.mkdir(parents=True, exist_ok=True)
scenarios = build_scenarios()
plan_path = out_dir / "scenario-plan.json"
plan_path.write_text(json.dumps([asdict(s) for s in scenarios], indent=2) + "\n", encoding="utf-8")
metrics = None
if args.mode == "apply":
metrics = apply_scenarios(scenarios, out_dir, fixtures_dir)
summary = {
"scenario_count": len(scenarios),
"mode": args.mode,
"fixtures": str(fixtures_dir),
"output": str(out_dir),
"plan": str(plan_path),
"metrics": metrics,
}
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())