Files
mcp-home/files/server.py
T
Stefan Lohmaier ef37d1e467 Fix Mail Maildir paths and oCIS auth
Mail: INBOX is a subfolder (INBOX/cur), not the account root.
Removed special-case mapping that pointed INBOX to root dir.

Files: oCIS user may differ from MCP user (e.g. stefan -> admin).
Added _ocis_user() mapping for WebDAV paths and auth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-12 10:22:27 +02:00

260 lines
11 KiB
Python

"""MCP Files Server — browse and read files via WebDAV/oCIS."""
import os, sys, contextlib, base64
from xml.etree import ElementTree as ET
from typing import Annotated
import httpx
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
OCIS = _cfg['ocis_url']
OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()}
mcp = FastMCP("Files", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _ocis_user(u):
c = OCIS_CREDS.get(u)
return c[0] if c else u
def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None
def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{_ocis_user(u)}/{p.lstrip('/')}"
def _propfind(user, path="", depth=1):
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:displayname/><d:getcontentlength/><d:getlastmodified/><d:getcontenttype/></d:prop></d:propfind>'
r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30)
return r.text, r.status_code
def _parse_pf(xml, user):
ns = {"d": "DAV:"}
entries = []
try: root = ET.fromstring(xml)
except: return entries
bp = f"/remote.php/dav/files/{_ocis_user(user)}/"
for resp in root.findall("d:response", ns):
href = resp.findtext("d:href", "", ns) or ""
rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/")
props = resp.find(".//d:prop", ns)
if props is None: continue
entries.append({
"name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1],
"path": "/" + rel if rel else "/",
"is_dir": props.find("d:resourcetype/d:collection", ns) is not None,
"size": int(props.findtext("d:getcontentlength", "0", ns) or 0),
"modified": props.findtext("d:getlastmodified", "", ns),
"type": props.findtext("d:getcontenttype", "", ns),
})
return entries
@mcp.tool()
def list_files(
path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/",
) -> str:
"""List files and subdirectories at the given path. Shows name, size, and modification date."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
lines = []
for e in entries[1:]:
if e["is_dir"]: lines.append(f"[DIR] {e['name']}/")
else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})")
return "\n".join(lines) if lines else "Leeres Verzeichnis"
IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"}
TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"]
DOC_TYPES = {
"application/pdf", "application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.ms-excel", "application/vnd.ms-powerpoint",
"application/rtf", "application/epub+zip",
}
MAX_BIN_SIZE = 25_000_000
def _guess_mime(path, ct):
if ct and ct != "application/octet-stream":
return ct
ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
return {
"pdf": "application/pdf", "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
"doc": "application/msword", "xls": "application/vnd.ms-excel", "ppt": "application/vnd.ms-powerpoint",
"rtf": "application/rtf", "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
"gif": "image/gif", "webp": "image/webp", "svg": "image/svg+xml",
}.get(ext, ct or "application/octet-stream")
@mcp.tool()
def read_file(
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/report.pdf', '/photo.jpg'")],
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""Read a file. Text files return content directly. Images are displayed inline. Documents (PDF, docx, xlsx, pptx) are passed as binary for the client to process. Max 25 MB."""
user = get_current_user()
if not user: return [TextContent(type="text", text="Error: not authenticated")]
r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60)
if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")]
ct = _guess_mime(path, r.headers.get("content-type", "").split(";")[0].strip())
size = len(r.content)
if size > MAX_BIN_SIZE:
return [TextContent(type="text", text=f"Datei zu gross: {size:,} bytes (max {MAX_BIN_SIZE:,}). Typ: {ct}")]
if ct in IMAGE_TYPES:
return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)]
if any(t in ct for t in TEXT_HINTS):
return [TextContent(type="text", text=r.text[:100000])]
try:
text = r.content.decode("utf-8")
return [TextContent(type="text", text=text[:100000])]
except Exception:
pass
b64 = base64.b64encode(r.content).decode()
return [EmbeddedResource(type="resource", resource=BlobResourceContents(uri=f"file://{path}", blob=b64, mimeType=ct))]
@mcp.tool()
def file_info(
path: Annotated[str, Field(description="File or directory path")],
) -> str:
"""Get metadata about a file (size, type, modification date) or directory."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path, 0)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
if not entries: return "Nicht gefunden"
e = entries[0]
parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"]
if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes")
parts.append(f"Geaendert: {e['modified']}")
return "\n".join(parts)
@mcp.tool()
def search_files(
query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")],
path: Annotated[str, Field(description="Start directory for search")] = "/",
) -> str:
"""Search for files by name recursively (up to 5 levels deep, max 50 results)."""
user = get_current_user()
if not user: return "Error: not authenticated"
q = query.lower()
results = []
def _s(p, d=0):
if d > 5 or len(results) >= 50: return
xml, _ = _propfind(user, p)
for e in _parse_pf(xml, user)[1:]:
if q in e["name"].lower(): results.append(e)
if e["is_dir"]: _s(e["path"], d+1)
_s(path)
lines = []
for e in results:
prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)"
lines.append(f"{prefix} {e['path']}")
return "\n".join(lines) if lines else "Keine Dateien gefunden"
@mcp.tool()
def write_file(
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/todo.md'. Creates parent dirs automatically.")],
content: Annotated[str, Field(description="File content as text (for text files) or base64-encoded data (for binary, set is_base64=true)")],
is_base64: Annotated[bool, Field(description="True if content is base64-encoded binary data")] = False,
) -> str:
"""Write or overwrite a file. Use text content for txt/md/json etc. Use base64 for binary files (images, documents)."""
user = get_current_user()
if not user: return "Error: not authenticated"
if is_base64:
data = base64.b64decode(content)
else:
data = content.encode("utf-8")
ct = _guess_mime(path, None)
r = httpx.put(_dav(user, path), content=data, auth=_auth(user),
headers={"Content-Type": ct}, timeout=60)
if r.status_code in (200, 201, 204):
return f"Datei geschrieben: {path} ({len(data):,} bytes)"
if r.status_code == 409:
return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht. Erstelle es zuerst mit create_folder."
return f"Fehler: HTTP {r.status_code}"
@mcp.tool()
def create_folder(
path: Annotated[str, Field(description="Folder path to create, e.g. '/Documents/Projekte', '/Fotos/2026'")],
) -> str:
"""Create a new folder/directory."""
user = get_current_user()
if not user: return "Error: not authenticated"
r = httpx.request("MKCOL", _dav(user, path), auth=_auth(user), timeout=15)
if r.status_code in (200, 201):
return f"Ordner erstellt: {path}"
if r.status_code == 405:
return f"Ordner existiert bereits: {path}"
if r.status_code == 409:
return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht."
return f"Fehler: HTTP {r.status_code}"
@mcp.tool()
def delete_file(
path: Annotated[str, Field(description="Path to file or folder to delete, e.g. '/Documents/old.txt', '/temp/'")],
) -> str:
"""Delete a file or folder (including all contents). This cannot be undone."""
user = get_current_user()
if not user: return "Error: not authenticated"
r = httpx.request("DELETE", _dav(user, path), auth=_auth(user), timeout=30)
if r.status_code in (200, 204):
return f"Geloescht: {path}"
if r.status_code == 404:
return f"Nicht gefunden: {path}"
return f"Fehler: HTTP {r.status_code}"
@mcp.tool()
def move_file(
source: Annotated[str, Field(description="Current path, e.g. '/Documents/old-name.txt'")],
destination: Annotated[str, Field(description="New path, e.g. '/Documents/new-name.txt' or '/Archive/file.txt'")],
) -> str:
"""Move or rename a file or folder."""
user = get_current_user()
if not user: return "Error: not authenticated"
dest_url = _dav(user, destination)
r = httpx.request("MOVE", _dav(user, source), auth=_auth(user),
headers={"Destination": dest_url}, timeout=30)
if r.status_code in (200, 201, 204):
return f"Verschoben: {source} -> {destination}"
if r.status_code == 404:
return f"Quelle nicht gefunden: {source}"
if r.status_code == 409:
return f"Fehler: Zielverzeichnis existiert nicht."
return f"Fehler: HTTP {r.status_code}"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5103)