Files
mcp-home/files/server.py
T
Stefan Lohmaier a9359beead Add image support to Files and contact photos to Contacts
Files:
- read_file returns images inline (jpg/png/gif/webp up to 10MB)
- Text files returned as text, binary files as metadata only

Contacts:
- get_contact includes contact photo as inline image if available
- New tool: set_contact_photo (base64 jpeg/png → VCard PHOTO)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-12 07:53:55 +02:00

153 lines
6.5 KiB
Python

"""MCP Files Server — browse and read files via WebDAV/oCIS."""
import os, sys, contextlib, base64
from xml.etree import ElementTree as ET
from typing import Annotated
import httpx
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from mcp.types import TextContent, ImageContent
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
OCIS = _cfg['ocis_url']
OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()}
mcp = FastMCP("Files", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None
def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{u}/{p.lstrip('/')}"
def _propfind(user, path="", depth=1):
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:displayname/><d:getcontentlength/><d:getlastmodified/><d:getcontenttype/></d:prop></d:propfind>'
r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30)
return r.text, r.status_code
def _parse_pf(xml, user):
ns = {"d": "DAV:"}
entries = []
try: root = ET.fromstring(xml)
except: return entries
bp = f"/remote.php/dav/files/{user}/"
for resp in root.findall("d:response", ns):
href = resp.findtext("d:href", "", ns) or ""
rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/")
props = resp.find(".//d:prop", ns)
if props is None: continue
entries.append({
"name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1],
"path": "/" + rel if rel else "/",
"is_dir": props.find("d:resourcetype/d:collection", ns) is not None,
"size": int(props.findtext("d:getcontentlength", "0", ns) or 0),
"modified": props.findtext("d:getlastmodified", "", ns),
"type": props.findtext("d:getcontenttype", "", ns),
})
return entries
@mcp.tool()
def list_files(
path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/",
) -> str:
"""List files and subdirectories at the given path. Shows name, size, and modification date."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
lines = []
for e in entries[1:]:
if e["is_dir"]: lines.append(f"[DIR] {e['name']}/")
else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})")
return "\n".join(lines) if lines else "Leeres Verzeichnis"
IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"}
TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"]
@mcp.tool()
def read_file(
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/Photos/pic.jpg'")],
) -> list[TextContent | ImageContent]:
"""Read a file. Text files return content directly. Images (jpg/png/gif/webp) are displayed inline. Other binary files return only metadata."""
user = get_current_user()
if not user: return [TextContent(type="text", text="Error: not authenticated")]
r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60)
if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")]
ct = r.headers.get("content-type", "").split(";")[0].strip()
if ct in IMAGE_TYPES and len(r.content) < 10_000_000:
return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)]
if any(t in ct for t in TEXT_HINTS):
return [TextContent(type="text", text=r.text[:100000])]
try:
return [TextContent(type="text", text=r.content.decode("utf-8")[:100000])]
except Exception:
return [TextContent(type="text", text=f"Binaerdatei ({len(r.content):,} bytes, Typ: {ct}). Nutze read_file nur fuer Text und Bilder.")]
@mcp.tool()
def file_info(
path: Annotated[str, Field(description="File or directory path")],
) -> str:
"""Get metadata about a file (size, type, modification date) or directory."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path, 0)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
if not entries: return "Nicht gefunden"
e = entries[0]
parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"]
if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes")
parts.append(f"Geaendert: {e['modified']}")
return "\n".join(parts)
@mcp.tool()
def search_files(
query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")],
path: Annotated[str, Field(description="Start directory for search")] = "/",
) -> str:
"""Search for files by name recursively (up to 5 levels deep, max 50 results)."""
user = get_current_user()
if not user: return "Error: not authenticated"
q = query.lower()
results = []
def _s(p, d=0):
if d > 5 or len(results) >= 50: return
xml, _ = _propfind(user, p)
for e in _parse_pf(xml, user)[1:]:
if q in e["name"].lower(): results.append(e)
if e["is_dir"]: _s(e["path"], d+1)
_s(path)
lines = []
for e in results:
prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)"
lines.append(f"{prefix} {e['path']}")
return "\n".join(lines) if lines else "Keine Dateien gefunden"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5103)