"""MCP Files Server — browse and read files via WebDAV/oCIS.""" import os, sys, contextlib, base64 from xml.etree import ElementTree as ET from typing import Annotated import httpx from pydantic import Field from mcp.server.fastmcp import FastMCP from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware from common import load_config as _lc _cfg = _lc() OCIS = _cfg['ocis_url'] OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()} mcp = FastMCP("Files", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{u}/{p.lstrip('/')}" def _propfind(user, path="", depth=1): body = '' r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30) return r.text, r.status_code def _parse_pf(xml, user): ns = {"d": "DAV:"} entries = [] try: root = ET.fromstring(xml) except: return entries bp = f"/remote.php/dav/files/{user}/" for resp in root.findall("d:response", ns): href = resp.findtext("d:href", "", ns) or "" rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/") props = resp.find(".//d:prop", ns) if props is None: continue entries.append({ "name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1], "path": "/" + rel if rel else "/", "is_dir": props.find("d:resourcetype/d:collection", ns) is not None, "size": int(props.findtext("d:getcontentlength", "0", ns) or 0), "modified": props.findtext("d:getlastmodified", "", ns), "type": props.findtext("d:getcontenttype", "", ns), }) return entries @mcp.tool() def list_files( path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/", ) -> str: """List files and subdirectories at the given path. Shows name, size, and modification date.""" user = get_current_user() if not user: return "Error: not authenticated" xml, st = _propfind(user, path) if st >= 400: return f"Fehler: HTTP {st}" entries = _parse_pf(xml, user) lines = [] for e in entries[1:]: if e["is_dir"]: lines.append(f"[DIR] {e['name']}/") else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})") return "\n".join(lines) if lines else "Leeres Verzeichnis" IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"} TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"] DOC_TYPES = { "application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-excel", "application/vnd.ms-powerpoint", "application/rtf", "application/epub+zip", } MAX_BIN_SIZE = 25_000_000 def _guess_mime(path, ct): if ct and ct != "application/octet-stream": return ct ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" return { "pdf": "application/pdf", "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", "doc": "application/msword", "xls": "application/vnd.ms-excel", "ppt": "application/vnd.ms-powerpoint", "rtf": "application/rtf", "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "gif": "image/gif", "webp": "image/webp", "svg": "image/svg+xml", }.get(ext, ct or "application/octet-stream") @mcp.tool() def read_file( path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/report.pdf', '/photo.jpg'")], ) -> list[TextContent | ImageContent | EmbeddedResource]: """Read a file. Text files return content directly. Images are displayed inline. Documents (PDF, docx, xlsx, pptx) are passed as binary for the client to process. Max 25 MB.""" user = get_current_user() if not user: return [TextContent(type="text", text="Error: not authenticated")] r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60) if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")] ct = _guess_mime(path, r.headers.get("content-type", "").split(";")[0].strip()) size = len(r.content) if size > MAX_BIN_SIZE: return [TextContent(type="text", text=f"Datei zu gross: {size:,} bytes (max {MAX_BIN_SIZE:,}). Typ: {ct}")] if ct in IMAGE_TYPES: return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)] if any(t in ct for t in TEXT_HINTS): return [TextContent(type="text", text=r.text[:100000])] try: text = r.content.decode("utf-8") return [TextContent(type="text", text=text[:100000])] except Exception: pass b64 = base64.b64encode(r.content).decode() return [EmbeddedResource(type="resource", resource=BlobResourceContents(uri=f"file://{path}", blob=b64, mimeType=ct))] @mcp.tool() def file_info( path: Annotated[str, Field(description="File or directory path")], ) -> str: """Get metadata about a file (size, type, modification date) or directory.""" user = get_current_user() if not user: return "Error: not authenticated" xml, st = _propfind(user, path, 0) if st >= 400: return f"Fehler: HTTP {st}" entries = _parse_pf(xml, user) if not entries: return "Nicht gefunden" e = entries[0] parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"] if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes") parts.append(f"Geaendert: {e['modified']}") return "\n".join(parts) @mcp.tool() def search_files( query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")], path: Annotated[str, Field(description="Start directory for search")] = "/", ) -> str: """Search for files by name recursively (up to 5 levels deep, max 50 results).""" user = get_current_user() if not user: return "Error: not authenticated" q = query.lower() results = [] def _s(p, d=0): if d > 5 or len(results) >= 50: return xml, _ = _propfind(user, p) for e in _parse_pf(xml, user)[1:]: if q in e["name"].lower(): results.append(e) if e["is_dir"]: _s(e["path"], d+1) _s(path) lines = [] for e in results: prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)" lines.append(f"{prefix} {e['path']}") return "\n".join(lines) if lines else "Keine Dateien gefunden" def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() @asynccontextmanager async def lifespan(app): async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) yield routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] app = Starlette(routes=routes, lifespan=lifespan) app.add_middleware(BearerAuthMiddleware) return app if __name__ == "__main__": import uvicorn uvicorn.run(create_app(), host="127.0.0.1", port=5103)