feat: Safety Goals + Drive-Away-Assist + vollst. Traceability
Validate / build-test (macos-latest) (push) Failing after 4s
Validate / build-test (windows-latest) (push) Failing after 17s
Validate / build-test (ubuntu-latest) (push) Successful in 16s
Validate / reports (push) Has been skipped
Release / release (push) Successful in 48s

Neue Layer:
- safety/sg/SG-001..005 als eigene Doorstop-Items (ASIL D/D/A/C/B)
- SYS-Reqs verlinken nach oben auf SG via frontmatter
- Kette ist jetzt: SG -> SYS -> SA, SWE -> SWA -> Code (@arch) + Test (@reqs)

Drive-Away-Assist im Safety Manager:
- SWE-011 (Anfahrabsicht erkennen) implementiert
- SWE-012 (Sicherheits-Check Tuer + Gurt) implementiert
- Neuer State SAFETY_DRIVE_AWAY + safety_mgr_release_requested()
- SafetyInputs erweitert um gas_pedal_percent, gear_in_drive,
  door_closed, seatbelt_fastened
- 5 neue Tests (DRIVE_AWAY armed/blocked/end-conditions)
- Test-Header @reqs erweitert auf SWE-007..012

traceability.py erweitert:
- SG als neuer Top-Level
- Code-Mapping-Check: @arch im Header von src/*.c muss SWA-id matchen
- Test-Mapping-Check: @reqs im Header der Tests muss alle SWE der
  zugehoerigen SWA abdecken
- HTML zeigt 7 Spalten: SG | SYS | SA | SWE | SWA | Code | Test
- 2 zusaetzliche Tabellen: Code->Arch und Test->Reqs

test_apply_controller.c:
- @reqs Header um SWE-005 ergaenzt (war funktional drin, nur Tag fehlte)

Counts:
- 55 doorstop-Items (war 50)
- 46 Unit-Tests (war 41)
- Traceability vollstaendig in beide Richtungen
This commit is contained in:
Stefan Lohmaier
2026-05-12 01:50:12 -07:00
parent 17910835ad
commit c610cc023c
20 changed files with 998 additions and 214 deletions
+213 -55
View File
@@ -2,11 +2,12 @@
"""
Traceability-Werkzeug fuer demo-epb.
Liest alle Markdown-Items in reqs/ und arch/ ein, validiert Links bidirektional
und erzeugt eine HTML-Traceability-Matrix.
Liest Markdown-Items aus safety/sg, reqs/sys, reqs/swe, arch/sys, arch/swe und
verifiziert die Traceability-Kette:
Doorstop-kompatibles Format (YAML-Frontmatter + Markdown-Body), aber ohne
Doorstop-Dependency — bleibt portabel.
SG <-- SYS <-- SA
<-- SWE <-- SWA <-- Code (@arch)
<-- Tests (@reqs)
Subcommands:
check Validiert Konsistenz, exit 1 bei Fehlern
@@ -28,25 +29,42 @@ from pathlib import Path
REPO = Path(__file__).resolve().parent.parent
SOURCES = [
("SG", "safety/sg", "Safety Goals"),
("SYS", "reqs/sys", "System Requirements"),
("SWE", "reqs/swe", "Software Requirements"),
("SA", "arch/sys", "System Architecture"),
("SWA", "arch/swe", "Software Architecture"),
]
# Welche Quellen verlinken auf welche?
# (key) -> (target_prefix) : Items mit key linken auf Items mit target_prefix
# Forward: items of <key> prefix SHOULD link to <target> prefix
EXPECTED_LINKS = {
"SA": ["SYS"],
"SWE": ["SYS"],
"SWA": ["SWE"],
# SYS optionally links to SG — checked separately, only for safety-relevant SYS
}
# Reverse: welche Quellen MUESSEN von welchen Quellen referenziert werden?
# (target) -> [prefix that should link to target] (coverage check)
# Reverse coverage: each item of <key> must be referenced by all items in list
COVERAGE = {
"SYS": ["SA", "SWE"], # jede SYS-Req muss durch SA und SWE abgedeckt sein
"SWE": ["SWA"], # jede SWE-Req muss durch SWA abgedeckt sein
"SG": ["SYS"], # each SG must be detailed by at least one SYS
"SYS": ["SA", "SWE"], # each SYS must be covered by SA + SWE
"SWE": ["SWA"], # each SWE must be implemented by SWA
}
# Components that are implemented in src/ (have a .c file)
IMPLEMENTED_SWA = {
"SWA-001": "src/safety_manager.c",
"SWA-002": "src/apply_controller.c",
"SWA-003": "src/actuator_driver.c",
"SWA-006": "src/switch_debouncer.c",
}
# Tests we ship — map test file → SWA it covers
IMPLEMENTED_TESTS = {
"test_safety_manager.c": "SWA-001",
"test_apply_controller.c": "SWA-002",
"test_actuator_driver.c": "SWA-003",
"test_switch_debouncer.c": "SWA-006",
}
@@ -61,7 +79,6 @@ class Item:
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
LINKS_RE = re.compile(r"^\s*-\s+([A-Z]+-\d+)", re.MULTILINE)
def parse_item(path: Path, prefix: str) -> Item | None:
@@ -71,7 +88,6 @@ def parse_item(path: Path, prefix: str) -> Item | None:
return None
fm = m.group(1)
# Crude YAML parsing — we only need a few fields.
def field_value(name: str) -> str | None:
rx = re.search(rf"^{name}:\s*(.*?)$", fm, re.MULTILINE)
return rx.group(1).strip().strip("'\"") if rx else None
@@ -79,7 +95,6 @@ def parse_item(path: Path, prefix: str) -> Item | None:
asil = field_value("asil") or "?"
title = field_value("header") or path.stem
# links: collect IDs from the `links:` block.
links: list[str] = []
in_links = False
for line in fm.splitlines():
@@ -112,16 +127,76 @@ def collect_all() -> dict[str, Item]:
return items
# ---------------------------------------------------------------------------
# Code / Test traceability
# ---------------------------------------------------------------------------
ARCH_TAG_RE = re.compile(r"@arch\s+([A-Z]+-\d+(?:\s+[A-Z]+-\d+)*)")
REQS_TAG_RE = re.compile(r"@reqs\s+([A-Z]+-\d+(?:\s+[A-Z]+-\d+)*)")
def extract_tags(path: Path) -> tuple[list[str], list[str]]:
"""Extract @arch and @reqs tag IDs from the file header (first ~400 chars)."""
if not path.exists():
return [], []
head = path.read_text()[:600]
arch_ids: list[str] = []
reqs_ids: list[str] = []
for m in ARCH_TAG_RE.finditer(head):
arch_ids.extend(re.findall(r"[A-Z]+-\d+", m.group(1)))
for m in REQS_TAG_RE.finditer(head):
reqs_ids.extend(re.findall(r"[A-Z]+-\d+", m.group(1)))
return arch_ids, reqs_ids
def check_code_test_mapping(items: dict[str, Item]) -> list[str]:
"""Check that:
- Each implemented .c file's @arch matches its SWA-id (per IMPLEMENTED_SWA)
- Each test's @reqs covers all SWE that the corresponding SWA implements
"""
errors: list[str] = []
for swa_id, src_rel in IMPLEMENTED_SWA.items():
src = REPO / src_rel
arch_tags, _ = extract_tags(src)
if swa_id not in arch_tags:
errors.append(f"{src_rel}: header @arch enthaelt {swa_id} nicht "
f"(gefunden: {arch_tags or ''})")
# For each test, verify @reqs covers the SWE that the corresponding SWA links to
for test_file, swa_id in IMPLEMENTED_TESTS.items():
test_path = REPO / "tests" / "unit" / test_file
_, reqs_in_test = extract_tags(test_path)
if not reqs_in_test:
errors.append(f"tests/unit/{test_file}: kein @reqs Tag im Header")
continue
swa = items.get(swa_id)
if swa is None:
errors.append(f"tests/unit/{test_file}: referenziertes "
f"{swa_id} nicht gefunden")
continue
swa_swe = set(swa.links)
test_swe = set(reqs_in_test)
missing = swa_swe - test_swe
if missing:
errors.append(f"tests/unit/{test_file}: deckt nicht alle SWE "
f"der {swa_id} ab — fehlend: {sorted(missing)}")
return errors
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_check(items: dict[str, Item]) -> int:
errors: list[str] = []
# 1. Each link target must exist
for it in items.values():
for link in it.links:
if link not in items:
errors.append(f"{it.id} links to non-existent {link}")
# 2. Forward expectation: items of certain prefixes must link to others
for it in items.values():
expected_prefixes = EXPECTED_LINKS.get(it.prefix, [])
if not expected_prefixes:
@@ -133,7 +208,6 @@ def cmd_check(items: dict[str, Item]) -> int:
f"{it.id} ({it.prefix}) has no link to a {ep}-* item"
)
# 3. Coverage: each item of certain prefix must be referenced by certain types
incoming: dict[str, set[str]] = {iid: set() for iid in items}
for it in items.values():
for link in it.links:
@@ -144,22 +218,30 @@ def cmd_check(items: dict[str, Item]) -> int:
required = COVERAGE.get(it.prefix, [])
for rp in required:
if rp not in incoming[it.id]:
# Exclude QM-level SYS reqs from SG coverage check
if rp == "SYS" and it.prefix == "SG":
continue
errors.append(
f"{it.id} ({it.prefix}) is not referenced by any {rp}-* item"
)
print(f"\nItems found: {len(items)}")
print(f" SYS: {sum(1 for i in items.values() if i.prefix == 'SYS')}")
print(f" SWE: {sum(1 for i in items.values() if i.prefix == 'SWE')}")
print(f" SA: {sum(1 for i in items.values() if i.prefix == 'SA')}")
print(f" SWA: {sum(1 for i in items.values() if i.prefix == 'SWA')}")
# Code + Test mapping
errors.extend(check_code_test_mapping(items))
counts = {p: sum(1 for i in items.values() if i.prefix == p)
for p, _, _ in SOURCES}
print(f"\nItems found: {sum(counts.values())}")
for p, _, _ in SOURCES:
print(f" {p:4} {counts[p]:3}")
print(f"\nCode mappings: {len(IMPLEMENTED_SWA)} implemented SWA")
print(f"Test mappings: {len(IMPLEMENTED_TESTS)} test files")
print()
if errors:
print(f"FAIL: {len(errors)} traceability error(s):")
for e in errors:
print(f" - {e}")
return 1
print("OK — Traceability vollstaendig.")
print("OK — Traceability vollstaendig (SG → SYS → SA, SWE → SWA → Code+Test).")
return 0
@@ -176,7 +258,7 @@ def asil_color(asil: str) -> str:
def cmd_publish(items: dict[str, Item], out_dir: Path) -> int:
out_dir.mkdir(parents=True, exist_ok=True)
# Build forward and reverse maps
# Build reverse map: who links to me?
children: dict[str, list[str]] = {iid: [] for iid in items}
for it in items.values():
for link in it.links:
@@ -184,15 +266,35 @@ def cmd_publish(items: dict[str, Item], out_dir: Path) -> int:
children[link].append(it.id)
rows = []
sys_items = [i for i in items.values() if i.prefix == "SYS"]
for sys in sorted(sys_items, key=lambda i: i.id):
sas = [c for c in children[sys.id] if items[c].prefix == "SA"]
swes = [c for c in children[sys.id] if items[c].prefix == "SWE"]
swas = sorted(set(c for s in swes for c in children[s]
if items[c].prefix == "SWA"))
rows.append({
"sys": sys, "sa": sas, "swe": swes, "swa": swas
})
sgs = [i for i in items.values() if i.prefix == "SG"]
qm_sys = [i for i in items.values() if i.prefix == "SYS" and not i.links]
# For each SG, build a row
for sg in sorted(sgs, key=lambda i: i.id):
sys_items = [c for c in children[sg.id] if items[c].prefix == "SYS"]
for s in sorted(sys_items):
sys_it = items[s]
sas = [c for c in children[sys_it.id] if items[c].prefix == "SA"]
swes = [c for c in children[sys_it.id] if items[c].prefix == "SWE"]
swas = sorted({c for sw in swes for c in children[sw]
if items[c].prefix == "SWA"})
code = sorted({swa for swa in swas if swa in IMPLEMENTED_SWA})
tests = sorted({f for f, swa in IMPLEMENTED_TESTS.items()
if swa in swas})
rows.append({"sg": sg, "sys": sys_it, "sa": sas, "swe": swes,
"swa": swas, "code": code, "tests": tests})
# Also include QM-level SYS (not linked to SG) as separate section
for sys_it in sorted(qm_sys, key=lambda i: i.id):
sas = [c for c in children[sys_it.id] if items[c].prefix == "SA"]
swes = [c for c in children[sys_it.id] if items[c].prefix == "SWE"]
swas = sorted({c for sw in swes for c in children[sw]
if items[c].prefix == "SWA"})
code = sorted({swa for swa in swas if swa in IMPLEMENTED_SWA})
tests = sorted({f for f, swa in IMPLEMENTED_TESTS.items() if swa in swas})
rows.append({"sg": None, "sys": sys_it, "sa": sas, "swe": swes,
"swa": swas, "code": code, "tests": tests})
counts = {p: sum(1 for i in items.values() if i.prefix == p) for p, _, _ in SOURCES}
# HTML
parts = [
@@ -201,60 +303,116 @@ def cmd_publish(items: dict[str, Item], out_dir: Path) -> int:
"<title>demo-epb — Traceability Matrix</title>",
"<style>",
"body{font-family:-apple-system,Segoe UI,sans-serif;padding:20px;color:#222}",
"table{border-collapse:collapse;width:100%;font-size:14px}",
"table{border-collapse:collapse;width:100%;font-size:13px;margin-top:16px}",
"th,td{border:1px solid #ccc;padding:6px 8px;vertical-align:top;text-align:left}",
"th{background:#f0f0f0}",
"th{background:#f0f0f0;position:sticky;top:0}",
"tr:nth-child(even) td{background:#fafafa}",
".asil{display:inline-block;padding:1px 6px;border-radius:3px;color:white;font-weight:bold;font-size:11px}",
".id{font-family:Consolas,monospace;font-size:13px}",
".id{font-family:Consolas,monospace;font-size:12px}",
".cnt{color:#666;font-size:11px}",
"h1{color:#1f3864}",
"h1{color:#1f3864}h2{color:#1f3864;margin-top:30px}",
".missing{color:#c00}",
"</style></head><body>",
"<h1>demo-epb — Traceability Matrix</h1>",
f"<p>Generiert aus {sum(1 for _ in items)} Items "
f"(SYS: {len([i for i in items.values() if i.prefix=='SYS'])}, "
f"SWE: {len([i for i in items.values() if i.prefix=='SWE'])}, "
f"SA: {len([i for i in items.values() if i.prefix=='SA'])}, "
f"SWA: {len([i for i in items.values() if i.prefix=='SWA'])}).</p>",
"<table>",
"<tr><th>System-Requirement</th><th>System-Arch (SA)</th>"
"<th>Software-Req (SWE)</th><th>Software-Arch (SWA)</th></tr>",
"<p>Vollstaendige Kette: <code>SG → SYS → SA, SWE → SWA → Code (@arch) + Test (@reqs)</code></p>",
"<p>",
]
for p, _, label in SOURCES:
parts.append(f"<strong>{p}:</strong> {counts[p]} &nbsp; ")
parts.append(f"<strong>Code-Files:</strong> {len(IMPLEMENTED_SWA)} &nbsp; ")
parts.append(f"<strong>Test-Files:</strong> {len(IMPLEMENTED_TESTS)}")
parts.append("</p>")
def cell(ids: list[str]) -> str:
parts.append("<table>")
parts.append(
"<tr><th>Safety Goal</th><th>System-Requirement</th>"
"<th>System-Arch</th><th>Software-Req</th>"
"<th>Software-Arch</th><th>Code</th><th>Test</th></tr>"
)
def cell_items(ids: list[str]) -> str:
if not ids:
return "<td style='color:#c00'>—</td>"
return "<td class='missing'>—</td>"
bits = []
for i in ids:
it = items[i]
c = asil_color(it.asil)
bits.append(
f"<div><span class='id'>{html.escape(i)}</span> "
f"<span class='asil' style='background:{c}'>{html.escape(it.asil)}</span></div>"
f"<span class='asil' style='background:{c}'>"
f"{html.escape(it.asil)}</span></div>"
f"<div class='cnt'>{html.escape(it.title)}</div>"
)
return "<td>" + "".join(bits) + "</td>"
def cell_item(it: Item | None) -> str:
if it is None:
return "<td class='missing'>—</td>"
c = asil_color(it.asil)
return (f"<td><div><span class='id'>{html.escape(it.id)}</span> "
f"<span class='asil' style='background:{c}'>"
f"{html.escape(it.asil)}</span></div>"
f"<div class='cnt'>{html.escape(it.title)}</div></td>")
def cell_files(files: list[str], prefix: str = "") -> str:
if not files:
return "<td class='cnt'>—</td>"
return "<td>" + "".join(
f"<div class='id'>{html.escape(prefix + f)}</div>"
for f in files
) + "</td>"
for r in rows:
sys = r["sys"]
c = asil_color(sys.asil)
first = (f"<td><div><span class='id'>{html.escape(sys.id)}</span> "
f"<span class='asil' style='background:{c}'>{html.escape(sys.asil)}</span></div>"
f"<div class='cnt'>{html.escape(sys.title)}</div></td>")
parts.append("<tr>" + first + cell(r["sa"]) + cell(r["swe"]) + cell(r["swa"]) + "</tr>")
parts.append("<tr>")
parts.append(cell_item(r["sg"]))
parts.append(cell_item(r["sys"]))
parts.append(cell_items(r["sa"]))
parts.append(cell_items(r["swe"]))
parts.append(cell_items(r["swa"]))
parts.append(cell_files([IMPLEMENTED_SWA.get(s, "") for s in r["code"]
if IMPLEMENTED_SWA.get(s)]))
parts.append(cell_files(r["tests"], "tests/unit/"))
parts.append("</tr>")
parts.append("</table></body></html>")
parts.append("</table>")
# Code/Test details
parts.append("<h2>Code → Architektur</h2>")
parts.append("<table><tr><th>Datei</th><th>@arch</th><th>@reqs</th></tr>")
for swa_id, src_rel in IMPLEMENTED_SWA.items():
arch, reqs = extract_tags(REPO / src_rel)
parts.append(
f"<tr><td class='id'>{html.escape(src_rel)}</td>"
f"<td>{' '.join(arch)}</td>"
f"<td class='cnt'>{' '.join(reqs)}</td></tr>"
)
parts.append("</table>")
parts.append("<h2>Test → Anforderungen</h2>")
parts.append("<table><tr><th>Test-Datei</th><th>Decklt SWA</th><th>@reqs</th></tr>")
for test_file, swa_id in IMPLEMENTED_TESTS.items():
_, reqs = extract_tags(REPO / "tests" / "unit" / test_file)
parts.append(
f"<tr><td class='id'>tests/unit/{html.escape(test_file)}</td>"
f"<td>{swa_id}</td>"
f"<td class='cnt'>{' '.join(reqs)}</td></tr>"
)
parts.append("</table>")
parts.append("</body></html>")
(out_dir / "index.html").write_text("\n".join(parts))
# JSON for machine consumption
# JSON
matrix = []
for r in rows:
matrix.append({
"sg": {"id": r["sg"].id, "asil": r["sg"].asil} if r["sg"] else None,
"sys": {"id": r["sys"].id, "asil": r["sys"].asil, "title": r["sys"].title},
"sa": [{"id": i, "asil": items[i].asil} for i in r["sa"]],
"swe": [{"id": i, "asil": items[i].asil} for i in r["swe"]],
"swa": [{"id": i, "asil": items[i].asil} for i in r["swa"]],
"code": [IMPLEMENTED_SWA[s] for s in r["code"] if s in IMPLEMENTED_SWA],
"tests": [f"tests/unit/{f}" for f in r["tests"]],
})
(out_dir / "matrix.json").write_text(json.dumps(matrix, indent=2))