#!/usr/bin/env python3 """ Knowledge Conduit – Phase 1: Extraction Extrahiert Improvements aus der Git-History für KI-Tooling-Dateien. Scannt git log für Änderungen im KI-Tooling-Scope (.github/skills, agents, prompts, instructions). Gruppiert Diffs nach Capability und gibt strukturiertes Markdown aus. Usage: python .github/skills/knowledge-conduit/kc-extract.py --since "7 days ago" python .github/skills/knowledge-conduit/kc-extract.py --since "4 days ago" --repo /path/to/repo """ import argparse import os import re import subprocess import sys from collections import defaultdict from datetime import datetime from pathlib import Path # --- Konfiguration --- CONDUIT_SCOPES = [ ".github/skills/", ".github/agents/", ".github/prompts/", ".github/copilot-instructions.md", ] # Dynamisches Pattern für weitere Instructions-Dateien INSTRUCTIONS_PATTERN = re.compile(r"^\.github/[^/]+\.instructions\.md$") MAX_DIFF_LINES = 80 def run_git(*args: str, cwd: str = ".") -> str: """Git-Kommando ausführen, UTF-8-Output zurückgeben.""" result = subprocess.run( ["git"] + list(args), cwd=cwd, capture_output=True, text=True, encoding="utf-8", errors="replace", ) return result.stdout def is_in_conduit_scope(filepath: str) -> bool: """Prüft ob ein Dateipfad im KI-Tooling-Scope liegt.""" for scope in CONDUIT_SCOPES: if scope.endswith("/"): if filepath.startswith(scope): return True else: if filepath == scope: return True # Dynamisch: .github/*.instructions.md if INSTRUCTIONS_PATTERN.match(filepath): return True return False def get_trait_key(filepath: str, repo_path: str) -> str | None: """Leitet den Capability-Key aus einem Dateipfad ab.""" # Skills: skill/ m = re.match(r"^\.github/skills/([^/]+)/", filepath) if m: return f"skill/{m.group(1)}" # Agents: agent/ m = re.match(r"^\.github/agents/(.+)\.agent\.md$", filepath) if m: return f"agent/{m.group(1)}" # Prompts: Standalone oder Verbund m = re.match(r"^\.github/prompts/(.+)\.prompt\.md$", filepath) if m: name = m.group(1) # Verbund-Erkennung: -.prompt.md → Trait des Routers parts = name.split("-") if len(parts) > 1: # Versuche progressiv kürzere Präfixe als Router-Name for i in range(len(parts) - 1, 0, -1): candidate = "-".join(parts[:i]) router_path = Path(repo_path) / f".github/prompts/{candidate}.prompt.md" if router_path.exists(): return f"prompt/{candidate}" # Standalone-Prompt return f"prompt/{name}" # Instructions (*.instructions.md) m = re.match(r"^\.github/(.+)\.instructions\.md$", filepath) if m: return f"instructions/{m.group(1)}" # copilot-instructions.md if filepath == ".github/copilot-instructions.md": return "instructions/copilot-instructions" return None def get_mutation_type(status: str) -> str: """Bestimmt den Improvement-Typ aus dem Git-Status-Buchstaben.""" if status.startswith("A"): return "member-added" elif status.startswith("D"): return "member-removed" else: return "content-change" def extract_mutations(repo_path: str, since: str) -> dict[str, list[dict]]: """Extrahiert alle Improvements aus der Git-History.""" mutations: dict[str, list[dict]] = defaultdict(list) # Git-Log abrufen log_output = run_git( "log", "--format=%H|%aI|%an|%s", f"--since={since}", "--", *CONDUIT_SCOPES, cwd=repo_path, ) if not log_output.strip(): return mutations for line in log_output.strip().split("\n"): if "|" not in line: continue parts = line.split("|", 3) if len(parts) < 4: continue commit_hash, date, author, message = parts # Geänderte Dateien für diesen Commit diff_tree_output = run_git( "diff-tree", "--no-commit-id", "-r", "--name-status", commit_hash, cwd=repo_path, ) for diff_line in diff_tree_output.strip().split("\n"): if not diff_line or not diff_line[0].isalpha(): continue diff_parts = diff_line.split("\t", 2) status = diff_parts[0] filepath = diff_parts[1] if len(diff_parts) > 1 else "" # Bei Renames: Zielpfad verwenden if status.startswith("R") and len(diff_parts) >= 3: filepath = diff_parts[2] # Normalisieren filepath = filepath.replace("\\", "/") if not is_in_conduit_scope(filepath): continue trait_key = get_trait_key(filepath, repo_path) if not trait_key: continue mutation_type = get_mutation_type(status) # Diff holen diff_output = run_git( "show", "--format=", "--no-color", commit_hash, "--", filepath, cwd=repo_path, ) # Diff kürzen diff_lines = diff_output.strip().split("\n") if diff_output.strip() else [] if len(diff_lines) > MAX_DIFF_LINES: truncated = len(diff_lines) - MAX_DIFF_LINES diff_lines = diff_lines[:MAX_DIFF_LINES] + [f"... ({truncated} weitere Zeilen)"] mutations[trait_key].append({ "hash": commit_hash[:8], "date": date, "author": author, "message": message, "file": filepath, "type": mutation_type, "diff": "\n".join(diff_lines), }) return mutations def generate_markdown(mutations: dict[str, list[dict]], repo_path: str, since: str) -> str: """Generiert die Markdown-Ausgabe.""" lines = [] lines.append("# Raw Improvements") lines.append("") lines.append(f"**Extrahiert:** {datetime.now().strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Zeitraum:** seit {since}") lines.append(f"**Repository:** {Path(repo_path).resolve().name}") lines.append(f"**Capabilities mit Improvements:** {len(mutations)}") lines.append("") lines.append("---") lines.append("") if not mutations: lines.append("*Keine Improvements im angegebenen Zeitraum gefunden.*") else: for trait_key in sorted(mutations.keys()): trait_mutations = mutations[trait_key] lines.append(f"## Capability: `{trait_key}`") lines.append("") lines.append("| Improvements | Dateien |") lines.append("|-------------|---------|") unique_files = sorted(set(m["file"] for m in trait_mutations)) lines.append(f"| {len(trait_mutations)} | {', '.join(unique_files)} |") lines.append("") # Gruppiert nach Commit commits_seen: dict[str, list[dict]] = {} for m in trait_mutations: commits_seen.setdefault(m["hash"], []).append(m) for commit_hash, commit_mutations in commits_seen.items(): first = commit_mutations[0] lines.append(f"### [{first['hash']}] {first['message']}") lines.append("") lines.append(f"- **Datum:** {first['date']}") lines.append(f"- **Autor:** {first['author']}") lines.append("") for mutation in commit_mutations: lines.append(f"#### `{mutation['type']}` – {mutation['file']}") lines.append("") if mutation["diff"]: lines.append("```diff") lines.append(mutation["diff"]) lines.append("```") lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Knowledge Conduit – Extraction") parser.add_argument("--since", default="7 days ago", help='Zeitspanne (z.B. "7 days ago")') parser.add_argument("--repo", default=".", help="Pfad zum Repository") parser.add_argument("--output", default="", help="Output-Pfad (default: .github/knowledge-conduit/output/raw-improvements.md)") args = parser.parse_args() repo_path = os.path.abspath(args.repo) output_path = args.output or os.path.join(repo_path, ".github/knowledge-conduit/output/raw-improvements.md") print(f"KC Extract: Scanning commits since '{args.since}'...") mutations = extract_mutations(repo_path, args.since) markdown = generate_markdown(mutations, repo_path, args.since) # Output schreiben os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: f.write(markdown) total_mutations = sum(len(v) for v in mutations.values()) print() print("Extraction abgeschlossen:") print(f" Capabilities: {len(mutations)}") print(f" Improvements: {total_mutations}") print(f" Output: {output_path}") if __name__ == "__main__": main()