#!/usr/bin/env python3 """ Traceability-Werkzeug fuer demo-epb. Liest Markdown-Items aus safety/sg, reqs/sys, reqs/swe, arch/sys, arch/swe und verifiziert die Traceability-Kette: SG <-- SYS <-- SA <-- SWE <-- SWA <-- Code (@arch) <-- Tests (@reqs) Subcommands: check Validiert Konsistenz, exit 1 bei Fehlern publish DIR Schreibt HTML + JSON nach DIR/ Run: python3 tools/traceability.py check python3 tools/traceability.py publish docs/traceability/ """ from __future__ import annotations import html import json import re import sys from dataclasses import dataclass, field from pathlib import Path REPO = Path(__file__).resolve().parent.parent SOURCES = [ ("SG", "safety/sg", "Safety Goals"), ("SYS", "reqs/sys", "System Requirements"), ("SWE", "reqs/swe", "Software Requirements"), ("SA", "arch/sys", "System Architecture"), ("SWA", "arch/swe", "Software Architecture"), ] # Forward: items of prefix SHOULD link to prefix EXPECTED_LINKS = { "SA": ["SYS"], "SWE": ["SYS"], "SWA": ["SWE"], # SYS optionally links to SG — checked separately, only for safety-relevant SYS } # Reverse coverage: each item of must be referenced by all items in list COVERAGE = { "SG": ["SYS"], # each SG must be detailed by at least one SYS "SYS": ["SA", "SWE"], # each SYS must be covered by SA + SWE "SWE": ["SWA"], # each SWE must be implemented by SWA } # Components that are implemented in src/ (have a .c file) IMPLEMENTED_SWA = { "SWA-001": "src/safety_manager.c", "SWA-002": "src/apply_controller.c", "SWA-003": "src/actuator_driver.c", "SWA-006": "src/switch_debouncer.c", } # Tests we ship — map test file → SWA it covers IMPLEMENTED_TESTS = { "test_safety_manager.c": "SWA-001", "test_apply_controller.c": "SWA-002", "test_actuator_driver.c": "SWA-003", "test_switch_debouncer.c": "SWA-006", } @dataclass class Item: id: str prefix: str path: Path title: str asil: str links: list[str] = field(default_factory=list) FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) def parse_item(path: Path, prefix: str) -> Item | None: text = path.read_text() m = FRONTMATTER_RE.match(text) if not m: return None fm = m.group(1) def field_value(name: str) -> str | None: rx = re.search(rf"^{name}:\s*(.*?)$", fm, re.MULTILINE) return rx.group(1).strip().strip("'\"") if rx else None asil = field_value("asil") or "?" title = field_value("header") or path.stem links: list[str] = [] in_links = False for line in fm.splitlines(): if line.startswith("links:"): in_links = True continue if in_links: if line.startswith(" - "): m2 = re.match(r" - ([A-Z]+-\d+)", line) if m2: links.append(m2.group(1)) elif not line.startswith(" ") and line.strip(): in_links = False return Item(id=path.stem, prefix=prefix, path=path, title=title, asil=asil, links=links) def collect_all() -> dict[str, Item]: items: dict[str, Item] = {} for prefix, rel, _label in SOURCES: d = REPO / rel if not d.exists(): continue for f in sorted(d.glob("*.md")): it = parse_item(f, prefix) if it is None: continue items[it.id] = it return items # --------------------------------------------------------------------------- # Code / Test traceability # --------------------------------------------------------------------------- ARCH_TAG_RE = re.compile(r"@arch\s+([A-Z]+-\d+(?:\s+[A-Z]+-\d+)*)") REQS_TAG_RE = re.compile(r"@reqs\s+([A-Z]+-\d+(?:\s+[A-Z]+-\d+)*)") def extract_tags(path: Path) -> tuple[list[str], list[str]]: """Extract @arch and @reqs tag IDs from the file header (first ~400 chars).""" if not path.exists(): return [], [] head = path.read_text()[:600] arch_ids: list[str] = [] reqs_ids: list[str] = [] for m in ARCH_TAG_RE.finditer(head): arch_ids.extend(re.findall(r"[A-Z]+-\d+", m.group(1))) for m in REQS_TAG_RE.finditer(head): reqs_ids.extend(re.findall(r"[A-Z]+-\d+", m.group(1))) return arch_ids, reqs_ids def check_code_test_mapping(items: dict[str, Item]) -> list[str]: """Check that: - Each implemented .c file's @arch matches its SWA-id (per IMPLEMENTED_SWA) - Each test's @reqs covers all SWE that the corresponding SWA implements """ errors: list[str] = [] for swa_id, src_rel in IMPLEMENTED_SWA.items(): src = REPO / src_rel arch_tags, _ = extract_tags(src) if swa_id not in arch_tags: errors.append(f"{src_rel}: header @arch enthaelt {swa_id} nicht " f"(gefunden: {arch_tags or '—'})") # For each test, verify @reqs covers the SWE that the corresponding SWA links to for test_file, swa_id in IMPLEMENTED_TESTS.items(): test_path = REPO / "tests" / "unit" / test_file _, reqs_in_test = extract_tags(test_path) if not reqs_in_test: errors.append(f"tests/unit/{test_file}: kein @reqs Tag im Header") continue swa = items.get(swa_id) if swa is None: errors.append(f"tests/unit/{test_file}: referenziertes " f"{swa_id} nicht gefunden") continue swa_swe = set(swa.links) test_swe = set(reqs_in_test) missing = swa_swe - test_swe if missing: errors.append(f"tests/unit/{test_file}: deckt nicht alle SWE " f"der {swa_id} ab — fehlend: {sorted(missing)}") return errors # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_check(items: dict[str, Item]) -> int: errors: list[str] = [] for it in items.values(): for link in it.links: if link not in items: errors.append(f"{it.id} links to non-existent {link}") for it in items.values(): expected_prefixes = EXPECTED_LINKS.get(it.prefix, []) if not expected_prefixes: continue actual_prefixes = {items[l].prefix for l in it.links if l in items} for ep in expected_prefixes: if ep not in actual_prefixes: errors.append( f"{it.id} ({it.prefix}) has no link to a {ep}-* item" ) incoming: dict[str, set[str]] = {iid: set() for iid in items} for it in items.values(): for link in it.links: if link in incoming: incoming[link].add(it.prefix) for it in items.values(): required = COVERAGE.get(it.prefix, []) for rp in required: if rp not in incoming[it.id]: # Exclude QM-level SYS reqs from SG coverage check if rp == "SYS" and it.prefix == "SG": continue errors.append( f"{it.id} ({it.prefix}) is not referenced by any {rp}-* item" ) # Code + Test mapping errors.extend(check_code_test_mapping(items)) counts = {p: sum(1 for i in items.values() if i.prefix == p) for p, _, _ in SOURCES} print(f"\nItems found: {sum(counts.values())}") for p, _, _ in SOURCES: print(f" {p:4} {counts[p]:3}") print(f"\nCode mappings: {len(IMPLEMENTED_SWA)} implemented SWA") print(f"Test mappings: {len(IMPLEMENTED_TESTS)} test files") print() if errors: print(f"FAIL: {len(errors)} traceability error(s):") for e in errors: print(f" - {e}") return 1 print("OK — Traceability vollstaendig (SG → SYS → SA, SWE → SWA → Code+Test).") return 0 def asil_color(asil: str) -> str: return { "D": "#d62728", "C": "#ff7f0e", "B": "#2ca02c", "A": "#1f77b4", "QM": "#888", }.get(asil, "#aaa") def cmd_publish(items: dict[str, Item], out_dir: Path) -> int: out_dir.mkdir(parents=True, exist_ok=True) # Build reverse map: who links to me? children: dict[str, list[str]] = {iid: [] for iid in items} for it in items.values(): for link in it.links: if link in children: children[link].append(it.id) rows = [] sgs = [i for i in items.values() if i.prefix == "SG"] qm_sys = [i for i in items.values() if i.prefix == "SYS" and not i.links] # For each SG, build a row for sg in sorted(sgs, key=lambda i: i.id): sys_items = [c for c in children[sg.id] if items[c].prefix == "SYS"] for s in sorted(sys_items): sys_it = items[s] sas = [c for c in children[sys_it.id] if items[c].prefix == "SA"] swes = [c for c in children[sys_it.id] if items[c].prefix == "SWE"] swas = sorted({c for sw in swes for c in children[sw] if items[c].prefix == "SWA"}) code = sorted({swa for swa in swas if swa in IMPLEMENTED_SWA}) tests = sorted({f for f, swa in IMPLEMENTED_TESTS.items() if swa in swas}) rows.append({"sg": sg, "sys": sys_it, "sa": sas, "swe": swes, "swa": swas, "code": code, "tests": tests}) # Also include QM-level SYS (not linked to SG) as separate section for sys_it in sorted(qm_sys, key=lambda i: i.id): sas = [c for c in children[sys_it.id] if items[c].prefix == "SA"] swes = [c for c in children[sys_it.id] if items[c].prefix == "SWE"] swas = sorted({c for sw in swes for c in children[sw] if items[c].prefix == "SWA"}) code = sorted({swa for swa in swas if swa in IMPLEMENTED_SWA}) tests = sorted({f for f, swa in IMPLEMENTED_TESTS.items() if swa in swas}) rows.append({"sg": None, "sys": sys_it, "sa": sas, "swe": swes, "swa": swas, "code": code, "tests": tests}) counts = {p: sum(1 for i in items.values() if i.prefix == p) for p, _, _ in SOURCES} # HTML parts = [ "", "", "demo-epb — Traceability Matrix", "", "

demo-epb — Traceability Matrix

", "

Vollstaendige Kette: SG → SYS → SA, SWE → SWA → Code (@arch) + Test (@reqs)

", "

", ] for p, _, label in SOURCES: parts.append(f"{p}: {counts[p]}   ") parts.append(f"Code-Files: {len(IMPLEMENTED_SWA)}   ") parts.append(f"Test-Files: {len(IMPLEMENTED_TESTS)}") parts.append("

") parts.append("") parts.append( "" "" "" ) def cell_items(ids: list[str]) -> str: if not ids: return "" bits = [] for i in ids: it = items[i] c = asil_color(it.asil) bits.append( f"
{html.escape(i)} " f"" f"{html.escape(it.asil)}
" f"
{html.escape(it.title)}
" ) return "" def cell_item(it: Item | None) -> str: if it is None: return "" c = asil_color(it.asil) return (f"") def cell_files(files: list[str], prefix: str = "") -> str: if not files: return "" return "" for r in rows: parts.append("") parts.append(cell_item(r["sg"])) parts.append(cell_item(r["sys"])) parts.append(cell_items(r["sa"])) parts.append(cell_items(r["swe"])) parts.append(cell_items(r["swa"])) parts.append(cell_files([IMPLEMENTED_SWA.get(s, "") for s in r["code"] if IMPLEMENTED_SWA.get(s)])) parts.append(cell_files(r["tests"], "tests/unit/")) parts.append("") parts.append("
Safety GoalSystem-RequirementSystem-ArchSoftware-ReqSoftware-ArchCodeTest
" + "".join(bits) + "
{html.escape(it.id)} " f"" f"{html.escape(it.asil)}
" f"
{html.escape(it.title)}
" + "".join( f"
{html.escape(prefix + f)}
" for f in files ) + "
") # Code/Test details parts.append("

Code → Architektur

") parts.append("") for swa_id, src_rel in IMPLEMENTED_SWA.items(): arch, reqs = extract_tags(REPO / src_rel) parts.append( f"" f"" f"" ) parts.append("
Datei@arch@reqs
{html.escape(src_rel)}{' '.join(arch)}{' '.join(reqs)}
") parts.append("

Test → Anforderungen

") parts.append("") for test_file, swa_id in IMPLEMENTED_TESTS.items(): _, reqs = extract_tags(REPO / "tests" / "unit" / test_file) parts.append( f"" f"" f"" ) parts.append("
Test-DateiDecklt SWA@reqs
tests/unit/{html.escape(test_file)}{swa_id}{' '.join(reqs)}
") parts.append("") (out_dir / "index.html").write_text("\n".join(parts)) # JSON matrix = [] for r in rows: matrix.append({ "sg": {"id": r["sg"].id, "asil": r["sg"].asil} if r["sg"] else None, "sys": {"id": r["sys"].id, "asil": r["sys"].asil, "title": r["sys"].title}, "sa": [{"id": i, "asil": items[i].asil} for i in r["sa"]], "swe": [{"id": i, "asil": items[i].asil} for i in r["swe"]], "swa": [{"id": i, "asil": items[i].asil} for i in r["swa"]], "code": [IMPLEMENTED_SWA[s] for s in r["code"] if s in IMPLEMENTED_SWA], "tests": [f"tests/unit/{f}" for f in r["tests"]], }) (out_dir / "matrix.json").write_text(json.dumps(matrix, indent=2)) print(f"Wrote: {out_dir / 'index.html'}") print(f"Wrote: {out_dir / 'matrix.json'}") return 0 def main(): if len(sys.argv) < 2: print(__doc__) return 2 cmd = sys.argv[1] items = collect_all() if cmd == "check": return cmd_check(items) if cmd == "publish": out = Path(sys.argv[2] if len(sys.argv) > 2 else "docs/traceability") rc = cmd_check(items) if rc != 0: print("WARN: publishing despite check errors") cmd_publish(items, out) return 0 print(f"unknown command: {cmd}") return 2 if __name__ == "__main__": sys.exit(main())