5190e8c849
Full CRUD for oCIS files: - write_file: text or base64 binary content - create_folder: MKCOL - delete_file: DELETE (files and folders) - move_file: MOVE (rename or relocate) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
256 lines
11 KiB
Python
256 lines
11 KiB
Python
"""MCP Files Server — browse and read files via WebDAV/oCIS."""
|
|
|
|
import os, sys, contextlib, base64
|
|
from xml.etree import ElementTree as ET
|
|
from typing import Annotated
|
|
|
|
import httpx
|
|
from pydantic import Field
|
|
from mcp.server.fastmcp import FastMCP
|
|
from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents
|
|
from starlette.applications import Starlette
|
|
from starlette.routing import Mount
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
|
|
|
|
from common import load_config as _lc
|
|
_cfg = _lc()
|
|
OCIS = _cfg['ocis_url']
|
|
OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()}
|
|
|
|
mcp = FastMCP("Files", stateless_http=True,
|
|
transport_security={"enable_dns_rebinding_protection": False})
|
|
|
|
def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None
|
|
def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{u}/{p.lstrip('/')}"
|
|
|
|
def _propfind(user, path="", depth=1):
|
|
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:displayname/><d:getcontentlength/><d:getlastmodified/><d:getcontenttype/></d:prop></d:propfind>'
|
|
r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30)
|
|
return r.text, r.status_code
|
|
|
|
def _parse_pf(xml, user):
|
|
ns = {"d": "DAV:"}
|
|
entries = []
|
|
try: root = ET.fromstring(xml)
|
|
except: return entries
|
|
bp = f"/remote.php/dav/files/{user}/"
|
|
for resp in root.findall("d:response", ns):
|
|
href = resp.findtext("d:href", "", ns) or ""
|
|
rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/")
|
|
props = resp.find(".//d:prop", ns)
|
|
if props is None: continue
|
|
entries.append({
|
|
"name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1],
|
|
"path": "/" + rel if rel else "/",
|
|
"is_dir": props.find("d:resourcetype/d:collection", ns) is not None,
|
|
"size": int(props.findtext("d:getcontentlength", "0", ns) or 0),
|
|
"modified": props.findtext("d:getlastmodified", "", ns),
|
|
"type": props.findtext("d:getcontenttype", "", ns),
|
|
})
|
|
return entries
|
|
|
|
|
|
@mcp.tool()
|
|
def list_files(
|
|
path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/",
|
|
) -> str:
|
|
"""List files and subdirectories at the given path. Shows name, size, and modification date."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
xml, st = _propfind(user, path)
|
|
if st >= 400: return f"Fehler: HTTP {st}"
|
|
entries = _parse_pf(xml, user)
|
|
lines = []
|
|
for e in entries[1:]:
|
|
if e["is_dir"]: lines.append(f"[DIR] {e['name']}/")
|
|
else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})")
|
|
return "\n".join(lines) if lines else "Leeres Verzeichnis"
|
|
|
|
|
|
IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"}
|
|
TEXT_HINTS = ["text/", "json", "xml", "csv", "yaml", "javascript", "markdown"]
|
|
DOC_TYPES = {
|
|
"application/pdf", "application/msword",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"application/vnd.ms-excel", "application/vnd.ms-powerpoint",
|
|
"application/rtf", "application/epub+zip",
|
|
}
|
|
MAX_BIN_SIZE = 25_000_000
|
|
|
|
def _guess_mime(path, ct):
|
|
if ct and ct != "application/octet-stream":
|
|
return ct
|
|
ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
|
|
return {
|
|
"pdf": "application/pdf", "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"doc": "application/msword", "xls": "application/vnd.ms-excel", "ppt": "application/vnd.ms-powerpoint",
|
|
"rtf": "application/rtf", "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
|
|
"gif": "image/gif", "webp": "image/webp", "svg": "image/svg+xml",
|
|
}.get(ext, ct or "application/octet-stream")
|
|
|
|
@mcp.tool()
|
|
def read_file(
|
|
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/report.pdf', '/photo.jpg'")],
|
|
) -> list[TextContent | ImageContent | EmbeddedResource]:
|
|
"""Read a file. Text files return content directly. Images are displayed inline. Documents (PDF, docx, xlsx, pptx) are passed as binary for the client to process. Max 25 MB."""
|
|
user = get_current_user()
|
|
if not user: return [TextContent(type="text", text="Error: not authenticated")]
|
|
r = httpx.get(_dav(user, path), auth=_auth(user), timeout=60)
|
|
if r.status_code >= 400: return [TextContent(type="text", text=f"Fehler: HTTP {r.status_code}")]
|
|
ct = _guess_mime(path, r.headers.get("content-type", "").split(";")[0].strip())
|
|
size = len(r.content)
|
|
if size > MAX_BIN_SIZE:
|
|
return [TextContent(type="text", text=f"Datei zu gross: {size:,} bytes (max {MAX_BIN_SIZE:,}). Typ: {ct}")]
|
|
if ct in IMAGE_TYPES:
|
|
return [ImageContent(type="image", data=base64.b64encode(r.content).decode(), mimeType=ct)]
|
|
if any(t in ct for t in TEXT_HINTS):
|
|
return [TextContent(type="text", text=r.text[:100000])]
|
|
try:
|
|
text = r.content.decode("utf-8")
|
|
return [TextContent(type="text", text=text[:100000])]
|
|
except Exception:
|
|
pass
|
|
b64 = base64.b64encode(r.content).decode()
|
|
return [EmbeddedResource(type="resource", resource=BlobResourceContents(uri=f"file://{path}", blob=b64, mimeType=ct))]
|
|
|
|
|
|
@mcp.tool()
|
|
def file_info(
|
|
path: Annotated[str, Field(description="File or directory path")],
|
|
) -> str:
|
|
"""Get metadata about a file (size, type, modification date) or directory."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
xml, st = _propfind(user, path, 0)
|
|
if st >= 400: return f"Fehler: HTTP {st}"
|
|
entries = _parse_pf(xml, user)
|
|
if not entries: return "Nicht gefunden"
|
|
e = entries[0]
|
|
parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"]
|
|
if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes")
|
|
parts.append(f"Geaendert: {e['modified']}")
|
|
return "\n".join(parts)
|
|
|
|
|
|
@mcp.tool()
|
|
def search_files(
|
|
query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")],
|
|
path: Annotated[str, Field(description="Start directory for search")] = "/",
|
|
) -> str:
|
|
"""Search for files by name recursively (up to 5 levels deep, max 50 results)."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
q = query.lower()
|
|
results = []
|
|
def _s(p, d=0):
|
|
if d > 5 or len(results) >= 50: return
|
|
xml, _ = _propfind(user, p)
|
|
for e in _parse_pf(xml, user)[1:]:
|
|
if q in e["name"].lower(): results.append(e)
|
|
if e["is_dir"]: _s(e["path"], d+1)
|
|
_s(path)
|
|
lines = []
|
|
for e in results:
|
|
prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)"
|
|
lines.append(f"{prefix} {e['path']}")
|
|
return "\n".join(lines) if lines else "Keine Dateien gefunden"
|
|
|
|
|
|
@mcp.tool()
|
|
def write_file(
|
|
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/todo.md'. Creates parent dirs automatically.")],
|
|
content: Annotated[str, Field(description="File content as text (for text files) or base64-encoded data (for binary, set is_base64=true)")],
|
|
is_base64: Annotated[bool, Field(description="True if content is base64-encoded binary data")] = False,
|
|
) -> str:
|
|
"""Write or overwrite a file. Use text content for txt/md/json etc. Use base64 for binary files (images, documents)."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
if is_base64:
|
|
data = base64.b64decode(content)
|
|
else:
|
|
data = content.encode("utf-8")
|
|
ct = _guess_mime(path, None)
|
|
r = httpx.put(_dav(user, path), content=data, auth=_auth(user),
|
|
headers={"Content-Type": ct}, timeout=60)
|
|
if r.status_code in (200, 201, 204):
|
|
return f"Datei geschrieben: {path} ({len(data):,} bytes)"
|
|
if r.status_code == 409:
|
|
return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht. Erstelle es zuerst mit create_folder."
|
|
return f"Fehler: HTTP {r.status_code}"
|
|
|
|
|
|
@mcp.tool()
|
|
def create_folder(
|
|
path: Annotated[str, Field(description="Folder path to create, e.g. '/Documents/Projekte', '/Fotos/2026'")],
|
|
) -> str:
|
|
"""Create a new folder/directory."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
r = httpx.request("MKCOL", _dav(user, path), auth=_auth(user), timeout=15)
|
|
if r.status_code in (200, 201):
|
|
return f"Ordner erstellt: {path}"
|
|
if r.status_code == 405:
|
|
return f"Ordner existiert bereits: {path}"
|
|
if r.status_code == 409:
|
|
return f"Fehler: Uebergeordnetes Verzeichnis existiert nicht."
|
|
return f"Fehler: HTTP {r.status_code}"
|
|
|
|
|
|
@mcp.tool()
|
|
def delete_file(
|
|
path: Annotated[str, Field(description="Path to file or folder to delete, e.g. '/Documents/old.txt', '/temp/'")],
|
|
) -> str:
|
|
"""Delete a file or folder (including all contents). This cannot be undone."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
r = httpx.request("DELETE", _dav(user, path), auth=_auth(user), timeout=30)
|
|
if r.status_code in (200, 204):
|
|
return f"Geloescht: {path}"
|
|
if r.status_code == 404:
|
|
return f"Nicht gefunden: {path}"
|
|
return f"Fehler: HTTP {r.status_code}"
|
|
|
|
|
|
@mcp.tool()
|
|
def move_file(
|
|
source: Annotated[str, Field(description="Current path, e.g. '/Documents/old-name.txt'")],
|
|
destination: Annotated[str, Field(description="New path, e.g. '/Documents/new-name.txt' or '/Archive/file.txt'")],
|
|
) -> str:
|
|
"""Move or rename a file or folder."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
dest_url = _dav(user, destination)
|
|
r = httpx.request("MOVE", _dav(user, source), auth=_auth(user),
|
|
headers={"Destination": dest_url}, timeout=30)
|
|
if r.status_code in (200, 201, 204):
|
|
return f"Verschoben: {source} -> {destination}"
|
|
if r.status_code == 404:
|
|
return f"Quelle nicht gefunden: {source}"
|
|
if r.status_code == 409:
|
|
return f"Fehler: Zielverzeichnis existiert nicht."
|
|
return f"Fehler: HTTP {r.status_code}"
|
|
|
|
|
|
def create_app():
|
|
from contextlib import asynccontextmanager
|
|
mcp_app = mcp.streamable_http_app()
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
async with contextlib.AsyncExitStack() as stack:
|
|
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
|
|
yield
|
|
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
|
|
app = Starlette(routes=routes, lifespan=lifespan)
|
|
app.add_middleware(BearerAuthMiddleware)
|
|
return app
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(create_app(), host="127.0.0.1", port=5103)
|