From fb642e47c801481f15fd0923d3ad5e9175157d1c Mon Sep 17 00:00:00 2001 From: Stefan Lohmaier Date: Fri, 12 Jun 2026 06:22:42 +0200 Subject: [PATCH] Initial commit: 5 MCP servers for Mail, Calendar, Contacts, Files, Notes Self-hosted MCP servers with OAuth client_credentials auth. Each server connects to a different backend: - Mail: reads Maildir IMAP backups - Calendar/Tasks: CalDAV against Radicale - Contacts: CardDAV against Radicale - Files: WebDAV against oCIS - Notes: Joplin REST API Credentials externalized to config.json (not in repo). Co-Authored-By: Claude Opus 4.6 --- .gitignore | 5 + CLAUDE.md | 69 ++++++++++++ README.md | 32 ++++++ calendar/server.py | 263 ++++++++++++++++++++++++++++++++++++++++++++ common.py | 146 ++++++++++++++++++++++++ config.json.example | 51 +++++++++ contacts/server.py | 166 ++++++++++++++++++++++++++++ files/server.py | 144 ++++++++++++++++++++++++ mail/server.py | 192 ++++++++++++++++++++++++++++++++ notes/server.py | 121 ++++++++++++++++++++ tokens.json.example | 10 ++ 11 files changed, 1199 insertions(+) create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 README.md create mode 100644 calendar/server.py create mode 100644 common.py create mode 100644 config.json.example create mode 100644 contacts/server.py create mode 100644 files/server.py create mode 100644 mail/server.py create mode 100644 notes/server.py create mode 100644 tokens.json.example diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a1b7fba --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +venv/ +__pycache__/ +*.pyc +tokens.json +config.json diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..adb3196 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,69 @@ +# mcp-home + +Self-hosted MCP (Model Context Protocol) servers for Claude. Provides access to personal data services via remote HTTP endpoints with OAuth client_credentials auth. + +## Architecture + +5 independent MCP servers, each on its own port, behind nginx reverse proxy at `mcp.home.slohmaier.de`: + +- **Mail** (Port 5100) — searches IMAP backup Maildirs +- **Calendar** (Port 5101) — CalDAV against Radicale (events + tasks/reminders) +- **Contacts** (Port 5102) — CardDAV against Radicale +- **Files** (Port 5103) — WebDAV against oCIS +- **Notes** (Port 5104) — Joplin REST API + +All servers share `common.py` for OAuth token handling and user resolution. + +## Auth Flow + +OAuth `client_credentials` grant — no browser redirect needed: + +1. Client POSTs to `//token` with `client_id` + `client_secret` +2. Server returns `access_token` (valid 24h) +3. Client sends `Authorization: Bearer ` with MCP requests +4. User is identified by `client_id` (matches `tokens.json`) + +OAuth metadata at `//.well-known/oauth-authorization-server`. + +## User Separation + +`tokens.json` (not in git, chmod 600) maps client_id → user. Each user only sees their own data. Shared CalDAV calendars are visible to both users. + +## Files + +- `common.py` — OAuth endpoints, Bearer middleware, user resolution via contextvars +- `tokens.json` — client_id/secret per user (generate with `python3 -c "import secrets; print(secrets.token_urlsafe(48))"`) +- `mail/server.py` — Maildir reader, search across subject/from/to/body +- `calendar/server.py` — CalDAV REPORT queries, event/task CRUD, vobject serialization +- `contacts/server.py` — CardDAV addressbook-query, vCard parsing, contact CRUD +- `files/server.py` — WebDAV PROPFIND/GET, recursive search, text file reading +- `notes/server.py` — Joplin REST API wrapper (needs per-user API token in code) + +## Dependencies + +``` +pip install mcp[cli] httpx vobject python-dateutil +``` + +Python 3.12+, venv at `./venv/`. + +## Deployment + +Each server runs as systemd unit (`mcp-.service`), managed by uvicorn. nginx at `mcp.home.slohmaier.de` proxies `//` to the right port. TLS via Certbot. + +## Adding a New Service + +1. Create `/server.py` with `FastMCP` instance + `create_app()` using the same pattern +2. Import `get_current_user`, `OAUTH_ROUTES`, `BearerAuthMiddleware` from `common.py` +3. Add nginx location block for `//` +4. Create systemd unit `mcp-.service` +5. Update `tokens.json` if the new service needs different user mapping + +## Tool Design Guidelines + +- Every tool docstring is the first thing Claude sees — make it actionable ("Call this first", "Use the UID from search results") +- Use `Annotated[str, Field(description="...")]` with example values for every parameter +- Indicate which parameters are optional ("Leave empty for all") +- Reference other tools in descriptions ("Use get_contact with the UID for full details") +- Return structured text, not JSON — Claude parses text faster +- Include UIDs/keys in output so Claude can chain to detail tools without asking the user diff --git a/README.md b/README.md new file mode 100644 index 0000000..6095186 --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# mcp-home + +Self-hosted [MCP](https://modelcontextprotocol.io) servers for Claude. Gives Claude access to your email, calendar, contacts, files, and notes — all running on your own hardware. + +## Services + +- **Mail** — search and read emails from IMAP backups (Maildir format) +- **Calendar + Tasks** — read/write events and reminders via CalDAV (Radicale) +- **Contacts** — search/read/write contacts via CardDAV (Radicale) +- **Files** — browse and read files via WebDAV (oCIS) +- **Notes** — search/read/write notes via Joplin API + +## Setup + +```bash +python3 -m venv venv +venv/bin/pip install mcp[cli] httpx vobject python-dateutil +``` + +Copy `tokens.json.example` to `tokens.json` and set client secrets. + +## Usage with claude.ai + +Add as Custom MCP Server in claude.ai Settings → Integrations: + +- **URL**: `https://your-domain/mail/mcp` (or calendar, contacts, files, notes) +- **OAuth Client ID**: your username +- **OAuth Client Secret**: your secret from tokens.json + +## License + +MIT diff --git a/calendar/server.py b/calendar/server.py new file mode 100644 index 0000000..e428b8f --- /dev/null +++ b/calendar/server.py @@ -0,0 +1,263 @@ +"""MCP Calendar + Tasks Server — reads and writes calendars and tasks via CalDAV.""" + +import os, sys, re, contextlib, uuid +from datetime import datetime, timedelta +from typing import Annotated + +import httpx, vobject +from pydantic import Field +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware + +from common import load_config as _lc +_cfg = _lc() +RADICALE = _cfg['radicale_url'] +CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()} +CAL_PATHS = _cfg['calendar_paths'] + +mcp = FastMCP("Calendar", stateless_http=True, + transport_security={"enable_dns_rebinding_protection": False}) + + +def _auth(user): + c = CREDS.get(user) + return httpx.BasicAuth(c[0], c[1]) if c else None + +def _propfind(url, auth, depth=1): + body = '' + return httpx.request("PROPFIND", url, content=body, auth=auth, headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30).text + +def _discover(user, comp): + auth = _auth(user) + cols = [] + for base in CAL_PATHS.get(user, []): + xml = _propfind(RADICALE + base, auth) + for m in re.finditer(r'([^<]+)', xml): + href = m.group(1) + if href.rstrip("/") == base.rstrip("/"): continue + block = xml[m.start():xml.find("", m.end())] + if comp not in block: continue + nm = re.search(r'([^<]*)', block) + cols.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href}) + return cols + +def _report(href, auth, start, end): + body = f'' + return httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30).text + +def _report_tasks(href, auth, inc=False): + filt = '' if inc else 'COMPLETED' + body = f'{filt}' + return httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30).text + +def _parse(xml, comp="VEVENT"): + objs = [] + for m in re.finditer(r']*>(.*?)', xml, re.DOTALL): + raw = m.group(1).replace("<","<").replace(">",">").replace("&","&") + try: + for c in vobject.readOne(raw).components(): + if c.name == comp: objs.append(c) + except: pass + return objs + +def _fmt_ev(ev, cal=""): + s = getattr(ev, 'dtstart', None) + e = getattr(ev, 'dtend', None) + summary = str(ev.summary.value) if hasattr(ev, 'summary') else '(kein Titel)' + start = s.value.isoformat() if s and hasattr(s.value, 'isoformat') else str(s.value) if s else '?' + end_s = e.value.isoformat() if e and hasattr(e.value, 'isoformat') else '' + loc = str(ev.location.value) if hasattr(ev, 'location') and ev.location.value else '' + desc = str(ev.description.value)[:300] if hasattr(ev, 'description') and ev.description.value else '' + uid = str(ev.uid.value) if hasattr(ev, 'uid') else '' + status = str(ev.status.value) if hasattr(ev, 'status') else '' + attendees = [] + if hasattr(ev, 'attendee_list'): + for att in ev.attendee_list: + cn = att.params.get("CN", [""])[0] if hasattr(att, 'params') and att.params else "" + ps = att.params.get("PARTSTAT", [""])[0] if hasattr(att, 'params') and att.params else "" + attendees.append(f"{cn} ({ps})" if cn else str(att.value)) + rrule = str(ev.rrule.value) if hasattr(ev, 'rrule') else '' + parts = [f"[{cal}] {summary}" if cal else summary] + parts.append(f" Wann: {start}" + (f" bis {end_s}" if end_s else "")) + if loc: parts.append(f" Ort: {loc}") + if desc: parts.append(f" Beschreibung: {desc}") + if status: parts.append(f" Status: {status}") + if attendees: parts.append(f" Teilnehmer: {', '.join(attendees[:8])}") + if rrule: parts.append(f" Wiederholung: {rrule}") + parts.append(f" UID: {uid}") + return "\n".join(parts) + +def _fmt_task(t, lst=""): + summary = str(t.summary.value) if hasattr(t, 'summary') else '(kein Titel)' + status = str(t.status.value) if hasattr(t, 'status') else 'NEEDS-ACTION' + due = t.due.value.isoformat() if hasattr(t, 'due') and hasattr(t.due.value, 'isoformat') else str(t.due.value) if hasattr(t, 'due') else '' + pri = int(t.priority.value) if hasattr(t, 'priority') else 0 + uid = str(t.uid.value) if hasattr(t, 'uid') else '' + desc = str(t.description.value)[:200] if hasattr(t, 'description') and t.description.value else '' + chk = '[x]' if status == 'COMPLETED' else '[ ]' + parts = [f"[{lst}] {chk} {summary}" if lst else f"{chk} {summary}"] + if due: parts.append(f" Faellig: {due}") + if pri: parts.append(f" Prioritaet: {pri} (1=hoechste, 9=niedrigste)") + if desc: parts.append(f" Notiz: {desc}") + parts.append(f" UID: {uid}") + return "\n".join(parts) + + +@mcp.tool() +def list_calendars() -> str: + """List all calendars available to the user (personal + shared). Call this first to see calendar names.""" + user = get_current_user() + if not user: return "Error: not authenticated" + cals = _discover(user, "VEVENT") + return "\n".join(f"{c['name']}" for c in cals) if cals else "Keine Kalender gefunden" + + +@mcp.tool() +def get_events( + calendar: Annotated[str, Field(description="Calendar name from list_calendars, e.g. 'Stefan', 'Arbeit', 'Familie'. Use partial match.")], + date_from: Annotated[str, Field(description="Start date as YYYY-MM-DD, e.g. '2026-06-12'")], + date_to: Annotated[str, Field(description="End date as YYYY-MM-DD, e.g. '2026-06-19'")], +) -> str: + """Get all events from a calendar within a date range. Returns title, time, location, attendees, and UID.""" + user = get_current_user() + if not user: return "Error: not authenticated" + cals = [c for c in _discover(user, "VEVENT") if calendar.lower() in c["name"].lower()] + if not cals: return f"Kalender '{calendar}' nicht gefunden. Nutze list_calendars." + s = datetime.strptime(date_from, "%Y-%m-%d").strftime("%Y%m%dT000000Z") + e = datetime.strptime(date_to, "%Y-%m-%d").strftime("%Y%m%dT235959Z") + results = [] + for cal in cals: + for ev in _parse(_report(cal["href"], _auth(user), s, e)): + results.append(_fmt_ev(ev, cal["name"])) + return "\n\n".join(results) if results else "Keine Termine in diesem Zeitraum" + + +@mcp.tool() +def search_events( + query: Annotated[str, Field(description="Search term — matches title, description, location. Example: 'Zahnarzt', 'Meeting', 'Berlin'")], + date_from: Annotated[str, Field(description="Start date YYYY-MM-DD. Default: 90 days ago")] = "", + date_to: Annotated[str, Field(description="End date YYYY-MM-DD. Default: 1 year ahead")] = "", +) -> str: + """Search events by keyword across all calendars. Returns matching events with details.""" + user = get_current_user() + if not user: return "Error: not authenticated" + if not date_from: date_from = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d") + if not date_to: date_to = (datetime.now() + timedelta(days=365)).strftime("%Y-%m-%d") + s = datetime.strptime(date_from, "%Y-%m-%d").strftime("%Y%m%dT000000Z") + e = datetime.strptime(date_to, "%Y-%m-%d").strftime("%Y%m%dT235959Z") + q = query.lower() + results = [] + for cal in _discover(user, "VEVENT"): + for ev in _parse(_report(cal["href"], _auth(user), s, e)): + txt = f"{ev.summary.value if hasattr(ev,'summary') else ''} {ev.description.value if hasattr(ev,'description') else ''} {ev.location.value if hasattr(ev,'location') else ''}".lower() + if q in txt: + results.append(_fmt_ev(ev, cal["name"])) + return "\n\n".join(results[:30]) if results else "Keine Treffer" + + +@mcp.tool() +def list_task_lists() -> str: + """List all task lists (Erinnerungen/Reminders). Call this first before get_tasks.""" + user = get_current_user() + if not user: return "Error: not authenticated" + lists = _discover(user, "VTODO") + return "\n".join(f"{l['name']}" for l in lists) if lists else "Keine Aufgabenlisten" + + +@mcp.tool() +def get_tasks( + task_list: Annotated[str, Field(description="Task list name from list_task_lists, e.g. 'Erinnerungen'. Leave empty for all lists.")] = "", + include_completed: Annotated[bool, Field(description="Also show completed tasks")] = False, +) -> str: + """Get tasks/reminders from a task list. Shows title, due date, priority, and status.""" + user = get_current_user() + if not user: return "Error: not authenticated" + lists = _discover(user, "VTODO") + if task_list: lists = [l for l in lists if task_list.lower() in l["name"].lower()] + results = [] + for lst in lists: + for t in _parse(_report_tasks(lst["href"], _auth(user), include_completed), "VTODO"): + results.append(_fmt_task(t, lst["name"])) + return "\n\n".join(results) if results else "Keine Aufgaben" + + +@mcp.tool() +def create_event( + calendar: Annotated[str, Field(description="Calendar name, e.g. 'Stefan', 'Arbeit', 'Familie'")], + title: Annotated[str, Field(description="Event title")], + start: Annotated[str, Field(description="Start: YYYY-MM-DD for all-day, or YYYY-MM-DDTHH:MM for timed events, e.g. '2026-06-15T14:00'")], + end: Annotated[str, Field(description="End: YYYY-MM-DD for all-day, or YYYY-MM-DDTHH:MM, e.g. '2026-06-15T15:30'")], + location: Annotated[str, Field(description="Location/address")] = "", + description: Annotated[str, Field(description="Event description or notes")] = "", + allday: Annotated[bool, Field(description="True for all-day events (then use YYYY-MM-DD for start/end)")] = False, +) -> str: + """Create a new calendar event. For all-day events, end date is exclusive (event on June 15 → start=2026-06-15, end=2026-06-16).""" + user = get_current_user() + if not user: return "Error: not authenticated" + cals = [c for c in _discover(user, "VEVENT") if calendar.lower() in c["name"].lower()] + if not cals: return f"Kalender '{calendar}' nicht gefunden" + uid = str(uuid.uuid4()) + cal = vobject.iCalendar() + ev = cal.add("vevent") + ev.add("uid").value = uid + ev.add("summary").value = title + if allday: + ev.add("dtstart").value = datetime.strptime(start, "%Y-%m-%d").date() + ev.add("dtend").value = datetime.strptime(end, "%Y-%m-%d").date() + else: + ev.add("dtstart").value = datetime.fromisoformat(start) + ev.add("dtend").value = datetime.fromisoformat(end) + if location: ev.add("location").value = location + if description: ev.add("description").value = description + ev.add("dtstamp").value = datetime.utcnow() + r = httpx.put(RADICALE + cals[0]["href"] + uid + ".ics", content=cal.serialize(), auth=_auth(user), headers={"Content-Type": "text/calendar"}, timeout=15) + return f"Termin erstellt: {title} am {start}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" + + +@mcp.tool() +def create_task( + task_list: Annotated[str, Field(description="Task list name, e.g. 'Erinnerungen', 'Langzeiterinnerungen'")], + title: Annotated[str, Field(description="Task title")], + due: Annotated[str, Field(description="Due date: YYYY-MM-DD or YYYY-MM-DDTHH:MM. Leave empty for no due date.")] = "", + priority: Annotated[int, Field(description="Priority 1-9 (1=highest, 9=lowest, 0=none)")] = 0, + description: Annotated[str, Field(description="Task notes")] = "", +) -> str: + """Create a new task/reminder in the specified list.""" + user = get_current_user() + if not user: return "Error: not authenticated" + lists = [l for l in _discover(user, "VTODO") if task_list.lower() in l["name"].lower()] + if not lists: return f"Liste '{task_list}' nicht gefunden" + uid = str(uuid.uuid4()) + cal = vobject.iCalendar() + todo = cal.add("vtodo") + todo.add("uid").value = uid + todo.add("summary").value = title + todo.add("status").value = "NEEDS-ACTION" + if due: todo.add("due").value = datetime.fromisoformat(due) if "T" in due else datetime.strptime(due, "%Y-%m-%d").date() + if priority: todo.add("priority").value = str(priority) + if description: todo.add("description").value = description + todo.add("dtstamp").value = datetime.utcnow() + r = httpx.put(RADICALE + lists[0]["href"] + uid + ".ics", content=cal.serialize(), auth=_auth(user), headers={"Content-Type": "text/calendar"}, timeout=15) + return f"Aufgabe erstellt: {title}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" + + +def create_app(): + from contextlib import asynccontextmanager + mcp_app = mcp.streamable_http_app() + @asynccontextmanager + async def lifespan(app): + async with contextlib.AsyncExitStack() as stack: + await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) + yield + routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] + app = Starlette(routes=routes, lifespan=lifespan) + app.add_middleware(BearerAuthMiddleware) + return app + +if __name__ == "__main__": + import uvicorn + uvicorn.run(create_app(), host="127.0.0.1", port=5101) diff --git a/common.py b/common.py new file mode 100644 index 0000000..2129575 --- /dev/null +++ b/common.py @@ -0,0 +1,146 @@ +"""Shared OAuth + user resolution for all MCP servers. + +OAuth client_credentials flow: +1. claude.ai discovers /.well-known/oauth-authorization-server +2. claude.ai POSTs to /token with client_id + client_secret +3. Server returns access_token +4. claude.ai sends Bearer access_token with MCP requests +5. Server resolves user from token +""" + +import json +import os +import secrets +import contextvars +import time +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import Route +from starlette.middleware.base import BaseHTTPMiddleware + +BASE_DIR = os.path.dirname(__file__) +TOKENS_FILE = os.path.join(BASE_DIR, "tokens.json") +CONFIG_FILE = os.path.join(BASE_DIR, "config.json") +VALID_USERS = ["stefan", "kati"] + +_config_cache = None + +def load_config(): + global _config_cache + if _config_cache is None: + with open(CONFIG_FILE) as f: + _config_cache = json.load(f) + return _config_cache + +_tokens_cache = None +_current_user: contextvars.ContextVar[str | None] = contextvars.ContextVar("current_user", default=None) +_access_tokens: dict[str, dict] = {} + + +def _load_tokens(): + global _tokens_cache + if _tokens_cache is None: + with open(TOKENS_FILE) as f: + _tokens_cache = json.load(f) + return _tokens_cache + + +def get_current_user() -> str | None: + return _current_user.get() + + +def get_user_key(username: str) -> str: + return _load_tokens().get(username, {}).get("token", "") + + +def _resolve_client(client_id, client_secret): + tokens = _load_tokens() + for username, data in tokens.items(): + if username == client_id and data["token"] == client_secret: + return username + return None + + +def _resolve_access_token(token): + info = _access_tokens.get(token) + if not info: + return None + if info.get("expires_at", 0) < time.time(): + del _access_tokens[token] + return None + return info["user"] + + +async def oauth_metadata(request: Request): + base = str(request.base_url).rstrip("/") + return JSONResponse({ + "issuer": base, + "token_endpoint": base + "/token", + "response_types_supported": ["token"], + "grant_types_supported": ["client_credentials"], + "token_endpoint_auth_methods_supported": ["client_secret_post"], + }) + + +async def oauth_token(request: Request): + try: + form = await request.form() + grant_type = form.get("grant_type", "") + client_id = form.get("client_id", "") + client_secret = form.get("client_secret", "") + except Exception: + body = await request.body() + try: + data = json.loads(body) + grant_type = data.get("grant_type", "") + client_id = data.get("client_id", "") + client_secret = data.get("client_secret", "") + except Exception: + return JSONResponse({"error": "invalid_request"}, status_code=400) + + if grant_type != "client_credentials": + return JSONResponse({"error": "unsupported_grant_type"}, status_code=400) + + user = _resolve_client(client_id, client_secret) + if not user: + return JSONResponse({"error": "invalid_client"}, status_code=401) + + access_token = secrets.token_urlsafe(48) + expires_in = 86400 + _access_tokens[access_token] = {"user": user, "expires_at": time.time() + expires_in} + + return JSONResponse({ + "access_token": access_token, + "token_type": "bearer", + "expires_in": expires_in, + }) + + +OAUTH_ROUTES = [ + Route("/.well-known/oauth-authorization-server", oauth_metadata, methods=["GET"]), + Route("/token", oauth_token, methods=["POST"]), +] + + +class BearerAuthMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + path = request.url.path + if path.endswith("/token") or "/.well-known/" in path: + return await call_next(request) + + auth = request.headers.get("authorization", "") + if auth.startswith("Bearer "): + token = auth[7:] + user = _resolve_access_token(token) + if user: + tok = _current_user.set(user) + try: + return await call_next(request) + finally: + _current_user.reset(tok) + + return JSONResponse( + {"error": "unauthorized"}, + status_code=401, + headers={"WWW-Authenticate": 'Bearer resource_metadata="/.well-known/oauth-authorization-server"'}, + ) diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..7aac37d --- /dev/null +++ b/config.json.example @@ -0,0 +1,51 @@ +{ + "radicale_url": "http://127.0.0.1:5232", + "radicale_users": { + "stefan": { + "username": "stefan", + "password": "CHANGE_ME" + }, + "kati": { + "username": "kati", + "password": "CHANGE_ME" + } + }, + "ocis_url": "http://127.0.0.1:9200", + "ocis_users": { + "stefan": { + "username": "stefan", + "password": "CHANGE_ME" + }, + "kati": { + "username": "kati", + "password": "CHANGE_ME" + } + }, + "joplin_url": "http://127.0.0.1:22300", + "joplin_tokens": { + "stefan": "", + "kati": "" + }, + "mail_roots": { + "stefan": "/mnt/ssd/Backup/stefan/imap", + "kati": "/mnt/ssd/Backup/kati/imap" + }, + "calendar_paths": { + "stefan": [ + "/stefan/", + "/shared/" + ], + "kati": [ + "/kati/", + "/shared/" + ] + }, + "addressbook_paths": { + "stefan": [ + "/stefan/" + ], + "kati": [ + "/kati/" + ] + } +} \ No newline at end of file diff --git a/contacts/server.py b/contacts/server.py new file mode 100644 index 0000000..d343187 --- /dev/null +++ b/contacts/server.py @@ -0,0 +1,166 @@ +"""MCP Contacts Server — reads and writes contacts via CardDAV.""" + +import os, sys, re, contextlib, uuid +from typing import Annotated + +import httpx, vobject +from pydantic import Field +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware + +from common import load_config as _lc +_cfg = _lc() +RADICALE = _cfg['radicale_url'] +CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()} +AB_PATHS = _cfg['addressbook_paths'] + +mcp = FastMCP("Contacts", stateless_http=True, + transport_security={"enable_dns_rebinding_protection": False}) + +def _auth(u): c = CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None + +def _discover_ab(user): + books = [] + for base in AB_PATHS.get(user, []): + body = '' + r = httpx.request("PROPFIND", RADICALE + base, content=body, auth=_auth(user), headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30) + for m in re.finditer(r'([^<]+)', r.text): + href = m.group(1) + if href.rstrip("/") == base.rstrip("/"): continue + block = r.text[m.start():r.text.find("", m.end())] + if "addressbook" not in block.lower(): continue + nm = re.search(r'([^<]*)', block) + books.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href}) + return books + +def _get_contacts(href, auth): + body = '' + r = httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=60) + contacts = [] + for m in re.finditer(r']*>(.*?)', r.text, re.DOTALL): + raw = m.group(1).replace("<","<").replace(">",">").replace("&","&") + try: contacts.append(vobject.readOne(raw)) + except: pass + return contacts + +def _brief(c): + fn = str(c.fn.value) if hasattr(c, 'fn') else '?' + org = f" ({c.org.value[0]})" if hasattr(c, 'org') and c.org.value else '' + uid = str(c.uid.value) if hasattr(c, 'uid') else '' + parts = [f"{fn}{org}"] + for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): + parts.append(f" Email: {em.value}") + for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): + parts.append(f" Tel: {tel.value}") + parts.append(f" UID: {uid}") + return "\n".join(parts) + +def _full(c): + parts = [str(c.fn.value) if hasattr(c, 'fn') else '?'] + if hasattr(c, 'n'): + n = c.n.value + if n.prefix: parts.append(f" Anrede: {n.prefix}") + parts.append(f" Vorname: {n.given}") + parts.append(f" Nachname: {n.family}") + if n.suffix: parts.append(f" Suffix: {n.suffix}") + if hasattr(c, 'nickname'): parts.append(f" Spitzname: {c.nickname.value}") + for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])): + typ = em.params.get("TYPE", [""])[0] if hasattr(em, 'params') and em.params else "" + parts.append(f" Email ({typ or 'sonstig'}): {em.value}") + for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])): + typ = tel.params.get("TYPE", [""])[0] if hasattr(tel, 'params') and tel.params else "" + parts.append(f" Telefon ({typ or 'sonstig'}): {tel.value}") + for adr in (c.adr_list if hasattr(c, 'adr_list') else ([c.adr] if hasattr(c, 'adr') else [])): + a = adr.value + addr = ", ".join(filter(None, [a.street, a.city, a.region, a.code, a.country])) + if addr: parts.append(f" Adresse: {addr}") + if hasattr(c, 'org') and c.org.value: parts.append(f" Firma: {c.org.value[0]}") + if hasattr(c, 'title'): parts.append(f" Titel: {c.title.value}") + if hasattr(c, 'role'): parts.append(f" Rolle: {c.role.value}") + if hasattr(c, 'bday'): parts.append(f" Geburtstag: {c.bday.value}") + if hasattr(c, 'note') and c.note.value: parts.append(f" Notiz: {str(c.note.value)[:300]}") + if hasattr(c, 'url'): parts.append(f" URL: {c.url.value}") + if hasattr(c, 'uid'): parts.append(f" UID: {c.uid.value}") + return "\n".join(parts) + + +@mcp.tool() +def search_contacts( + query: Annotated[str, Field(description="Search term — matches name, email, phone, company, address, notes. Example: 'Lohmaier', 'BMW', '+49171'")], + limit: Annotated[int, Field(description="Maximum number of results")] = 30, +) -> str: + """Search contacts by any field. Returns name, email, phone, and UID. Use get_contact with the UID for full details.""" + user = get_current_user() + if not user: return "Error: not authenticated" + q = query.lower() + results = [] + for book in _discover_ab(user): + for c in _get_contacts(book["href"], _auth(user)): + txt = " ".join(str(getattr(c, a, None) or '') for a in ['fn','email','tel','org','note','nickname']).lower() + if q in txt: + results.append(_brief(c)) + if len(results) >= limit: break + return "\n\n".join(results) if results else "Keine Kontakte gefunden" + + +@mcp.tool() +def get_contact( + uid: Annotated[str, Field(description="Contact UID from search_contacts results")], +) -> str: + """Get full details of a contact by UID: name, all emails, all phones, addresses, company, birthday, notes.""" + user = get_current_user() + if not user: return "Error: not authenticated" + for book in _discover_ab(user): + for c in _get_contacts(book["href"], _auth(user)): + if hasattr(c, 'uid') and str(c.uid.value) == uid: + return _full(c) + return f"Kontakt nicht gefunden: {uid}" + + +@mcp.tool() +def create_contact( + name: Annotated[str, Field(description="Full name, e.g. 'Max Mustermann'")], + email: Annotated[str, Field(description="Email address")] = "", + phone: Annotated[str, Field(description="Phone number, e.g. '+49 171 1234567'")] = "", + organization: Annotated[str, Field(description="Company/organization name")] = "", + note: Annotated[str, Field(description="Notes about this contact")] = "", +) -> str: + """Create a new contact in the default addressbook.""" + user = get_current_user() + if not user: return "Error: not authenticated" + books = _discover_ab(user) + if not books: return "Kein Adressbuch gefunden" + uid_val = str(uuid.uuid4()) + card = vobject.vCard() + card.add("uid").value = uid_val + card.add("fn").value = name + p = name.split(" ", 1) + card.add("n").value = vobject.vcard.Name(family=p[-1] if len(p)>1 else p[0], given=p[0] if len(p)>1 else "") + if email: card.add("email").value = email + if phone: card.add("tel").value = phone + if organization: card.add("org").value = [organization] + if note: card.add("note").value = note + r = httpx.put(RADICALE + books[0]["href"] + uid_val + ".vcf", content=card.serialize(), auth=_auth(user), headers={"Content-Type": "text/vcard"}, timeout=15) + return f"Kontakt erstellt: {name}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}" + + +def create_app(): + from contextlib import asynccontextmanager + mcp_app = mcp.streamable_http_app() + @asynccontextmanager + async def lifespan(app): + async with contextlib.AsyncExitStack() as stack: + await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) + yield + routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] + app = Starlette(routes=routes, lifespan=lifespan) + app.add_middleware(BearerAuthMiddleware) + return app + +if __name__ == "__main__": + import uvicorn + uvicorn.run(create_app(), host="127.0.0.1", port=5102) diff --git a/files/server.py b/files/server.py new file mode 100644 index 0000000..f052d81 --- /dev/null +++ b/files/server.py @@ -0,0 +1,144 @@ +"""MCP Files Server — browse and read files via WebDAV/oCIS.""" + +import os, sys, contextlib +from xml.etree import ElementTree as ET +from typing import Annotated + +import httpx +from pydantic import Field +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware + +from common import load_config as _lc +_cfg = _lc() +OCIS = _cfg['ocis_url'] +OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()} + +mcp = FastMCP("Files", stateless_http=True, + transport_security={"enable_dns_rebinding_protection": False}) + +def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None +def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{u}/{p.lstrip('/')}" + +def _propfind(user, path="", depth=1): + body = '' + r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30) + return r.text, r.status_code + +def _parse_pf(xml, user): + ns = {"d": "DAV:"} + entries = [] + try: root = ET.fromstring(xml) + except: return entries + bp = f"/remote.php/dav/files/{user}/" + for resp in root.findall("d:response", ns): + href = resp.findtext("d:href", "", ns) or "" + rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/") + props = resp.find(".//d:prop", ns) + if props is None: continue + entries.append({ + "name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1], + "path": "/" + rel if rel else "/", + "is_dir": props.find("d:resourcetype/d:collection", ns) is not None, + "size": int(props.findtext("d:getcontentlength", "0", ns) or 0), + "modified": props.findtext("d:getlastmodified", "", ns), + "type": props.findtext("d:getcontenttype", "", ns), + }) + return entries + + +@mcp.tool() +def list_files( + path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/", +) -> str: + """List files and subdirectories at the given path. Shows name, size, and modification date.""" + user = get_current_user() + if not user: return "Error: not authenticated" + xml, st = _propfind(user, path) + if st >= 400: return f"Fehler: HTTP {st}" + entries = _parse_pf(xml, user) + lines = [] + for e in entries[1:]: + if e["is_dir"]: lines.append(f"[DIR] {e['name']}/") + else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})") + return "\n".join(lines) if lines else "Leeres Verzeichnis" + + +@mcp.tool() +def read_file( + path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/config.yaml'")], +) -> str: + """Read a text file's content (txt, md, json, csv, yaml, xml, html). Binary files show only metadata.""" + user = get_current_user() + if not user: return "Error: not authenticated" + r = httpx.get(_dav(user, path), auth=_auth(user), timeout=30) + if r.status_code >= 400: return f"Fehler: HTTP {r.status_code}" + ct = r.headers.get("content-type", "") + if any(t in ct for t in ["text/", "json", "xml", "csv", "yaml", "javascript"]): + return r.text[:100000] + try: return r.content.decode("utf-8")[:100000] + except: return f"Binaerdatei ({len(r.content)} bytes, Typ: {ct}). Kann nicht angezeigt werden." + + +@mcp.tool() +def file_info( + path: Annotated[str, Field(description="File or directory path")], +) -> str: + """Get metadata about a file (size, type, modification date) or directory.""" + user = get_current_user() + if not user: return "Error: not authenticated" + xml, st = _propfind(user, path, 0) + if st >= 400: return f"Fehler: HTTP {st}" + entries = _parse_pf(xml, user) + if not entries: return "Nicht gefunden" + e = entries[0] + parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"] + if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes") + parts.append(f"Geaendert: {e['modified']}") + return "\n".join(parts) + + +@mcp.tool() +def search_files( + query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")], + path: Annotated[str, Field(description="Start directory for search")] = "/", +) -> str: + """Search for files by name recursively (up to 5 levels deep, max 50 results).""" + user = get_current_user() + if not user: return "Error: not authenticated" + q = query.lower() + results = [] + def _s(p, d=0): + if d > 5 or len(results) >= 50: return + xml, _ = _propfind(user, p) + for e in _parse_pf(xml, user)[1:]: + if q in e["name"].lower(): results.append(e) + if e["is_dir"]: _s(e["path"], d+1) + _s(path) + lines = [] + for e in results: + prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)" + lines.append(f"{prefix} {e['path']}") + return "\n".join(lines) if lines else "Keine Dateien gefunden" + + +def create_app(): + from contextlib import asynccontextmanager + mcp_app = mcp.streamable_http_app() + @asynccontextmanager + async def lifespan(app): + async with contextlib.AsyncExitStack() as stack: + await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) + yield + routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] + app = Starlette(routes=routes, lifespan=lifespan) + app.add_middleware(BearerAuthMiddleware) + return app + +if __name__ == "__main__": + import uvicorn + uvicorn.run(create_app(), host="127.0.0.1", port=5103) diff --git a/mail/server.py b/mail/server.py new file mode 100644 index 0000000..b737eb8 --- /dev/null +++ b/mail/server.py @@ -0,0 +1,192 @@ +"""MCP Mail Server — searches and reads IMAP backup emails.""" + +import os +import sys +import contextlib +import mailbox +from email.header import decode_header +from pathlib import Path +from typing import Annotated + +from pydantic import Field +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware + +from common import load_config as _lc +MAIL_ROOTS = _lc()["mail_roots"] + +mcp = FastMCP("Mail", stateless_http=True, + transport_security={"enable_dns_rebinding_protection": False}) + + +def _decode_hdr(raw): + if not raw: + return "" + parts = decode_header(str(raw)) + decoded = [] + for data, charset in parts: + if isinstance(data, bytes): + decoded.append(data.decode(charset or "utf-8", errors="replace")) + else: + decoded.append(str(data)) + return " ".join(decoded) + + +def _get_body(msg): + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + payload = part.get_payload(decode=True) + if payload: + return payload.decode(part.get_content_charset() or "utf-8", errors="replace") + for part in msg.walk(): + if part.get_content_type() == "text/html": + payload = part.get_payload(decode=True) + if payload: + return "[HTML] " + payload.decode(part.get_content_charset() or "utf-8", errors="replace") + else: + payload = msg.get_payload(decode=True) + if payload: + return payload.decode(msg.get_content_charset() or "utf-8", errors="replace") + return "" + + +def _discover_accounts(user): + root = MAIL_ROOTS.get(user) + if not root or not os.path.isdir(root): + return {} + return {a: os.path.join(root, a) for a in sorted(os.listdir(root)) if os.path.isdir(os.path.join(root, a))} + + +def _discover_folders(acct_path): + folders = [] + for entry in sorted(Path(acct_path).rglob("cur")): + rel = str(entry.parent.relative_to(acct_path)) + if rel == ".": + folders.insert(0, "INBOX") + elif rel not in folders: + folders.append(rel) + return folders + + +def _open_folder(acct_path, folder_name): + path = acct_path if folder_name == "INBOX" else os.path.join(acct_path, folder_name) + return mailbox.Maildir(path, create=False) if os.path.isdir(path) else None + + +@mcp.tool() +def list_accounts() -> str: + """List all email accounts with their folder count. Call this first to see available accounts.""" + user = get_current_user() + if not user: + return "Error: not authenticated" + accounts = _discover_accounts(user) + if not accounts: + return "No mail accounts found" + return "\n".join(f"{name}: {len(_discover_folders(path))} folders" for name, path in accounts.items()) + + +@mcp.tool() +def list_folders( + account: Annotated[str, Field(description="Account name from list_accounts, e.g. 'd370128_0-slohmaier' or 'gmail'")] +) -> str: + """List all IMAP folders in a mail account (INBOX, Sent, Archive, etc.).""" + user = get_current_user() + if not user: + return "Error: not authenticated" + acct_path = _discover_accounts(user).get(account) + if not acct_path: + return f"Account not found: {account}. Use list_accounts to see available accounts." + folders = _discover_folders(acct_path) + return "\n".join(folders) if folders else "No folders" + + +@mcp.tool() +def search_mail( + query: Annotated[str, Field(description="Search term — matches subject, from, to, and body. Example: 'Rechnung', 'Amazon', 'meeting'")], + account: Annotated[str, Field(description="Filter by account name. Leave empty to search all accounts.")] = "", + folder: Annotated[str, Field(description="Filter by folder name, e.g. 'INBOX', 'Sent'. Leave empty for all folders.")] = "", + limit: Annotated[int, Field(description="Maximum number of results to return")] = 20, +) -> str: + """Search emails by keyword. Returns date, sender, recipient, subject, and a location key for reading the full email with read_mail.""" + user = get_current_user() + if not user: + return "Error: not authenticated" + query_lower = query.lower() + results = [] + for acct_name, acct_path in _discover_accounts(user).items(): + if account and account not in acct_name: + continue + for fld in _discover_folders(acct_path): + if folder and folder.lower() not in fld.lower(): + continue + md = _open_folder(acct_path, fld) + if not md: + continue + for key, msg in md.items(): + subj = _decode_hdr(msg.get("Subject", "")) + frm = _decode_hdr(msg.get("From", "")) + to = _decode_hdr(msg.get("To", "")) + date_str = msg.get("Date", "") + if query_lower not in f"{subj} {frm} {to}".lower(): + body = _get_body(msg) + if query_lower not in body.lower(): + continue + results.append(f"[{date_str}] {frm} -> {to}\n Subject: {subj}\n Account: {acct_name}, Folder: {fld}, Key: {key}") + if len(results) >= limit: + return "\n\n".join(results) + return "\n\n".join(results) if results else "No results found" + + +@mcp.tool() +def read_mail( + account: Annotated[str, Field(description="Account name from search results, e.g. 'd370128_0-slohmaier'")], + folder: Annotated[str, Field(description="Folder name from search results, e.g. 'INBOX'")], + key: Annotated[str, Field(description="Message key from search results")], +) -> str: + """Read the full content of a single email. Use the account, folder, and key values from search_mail results.""" + user = get_current_user() + if not user: + return "Error: not authenticated" + acct_path = _discover_accounts(user).get(account) + if not acct_path: + return f"Account not found: {account}" + md = _open_folder(acct_path, folder) + if not md: + return f"Folder not found: {folder}" + msg = md.get(key) + if not msg: + return f"Message not found: {key}" + return ( + f"From: {_decode_hdr(msg.get('From', ''))}\n" + f"To: {_decode_hdr(msg.get('To', ''))}\n" + f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n" + f"Date: {msg.get('Date', '')}\n" + f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n" + f"{_get_body(msg)[:50000]}" + ) + + +def create_app(): + from contextlib import asynccontextmanager + mcp_app = mcp.streamable_http_app() + + @asynccontextmanager + async def lifespan(app): + async with contextlib.AsyncExitStack() as stack: + await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) + yield + + routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] + app = Starlette(routes=routes, lifespan=lifespan) + app.add_middleware(BearerAuthMiddleware) + return app + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(create_app(), host="127.0.0.1", port=5100) diff --git a/notes/server.py b/notes/server.py new file mode 100644 index 0000000..c9f62d5 --- /dev/null +++ b/notes/server.py @@ -0,0 +1,121 @@ +"""MCP Notes Server — reads and writes notes via Joplin API.""" + +import os, sys, contextlib +from typing import Annotated + +import httpx +from pydantic import Field +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware + +from common import load_config as _lc +_cfg = _lc() +JOPLIN = _cfg['joplin_url'] +JOPLIN_TOKENS = _cfg['joplin_tokens'] + +mcp = FastMCP("Notes", stateless_http=True, + transport_security={"enable_dns_rebinding_protection": False}) + +def _api(user, endpoint, method="GET", json_data=None): + token = JOPLIN_TOKENS.get(user) + if not token: return None, "Kein Joplin API-Token konfiguriert. Bitte in Joplin-App generieren." + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Origin": "https://notes.home.slohmaier.de"} + try: + r = getattr(httpx, method.lower())(f"{JOPLIN}{endpoint}", headers=headers, json=json_data, timeout=15) + if r.status_code >= 400: return None, f"Joplin-Fehler: HTTP {r.status_code}" + return (r.json() if r.text else {}), None + except Exception as e: return None, str(e) + + +@mcp.tool() +def list_notebooks() -> str: + """List all Joplin notebooks (folders). Call this first to see available notebooks.""" + user = get_current_user() + if not user: return "Error: not authenticated" + data, err = _api(user, "/api/folders") + if err: return f"Fehler: {err}" + return "\n".join(f"{nb['title']} (ID: {nb['id']})" for nb in data.get("items", [])) or "Keine Notizbuecher" + + +@mcp.tool() +def list_notes( + notebook: Annotated[str, Field(description="Notebook name or ID from list_notebooks. Leave empty for all notes.")] = "", + limit: Annotated[int, Field(description="Maximum number of notes to return")] = 50, +) -> str: + """List notes, optionally filtered by notebook. Shows title and ID. Use read_note with the ID to get content.""" + user = get_current_user() + if not user: return "Error: not authenticated" + if notebook: + data, err = _api(user, "/api/folders") + if err: return f"Fehler: {err}" + nb_id = next((nb["id"] for nb in data.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None) + if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden" + data, err = _api(user, f"/api/folders/{nb_id}/notes?limit={limit}") + else: + data, err = _api(user, f"/api/notes?limit={limit}&order_by=updated_time&order_dir=DESC") + if err: return f"Fehler: {err}" + return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Notizen" + + +@mcp.tool() +def read_note( + note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes results")], +) -> str: + """Read the full content of a note (Markdown format).""" + user = get_current_user() + if not user: return "Error: not authenticated" + data, err = _api(user, f"/api/notes/{note_id}?fields=id,title,body") + if err: return f"Fehler: {err}" + return f"# {data.get('title','')}\n\n{data.get('body','')}" + + +@mcp.tool() +def search_notes( + query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Passwort', 'Rezept'")], +) -> str: + """Search notes by keyword. Returns matching note titles and IDs.""" + user = get_current_user() + if not user: return "Error: not authenticated" + data, err = _api(user, f"/api/search?query={query}&limit=20") + if err: return f"Fehler: {err}" + return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Treffer" + + +@mcp.tool() +def create_note( + notebook: Annotated[str, Field(description="Notebook name or ID where the note should be created")], + title: Annotated[str, Field(description="Note title")], + body: Annotated[str, Field(description="Note content in Markdown format")], +) -> str: + """Create a new note in the specified notebook.""" + user = get_current_user() + if not user: return "Error: not authenticated" + folders, err = _api(user, "/api/folders") + if err: return f"Fehler: {err}" + nb_id = next((nb["id"] for nb in folders.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None) + if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden" + data, err = _api(user, "/api/notes", "POST", {"title": title, "body": body, "parent_id": nb_id}) + if err: return f"Fehler: {err}" + return f"Notiz erstellt: {title}" + + +def create_app(): + from contextlib import asynccontextmanager + mcp_app = mcp.streamable_http_app() + @asynccontextmanager + async def lifespan(app): + async with contextlib.AsyncExitStack() as stack: + await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) + yield + routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] + app = Starlette(routes=routes, lifespan=lifespan) + app.add_middleware(BearerAuthMiddleware) + return app + +if __name__ == "__main__": + import uvicorn + uvicorn.run(create_app(), host="127.0.0.1", port=5104) diff --git a/tokens.json.example b/tokens.json.example new file mode 100644 index 0000000..89ecb1c --- /dev/null +++ b/tokens.json.example @@ -0,0 +1,10 @@ +{ + "stefan": { + "token": "REPLACE_WITH_RANDOM_SECRET", + "label": "Stefan" + }, + "kati": { + "token": "REPLACE_WITH_RANDOM_SECRET", + "label": "Kati" + } +}