From a9359beead2b99287facf0c4e830538700177d47 Mon Sep 17 00:00:00 2001 From: Stefan Lohmaier Date: Fri, 12 Jun 2026 07:53:55 +0200 Subject: [PATCH] Add image support to Files and contact photos to Contacts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Files: - read_file returns images inline (jpg/png/gif/webp up to 10MB) - Text files returned as text, binary files as metadata only Contacts: - get_contact includes contact photo as inline image if available - New tool: set_contact_photo (base64 jpeg/png → VCard PHOTO) Co-Authored-By: Claude Opus 4.6 --- contacts/server.py | 60 +++++++++++++++++++++++++++++++++++++++++----- files/server.py | 32 +++++++++++++++---------- 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/contacts/server.py b/contacts/server.py index d343187..c092725 100644 --- a/contacts/server.py +++ b/contacts/server.py @@ -1,11 +1,12 @@ """MCP Contacts Server — reads and writes contacts via CardDAV.""" -import os, sys, re, contextlib, uuid +import os, sys, re, contextlib, uuid, base64 from typing import Annotated import httpx, vobject from pydantic import Field from mcp.server.fastmcp import FastMCP +from mcp.types import TextContent, ImageContent from starlette.applications import Starlette from starlette.routing import Mount @@ -107,18 +108,38 @@ def search_contacts( return "\n\n".join(results) if results else "Keine Kontakte gefunden" +def _get_photo_data(card): + if not hasattr(card, 'photo'): + return None, None + photo = card.photo + data = photo.value + if isinstance(data, str) and data.startswith("http"): + return None, None + if isinstance(data, bytes): + b64 = base64.b64encode(data).decode() + else: + b64 = data + ptype = (photo.params.get("TYPE", [""])[0] or "").lower() if hasattr(photo, 'params') and photo.params else "" + mime = {"jpeg": "image/jpeg", "png": "image/png", "gif": "image/gif"}.get(ptype, "image/jpeg") + return b64, mime + + @mcp.tool() def get_contact( uid: Annotated[str, Field(description="Contact UID from search_contacts results")], -) -> str: - """Get full details of a contact by UID: name, all emails, all phones, addresses, company, birthday, notes.""" +) -> list[TextContent | ImageContent]: + """Get full details of a contact by UID: name, emails, phones, addresses, company, birthday, notes. Includes contact photo if available.""" user = get_current_user() - if not user: return "Error: not authenticated" + if not user: return [TextContent(type="text", text="Error: not authenticated")] for book in _discover_ab(user): for c in _get_contacts(book["href"], _auth(user)): if hasattr(c, 'uid') and str(c.uid.value) == uid: - return _full(c) - return f"Kontakt nicht gefunden: {uid}" + result = [TextContent(type="text", text=_full(c))] + b64, mime = _get_photo_data(c) + if b64 and mime: + result.append(ImageContent(type="image", data=b64, mimeType=mime)) + return result + return [TextContent(type="text", text=f"Kontakt nicht gefunden: {uid}")] @mcp.tool() @@ -148,6 +169,33 @@ def create_contact( return f"Kontakt erstellt: {name}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" +@mcp.tool() +def set_contact_photo( + uid: Annotated[str, Field(description="Contact UID from search_contacts or get_contact results")], + image_base64: Annotated[str, Field(description="Photo as base64-encoded JPEG or PNG data")], + image_type: Annotated[str, Field(description="Image format: 'jpeg' or 'png'")] = "jpeg", +) -> str: + """Set or replace a contact's photo. Provide the image as base64-encoded data.""" + user = get_current_user() + if not user: return "Error: not authenticated" + auth = _auth(user) + for book in _discover_ab(user): + for c in _get_contacts(book["href"], auth): + if hasattr(c, 'uid') and str(c.uid.value) == uid: + if hasattr(c, 'photo'): + c.remove(c.photo) + photo = c.add("photo") + photo.value = base64.b64decode(image_base64) + photo.params["ENCODING"] = ["b"] + photo.params["TYPE"] = [image_type.upper()] + href = book["href"] + uid + ".vcf" + r = httpx.put(RADICALE + href, content=c.serialize(), auth=auth, headers={"Content-Type": "text/vcard"}, timeout=15) + if r.status_code in (200, 201, 204): + return f"Foto gesetzt fuer {c.fn.value if hasattr(c, 'fn') else uid}" + return f"Fehler: HTTP {r.status_code}" + return f"Kontakt nicht gefunden: {uid}" + + def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() diff --git a/files/server.py b/files/server.py index f052d81..6382f2f 100644 --- a/files/server.py +++ b/files/server.py @@ -1,12 +1,13 @@ """MCP Files Server — browse and read files via WebDAV/oCIS.""" -import os, sys, contextlib +import os, sys, contextlib, base64 from xml.etree import ElementTree as ET from typing import Annotated import httpx from pydantic import Field from mcp.server.fastmcp import FastMCP +from mcp.types import TextContent, ImageContent from starlette.applications import Starlette from starlette.routing import Mount @@ -68,20 +69,27 @@ def list_files( return "\n".join(lines) if lines else "Leeres Verzeichnis" +IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"} +TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"] + @mcp.tool() def read_file( - path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/config.yaml'")], -) -> str: - """Read a text file's content (txt, md, json, csv, yaml, xml, html). Binary files show only metadata.""" + path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/Photos/pic.jpg'")], +) -> list[TextContent | ImageContent]: + """Read a file. Text files return content directly. Images (jpg/png/gif/webp) are displayed inline. Other binary files return only metadata.""" user = get_current_user() - if not user: return "Error: not authenticated" - r = httpx.get(_dav(user, path), auth=_auth(user), timeout=30) - if r.status_code >= 400: return f"Fehler: HTTP {r.status_code}" - ct = r.headers.get("content-type", "") - if any(t in ct for t in ["text/", "json", "xml", "csv", "yaml", "javascript"]): - return r.text[:100000] - try: return r.content.decode("utf-8")[:100000] - except: return f"Binaerdatei ({len(r.content)} bytes, Typ: {ct}). Kann nicht angezeigt werden." + if not user: return [TextContent(type="text", text="Error: not authenticated")] + r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60) + if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")] + ct = r.headers.get("content-type", "").split(";")[0].strip() + if ct in IMAGE_TYPES and len(r.content) < 10_000_000: + return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)] + if any(t in ct for t in TEXT_HINTS): + return [TextContent(type="text", text=r.text[:100000])] + try: + return [TextContent(type="text", text=r.content.decode("utf-8")[:100000])] + except Exception: + return [TextContent(type="text", text=f"Binaerdatei ({len(r.content):,} bytes, Typ: {ct}). Nutze read_file nur fuer Text und Bilder.")] @mcp.tool()