"""MCP Files Server — browse and read files via WebDAV/oCIS.""" import os, sys, contextlib, base64, io from xml.etree import ElementTree as ET from typing import Annotated import httpx from pydantic import Field from mcp.server.fastmcp import FastMCP from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware from common import load_config as _lc _cfg = _lc() OCIS = _cfg['ocis_url'] OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()} mcp = FastMCP("Files", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) def _ocis_user(u): c = OCIS_CREDS.get(u) return c[0] if c else u def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{_ocis_user(u)}/{p.lstrip('/')}" def _propfind(user, path="", depth=1): body = '' r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30) return r.text, r.status_code def _parse_pf(xml, user): ns = {"d": "DAV:"} entries = [] try: root = ET.fromstring(xml) except: return entries bp = f"/remote.php/dav/files/{_ocis_user(user)}/" for resp in root.findall("d:response", ns): href = resp.findtext("d:href", "", ns) or "" rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/") props = resp.find(".//d:prop", ns) if props is None: continue entries.append({ "name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1], "path": "/" + rel if rel else "/", "is_dir": props.find("d:resourcetype/d:collection", ns) is not None, "size": int(props.findtext("d:getcontentlength", "0", ns) or 0), "modified": props.findtext("d:getlastmodified", "", ns), "type": props.findtext("d:getcontenttype", "", ns), }) return entries @mcp.tool() def list_files( path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/", ) -> str: """List files and subdirectories at the given path. Shows name, size, and modification date.""" user = get_current_user() if not user: return "Error: not authenticated" xml, st = _propfind(user, path) if st >= 400: return f"Fehler: HTTP {st}" entries = _parse_pf(xml, user) lines = [] for e in entries[1:]: if e["is_dir"]: lines.append(f"[DIR] {e['name']}/") else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})") return "\n".join(lines) if lines else "Leeres Verzeichnis" IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"} TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"] DOC_TYPES = { "application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-excel", "application/vnd.ms-powerpoint", "application/rtf", "application/epub+zip", } MAX_BIN_SIZE = 25_000_000 def _guess_mime(path, ct): if ct and ct != "application/octet-stream": return ct ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" return { "pdf": "application/pdf", "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", "doc": "application/msword", "xls": "application/vnd.ms-excel", "ppt": "application/vnd.ms-powerpoint", "rtf": "application/rtf", "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "gif": "image/gif", "webp": "image/webp", "svg": "image/svg+xml", }.get(ext, ct or "application/octet-stream") @mcp.tool() def read_file( path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/report.pdf', '/photo.jpg'")], ) -> list[TextContent | ImageContent | EmbeddedResource]: """Read a file. Text files return content directly. Images inline. PDFs as extracted text. Other documents (docx, xlsx, pptx) as binary. Max 25 MB.""" user = get_current_user() if not user: return [TextContent(type="text", text="Error: not authenticated")] r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60) if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")] ct = _guess_mime(path, r.headers.get("content-type", "").split(";")[0].strip()) size = len(r.content) if size > MAX_BIN_SIZE: return [TextContent(type="text", text=f"Datei zu gross: {size:,} bytes (max {MAX_BIN_SIZE:,}). Typ: {ct}")] if ct in IMAGE_TYPES: return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)] if any(t in ct for t in TEXT_HINTS): return [TextContent(type="text", text=r.text[:100000])] if ct == "application/pdf" or path.lower().endswith(".pdf"): try: import pdfplumber with pdfplumber.open(io.BytesIO(r.content)) as pdf: pages = [] for i, page in enumerate(pdf.pages, 1): t = page.extract_text() or "" if t.strip(): pages.append(f"--- Seite {i} ---\n{t}") text = "\n\n".join(pages) if text.strip(): return [TextContent(type="text", text=f"[PDF: {path}]\n\n{text[:200000]}")] return [ TextContent(type="text", text=f"[PDF '{path}' enthaelt keinen extrahierbaren Text (vermutlich Scan). Rohdaten folgen.]"), EmbeddedResource(type="resource", resource=BlobResourceContents(uri=f"file://{path}", blob=base64.b64encode(r.content).decode(), mimeType=ct)), ] except Exception as e: return [TextContent(type="text", text=f"PDF '{path}' konnte nicht gelesen werden: {e}")] try: text = r.content.decode("utf-8") return [TextContent(type="text", text=text[:100000])] except Exception: pass b64 = base64.b64encode(r.content).decode() return [EmbeddedResource(type="resource", resource=BlobResourceContents(uri=f"file://{path}", blob=b64, mimeType=ct))] @mcp.tool() def file_info( path: Annotated[str, Field(description="File or directory path")], ) -> str: """Get metadata about a file (size, type, modification date) or directory.""" user = get_current_user() if not user: return "Error: not authenticated" xml, st = _propfind(user, path, 0) if st >= 400: return f"Fehler: HTTP {st}" entries = _parse_pf(xml, user) if not entries: return "Nicht gefunden" e = entries[0] parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"] if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes") parts.append(f"Geaendert: {e['modified']}") return "\n".join(parts) @mcp.tool() def search_files( query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")], path: Annotated[str, Field(description="Start directory for search")] = "/", ) -> str: """Search for files by name recursively (up to 5 levels deep, max 50 results).""" user = get_current_user() if not user: return "Error: not authenticated" q = query.lower() results = [] def _s(p, d=0): if d > 5 or len(results) >= 50: return xml, _ = _propfind(user, p) for e in _parse_pf(xml, user)[1:]: if q in e["name"].lower(): results.append(e) if e["is_dir"]: _s(e["path"], d+1) _s(path) lines = [] for e in results: prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)" lines.append(f"{prefix} {e['path']}") return "\n".join(lines) if lines else "Keine Dateien gefunden" @mcp.tool() def write_file( path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/todo.md'. Creates parent dirs automatically.")], content: Annotated[str, Field(description="File content as text (for text files) or base64-encoded data (for binary, set is_base64=true)")], is_base64: Annotated[bool, Field(description="True if content is base64-encoded binary data")] = False, ) -> str: """Write or overwrite a file. Use text content for txt/md/json etc. Use base64 for binary files (images, documents).""" user = get_current_user() if not user: return "Error: not authenticated" if is_base64: data = base64.b64decode(content) else: data = content.encode("utf-8") ct = _guess_mime(path, None) r = httpx.put(_dav(user, path), content=data, auth=_auth(user), headers={"Content-Type": ct}, timeout=60) if r.status_code in (200, 201, 204): return f"Datei geschrieben: {path} ({len(data):,} bytes)" if r.status_code == 409: return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht. Erstelle es zuerst mit create_folder." return f"Fehler: HTTP {r.status_code}" @mcp.tool() def create_folder( path: Annotated[str, Field(description="Folder path to create, e.g. '/Documents/Projekte', '/Fotos/2026'")], ) -> str: """Create a new folder/directory.""" user = get_current_user() if not user: return "Error: not authenticated" r = httpx.request("MKCOL", _dav(user, path), auth=_auth(user), timeout=15) if r.status_code in (200, 201): return f"Ordner erstellt: {path}" if r.status_code == 405: return f"Ordner existiert bereits: {path}" if r.status_code == 409: return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht." return f"Fehler: HTTP {r.status_code}" @mcp.tool() def delete_file( path: Annotated[str, Field(description="Path to file or folder to delete, e.g. '/Documents/old.txt', '/temp/'")], ) -> str: """Delete a file or folder (including all contents). This cannot be undone.""" user = get_current_user() if not user: return "Error: not authenticated" r = httpx.request("DELETE", _dav(user, path), auth=_auth(user), timeout=30) if r.status_code in (200, 204): return f"Geloescht: {path}" if r.status_code == 404: return f"Nicht gefunden: {path}" return f"Fehler: HTTP {r.status_code}" @mcp.tool() def move_file( source: Annotated[str, Field(description="Current path, e.g. '/Documents/old-name.txt'")], destination: Annotated[str, Field(description="New path, e.g. '/Documents/new-name.txt' or '/Archive/file.txt'")], ) -> str: """Move or rename a file or folder.""" user = get_current_user() if not user: return "Error: not authenticated" dest_url = _dav(user, destination) r = httpx.request("MOVE", _dav(user, source), auth=_auth(user), headers={"Destination": dest_url}, timeout=30) if r.status_code in (200, 201, 204): return f"Verschoben: {source} -> {destination}" if r.status_code == 404: return f"Quelle nicht gefunden: {source}" if r.status_code == 409: return f"Fehler: Zielverzeichnis existiert nicht." return f"Fehler: HTTP {r.status_code}" def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() @asynccontextmanager async def lifespan(app): async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) yield routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] app = Starlette(routes=routes, lifespan=lifespan) app.add_middleware(BearerAuthMiddleware) return app if __name__ == "__main__": import uvicorn uvicorn.run(create_app(), host="127.0.0.1", port=5103)