"""MCP Contacts Server — reads and writes contacts via CardDAV.""" import os, sys, re, contextlib, uuid, base64 from typing import Annotated import httpx, vobject from pydantic import Field from mcp.server.fastmcp import FastMCP from mcp.types import TextContent, ImageContent from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware from common import load_config as _lc _cfg = _lc() RADICALE = _cfg['radicale_url'] CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()} AB_PATHS = _cfg['addressbook_paths'] mcp = FastMCP("Contacts", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) def _auth(u): c = CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None def _discover_ab(user): books = [] for base in AB_PATHS.get(user, []): body = '' r = httpx.request("PROPFIND", RADICALE + base, content=body, auth=_auth(user), headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30) for m in re.finditer(r'<(?:d:)?href>([^<]+)', r.text): href = m.group(1) if href.rstrip("/") == base.rstrip("/"): continue block = r.text[m.start():r.text.find("", m.end())] if "addressbook" not in block.lower(): continue nm = re.search(r'<(?:d:)?displayname>([^<]*)', block) books.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href}) return books def _get_contacts(href, auth): body = '' r = httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=60) contacts = [] for m in re.finditer(r'<(?:c|CR):address-data[^>]*>(.*?)', r.text, re.DOTALL): raw = m.group(1).replace("<","<").replace(">",">").replace("&","&") try: contacts.append(vobject.readOne(raw)) except: pass return contacts def _brief(c): fn = str(c.fn.value) if hasattr(c, 'fn') else '?' org = f" ({c.org.value[0]})" if hasattr(c, 'org') and c.org.value else '' uid = str(c.uid.value) if hasattr(c, 'uid') else '' parts = [f"{fn}{org}"] for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): parts.append(f" Email: {em.value}") for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): parts.append(f" Tel: {tel.value}") parts.append(f" UID: {uid}") return "\n".join(parts) def _full(c): parts = [str(c.fn.value) if hasattr(c, 'fn') else '?'] if hasattr(c, 'n'): n = c.n.value if n.prefix: parts.append(f" Anrede: {n.prefix}") parts.append(f" Vorname: {n.given}") parts.append(f" Nachname: {n.family}") if n.suffix: parts.append(f" Suffix: {n.suffix}") if hasattr(c, 'nickname'): parts.append(f" Spitzname: {c.nickname.value}") for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): typ = em.params.get("TYPE", [""])[0] if hasattr(em, 'params') and em.params else "" parts.append(f" Email ({typ or 'sonstig'}): {em.value}") for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): typ = tel.params.get("TYPE", [""])[0] if hasattr(tel, 'params') and tel.params else "" parts.append(f" Telefon ({typ or 'sonstig'}): {tel.value}") for adr in (c.adr_list if hasattr(c, 'adr_list') else ([c.adr] if hasattr(c, 'adr') else [])): a = adr.value addr = ", ".join(filter(None, [a.street, a.city, a.region, a.code, a.country])) if addr: parts.append(f" Adresse: {addr}") if hasattr(c, 'org') and c.org.value: parts.append(f" Firma: {c.org.value[0]}") if hasattr(c, 'title'): parts.append(f" Titel: {c.title.value}") if hasattr(c, 'role'): parts.append(f" Rolle: {c.role.value}") if hasattr(c, 'bday'): parts.append(f" Geburtstag: {c.bday.value}") if hasattr(c, 'note') and c.note.value: parts.append(f" Notiz: {str(c.note.value)[:300]}") if hasattr(c, 'url'): parts.append(f" URL: {c.url.value}") if hasattr(c, 'uid'): parts.append(f" UID: {c.uid.value}") return "\n".join(parts) @mcp.tool() def search_contacts( query: Annotated[str, Field(description="Search term — matches name, email, phone, company, address, notes. Example: 'Lohmaier', 'BMW', '+49171'")], limit: Annotated[int, Field(description="Maximum number of results")] = 30, ) -> str: """Search contacts by any field. Returns name, email, phone, and UID. Use get_contact with the UID for full details.""" user = get_current_user() if not user: return "Error: not authenticated" q = query.lower() results = [] for book in _discover_ab(user): for c in _get_contacts(book["href"], _auth(user)): txt = " ".join(str(getattr(c, a, None) or '') for a in ['fn','email','tel','org','note','nickname']).lower() if q in txt: results.append(_brief(c)) if len(results) >= limit: break return "\n\n".join(results) if results else "Keine Kontakte gefunden" def _get_photo_data(card): if not hasattr(card, 'photo'): return None, None photo = card.photo data = photo.value if isinstance(data, str) and data.startswith("http"): return None, None if isinstance(data, bytes): b64 = base64.b64encode(data).decode() else: b64 = data ptype = (photo.params.get("TYPE", [""])[0] or "").lower() if hasattr(photo, 'params') and photo.params else "" mime = {"jpeg": "image/jpeg", "png": "image/png", "gif": "image/gif"}.get(ptype, "image/jpeg") return b64, mime @mcp.tool() def get_contact( uid: Annotated[str, Field(description="Contact UID from search_contacts results")], ) -> list[TextContent | ImageContent]: """Get full details of a contact by UID: name, emails, phones, addresses, company, birthday, notes. Includes contact photo if available.""" user = get_current_user() if not user: return [TextContent(type="text", text="Error: not authenticated")] for book in _discover_ab(user): for c in _get_contacts(book["href"], _auth(user)): if hasattr(c, 'uid') and str(c.uid.value) == uid: result = [TextContent(type="text", text=_full(c))] b64, mime = _get_photo_data(c) if b64 and mime: result.append(ImageContent(type="image", data=b64, mimeType=mime)) return result return [TextContent(type="text", text=f"Kontakt nicht gefunden: {uid}")] @mcp.tool() def create_contact( name: Annotated[str, Field(description="Full name, e.g. 'Max Mustermann'")], email: Annotated[str, Field(description="Email address")] = "", phone: Annotated[str, Field(description="Phone number, e.g. '+49 171 1234567'")] = "", organization: Annotated[str, Field(description="Company/organization name")] = "", note: Annotated[str, Field(description="Notes about this contact")] = "", ) -> str: """Create a new contact in the default addressbook.""" user = get_current_user() if not user: return "Error: not authenticated" books = _discover_ab(user) if not books: return "Kein Adressbuch gefunden" uid_val = str(uuid.uuid4()) card = vobject.vCard() card.add("uid").value = uid_val card.add("fn").value = name p = name.split(" ", 1) card.add("n").value = vobject.vcard.Name(family=p[-1] if len(p)>1 else p[0], given=p[0] if len(p)>1 else "") if email: card.add("email").value = email if phone: card.add("tel").value = phone if organization: card.add("org").value = [organization] if note: card.add("note").value = note r = httpx.put(RADICALE + books[0]["href"] + uid_val + ".vcf", content=card.serialize(), auth=_auth(user), headers={"Content-Type": "text/vcard"}, timeout=15) return f"Kontakt erstellt: {name}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" @mcp.tool() def set_contact_photo( uid: Annotated[str, Field(description="Contact UID from search_contacts or get_contact results")], image_base64: Annotated[str, Field(description="Photo as base64-encoded JPEG or PNG data")], image_type: Annotated[str, Field(description="Image format: 'jpeg' or 'png'")] = "jpeg", ) -> str: """Set or replace a contact's photo. Provide the image as base64-encoded data.""" user = get_current_user() if not user: return "Error: not authenticated" auth = _auth(user) for book in _discover_ab(user): for c in _get_contacts(book["href"], auth): if hasattr(c, 'uid') and str(c.uid.value) == uid: if hasattr(c, 'photo'): c.remove(c.photo) photo = c.add("photo") photo.value = base64.b64decode(image_base64) photo.params["ENCODING"] = ["b"] photo.params["TYPE"] = [image_type.upper()] href = book["href"] + uid + ".vcf" r = httpx.put(RADICALE + href, content=c.serialize(), auth=auth, headers={"Content-Type": "text/vcard"}, timeout=15) if r.status_code in (200, 201, 204): return f"Foto gesetzt fuer {c.fn.value if hasattr(c, 'fn') else uid}" return f"Fehler: HTTP {r.status_code}" return f"Kontakt nicht gefunden: {uid}" def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() @asynccontextmanager async def lifespan(app): async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) yield routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] app = Starlette(routes=routes, lifespan=lifespan) app.add_middleware(BearerAuthMiddleware) return app if __name__ == "__main__": import uvicorn uvicorn.run(create_app(), host="127.0.0.1", port=5102)