feat: Safety Manager + Traceability + PlantUML in CI
Validate / build-and-test (push) Successful in 30s

- Implement Safety Manager (SWA-001, ASIL-D): Hill-Hold + Auto-Apply
  state machine, 13 unit tests
- Update SWA-002 + SWA-001 link coverage so all SWE reqs are covered
- New tool: tools/traceability.py — Markdown-frontmatter-basierter
  Traceability-Checker + HTML/JSON-Matrix-Generator (Doorstop-Format ohne
  Doorstop-Dependency)
- New tool: tools/render_plantuml.py — extrahiert PlantUML-Bloecke aus
  arch/**.md und rendert via plantuml.com zu SVG
- validate.yml: neue Steps Traceability-Check, Matrix-Publish, PlantUML-
  Render; uploaded als Gitea-Artefakte
This commit is contained in:
Stefan Lohmaier
2026-05-11 23:51:55 -07:00
parent 7c1848cb26
commit 4351dfa4e1
19 changed files with 1292 additions and 37 deletions
+2 -2
View File
@@ -439,7 +439,7 @@ SA_ELEMENTS = [
SWA_ELEMENTS = [
{
"id": "SWA-001", "asil": "D",
"links": ["SWE-007", "SWE-008", "SWE-009", "SWE-010"],
"links": ["SWE-007", "SWE-008", "SWE-009", "SWE-010", "SWE-011", "SWE-012"],
"title": "Safety Manager",
"text": textwrap.dedent("""
## Verantwortung
@@ -500,7 +500,7 @@ SWA_ELEMENTS = [
},
{
"id": "SWA-002", "asil": "D",
"links": ["SWE-001", "SWE-002", "SWE-003", "SWE-004"],
"links": ["SWE-001", "SWE-002", "SWE-003", "SWE-004", "SWE-005"],
"title": "Apply Controller",
"text": textwrap.dedent("""
## Verantwortung
+106
View File
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Rendert alle @startuml ... @enduml Bloecke aus arch/**/*.md als SVG.
Verwendet einen erreichbaren PlantUML-HTTP-Server (Default: www.plantuml.com).
In CI kann die Server-URL ueber PLANTUML_SERVER ueberschrieben werden.
Output: docs/diagrams/<file>-<index>.svg
Run:
python3 tools/render_plantuml.py
PLANTUML_SERVER=http://plantuml-server:8080 python3 tools/render_plantuml.py
"""
from __future__ import annotations
import os
import re
import sys
import urllib.request
import zlib
from pathlib import Path
REPO = Path(__file__).resolve().parent.parent
SERVER = os.environ.get("PLANTUML_SERVER", "https://www.plantuml.com/plantuml")
# Map standard base64 to PlantUML's base64 alphabet
_B64 = (
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"-_"
)
def _encode_3bytes(b1: int, b2: int, b3: int) -> str:
c1 = b1 >> 2
c2 = ((b1 & 0x3) << 4) | (b2 >> 4)
c3 = ((b2 & 0xF) << 2) | (b3 >> 6)
c4 = b3 & 0x3F
return _B64[c1] + _B64[c2] + _B64[c3] + _B64[c4]
def plantuml_encode(text: str) -> str:
"""Encode PlantUML source to its URL-safe representation."""
compressed = zlib.compress(text.encode("utf-8"), 9)
# Strip zlib header (2 bytes) and Adler32 trailer (4 bytes)
raw = compressed[2:-4]
out = ""
i = 0
while i < len(raw):
b1 = raw[i]
b2 = raw[i + 1] if i + 1 < len(raw) else 0
b3 = raw[i + 2] if i + 2 < len(raw) else 0
out += _encode_3bytes(b1, b2, b3)
i += 3
return out
BLOCK_RE = re.compile(r"```plantuml\s*\n(@startuml.*?@enduml)\s*\n```",
re.DOTALL)
def extract_blocks(md: Path) -> list[str]:
return BLOCK_RE.findall(md.read_text())
def render_one(src: str, out_path: Path) -> None:
encoded = plantuml_encode(src)
url = f"{SERVER.rstrip('/')}/svg/{encoded}"
req = urllib.request.Request(url, headers={"User-Agent": "demo-epb/1.0"})
with urllib.request.urlopen(req, timeout=30) as resp:
data = resp.read()
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(data)
def main() -> int:
out_dir = REPO / "docs" / "diagrams"
out_dir.mkdir(parents=True, exist_ok=True)
md_files = sorted((REPO / "arch").rglob("*.md"))
if not md_files:
print("Keine arch/**/*.md gefunden.")
return 0
total = 0
for md in md_files:
rel = md.relative_to(REPO)
blocks = extract_blocks(md)
if not blocks:
continue
for i, block in enumerate(blocks, start=1):
stem = md.stem
target = out_dir / f"{stem}-{i}.svg"
try:
render_one(block, target)
print(f" {rel} block {i} -> {target.relative_to(REPO)}")
total += 1
except Exception as e:
print(f" WARN: {rel} block {i} render failed: {e}")
print(f"\nDone: {total} Diagrams rendered to {out_dir.relative_to(REPO)}/")
return 0
if __name__ == "__main__":
sys.exit(main())
+286
View File
@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
Traceability-Werkzeug fuer demo-epb.
Liest alle Markdown-Items in reqs/ und arch/ ein, validiert Links bidirektional
und erzeugt eine HTML-Traceability-Matrix.
Doorstop-kompatibles Format (YAML-Frontmatter + Markdown-Body), aber ohne
Doorstop-Dependency — bleibt portabel.
Subcommands:
check Validiert Konsistenz, exit 1 bei Fehlern
publish DIR Schreibt HTML + JSON nach DIR/
Run:
python3 tools/traceability.py check
python3 tools/traceability.py publish docs/traceability/
"""
from __future__ import annotations
import html
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
REPO = Path(__file__).resolve().parent.parent
SOURCES = [
("SYS", "reqs/sys", "System Requirements"),
("SWE", "reqs/swe", "Software Requirements"),
("SA", "arch/sys", "System Architecture"),
("SWA", "arch/swe", "Software Architecture"),
]
# Welche Quellen verlinken auf welche?
# (key) -> (target_prefix) : Items mit key linken auf Items mit target_prefix
EXPECTED_LINKS = {
"SA": ["SYS"],
"SWE": ["SYS"],
"SWA": ["SWE"],
}
# Reverse: welche Quellen MUESSEN von welchen Quellen referenziert werden?
# (target) -> [prefix that should link to target] (coverage check)
COVERAGE = {
"SYS": ["SA", "SWE"], # jede SYS-Req muss durch SA und SWE abgedeckt sein
"SWE": ["SWA"], # jede SWE-Req muss durch SWA abgedeckt sein
}
@dataclass
class Item:
id: str
prefix: str
path: Path
title: str
asil: str
links: list[str] = field(default_factory=list)
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
LINKS_RE = re.compile(r"^\s*-\s+([A-Z]+-\d+)", re.MULTILINE)
def parse_item(path: Path, prefix: str) -> Item | None:
text = path.read_text()
m = FRONTMATTER_RE.match(text)
if not m:
return None
fm = m.group(1)
# Crude YAML parsing — we only need a few fields.
def field_value(name: str) -> str | None:
rx = re.search(rf"^{name}:\s*(.*?)$", fm, re.MULTILINE)
return rx.group(1).strip().strip("'\"") if rx else None
asil = field_value("asil") or "?"
title = field_value("header") or path.stem
# links: collect IDs from the `links:` block.
links: list[str] = []
in_links = False
for line in fm.splitlines():
if line.startswith("links:"):
in_links = True
continue
if in_links:
if line.startswith(" - "):
m2 = re.match(r" - ([A-Z]+-\d+)", line)
if m2:
links.append(m2.group(1))
elif not line.startswith(" ") and line.strip():
in_links = False
return Item(id=path.stem, prefix=prefix, path=path, title=title,
asil=asil, links=links)
def collect_all() -> dict[str, Item]:
items: dict[str, Item] = {}
for prefix, rel, _label in SOURCES:
d = REPO / rel
if not d.exists():
continue
for f in sorted(d.glob("*.md")):
it = parse_item(f, prefix)
if it is None:
continue
items[it.id] = it
return items
def cmd_check(items: dict[str, Item]) -> int:
errors: list[str] = []
# 1. Each link target must exist
for it in items.values():
for link in it.links:
if link not in items:
errors.append(f"{it.id} links to non-existent {link}")
# 2. Forward expectation: items of certain prefixes must link to others
for it in items.values():
expected_prefixes = EXPECTED_LINKS.get(it.prefix, [])
if not expected_prefixes:
continue
actual_prefixes = {items[l].prefix for l in it.links if l in items}
for ep in expected_prefixes:
if ep not in actual_prefixes:
errors.append(
f"{it.id} ({it.prefix}) has no link to a {ep}-* item"
)
# 3. Coverage: each item of certain prefix must be referenced by certain types
incoming: dict[str, set[str]] = {iid: set() for iid in items}
for it in items.values():
for link in it.links:
if link in incoming:
incoming[link].add(it.prefix)
for it in items.values():
required = COVERAGE.get(it.prefix, [])
for rp in required:
if rp not in incoming[it.id]:
errors.append(
f"{it.id} ({it.prefix}) is not referenced by any {rp}-* item"
)
print(f"\nItems found: {len(items)}")
print(f" SYS: {sum(1 for i in items.values() if i.prefix == 'SYS')}")
print(f" SWE: {sum(1 for i in items.values() if i.prefix == 'SWE')}")
print(f" SA: {sum(1 for i in items.values() if i.prefix == 'SA')}")
print(f" SWA: {sum(1 for i in items.values() if i.prefix == 'SWA')}")
print()
if errors:
print(f"FAIL: {len(errors)} traceability error(s):")
for e in errors:
print(f" - {e}")
return 1
print("OK — Traceability vollstaendig.")
return 0
def asil_color(asil: str) -> str:
return {
"D": "#d62728",
"C": "#ff7f0e",
"B": "#2ca02c",
"A": "#1f77b4",
"QM": "#888",
}.get(asil, "#aaa")
def cmd_publish(items: dict[str, Item], out_dir: Path) -> int:
out_dir.mkdir(parents=True, exist_ok=True)
# Build forward and reverse maps
children: dict[str, list[str]] = {iid: [] for iid in items}
for it in items.values():
for link in it.links:
if link in children:
children[link].append(it.id)
rows = []
sys_items = [i for i in items.values() if i.prefix == "SYS"]
for sys in sorted(sys_items, key=lambda i: i.id):
sas = [c for c in children[sys.id] if items[c].prefix == "SA"]
swes = [c for c in children[sys.id] if items[c].prefix == "SWE"]
swas = sorted(set(c for s in swes for c in children[s]
if items[c].prefix == "SWA"))
rows.append({
"sys": sys, "sa": sas, "swe": swes, "swa": swas
})
# HTML
parts = [
"<!doctype html><html lang='de'><head>",
"<meta charset='utf-8'>",
"<title>demo-epb — Traceability Matrix</title>",
"<style>",
"body{font-family:-apple-system,Segoe UI,sans-serif;padding:20px;color:#222}",
"table{border-collapse:collapse;width:100%;font-size:14px}",
"th,td{border:1px solid #ccc;padding:6px 8px;vertical-align:top;text-align:left}",
"th{background:#f0f0f0}",
"tr:nth-child(even) td{background:#fafafa}",
".asil{display:inline-block;padding:1px 6px;border-radius:3px;color:white;font-weight:bold;font-size:11px}",
".id{font-family:Consolas,monospace;font-size:13px}",
".cnt{color:#666;font-size:11px}",
"h1{color:#1f3864}",
"</style></head><body>",
"<h1>demo-epb — Traceability Matrix</h1>",
f"<p>Generiert aus {sum(1 for _ in items)} Items "
f"(SYS: {len([i for i in items.values() if i.prefix=='SYS'])}, "
f"SWE: {len([i for i in items.values() if i.prefix=='SWE'])}, "
f"SA: {len([i for i in items.values() if i.prefix=='SA'])}, "
f"SWA: {len([i for i in items.values() if i.prefix=='SWA'])}).</p>",
"<table>",
"<tr><th>System-Requirement</th><th>System-Arch (SA)</th>"
"<th>Software-Req (SWE)</th><th>Software-Arch (SWA)</th></tr>",
]
def cell(ids: list[str]) -> str:
if not ids:
return "<td style='color:#c00'>—</td>"
bits = []
for i in ids:
it = items[i]
c = asil_color(it.asil)
bits.append(
f"<div><span class='id'>{html.escape(i)}</span> "
f"<span class='asil' style='background:{c}'>{html.escape(it.asil)}</span></div>"
f"<div class='cnt'>{html.escape(it.title)}</div>"
)
return "<td>" + "".join(bits) + "</td>"
for r in rows:
sys = r["sys"]
c = asil_color(sys.asil)
first = (f"<td><div><span class='id'>{html.escape(sys.id)}</span> "
f"<span class='asil' style='background:{c}'>{html.escape(sys.asil)}</span></div>"
f"<div class='cnt'>{html.escape(sys.title)}</div></td>")
parts.append("<tr>" + first + cell(r["sa"]) + cell(r["swe"]) + cell(r["swa"]) + "</tr>")
parts.append("</table></body></html>")
(out_dir / "index.html").write_text("\n".join(parts))
# JSON for machine consumption
matrix = []
for r in rows:
matrix.append({
"sys": {"id": r["sys"].id, "asil": r["sys"].asil, "title": r["sys"].title},
"sa": [{"id": i, "asil": items[i].asil} for i in r["sa"]],
"swe": [{"id": i, "asil": items[i].asil} for i in r["swe"]],
"swa": [{"id": i, "asil": items[i].asil} for i in r["swa"]],
})
(out_dir / "matrix.json").write_text(json.dumps(matrix, indent=2))
print(f"Wrote: {out_dir / 'index.html'}")
print(f"Wrote: {out_dir / 'matrix.json'}")
return 0
def main():
if len(sys.argv) < 2:
print(__doc__)
return 2
cmd = sys.argv[1]
items = collect_all()
if cmd == "check":
return cmd_check(items)
if cmd == "publish":
out = Path(sys.argv[2] if len(sys.argv) > 2 else "docs/traceability")
rc = cmd_check(items)
if rc != 0:
print("WARN: publishing despite check errors")
cmd_publish(items, out)
return 0
print(f"unknown command: {cmd}")
return 2
if __name__ == "__main__":
sys.exit(main())