"""MCP Contacts Server — reads and writes contacts via CardDAV.""" import os, sys, re, contextlib, uuid from typing import Annotated import httpx, vobject from pydantic import Field from mcp.server.fastmcp import FastMCP from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware from common import load_config as _lc _cfg = _lc() RADICALE = _cfg['radicale_url'] CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()} AB_PATHS = _cfg['addressbook_paths'] mcp = FastMCP("Contacts", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) def _auth(u): c = CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None def _discover_ab(user): books = [] for base in AB_PATHS.get(user, []): body = '' r = httpx.request("PROPFIND", RADICALE + base, content=body, auth=_auth(user), headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30) for m in re.finditer(r'([^<]+)', r.text): href = m.group(1) if href.rstrip("/") == base.rstrip("/"): continue block = r.text[m.start():r.text.find("", m.end())] if "addressbook" not in block.lower(): continue nm = re.search(r'([^<]*)', block) books.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href}) return books def _get_contacts(href, auth): body = '' r = httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=60) contacts = [] for m in re.finditer(r']*>(.*?)', r.text, re.DOTALL): raw = m.group(1).replace("<","<").replace(">",">").replace("&","&") try: contacts.append(vobject.readOne(raw)) except: pass return contacts def _brief(c): fn = str(c.fn.value) if hasattr(c, 'fn') else '?' org = f" ({c.org.value[0]})" if hasattr(c, 'org') and c.org.value else '' uid = str(c.uid.value) if hasattr(c, 'uid') else '' parts = [f"{fn}{org}"] for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): parts.append(f" Email: {em.value}") for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): parts.append(f" Tel: {tel.value}") parts.append(f" UID: {uid}") return "\n".join(parts) def _full(c): parts = [str(c.fn.value) if hasattr(c, 'fn') else '?'] if hasattr(c, 'n'): n = c.n.value if n.prefix: parts.append(f" Anrede: {n.prefix}") parts.append(f" Vorname: {n.given}") parts.append(f" Nachname: {n.family}") if n.suffix: parts.append(f" Suffix: {n.suffix}") if hasattr(c, 'nickname'): parts.append(f" Spitzname: {c.nickname.value}") for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): typ = em.params.get("TYPE", [""])[0] if hasattr(em, 'params') and em.params else "" parts.append(f" Email ({typ or 'sonstig'}): {em.value}") for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): typ = tel.params.get("TYPE", [""])[0] if hasattr(tel, 'params') and tel.params else "" parts.append(f" Telefon ({typ or 'sonstig'}): {tel.value}") for adr in (c.adr_list if hasattr(c, 'adr_list') else ([c.adr] if hasattr(c, 'adr') else [])): a = adr.value addr = ", ".join(filter(None, [a.street, a.city, a.region, a.code, a.country])) if addr: parts.append(f" Adresse: {addr}") if hasattr(c, 'org') and c.org.value: parts.append(f" Firma: {c.org.value[0]}") if hasattr(c, 'title'): parts.append(f" Titel: {c.title.value}") if hasattr(c, 'role'): parts.append(f" Rolle: {c.role.value}") if hasattr(c, 'bday'): parts.append(f" Geburtstag: {c.bday.value}") if hasattr(c, 'note') and c.note.value: parts.append(f" Notiz: {str(c.note.value)[:300]}") if hasattr(c, 'url'): parts.append(f" URL: {c.url.value}") if hasattr(c, 'uid'): parts.append(f" UID: {c.uid.value}") return "\n".join(parts) @mcp.tool() def search_contacts( query: Annotated[str, Field(description="Search term — matches name, email, phone, company, address, notes. Example: 'Lohmaier', 'BMW', '+49171'")], limit: Annotated[int, Field(description="Maximum number of results")] = 30, ) -> str: """Search contacts by any field. Returns name, email, phone, and UID. Use get_contact with the UID for full details.""" user = get_current_user() if not user: return "Error: not authenticated" q = query.lower() results = [] for book in _discover_ab(user): for c in _get_contacts(book["href"], _auth(user)): txt = " ".join(str(getattr(c, a, None) or '') for a in ['fn','email','tel','org','note','nickname']).lower() if q in txt: results.append(_brief(c)) if len(results) >= limit: break return "\n\n".join(results) if results else "Keine Kontakte gefunden" @mcp.tool() def get_contact( uid: Annotated[str, Field(description="Contact UID from search_contacts results")], ) -> str: """Get full details of a contact by UID: name, all emails, all phones, addresses, company, birthday, notes.""" user = get_current_user() if not user: return "Error: not authenticated" for book in _discover_ab(user): for c in _get_contacts(book["href"], _auth(user)): if hasattr(c, 'uid') and str(c.uid.value) == uid: return _full(c) return f"Kontakt nicht gefunden: {uid}" @mcp.tool() def create_contact( name: Annotated[str, Field(description="Full name, e.g. 'Max Mustermann'")], email: Annotated[str, Field(description="Email address")] = "", phone: Annotated[str, Field(description="Phone number, e.g. '+49 171 1234567'")] = "", organization: Annotated[str, Field(description="Company/organization name")] = "", note: Annotated[str, Field(description="Notes about this contact")] = "", ) -> str: """Create a new contact in the default addressbook.""" user = get_current_user() if not user: return "Error: not authenticated" books = _discover_ab(user) if not books: return "Kein Adressbuch gefunden" uid_val = str(uuid.uuid4()) card = vobject.vCard() card.add("uid").value = uid_val card.add("fn").value = name p = name.split(" ", 1) card.add("n").value = vobject.vcard.Name(family=p[-1] if len(p)>1 else p[0], given=p[0] if len(p)>1 else "") if email: card.add("email").value = email if phone: card.add("tel").value = phone if organization: card.add("org").value = [organization] if note: card.add("note").value = note r = httpx.put(RADICALE + books[0]["href"] + uid_val + ".vcf", content=card.serialize(), auth=_auth(user), headers={"Content-Type": "text/vcard"}, timeout=15) return f"Kontakt erstellt: {name}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() @asynccontextmanager async def lifespan(app): async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) yield routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] app = Starlette(routes=routes, lifespan=lifespan) app.add_middleware(BearerAuthMiddleware) return app if __name__ == "__main__": import uvicorn uvicorn.run(create_app(), host="127.0.0.1", port=5102)