Initial commit: 5 MCP servers for Mail, Calendar, Contacts, Files, Notes

Self-hosted MCP servers with OAuth client_credentials auth.
Each server connects to a different backend:
- Mail: reads Maildir IMAP backups
- Calendar/Tasks: CalDAV against Radicale
- Contacts: CardDAV against Radicale
- Files: WebDAV against oCIS
- Notes: Joplin REST API

Credentials externalized to config.json (not in repo).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Stefan Lohmaier
2026-06-12 06:22:42 +02:00
commit fb642e47c8
11 changed files with 1199 additions and 0 deletions
+5
View File
@@ -0,0 +1,5 @@
venv/
__pycache__/
*.pyc
tokens.json
config.json
+69
View File
@@ -0,0 +1,69 @@
# mcp-home
Self-hosted MCP (Model Context Protocol) servers for Claude. Provides access to personal data services via remote HTTP endpoints with OAuth client_credentials auth.
## Architecture
5 independent MCP servers, each on its own port, behind nginx reverse proxy at `mcp.home.slohmaier.de`:
- **Mail** (Port 5100) — searches IMAP backup Maildirs
- **Calendar** (Port 5101) — CalDAV against Radicale (events + tasks/reminders)
- **Contacts** (Port 5102) — CardDAV against Radicale
- **Files** (Port 5103) — WebDAV against oCIS
- **Notes** (Port 5104) — Joplin REST API
All servers share `common.py` for OAuth token handling and user resolution.
## Auth Flow
OAuth `client_credentials` grant — no browser redirect needed:
1. Client POSTs to `/<service>/token` with `client_id` + `client_secret`
2. Server returns `access_token` (valid 24h)
3. Client sends `Authorization: Bearer <token>` with MCP requests
4. User is identified by `client_id` (matches `tokens.json`)
OAuth metadata at `/<service>/.well-known/oauth-authorization-server`.
## User Separation
`tokens.json` (not in git, chmod 600) maps client_id → user. Each user only sees their own data. Shared CalDAV calendars are visible to both users.
## Files
- `common.py` — OAuth endpoints, Bearer middleware, user resolution via contextvars
- `tokens.json` — client_id/secret per user (generate with `python3 -c "import secrets; print(secrets.token_urlsafe(48))"`)
- `mail/server.py` — Maildir reader, search across subject/from/to/body
- `calendar/server.py` — CalDAV REPORT queries, event/task CRUD, vobject serialization
- `contacts/server.py` — CardDAV addressbook-query, vCard parsing, contact CRUD
- `files/server.py` — WebDAV PROPFIND/GET, recursive search, text file reading
- `notes/server.py` — Joplin REST API wrapper (needs per-user API token in code)
## Dependencies
```
pip install mcp[cli] httpx vobject python-dateutil
```
Python 3.12+, venv at `./venv/`.
## Deployment
Each server runs as systemd unit (`mcp-<name>.service`), managed by uvicorn. nginx at `mcp.home.slohmaier.de` proxies `/<service>/` to the right port. TLS via Certbot.
## Adding a New Service
1. Create `<name>/server.py` with `FastMCP` instance + `create_app()` using the same pattern
2. Import `get_current_user`, `OAUTH_ROUTES`, `BearerAuthMiddleware` from `common.py`
3. Add nginx location block for `/<name>/`
4. Create systemd unit `mcp-<name>.service`
5. Update `tokens.json` if the new service needs different user mapping
## Tool Design Guidelines
- Every tool docstring is the first thing Claude sees — make it actionable ("Call this first", "Use the UID from search results")
- Use `Annotated[str, Field(description="...")]` with example values for every parameter
- Indicate which parameters are optional ("Leave empty for all")
- Reference other tools in descriptions ("Use get_contact with the UID for full details")
- Return structured text, not JSON — Claude parses text faster
- Include UIDs/keys in output so Claude can chain to detail tools without asking the user
+32
View File
@@ -0,0 +1,32 @@
# mcp-home
Self-hosted [MCP](https://modelcontextprotocol.io) servers for Claude. Gives Claude access to your email, calendar, contacts, files, and notes — all running on your own hardware.
## Services
- **Mail** — search and read emails from IMAP backups (Maildir format)
- **Calendar + Tasks** — read/write events and reminders via CalDAV (Radicale)
- **Contacts** — search/read/write contacts via CardDAV (Radicale)
- **Files** — browse and read files via WebDAV (oCIS)
- **Notes** — search/read/write notes via Joplin API
## Setup
```bash
python3 -m venv venv
venv/bin/pip install mcp[cli] httpx vobject python-dateutil
```
Copy `tokens.json.example` to `tokens.json` and set client secrets.
## Usage with claude.ai
Add as Custom MCP Server in claude.ai Settings → Integrations:
- **URL**: `https://your-domain/mail/mcp` (or calendar, contacts, files, notes)
- **OAuth Client ID**: your username
- **OAuth Client Secret**: your secret from tokens.json
## License
MIT
+263
View File
@@ -0,0 +1,263 @@
"""MCP Calendar + Tasks Server — reads and writes calendars and tasks via CalDAV."""
import os, sys, re, contextlib, uuid
from datetime import datetime, timedelta
from typing import Annotated
import httpx, vobject
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
RADICALE = _cfg['radicale_url']
CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()}
CAL_PATHS = _cfg['calendar_paths']
mcp = FastMCP("Calendar", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _auth(user):
c = CREDS.get(user)
return httpx.BasicAuth(c[0], c[1]) if c else None
def _propfind(url, auth, depth=1):
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:" xmlns:cs="urn:ietf:params:xml:ns:caldav"><d:prop><d:resourcetype/><d:displayname/><cs:supported-calendar-component-set/></d:prop></d:propfind>'
return httpx.request("PROPFIND", url, content=body, auth=auth, headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30).text
def _discover(user, comp):
auth = _auth(user)
cols = []
for base in CAL_PATHS.get(user, []):
xml = _propfind(RADICALE + base, auth)
for m in re.finditer(r'<d:href>([^<]+)</d:href>', xml):
href = m.group(1)
if href.rstrip("/") == base.rstrip("/"): continue
block = xml[m.start():xml.find("</d:response>", m.end())]
if comp not in block: continue
nm = re.search(r'<d:displayname>([^<]*)</d:displayname>', block)
cols.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href})
return cols
def _report(href, auth, start, end):
body = f'<?xml version="1.0"?><c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav"><d:prop><d:getetag/><c:calendar-data/></d:prop><c:filter><c:comp-filter name="VCALENDAR"><c:comp-filter name="VEVENT"><c:time-range start="{start}" end="{end}"/></c:comp-filter></c:comp-filter></c:filter></c:calendar-query>'
return httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30).text
def _report_tasks(href, auth, inc=False):
filt = '<c:comp-filter name="VTODO"/>' if inc else '<c:comp-filter name="VTODO"><c:prop-filter name="STATUS"><c:text-match negate-condition="yes">COMPLETED</c:text-match></c:prop-filter></c:comp-filter>'
body = f'<?xml version="1.0"?><c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav"><d:prop><d:getetag/><c:calendar-data/></d:prop><c:filter><c:comp-filter name="VCALENDAR">{filt}</c:comp-filter></c:filter></c:calendar-query>'
return httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30).text
def _parse(xml, comp="VEVENT"):
objs = []
for m in re.finditer(r'<c:calendar-data[^>]*>(.*?)</c:calendar-data>', xml, re.DOTALL):
raw = m.group(1).replace("&lt;","<").replace("&gt;",">").replace("&amp;","&")
try:
for c in vobject.readOne(raw).components():
if c.name == comp: objs.append(c)
except: pass
return objs
def _fmt_ev(ev, cal=""):
s = getattr(ev, 'dtstart', None)
e = getattr(ev, 'dtend', None)
summary = str(ev.summary.value) if hasattr(ev, 'summary') else '(kein Titel)'
start = s.value.isoformat() if s and hasattr(s.value, 'isoformat') else str(s.value) if s else '?'
end_s = e.value.isoformat() if e and hasattr(e.value, 'isoformat') else ''
loc = str(ev.location.value) if hasattr(ev, 'location') and ev.location.value else ''
desc = str(ev.description.value)[:300] if hasattr(ev, 'description') and ev.description.value else ''
uid = str(ev.uid.value) if hasattr(ev, 'uid') else ''
status = str(ev.status.value) if hasattr(ev, 'status') else ''
attendees = []
if hasattr(ev, 'attendee_list'):
for att in ev.attendee_list:
cn = att.params.get("CN", [""])[0] if hasattr(att, 'params') and att.params else ""
ps = att.params.get("PARTSTAT", [""])[0] if hasattr(att, 'params') and att.params else ""
attendees.append(f"{cn} ({ps})" if cn else str(att.value))
rrule = str(ev.rrule.value) if hasattr(ev, 'rrule') else ''
parts = [f"[{cal}] {summary}" if cal else summary]
parts.append(f" Wann: {start}" + (f" bis {end_s}" if end_s else ""))
if loc: parts.append(f" Ort: {loc}")
if desc: parts.append(f" Beschreibung: {desc}")
if status: parts.append(f" Status: {status}")
if attendees: parts.append(f" Teilnehmer: {', '.join(attendees[:8])}")
if rrule: parts.append(f" Wiederholung: {rrule}")
parts.append(f" UID: {uid}")
return "\n".join(parts)
def _fmt_task(t, lst=""):
summary = str(t.summary.value) if hasattr(t, 'summary') else '(kein Titel)'
status = str(t.status.value) if hasattr(t, 'status') else 'NEEDS-ACTION'
due = t.due.value.isoformat() if hasattr(t, 'due') and hasattr(t.due.value, 'isoformat') else str(t.due.value) if hasattr(t, 'due') else ''
pri = int(t.priority.value) if hasattr(t, 'priority') else 0
uid = str(t.uid.value) if hasattr(t, 'uid') else ''
desc = str(t.description.value)[:200] if hasattr(t, 'description') and t.description.value else ''
chk = '[x]' if status == 'COMPLETED' else '[ ]'
parts = [f"[{lst}] {chk} {summary}" if lst else f"{chk} {summary}"]
if due: parts.append(f" Faellig: {due}")
if pri: parts.append(f" Prioritaet: {pri} (1=hoechste, 9=niedrigste)")
if desc: parts.append(f" Notiz: {desc}")
parts.append(f" UID: {uid}")
return "\n".join(parts)
@mcp.tool()
def list_calendars() -> str:
"""List all calendars available to the user (personal + shared). Call this first to see calendar names."""
user = get_current_user()
if not user: return "Error: not authenticated"
cals = _discover(user, "VEVENT")
return "\n".join(f"{c['name']}" for c in cals) if cals else "Keine Kalender gefunden"
@mcp.tool()
def get_events(
calendar: Annotated[str, Field(description="Calendar name from list_calendars, e.g. 'Stefan', 'Arbeit', 'Familie'. Use partial match.")],
date_from: Annotated[str, Field(description="Start date as YYYY-MM-DD, e.g. '2026-06-12'")],
date_to: Annotated[str, Field(description="End date as YYYY-MM-DD, e.g. '2026-06-19'")],
) -> str:
"""Get all events from a calendar within a date range. Returns title, time, location, attendees, and UID."""
user = get_current_user()
if not user: return "Error: not authenticated"
cals = [c for c in _discover(user, "VEVENT") if calendar.lower() in c["name"].lower()]
if not cals: return f"Kalender '{calendar}' nicht gefunden. Nutze list_calendars."
s = datetime.strptime(date_from, "%Y-%m-%d").strftime("%Y%m%dT000000Z")
e = datetime.strptime(date_to, "%Y-%m-%d").strftime("%Y%m%dT235959Z")
results = []
for cal in cals:
for ev in _parse(_report(cal["href"], _auth(user), s, e)):
results.append(_fmt_ev(ev, cal["name"]))
return "\n\n".join(results) if results else "Keine Termine in diesem Zeitraum"
@mcp.tool()
def search_events(
query: Annotated[str, Field(description="Search term — matches title, description, location. Example: 'Zahnarzt', 'Meeting', 'Berlin'")],
date_from: Annotated[str, Field(description="Start date YYYY-MM-DD. Default: 90 days ago")] = "",
date_to: Annotated[str, Field(description="End date YYYY-MM-DD. Default: 1 year ahead")] = "",
) -> str:
"""Search events by keyword across all calendars. Returns matching events with details."""
user = get_current_user()
if not user: return "Error: not authenticated"
if not date_from: date_from = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
if not date_to: date_to = (datetime.now() + timedelta(days=365)).strftime("%Y-%m-%d")
s = datetime.strptime(date_from, "%Y-%m-%d").strftime("%Y%m%dT000000Z")
e = datetime.strptime(date_to, "%Y-%m-%d").strftime("%Y%m%dT235959Z")
q = query.lower()
results = []
for cal in _discover(user, "VEVENT"):
for ev in _parse(_report(cal["href"], _auth(user), s, e)):
txt = f"{ev.summary.value if hasattr(ev,'summary') else ''} {ev.description.value if hasattr(ev,'description') else ''} {ev.location.value if hasattr(ev,'location') else ''}".lower()
if q in txt:
results.append(_fmt_ev(ev, cal["name"]))
return "\n\n".join(results[:30]) if results else "Keine Treffer"
@mcp.tool()
def list_task_lists() -> str:
"""List all task lists (Erinnerungen/Reminders). Call this first before get_tasks."""
user = get_current_user()
if not user: return "Error: not authenticated"
lists = _discover(user, "VTODO")
return "\n".join(f"{l['name']}" for l in lists) if lists else "Keine Aufgabenlisten"
@mcp.tool()
def get_tasks(
task_list: Annotated[str, Field(description="Task list name from list_task_lists, e.g. 'Erinnerungen'. Leave empty for all lists.")] = "",
include_completed: Annotated[bool, Field(description="Also show completed tasks")] = False,
) -> str:
"""Get tasks/reminders from a task list. Shows title, due date, priority, and status."""
user = get_current_user()
if not user: return "Error: not authenticated"
lists = _discover(user, "VTODO")
if task_list: lists = [l for l in lists if task_list.lower() in l["name"].lower()]
results = []
for lst in lists:
for t in _parse(_report_tasks(lst["href"], _auth(user), include_completed), "VTODO"):
results.append(_fmt_task(t, lst["name"]))
return "\n\n".join(results) if results else "Keine Aufgaben"
@mcp.tool()
def create_event(
calendar: Annotated[str, Field(description="Calendar name, e.g. 'Stefan', 'Arbeit', 'Familie'")],
title: Annotated[str, Field(description="Event title")],
start: Annotated[str, Field(description="Start: YYYY-MM-DD for all-day, or YYYY-MM-DDTHH:MM for timed events, e.g. '2026-06-15T14:00'")],
end: Annotated[str, Field(description="End: YYYY-MM-DD for all-day, or YYYY-MM-DDTHH:MM, e.g. '2026-06-15T15:30'")],
location: Annotated[str, Field(description="Location/address")] = "",
description: Annotated[str, Field(description="Event description or notes")] = "",
allday: Annotated[bool, Field(description="True for all-day events (then use YYYY-MM-DD for start/end)")] = False,
) -> str:
"""Create a new calendar event. For all-day events, end date is exclusive (event on June 15 → start=2026-06-15, end=2026-06-16)."""
user = get_current_user()
if not user: return "Error: not authenticated"
cals = [c for c in _discover(user, "VEVENT") if calendar.lower() in c["name"].lower()]
if not cals: return f"Kalender '{calendar}' nicht gefunden"
uid = str(uuid.uuid4())
cal = vobject.iCalendar()
ev = cal.add("vevent")
ev.add("uid").value = uid
ev.add("summary").value = title
if allday:
ev.add("dtstart").value = datetime.strptime(start, "%Y-%m-%d").date()
ev.add("dtend").value = datetime.strptime(end, "%Y-%m-%d").date()
else:
ev.add("dtstart").value = datetime.fromisoformat(start)
ev.add("dtend").value = datetime.fromisoformat(end)
if location: ev.add("location").value = location
if description: ev.add("description").value = description
ev.add("dtstamp").value = datetime.utcnow()
r = httpx.put(RADICALE + cals[0]["href"] + uid + ".ics", content=cal.serialize(), auth=_auth(user), headers={"Content-Type": "text/calendar"}, timeout=15)
return f"Termin erstellt: {title} am {start}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}"
@mcp.tool()
def create_task(
task_list: Annotated[str, Field(description="Task list name, e.g. 'Erinnerungen', 'Langzeiterinnerungen'")],
title: Annotated[str, Field(description="Task title")],
due: Annotated[str, Field(description="Due date: YYYY-MM-DD or YYYY-MM-DDTHH:MM. Leave empty for no due date.")] = "",
priority: Annotated[int, Field(description="Priority 1-9 (1=highest, 9=lowest, 0=none)")] = 0,
description: Annotated[str, Field(description="Task notes")] = "",
) -> str:
"""Create a new task/reminder in the specified list."""
user = get_current_user()
if not user: return "Error: not authenticated"
lists = [l for l in _discover(user, "VTODO") if task_list.lower() in l["name"].lower()]
if not lists: return f"Liste '{task_list}' nicht gefunden"
uid = str(uuid.uuid4())
cal = vobject.iCalendar()
todo = cal.add("vtodo")
todo.add("uid").value = uid
todo.add("summary").value = title
todo.add("status").value = "NEEDS-ACTION"
if due: todo.add("due").value = datetime.fromisoformat(due) if "T" in due else datetime.strptime(due, "%Y-%m-%d").date()
if priority: todo.add("priority").value = str(priority)
if description: todo.add("description").value = description
todo.add("dtstamp").value = datetime.utcnow()
r = httpx.put(RADICALE + lists[0]["href"] + uid + ".ics", content=cal.serialize(), auth=_auth(user), headers={"Content-Type": "text/calendar"}, timeout=15)
return f"Aufgabe erstellt: {title}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5101)
+146
View File
@@ -0,0 +1,146 @@
"""Shared OAuth + user resolution for all MCP servers.
OAuth client_credentials flow:
1. claude.ai discovers /.well-known/oauth-authorization-server
2. claude.ai POSTs to /token with client_id + client_secret
3. Server returns access_token
4. claude.ai sends Bearer access_token with MCP requests
5. Server resolves user from token
"""
import json
import os
import secrets
import contextvars
import time
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette.middleware.base import BaseHTTPMiddleware
BASE_DIR = os.path.dirname(__file__)
TOKENS_FILE = os.path.join(BASE_DIR, "tokens.json")
CONFIG_FILE = os.path.join(BASE_DIR, "config.json")
VALID_USERS = ["stefan", "kati"]
_config_cache = None
def load_config():
global _config_cache
if _config_cache is None:
with open(CONFIG_FILE) as f:
_config_cache = json.load(f)
return _config_cache
_tokens_cache = None
_current_user: contextvars.ContextVar[str | None] = contextvars.ContextVar("current_user", default=None)
_access_tokens: dict[str, dict] = {}
def _load_tokens():
global _tokens_cache
if _tokens_cache is None:
with open(TOKENS_FILE) as f:
_tokens_cache = json.load(f)
return _tokens_cache
def get_current_user() -> str | None:
return _current_user.get()
def get_user_key(username: str) -> str:
return _load_tokens().get(username, {}).get("token", "")
def _resolve_client(client_id, client_secret):
tokens = _load_tokens()
for username, data in tokens.items():
if username == client_id and data["token"] == client_secret:
return username
return None
def _resolve_access_token(token):
info = _access_tokens.get(token)
if not info:
return None
if info.get("expires_at", 0) < time.time():
del _access_tokens[token]
return None
return info["user"]
async def oauth_metadata(request: Request):
base = str(request.base_url).rstrip("/")
return JSONResponse({
"issuer": base,
"token_endpoint": base + "/token",
"response_types_supported": ["token"],
"grant_types_supported": ["client_credentials"],
"token_endpoint_auth_methods_supported": ["client_secret_post"],
})
async def oauth_token(request: Request):
try:
form = await request.form()
grant_type = form.get("grant_type", "")
client_id = form.get("client_id", "")
client_secret = form.get("client_secret", "")
except Exception:
body = await request.body()
try:
data = json.loads(body)
grant_type = data.get("grant_type", "")
client_id = data.get("client_id", "")
client_secret = data.get("client_secret", "")
except Exception:
return JSONResponse({"error": "invalid_request"}, status_code=400)
if grant_type != "client_credentials":
return JSONResponse({"error": "unsupported_grant_type"}, status_code=400)
user = _resolve_client(client_id, client_secret)
if not user:
return JSONResponse({"error": "invalid_client"}, status_code=401)
access_token = secrets.token_urlsafe(48)
expires_in = 86400
_access_tokens[access_token] = {"user": user, "expires_at": time.time() + expires_in}
return JSONResponse({
"access_token": access_token,
"token_type": "bearer",
"expires_in": expires_in,
})
OAUTH_ROUTES = [
Route("/.well-known/oauth-authorization-server", oauth_metadata, methods=["GET"]),
Route("/token", oauth_token, methods=["POST"]),
]
class BearerAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
path = request.url.path
if path.endswith("/token") or "/.well-known/" in path:
return await call_next(request)
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]
user = _resolve_access_token(token)
if user:
tok = _current_user.set(user)
try:
return await call_next(request)
finally:
_current_user.reset(tok)
return JSONResponse(
{"error": "unauthorized"},
status_code=401,
headers={"WWW-Authenticate": 'Bearer resource_metadata="/.well-known/oauth-authorization-server"'},
)
+51
View File
@@ -0,0 +1,51 @@
{
"radicale_url": "http://127.0.0.1:5232",
"radicale_users": {
"stefan": {
"username": "stefan",
"password": "CHANGE_ME"
},
"kati": {
"username": "kati",
"password": "CHANGE_ME"
}
},
"ocis_url": "http://127.0.0.1:9200",
"ocis_users": {
"stefan": {
"username": "stefan",
"password": "CHANGE_ME"
},
"kati": {
"username": "kati",
"password": "CHANGE_ME"
}
},
"joplin_url": "http://127.0.0.1:22300",
"joplin_tokens": {
"stefan": "",
"kati": ""
},
"mail_roots": {
"stefan": "/mnt/ssd/Backup/stefan/imap",
"kati": "/mnt/ssd/Backup/kati/imap"
},
"calendar_paths": {
"stefan": [
"/stefan/",
"/shared/"
],
"kati": [
"/kati/",
"/shared/"
]
},
"addressbook_paths": {
"stefan": [
"/stefan/"
],
"kati": [
"/kati/"
]
}
}
+166
View File
@@ -0,0 +1,166 @@
"""MCP Contacts Server — reads and writes contacts via CardDAV."""
import os, sys, re, contextlib, uuid
from typing import Annotated
import httpx, vobject
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
RADICALE = _cfg['radicale_url']
CREDS = {u: (d['username'], d['password']) for u, d in _cfg['radicale_users'].items()}
AB_PATHS = _cfg['addressbook_paths']
mcp = FastMCP("Contacts", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _auth(u): c = CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None
def _discover_ab(user):
books = []
for base in AB_PATHS.get(user, []):
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:displayname/></d:prop></d:propfind>'
r = httpx.request("PROPFIND", RADICALE + base, content=body, auth=_auth(user), headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=30)
for m in re.finditer(r'<d:href>([^<]+)</d:href>', r.text):
href = m.group(1)
if href.rstrip("/") == base.rstrip("/"): continue
block = r.text[m.start():r.text.find("</d:response>", m.end())]
if "addressbook" not in block.lower(): continue
nm = re.search(r'<d:displayname>([^<]*)</d:displayname>', block)
books.append({"name": nm.group(1) if nm else href.split("/")[-2], "href": href})
return books
def _get_contacts(href, auth):
body = '<?xml version="1.0"?><c:addressbook-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav"><d:prop><d:getetag/><c:address-data/></d:prop></c:addressbook-query>'
r = httpx.request("REPORT", RADICALE + href, content=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=60)
contacts = []
for m in re.finditer(r'<c:address-data[^>]*>(.*?)</c:address-data>', r.text, re.DOTALL):
raw = m.group(1).replace("&lt;","<").replace("&gt;",">").replace("&amp;","&")
try: contacts.append(vobject.readOne(raw))
except: pass
return contacts
def _brief(c):
fn = str(c.fn.value) if hasattr(c, 'fn') else '?'
org = f" ({c.org.value[0]})" if hasattr(c, 'org') and c.org.value else ''
uid = str(c.uid.value) if hasattr(c, 'uid') else ''
parts = [f"{fn}{org}"]
for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])):
parts.append(f" Email: {em.value}")
for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])):
parts.append(f" Tel: {tel.value}")
parts.append(f" UID: {uid}")
return "\n".join(parts)
def _full(c):
parts = [str(c.fn.value) if hasattr(c, 'fn') else '?']
if hasattr(c, 'n'):
n = c.n.value
if n.prefix: parts.append(f" Anrede: {n.prefix}")
parts.append(f" Vorname: {n.given}")
parts.append(f" Nachname: {n.family}")
if n.suffix: parts.append(f" Suffix: {n.suffix}")
if hasattr(c, 'nickname'): parts.append(f" Spitzname: {c.nickname.value}")
for em in (c.email_list if hasattr(c, 'email_list') else ([c.email] if hasattr(c, 'email') else [])):
typ = em.params.get("TYPE", [""])[0] if hasattr(em, 'params') and em.params else ""
parts.append(f" Email ({typ or 'sonstig'}): {em.value}")
for tel in (c.tel_list if hasattr(c, 'tel_list') else ([c.tel] if hasattr(c, 'tel') else [])):
typ = tel.params.get("TYPE", [""])[0] if hasattr(tel, 'params') and tel.params else ""
parts.append(f" Telefon ({typ or 'sonstig'}): {tel.value}")
for adr in (c.adr_list if hasattr(c, 'adr_list') else ([c.adr] if hasattr(c, 'adr') else [])):
a = adr.value
addr = ", ".join(filter(None, [a.street, a.city, a.region, a.code, a.country]))
if addr: parts.append(f" Adresse: {addr}")
if hasattr(c, 'org') and c.org.value: parts.append(f" Firma: {c.org.value[0]}")
if hasattr(c, 'title'): parts.append(f" Titel: {c.title.value}")
if hasattr(c, 'role'): parts.append(f" Rolle: {c.role.value}")
if hasattr(c, 'bday'): parts.append(f" Geburtstag: {c.bday.value}")
if hasattr(c, 'note') and c.note.value: parts.append(f" Notiz: {str(c.note.value)[:300]}")
if hasattr(c, 'url'): parts.append(f" URL: {c.url.value}")
if hasattr(c, 'uid'): parts.append(f" UID: {c.uid.value}")
return "\n".join(parts)
@mcp.tool()
def search_contacts(
query: Annotated[str, Field(description="Search term — matches name, email, phone, company, address, notes. Example: 'Lohmaier', 'BMW', '+49171'")],
limit: Annotated[int, Field(description="Maximum number of results")] = 30,
) -> str:
"""Search contacts by any field. Returns name, email, phone, and UID. Use get_contact with the UID for full details."""
user = get_current_user()
if not user: return "Error: not authenticated"
q = query.lower()
results = []
for book in _discover_ab(user):
for c in _get_contacts(book["href"], _auth(user)):
txt = " ".join(str(getattr(c, a, None) or '') for a in ['fn','email','tel','org','note','nickname']).lower()
if q in txt:
results.append(_brief(c))
if len(results) >= limit: break
return "\n\n".join(results) if results else "Keine Kontakte gefunden"
@mcp.tool()
def get_contact(
uid: Annotated[str, Field(description="Contact UID from search_contacts results")],
) -> str:
"""Get full details of a contact by UID: name, all emails, all phones, addresses, company, birthday, notes."""
user = get_current_user()
if not user: return "Error: not authenticated"
for book in _discover_ab(user):
for c in _get_contacts(book["href"], _auth(user)):
if hasattr(c, 'uid') and str(c.uid.value) == uid:
return _full(c)
return f"Kontakt nicht gefunden: {uid}"
@mcp.tool()
def create_contact(
name: Annotated[str, Field(description="Full name, e.g. 'Max Mustermann'")],
email: Annotated[str, Field(description="Email address")] = "",
phone: Annotated[str, Field(description="Phone number, e.g. '+49 171 1234567'")] = "",
organization: Annotated[str, Field(description="Company/organization name")] = "",
note: Annotated[str, Field(description="Notes about this contact")] = "",
) -> str:
"""Create a new contact in the default addressbook."""
user = get_current_user()
if not user: return "Error: not authenticated"
books = _discover_ab(user)
if not books: return "Kein Adressbuch gefunden"
uid_val = str(uuid.uuid4())
card = vobject.vCard()
card.add("uid").value = uid_val
card.add("fn").value = name
p = name.split(" ", 1)
card.add("n").value = vobject.vcard.Name(family=p[-1] if len(p)>1 else p[0], given=p[0] if len(p)>1 else "")
if email: card.add("email").value = email
if phone: card.add("tel").value = phone
if organization: card.add("org").value = [organization]
if note: card.add("note").value = note
r = httpx.put(RADICALE + books[0]["href"] + uid_val + ".vcf", content=card.serialize(), auth=_auth(user), headers={"Content-Type": "text/vcard"}, timeout=15)
return f"Kontakt erstellt: {name}" if r.status_code in (201, 204) else f"Fehler: HTTP {r.status_code}"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5102)
+144
View File
@@ -0,0 +1,144 @@
"""MCP Files Server — browse and read files via WebDAV/oCIS."""
import os, sys, contextlib
from xml.etree import ElementTree as ET
from typing import Annotated
import httpx
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
OCIS = _cfg['ocis_url']
OCIS_CREDS = {u: (d['username'], d['password']) for u, d in _cfg['ocis_users'].items()}
mcp = FastMCP("Files", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _auth(u): c = OCIS_CREDS.get(u); return httpx.BasicAuth(c[0], c[1]) if c else None
def _dav(u, p=""): return f"{OCIS}/remote.php/dav/files/{u}/{p.lstrip('/')}"
def _propfind(user, path="", depth=1):
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:displayname/><d:getcontentlength/><d:getlastmodified/><d:getcontenttype/></d:prop></d:propfind>'
r = httpx.request("PROPFIND", _dav(user, path), content=body, auth=_auth(user), headers={"Depth": str(depth), "Content-Type": "application/xml"}, timeout=30)
return r.text, r.status_code
def _parse_pf(xml, user):
ns = {"d": "DAV:"}
entries = []
try: root = ET.fromstring(xml)
except: return entries
bp = f"/remote.php/dav/files/{user}/"
for resp in root.findall("d:response", ns):
href = resp.findtext("d:href", "", ns) or ""
rel = href.split(bp, 1)[-1].rstrip("/") if bp in href else href.rstrip("/")
props = resp.find(".//d:prop", ns)
if props is None: continue
entries.append({
"name": props.findtext("d:displayname", "", ns) or rel.split("/")[-1],
"path": "/" + rel if rel else "/",
"is_dir": props.find("d:resourcetype/d:collection", ns) is not None,
"size": int(props.findtext("d:getcontentlength", "0", ns) or 0),
"modified": props.findtext("d:getlastmodified", "", ns),
"type": props.findtext("d:getcontenttype", "", ns),
})
return entries
@mcp.tool()
def list_files(
path: Annotated[str, Field(description="Directory path, e.g. '/' for root, '/Documents', '/Photos/2026'")] = "/",
) -> str:
"""List files and subdirectories at the given path. Shows name, size, and modification date."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
lines = []
for e in entries[1:]:
if e["is_dir"]: lines.append(f"[DIR] {e['name']}/")
else: lines.append(f" {e['name']} ({e['size']:,} bytes, {e['modified']})")
return "\n".join(lines) if lines else "Leeres Verzeichnis"
@mcp.tool()
def read_file(
path: Annotated[str, Field(description="Full file path, e.g. '/Documents/notes.txt', '/config.yaml'")],
) -> str:
"""Read a text file's content (txt, md, json, csv, yaml, xml, html). Binary files show only metadata."""
user = get_current_user()
if not user: return "Error: not authenticated"
r = httpx.get(_dav(user, path), auth=_auth(user), timeout=30)
if r.status_code >= 400: return f"Fehler: HTTP {r.status_code}"
ct = r.headers.get("content-type", "")
if any(t in ct for t in ["text/", "json", "xml", "csv", "yaml", "javascript"]):
return r.text[:100000]
try: return r.content.decode("utf-8")[:100000]
except: return f"Binaerdatei ({len(r.content)} bytes, Typ: {ct}). Kann nicht angezeigt werden."
@mcp.tool()
def file_info(
path: Annotated[str, Field(description="File or directory path")],
) -> str:
"""Get metadata about a file (size, type, modification date) or directory."""
user = get_current_user()
if not user: return "Error: not authenticated"
xml, st = _propfind(user, path, 0)
if st >= 400: return f"Fehler: HTTP {st}"
entries = _parse_pf(xml, user)
if not entries: return "Nicht gefunden"
e = entries[0]
parts = [f"Name: {e['name']}", f"Pfad: {e['path']}", f"Typ: {'Verzeichnis' if e['is_dir'] else e['type']}"]
if not e["is_dir"]: parts.append(f"Groesse: {e['size']:,} bytes")
parts.append(f"Geaendert: {e['modified']}")
return "\n".join(parts)
@mcp.tool()
def search_files(
query: Annotated[str, Field(description="Search term — matches file names. Example: 'Rechnung', '.pdf', 'backup'")],
path: Annotated[str, Field(description="Start directory for search")] = "/",
) -> str:
"""Search for files by name recursively (up to 5 levels deep, max 50 results)."""
user = get_current_user()
if not user: return "Error: not authenticated"
q = query.lower()
results = []
def _s(p, d=0):
if d > 5 or len(results) >= 50: return
xml, _ = _propfind(user, p)
for e in _parse_pf(xml, user)[1:]:
if q in e["name"].lower(): results.append(e)
if e["is_dir"]: _s(e["path"], d+1)
_s(path)
lines = []
for e in results:
prefix = "[DIR]" if e["is_dir"] else f"({e['size']:,}b)"
lines.append(f"{prefix} {e['path']}")
return "\n".join(lines) if lines else "Keine Dateien gefunden"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5103)
+192
View File
@@ -0,0 +1,192 @@
"""MCP Mail Server — searches and reads IMAP backup emails."""
import os
import sys
import contextlib
import mailbox
from email.header import decode_header
from pathlib import Path
from typing import Annotated
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
MAIL_ROOTS = _lc()["mail_roots"]
mcp = FastMCP("Mail", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _decode_hdr(raw):
if not raw:
return ""
parts = decode_header(str(raw))
decoded = []
for data, charset in parts:
if isinstance(data, bytes):
decoded.append(data.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(str(data))
return " ".join(decoded)
def _get_body(msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
if payload:
return "[HTML] " + payload.decode(part.get_content_charset() or "utf-8", errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
return ""
def _discover_accounts(user):
root = MAIL_ROOTS.get(user)
if not root or not os.path.isdir(root):
return {}
return {a: os.path.join(root, a) for a in sorted(os.listdir(root)) if os.path.isdir(os.path.join(root, a))}
def _discover_folders(acct_path):
folders = []
for entry in sorted(Path(acct_path).rglob("cur")):
rel = str(entry.parent.relative_to(acct_path))
if rel == ".":
folders.insert(0, "INBOX")
elif rel not in folders:
folders.append(rel)
return folders
def _open_folder(acct_path, folder_name):
path = acct_path if folder_name == "INBOX" else os.path.join(acct_path, folder_name)
return mailbox.Maildir(path, create=False) if os.path.isdir(path) else None
@mcp.tool()
def list_accounts() -> str:
"""List all email accounts with their folder count. Call this first to see available accounts."""
user = get_current_user()
if not user:
return "Error: not authenticated"
accounts = _discover_accounts(user)
if not accounts:
return "No mail accounts found"
return "\n".join(f"{name}: {len(_discover_folders(path))} folders" for name, path in accounts.items())
@mcp.tool()
def list_folders(
account: Annotated[str, Field(description="Account name from list_accounts, e.g. 'd370128_0-slohmaier' or 'gmail'")]
) -> str:
"""List all IMAP folders in a mail account (INBOX, Sent, Archive, etc.)."""
user = get_current_user()
if not user:
return "Error: not authenticated"
acct_path = _discover_accounts(user).get(account)
if not acct_path:
return f"Account not found: {account}. Use list_accounts to see available accounts."
folders = _discover_folders(acct_path)
return "\n".join(folders) if folders else "No folders"
@mcp.tool()
def search_mail(
query: Annotated[str, Field(description="Search term — matches subject, from, to, and body. Example: 'Rechnung', 'Amazon', 'meeting'")],
account: Annotated[str, Field(description="Filter by account name. Leave empty to search all accounts.")] = "",
folder: Annotated[str, Field(description="Filter by folder name, e.g. 'INBOX', 'Sent'. Leave empty for all folders.")] = "",
limit: Annotated[int, Field(description="Maximum number of results to return")] = 20,
) -> str:
"""Search emails by keyword. Returns date, sender, recipient, subject, and a location key for reading the full email with read_mail."""
user = get_current_user()
if not user:
return "Error: not authenticated"
query_lower = query.lower()
results = []
for acct_name, acct_path in _discover_accounts(user).items():
if account and account not in acct_name:
continue
for fld in _discover_folders(acct_path):
if folder and folder.lower() not in fld.lower():
continue
md = _open_folder(acct_path, fld)
if not md:
continue
for key, msg in md.items():
subj = _decode_hdr(msg.get("Subject", ""))
frm = _decode_hdr(msg.get("From", ""))
to = _decode_hdr(msg.get("To", ""))
date_str = msg.get("Date", "")
if query_lower not in f"{subj} {frm} {to}".lower():
body = _get_body(msg)
if query_lower not in body.lower():
continue
results.append(f"[{date_str}] {frm} -> {to}\n Subject: {subj}\n Account: {acct_name}, Folder: {fld}, Key: {key}")
if len(results) >= limit:
return "\n\n".join(results)
return "\n\n".join(results) if results else "No results found"
@mcp.tool()
def read_mail(
account: Annotated[str, Field(description="Account name from search results, e.g. 'd370128_0-slohmaier'")],
folder: Annotated[str, Field(description="Folder name from search results, e.g. 'INBOX'")],
key: Annotated[str, Field(description="Message key from search results")],
) -> str:
"""Read the full content of a single email. Use the account, folder, and key values from search_mail results."""
user = get_current_user()
if not user:
return "Error: not authenticated"
acct_path = _discover_accounts(user).get(account)
if not acct_path:
return f"Account not found: {account}"
md = _open_folder(acct_path, folder)
if not md:
return f"Folder not found: {folder}"
msg = md.get(key)
if not msg:
return f"Message not found: {key}"
return (
f"From: {_decode_hdr(msg.get('From', ''))}\n"
f"To: {_decode_hdr(msg.get('To', ''))}\n"
f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n"
f"Date: {msg.get('Date', '')}\n"
f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n"
f"{_get_body(msg)[:50000]}"
)
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5100)
+121
View File
@@ -0,0 +1,121 @@
"""MCP Notes Server — reads and writes notes via Joplin API."""
import os, sys, contextlib
from typing import Annotated
import httpx
from pydantic import Field
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
from common import load_config as _lc
_cfg = _lc()
JOPLIN = _cfg['joplin_url']
JOPLIN_TOKENS = _cfg['joplin_tokens']
mcp = FastMCP("Notes", stateless_http=True,
transport_security={"enable_dns_rebinding_protection": False})
def _api(user, endpoint, method="GET", json_data=None):
token = JOPLIN_TOKENS.get(user)
if not token: return None, "Kein Joplin API-Token konfiguriert. Bitte in Joplin-App generieren."
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Origin": "https://notes.home.slohmaier.de"}
try:
r = getattr(httpx, method.lower())(f"{JOPLIN}{endpoint}", headers=headers, json=json_data, timeout=15)
if r.status_code >= 400: return None, f"Joplin-Fehler: HTTP {r.status_code}"
return (r.json() if r.text else {}), None
except Exception as e: return None, str(e)
@mcp.tool()
def list_notebooks() -> str:
"""List all Joplin notebooks (folders). Call this first to see available notebooks."""
user = get_current_user()
if not user: return "Error: not authenticated"
data, err = _api(user, "/api/folders")
if err: return f"Fehler: {err}"
return "\n".join(f"{nb['title']} (ID: {nb['id']})" for nb in data.get("items", [])) or "Keine Notizbuecher"
@mcp.tool()
def list_notes(
notebook: Annotated[str, Field(description="Notebook name or ID from list_notebooks. Leave empty for all notes.")] = "",
limit: Annotated[int, Field(description="Maximum number of notes to return")] = 50,
) -> str:
"""List notes, optionally filtered by notebook. Shows title and ID. Use read_note with the ID to get content."""
user = get_current_user()
if not user: return "Error: not authenticated"
if notebook:
data, err = _api(user, "/api/folders")
if err: return f"Fehler: {err}"
nb_id = next((nb["id"] for nb in data.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None)
if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden"
data, err = _api(user, f"/api/folders/{nb_id}/notes?limit={limit}")
else:
data, err = _api(user, f"/api/notes?limit={limit}&order_by=updated_time&order_dir=DESC")
if err: return f"Fehler: {err}"
return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Notizen"
@mcp.tool()
def read_note(
note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes results")],
) -> str:
"""Read the full content of a note (Markdown format)."""
user = get_current_user()
if not user: return "Error: not authenticated"
data, err = _api(user, f"/api/notes/{note_id}?fields=id,title,body")
if err: return f"Fehler: {err}"
return f"# {data.get('title','')}\n\n{data.get('body','')}"
@mcp.tool()
def search_notes(
query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Passwort', 'Rezept'")],
) -> str:
"""Search notes by keyword. Returns matching note titles and IDs."""
user = get_current_user()
if not user: return "Error: not authenticated"
data, err = _api(user, f"/api/search?query={query}&limit=20")
if err: return f"Fehler: {err}"
return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Treffer"
@mcp.tool()
def create_note(
notebook: Annotated[str, Field(description="Notebook name or ID where the note should be created")],
title: Annotated[str, Field(description="Note title")],
body: Annotated[str, Field(description="Note content in Markdown format")],
) -> str:
"""Create a new note in the specified notebook."""
user = get_current_user()
if not user: return "Error: not authenticated"
folders, err = _api(user, "/api/folders")
if err: return f"Fehler: {err}"
nb_id = next((nb["id"] for nb in folders.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None)
if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden"
data, err = _api(user, "/api/notes", "POST", {"title": title, "body": body, "parent_id": nb_id})
if err: return f"Fehler: {err}"
return f"Notiz erstellt: {title}"
def create_app():
from contextlib import asynccontextmanager
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def lifespan(app):
async with contextlib.AsyncExitStack() as stack:
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
yield
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
app = Starlette(routes=routes, lifespan=lifespan)
app.add_middleware(BearerAuthMiddleware)
return app
if __name__ == "__main__":
import uvicorn
uvicorn.run(create_app(), host="127.0.0.1", port=5104)
+10
View File
@@ -0,0 +1,10 @@
{
"stefan": {
"token": "REPLACE_WITH_RANDOM_SECRET",
"label": "Stefan"
},
"kati": {
"token": "REPLACE_WITH_RANDOM_SECRET",
"label": "Kati"
}
}