From c06e6d6b4cb4b449994b85e50c6938d2a8fb618d Mon Sep 17 00:00:00 2001 From: Stefan Lohmaier Date: Fri, 12 Jun 2026 12:26:26 +0200 Subject: [PATCH] Joplin via Data API + Mail attachments Notes server rewritten to use Joplin CLI Data API (joplin-cli sync clients on host, ports 41184/41185). Clean fast search, proper resource handling. New tools: list_note_resources, read_resource (attachments as inline image/document/text). Mail server: read_mail now lists attachments; new read_attachment tool returns images inline, PDFs/docs as EmbeddedResource, text directly. Tests: 54 total. Notes now real (notebooks, list, create+read). Mail attachment listing + fetch tested. Co-Authored-By: Claude Opus 4.6 --- config.json.example | 10 +++ mail/server.py | 74 ++++++++++++++++- notes/server.py | 196 +++++++++++++++++++++++++++++++++----------- tests/test_all.py | 62 +++++++++++++- 4 files changed, 290 insertions(+), 52 deletions(-) diff --git a/config.json.example b/config.json.example index 7aac37d..1b34879 100644 --- a/config.json.example +++ b/config.json.example @@ -47,5 +47,15 @@ "kati": [ "/kati/" ] + }, + "joplin_data_api": { + "stefan": { + "url": "http://127.0.0.1:41184", + "token": "CHANGE_ME" + }, + "kati": { + "url": "http://127.0.0.1:41185", + "token": "CHANGE_ME" + } } } \ No newline at end of file diff --git a/mail/server.py b/mail/server.py index 92804a6..4a7eee2 100644 --- a/mail/server.py +++ b/mail/server.py @@ -2,6 +2,7 @@ import os import sys +import base64 import contextlib import imaplib import mailbox @@ -13,6 +14,7 @@ from typing import Annotated from pydantic import Field from mcp.server.fastmcp import FastMCP +from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents from starlette.applications import Starlette from starlette.routing import Mount @@ -76,6 +78,35 @@ def _get_body(msg): return "" +def _get_attachments(msg): + """Return list of (index, filename, mimetype, size, part) for attachments.""" + attachments = [] + if not msg.is_multipart(): + return attachments + idx = 0 + for part in msg.walk(): + if part.get_content_maintype() == "multipart": + continue + disp = str(part.get("Content-Disposition", "")) + filename = part.get_filename() + ctype = part.get_content_type() + # Attachment = has filename, or explicit attachment disposition, or non-text inline + is_attachment = ( + "attachment" in disp.lower() + or (filename is not None) + or (ctype not in ("text/plain", "text/html") and part.get_content_maintype() != "multipart") + ) + if not is_attachment: + continue + payload = part.get_payload(decode=True) + if payload is None: + continue + idx += 1 + name = _decode_hdr(filename) if filename else f"attachment_{idx}.{ctype.split('/')[-1]}" + attachments.append({"index": idx, "filename": name, "mime": ctype, "size": len(payload), "payload": payload}) + return attachments + + def _discover_accounts(user): root = MAIL_ROOTS.get(user) if not root or not os.path.isdir(root): @@ -183,7 +214,7 @@ def read_mail( msg = md.get(key) if not msg: return f"Message not found: {key}" - return ( + out = ( f"From: {_decode_hdr(msg.get('From', ''))}\n" f"To: {_decode_hdr(msg.get('To', ''))}\n" f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n" @@ -191,6 +222,47 @@ def read_mail( f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n" f"{_get_body(msg)[:50000]}" ) + attachments = _get_attachments(msg) + if attachments: + out += "\n\nAnhaenge (nutze read_attachment mit dem Index):\n" + for a in attachments: + out += f" {a['index']}. {a['filename']} ({a['mime']}, {a['size']:,} bytes)\n" + return out + + +@mcp.tool() +def read_attachment( + account: Annotated[str, Field(description="Account name from search results")], + folder: Annotated[str, Field(description="Folder name from search results")], + key: Annotated[str, Field(description="Message key from search results")], + attachment_index: Annotated[int, Field(description="Attachment number from the read_mail attachment list (1-based)")], +) -> list[TextContent | ImageContent | EmbeddedResource]: + """Read an email attachment. Images shown inline, documents (PDF/docx) as binary, text directly. Get the index from read_mail.""" + user = get_current_user() + if not user: + return [TextContent(type="text", text="Error: not authenticated")] + acct_path = _discover_accounts(user).get(account) + if not acct_path: + return [TextContent(type="text", text=f"Account not found: {account}")] + md = _open_folder(acct_path, folder) + if not md: + return [TextContent(type="text", text=f"Folder not found: {folder}")] + msg = md.get(key) + if not msg: + return [TextContent(type="text", text=f"Message not found: {key}")] + attachments = _get_attachments(msg) + att = next((a for a in attachments if a["index"] == attachment_index), None) + if not att: + return [TextContent(type="text", text=f"Anhang {attachment_index} nicht gefunden. {len(attachments)} Anhaenge vorhanden.")] + mime = att["mime"] + payload = att["payload"] + if mime in ("image/jpeg", "image/png", "image/gif", "image/webp") and len(payload) < 10_000_000: + return [ImageContent(type="image", data=base64.b64encode(payload).decode(), mimeType=mime)] + if mime.startswith("text/"): + return [TextContent(type="text", text=payload.decode("utf-8", errors="replace")[:100000])] + return [EmbeddedResource(type="resource", resource=BlobResourceContents( + uri=f"mail://attachment/{att['filename']}", + blob=base64.b64encode(payload).decode(), mimeType=mime))] @mcp.tool() diff --git a/notes/server.py b/notes/server.py index c9f62d5..e3bf2d3 100644 --- a/notes/server.py +++ b/notes/server.py @@ -1,106 +1,204 @@ -"""MCP Notes Server — reads and writes notes via Joplin API.""" +"""MCP Notes Server — reads/writes Joplin notes via Joplin CLI Data API.""" -import os, sys, contextlib +import os, sys, contextlib, base64 from typing import Annotated import httpx from pydantic import Field from mcp.server.fastmcp import FastMCP +from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) -from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware +from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware, load_config -from common import load_config as _lc -_cfg = _lc() -JOPLIN = _cfg['joplin_url'] -JOPLIN_TOKENS = _cfg['joplin_tokens'] +_cfg = load_config() +DATA_API = _cfg["joplin_data_api"] # {user: {url, token}} mcp = FastMCP("Notes", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) -def _api(user, endpoint, method="GET", json_data=None): - token = JOPLIN_TOKENS.get(user) - if not token: return None, "Kein Joplin API-Token konfiguriert. Bitte in Joplin-App generieren." - headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Origin": "https://notes.home.slohmaier.de"} +IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} + + +def _api(user): + return DATA_API.get(user) + + +def _get(user, path, params=None): + api = _api(user) + if not api: + return None, "Kein Joplin-Zugang fuer diesen User" + p = dict(params or {}) + p["token"] = api["token"] try: - r = getattr(httpx, method.lower())(f"{JOPLIN}{endpoint}", headers=headers, json=json_data, timeout=15) - if r.status_code >= 400: return None, f"Joplin-Fehler: HTTP {r.status_code}" - return (r.json() if r.text else {}), None - except Exception as e: return None, str(e) + r = httpx.get(f"{api['url']}{path}", params=p, timeout=30) + if r.status_code >= 400: + return None, f"Joplin HTTP {r.status_code}" + return r, None + except Exception as e: + return None, str(e) + + +def _post(user, path, json_data): + api = _api(user) + if not api: + return None, "Kein Joplin-Zugang" + try: + r = httpx.post(f"{api['url']}{path}?token={api['token']}", json=json_data, timeout=30) + if r.status_code >= 400: + return None, f"Joplin HTTP {r.status_code}" + return r.json(), None + except Exception as e: + return None, str(e) + + +def _all_items(user, path, params=None): + """Fetch all pages from a Data API list endpoint.""" + items = [] + page = 1 + while True: + p = dict(params or {}) + p["page"] = page + r, err = _get(user, path, p) + if err: + return [], err + d = r.json() + items.extend(d.get("items", [])) + if not d.get("has_more"): + break + page += 1 + if page > 50: + break + return items, None @mcp.tool() def list_notebooks() -> str: - """List all Joplin notebooks (folders). Call this first to see available notebooks.""" + """List all Joplin notebooks (folders). Call this first to see notebook names and IDs.""" user = get_current_user() if not user: return "Error: not authenticated" - data, err = _api(user, "/api/folders") + items, err = _all_items(user, "/folders") if err: return f"Fehler: {err}" - return "\n".join(f"{nb['title']} (ID: {nb['id']})" for nb in data.get("items", [])) or "Keine Notizbuecher" + return "\n".join(f"{i['title']} (id: {i['id']})" for i in items) if items else "Keine Notizbuecher" @mcp.tool() def list_notes( notebook: Annotated[str, Field(description="Notebook name or ID from list_notebooks. Leave empty for all notes.")] = "", - limit: Annotated[int, Field(description="Maximum number of notes to return")] = 50, + limit: Annotated[int, Field(description="Maximum number of notes")] = 50, ) -> str: - """List notes, optionally filtered by notebook. Shows title and ID. Use read_note with the ID to get content.""" + """List notes, optionally filtered by notebook. Shows title and ID.""" user = get_current_user() if not user: return "Error: not authenticated" + nb_id = "" if notebook: - data, err = _api(user, "/api/folders") + folders, err = _all_items(user, "/folders") if err: return f"Fehler: {err}" - nb_id = next((nb["id"] for nb in data.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None) - if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden" - data, err = _api(user, f"/api/folders/{nb_id}/notes?limit={limit}") + for f in folders: + if notebook.lower() in f["title"].lower() or notebook == f["id"]: + nb_id = f["id"] + break + if not nb_id: + return f"Notizbuch nicht gefunden: {notebook}" + path = f"/folders/{nb_id}/notes" else: - data, err = _api(user, f"/api/notes?limit={limit}&order_by=updated_time&order_dir=DESC") + path = "/notes" + items, err = _all_items(user, path, {"fields": "id,title", "order_by": "updated_time", "order_dir": "DESC"}) if err: return f"Fehler: {err}" - return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Notizen" - - -@mcp.tool() -def read_note( - note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes results")], -) -> str: - """Read the full content of a note (Markdown format).""" - user = get_current_user() - if not user: return "Error: not authenticated" - data, err = _api(user, f"/api/notes/{note_id}?fields=id,title,body") - if err: return f"Fehler: {err}" - return f"# {data.get('title','')}\n\n{data.get('body','')}" + return "\n".join(f"{i['title']} (id: {i['id']})" for i in items[:limit]) if items else "Keine Notizen" @mcp.tool() def search_notes( - query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Passwort', 'Rezept'")], + query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Rezept', 'Passwort'")], ) -> str: - """Search notes by keyword. Returns matching note titles and IDs.""" + """Search notes by keyword (full text). Returns matching note titles and IDs.""" user = get_current_user() if not user: return "Error: not authenticated" - data, err = _api(user, f"/api/search?query={query}&limit=20") + items, err = _all_items(user, "/search", {"query": query, "type": "note", "fields": "id,title"}) if err: return f"Fehler: {err}" - return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Treffer" + return "\n".join(f"{i['title']} (id: {i['id']})" for i in items[:30]) if items else "Keine Treffer" + + +@mcp.tool() +def read_note( + note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes")], +) -> str: + """Read a note's full content (Markdown). Resource links appear as :/resourceId — use list_note_resources + read_resource for attachments.""" + user = get_current_user() + if not user: return "Error: not authenticated" + r, err = _get(user, f"/notes/{note_id}", {"fields": "id,title,body"}) + if err: return f"Fehler: {err}" + d = r.json() + return f"# {d.get('title','?')}\n\n{d.get('body','')}" + + +@mcp.tool() +def list_note_resources( + note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes")], +) -> str: + """List attachments (images, PDFs, files) embedded in a note. Use read_resource with the resource ID to fetch one.""" + user = get_current_user() + if not user: return "Error: not authenticated" + items, err = _all_items(user, f"/notes/{note_id}/resources", {"fields": "id,title,mime,size"}) + if err: return f"Fehler: {err}" + if not items: + return "Keine Anhaenge" + lines = [] + for i in items: + size = i.get("size", 0) + lines.append(f"{i.get('title','?')} ({i.get('mime','?')}, {size:,} bytes) id: {i['id']}") + return "\n".join(lines) + + +@mcp.tool() +def read_resource( + resource_id: Annotated[str, Field(description="Resource/attachment ID from list_note_resources")], +) -> list[TextContent | ImageContent | EmbeddedResource]: + """Read an attachment. Images shown inline, documents (PDF/docx) as binary, text directly.""" + user = get_current_user() + if not user: return [TextContent(type="text", text="Error: not authenticated")] + # Get metadata + rmeta, err = _get(user, f"/resources/{resource_id}", {"fields": "id,title,mime,size"}) + if err: return [TextContent(type="text", text=f"Fehler: {err}")] + meta = rmeta.json() + mime = meta.get("mime", "application/octet-stream") + title = meta.get("title", "attachment") + # Get blob + rfile, err = _get(user, f"/resources/{resource_id}/file") + if err: return [TextContent(type="text", text=f"Fehler: {err}")] + content = rfile.content + if mime in IMAGE_TYPES and len(content) < 10_000_000: + return [ImageContent(type="image", data=base64.b64encode(content).decode(), mimeType=mime)] + if mime.startswith("text/"): + return [TextContent(type="text", text=content.decode("utf-8", errors="replace")[:100000])] + return [EmbeddedResource(type="resource", resource=BlobResourceContents( + uri=f"joplin://resource/{resource_id}/{title}", + blob=base64.b64encode(content).decode(), mimeType=mime))] @mcp.tool() def create_note( - notebook: Annotated[str, Field(description="Notebook name or ID where the note should be created")], + notebook: Annotated[str, Field(description="Notebook name or ID where the note is created")], title: Annotated[str, Field(description="Note title")], - body: Annotated[str, Field(description="Note content in Markdown format")], + body: Annotated[str, Field(description="Note content in Markdown")], ) -> str: """Create a new note in the specified notebook.""" user = get_current_user() if not user: return "Error: not authenticated" - folders, err = _api(user, "/api/folders") + folders, err = _all_items(user, "/folders") if err: return f"Fehler: {err}" - nb_id = next((nb["id"] for nb in folders.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None) - if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden" - data, err = _api(user, "/api/notes", "POST", {"title": title, "body": body, "parent_id": nb_id}) + nb_id = None + for f in folders: + if notebook.lower() in f["title"].lower() or notebook == f["id"]: + nb_id = f["id"] + break + if not nb_id: return f"Notizbuch nicht gefunden: {notebook}" + d, err = _post(user, "/notes", {"title": title, "body": body, "parent_id": nb_id}) if err: return f"Fehler: {err}" - return f"Notiz erstellt: {title}" + return f"Notiz erstellt: {title} (id: {d.get('id','?')})" def create_app(): diff --git a/tests/test_all.py b/tests/test_all.py index 91d86bd..fc06d9e 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -167,6 +167,39 @@ class TestMail: assert "From:" in body assert "Subject:" in body + def test_attachment_listing_and_fetch(self): + token = get_token(self.PORT) + # Find a mail with an attachment by scanning Rechnung search results + text = tool_call(self.PORT, token, "search_mail", + {"query": "Rechnung", "limit": 10, "account": "d370128_0-slohmaier"}) + if "No results" in text: + pytest.skip("No mails found") + # Read each result, look for one with attachments + found_att = None + for block in text.split("\n\n"): + acc = fld = k = None + for line in block.split("\n"): + if "Account:" in line: + parts = line.strip().split(", ") + acc = parts[0].split(": ")[1] + fld = parts[1].split(": ")[1] + k = parts[2].split(": ")[1] + if not k: + continue + body = tool_call(self.PORT, token, "read_mail", {"account": acc, "folder": fld, "key": k}) + if "Anhaenge" in body: + found_att = (acc, fld, k) + break + if not found_att: + pytest.skip("No mail with attachment found") + acc, fld, k = found_att + result = mcp_call(self.PORT, token, "tools/call", { + "name": "read_attachment", + "arguments": {"account": acc, "folder": fld, "key": k, "attachment_index": 1}, + }) + content = result["result"]["content"][0] + assert content["type"] in ("image", "resource", "text") + # ============================================================ # Calendar Tests (CRUD on calendar-test) @@ -354,12 +387,37 @@ class TestFiles: # ============================================================ -# Notes Tests (Joplin — likely no token configured) +# Notes Tests (Joplin Data API) # ============================================================ class TestNotes: PORT = SERVERS["notes"] + TEST_NOTEBOOK = "Inbox" def test_list_notebooks(self): text = tool_call(self.PORT, get_token(self.PORT), "list_notebooks") - assert len(text) > 0 # Either notebooks or error about token + assert "id:" in text and "Keine" not in text + + def test_list_notes(self): + text = tool_call(self.PORT, get_token(self.PORT), "list_notes", {"limit": 5}) + assert "id:" in text or "Keine Notizen" in text + + def test_create_search_read_note(self): + token = get_token(self.PORT) + tag = f"mcptest-{int(time.time())}" + # Create + result = tool_call(self.PORT, token, "create_note", { + "notebook": self.TEST_NOTEBOOK, + "title": f"Test Note {tag}", + "body": f"Automatischer Test {tag}\n\nInhalt.", + }) + assert "erstellt" in result.lower(), f"Create failed: {result}" + note_id = result.split("id: ")[1].rstrip(")").strip() + + # Read back (search FTS index may lag, so verify via read) + body = tool_call(self.PORT, token, "read_note", {"note_id": note_id}) + assert tag in body, f"Note content mismatch: {body[:200]}" + + # Verify it appears in the notebook listing + listing = tool_call(self.PORT, token, "list_notes", {"notebook": self.TEST_NOTEBOOK}) + assert note_id in listing or f"Test Note {tag}" in listing