Joplin via Data API + Mail attachments
Notes server rewritten to use Joplin CLI Data API (joplin-cli sync clients on host, ports 41184/41185). Clean fast search, proper resource handling. New tools: list_note_resources, read_resource (attachments as inline image/document/text). Mail server: read_mail now lists attachments; new read_attachment tool returns images inline, PDFs/docs as EmbeddedResource, text directly. Tests: 54 total. Notes now real (notebooks, list, create+read). Mail attachment listing + fetch tested. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -47,5 +47,15 @@
|
|||||||
"kati": [
|
"kati": [
|
||||||
"/kati/"
|
"/kati/"
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
"joplin_data_api": {
|
||||||
|
"stefan": {
|
||||||
|
"url": "http://127.0.0.1:41184",
|
||||||
|
"token": "CHANGE_ME"
|
||||||
|
},
|
||||||
|
"kati": {
|
||||||
|
"url": "http://127.0.0.1:41185",
|
||||||
|
"token": "CHANGE_ME"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
+73
-1
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import base64
|
||||||
import contextlib
|
import contextlib
|
||||||
import imaplib
|
import imaplib
|
||||||
import mailbox
|
import mailbox
|
||||||
@@ -13,6 +14,7 @@ from typing import Annotated
|
|||||||
|
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents
|
||||||
from starlette.applications import Starlette
|
from starlette.applications import Starlette
|
||||||
from starlette.routing import Mount
|
from starlette.routing import Mount
|
||||||
|
|
||||||
@@ -76,6 +78,35 @@ def _get_body(msg):
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _get_attachments(msg):
|
||||||
|
"""Return list of (index, filename, mimetype, size, part) for attachments."""
|
||||||
|
attachments = []
|
||||||
|
if not msg.is_multipart():
|
||||||
|
return attachments
|
||||||
|
idx = 0
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_maintype() == "multipart":
|
||||||
|
continue
|
||||||
|
disp = str(part.get("Content-Disposition", ""))
|
||||||
|
filename = part.get_filename()
|
||||||
|
ctype = part.get_content_type()
|
||||||
|
# Attachment = has filename, or explicit attachment disposition, or non-text inline
|
||||||
|
is_attachment = (
|
||||||
|
"attachment" in disp.lower()
|
||||||
|
or (filename is not None)
|
||||||
|
or (ctype not in ("text/plain", "text/html") and part.get_content_maintype() != "multipart")
|
||||||
|
)
|
||||||
|
if not is_attachment:
|
||||||
|
continue
|
||||||
|
payload = part.get_payload(decode=True)
|
||||||
|
if payload is None:
|
||||||
|
continue
|
||||||
|
idx += 1
|
||||||
|
name = _decode_hdr(filename) if filename else f"attachment_{idx}.{ctype.split('/')[-1]}"
|
||||||
|
attachments.append({"index": idx, "filename": name, "mime": ctype, "size": len(payload), "payload": payload})
|
||||||
|
return attachments
|
||||||
|
|
||||||
|
|
||||||
def _discover_accounts(user):
|
def _discover_accounts(user):
|
||||||
root = MAIL_ROOTS.get(user)
|
root = MAIL_ROOTS.get(user)
|
||||||
if not root or not os.path.isdir(root):
|
if not root or not os.path.isdir(root):
|
||||||
@@ -183,7 +214,7 @@ def read_mail(
|
|||||||
msg = md.get(key)
|
msg = md.get(key)
|
||||||
if not msg:
|
if not msg:
|
||||||
return f"Message not found: {key}"
|
return f"Message not found: {key}"
|
||||||
return (
|
out = (
|
||||||
f"From: {_decode_hdr(msg.get('From', ''))}\n"
|
f"From: {_decode_hdr(msg.get('From', ''))}\n"
|
||||||
f"To: {_decode_hdr(msg.get('To', ''))}\n"
|
f"To: {_decode_hdr(msg.get('To', ''))}\n"
|
||||||
f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n"
|
f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n"
|
||||||
@@ -191,6 +222,47 @@ def read_mail(
|
|||||||
f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n"
|
f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n"
|
||||||
f"{_get_body(msg)[:50000]}"
|
f"{_get_body(msg)[:50000]}"
|
||||||
)
|
)
|
||||||
|
attachments = _get_attachments(msg)
|
||||||
|
if attachments:
|
||||||
|
out += "\n\nAnhaenge (nutze read_attachment mit dem Index):\n"
|
||||||
|
for a in attachments:
|
||||||
|
out += f" {a['index']}. {a['filename']} ({a['mime']}, {a['size']:,} bytes)\n"
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def read_attachment(
|
||||||
|
account: Annotated[str, Field(description="Account name from search results")],
|
||||||
|
folder: Annotated[str, Field(description="Folder name from search results")],
|
||||||
|
key: Annotated[str, Field(description="Message key from search results")],
|
||||||
|
attachment_index: Annotated[int, Field(description="Attachment number from the read_mail attachment list (1-based)")],
|
||||||
|
) -> list[TextContent | ImageContent | EmbeddedResource]:
|
||||||
|
"""Read an email attachment. Images shown inline, documents (PDF/docx) as binary, text directly. Get the index from read_mail."""
|
||||||
|
user = get_current_user()
|
||||||
|
if not user:
|
||||||
|
return [TextContent(type="text", text="Error: not authenticated")]
|
||||||
|
acct_path = _discover_accounts(user).get(account)
|
||||||
|
if not acct_path:
|
||||||
|
return [TextContent(type="text", text=f"Account not found: {account}")]
|
||||||
|
md = _open_folder(acct_path, folder)
|
||||||
|
if not md:
|
||||||
|
return [TextContent(type="text", text=f"Folder not found: {folder}")]
|
||||||
|
msg = md.get(key)
|
||||||
|
if not msg:
|
||||||
|
return [TextContent(type="text", text=f"Message not found: {key}")]
|
||||||
|
attachments = _get_attachments(msg)
|
||||||
|
att = next((a for a in attachments if a["index"] == attachment_index), None)
|
||||||
|
if not att:
|
||||||
|
return [TextContent(type="text", text=f"Anhang {attachment_index} nicht gefunden. {len(attachments)} Anhaenge vorhanden.")]
|
||||||
|
mime = att["mime"]
|
||||||
|
payload = att["payload"]
|
||||||
|
if mime in ("image/jpeg", "image/png", "image/gif", "image/webp") and len(payload) < 10_000_000:
|
||||||
|
return [ImageContent(type="image", data=base64.b64encode(payload).decode(), mimeType=mime)]
|
||||||
|
if mime.startswith("text/"):
|
||||||
|
return [TextContent(type="text", text=payload.decode("utf-8", errors="replace")[:100000])]
|
||||||
|
return [EmbeddedResource(type="resource", resource=BlobResourceContents(
|
||||||
|
uri=f"mail://attachment/{att['filename']}",
|
||||||
|
blob=base64.b64encode(payload).decode(), mimeType=mime))]
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
|
|||||||
+147
-49
@@ -1,106 +1,204 @@
|
|||||||
"""MCP Notes Server — reads and writes notes via Joplin API."""
|
"""MCP Notes Server — reads/writes Joplin notes via Joplin CLI Data API."""
|
||||||
|
|
||||||
import os, sys, contextlib
|
import os, sys, contextlib, base64
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
from mcp.types import TextContent, ImageContent, EmbeddedResource, BlobResourceContents
|
||||||
from starlette.applications import Starlette
|
from starlette.applications import Starlette
|
||||||
from starlette.routing import Mount
|
from starlette.routing import Mount
|
||||||
|
|
||||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||||
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
|
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware, load_config
|
||||||
|
|
||||||
from common import load_config as _lc
|
_cfg = load_config()
|
||||||
_cfg = _lc()
|
DATA_API = _cfg["joplin_data_api"] # {user: {url, token}}
|
||||||
JOPLIN = _cfg['joplin_url']
|
|
||||||
JOPLIN_TOKENS = _cfg['joplin_tokens']
|
|
||||||
|
|
||||||
mcp = FastMCP("Notes", stateless_http=True,
|
mcp = FastMCP("Notes", stateless_http=True,
|
||||||
transport_security={"enable_dns_rebinding_protection": False})
|
transport_security={"enable_dns_rebinding_protection": False})
|
||||||
|
|
||||||
def _api(user, endpoint, method="GET", json_data=None):
|
IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
|
||||||
token = JOPLIN_TOKENS.get(user)
|
|
||||||
if not token: return None, "Kein Joplin API-Token konfiguriert. Bitte in Joplin-App generieren."
|
|
||||||
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Origin": "https://notes.home.slohmaier.de"}
|
def _api(user):
|
||||||
|
return DATA_API.get(user)
|
||||||
|
|
||||||
|
|
||||||
|
def _get(user, path, params=None):
|
||||||
|
api = _api(user)
|
||||||
|
if not api:
|
||||||
|
return None, "Kein Joplin-Zugang fuer diesen User"
|
||||||
|
p = dict(params or {})
|
||||||
|
p["token"] = api["token"]
|
||||||
try:
|
try:
|
||||||
r = getattr(httpx, method.lower())(f"{JOPLIN}{endpoint}", headers=headers, json=json_data, timeout=15)
|
r = httpx.get(f"{api['url']}{path}", params=p, timeout=30)
|
||||||
if r.status_code >= 400: return None, f"Joplin-Fehler: HTTP {r.status_code}"
|
if r.status_code >= 400:
|
||||||
return (r.json() if r.text else {}), None
|
return None, f"Joplin HTTP {r.status_code}"
|
||||||
except Exception as e: return None, str(e)
|
return r, None
|
||||||
|
except Exception as e:
|
||||||
|
return None, str(e)
|
||||||
|
|
||||||
|
|
||||||
|
def _post(user, path, json_data):
|
||||||
|
api = _api(user)
|
||||||
|
if not api:
|
||||||
|
return None, "Kein Joplin-Zugang"
|
||||||
|
try:
|
||||||
|
r = httpx.post(f"{api['url']}{path}?token={api['token']}", json=json_data, timeout=30)
|
||||||
|
if r.status_code >= 400:
|
||||||
|
return None, f"Joplin HTTP {r.status_code}"
|
||||||
|
return r.json(), None
|
||||||
|
except Exception as e:
|
||||||
|
return None, str(e)
|
||||||
|
|
||||||
|
|
||||||
|
def _all_items(user, path, params=None):
|
||||||
|
"""Fetch all pages from a Data API list endpoint."""
|
||||||
|
items = []
|
||||||
|
page = 1
|
||||||
|
while True:
|
||||||
|
p = dict(params or {})
|
||||||
|
p["page"] = page
|
||||||
|
r, err = _get(user, path, p)
|
||||||
|
if err:
|
||||||
|
return [], err
|
||||||
|
d = r.json()
|
||||||
|
items.extend(d.get("items", []))
|
||||||
|
if not d.get("has_more"):
|
||||||
|
break
|
||||||
|
page += 1
|
||||||
|
if page > 50:
|
||||||
|
break
|
||||||
|
return items, None
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_notebooks() -> str:
|
def list_notebooks() -> str:
|
||||||
"""List all Joplin notebooks (folders). Call this first to see available notebooks."""
|
"""List all Joplin notebooks (folders). Call this first to see notebook names and IDs."""
|
||||||
user = get_current_user()
|
user = get_current_user()
|
||||||
if not user: return "Error: not authenticated"
|
if not user: return "Error: not authenticated"
|
||||||
data, err = _api(user, "/api/folders")
|
items, err = _all_items(user, "/folders")
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
return "\n".join(f"{nb['title']} (ID: {nb['id']})" for nb in data.get("items", [])) or "Keine Notizbuecher"
|
return "\n".join(f"{i['title']} (id: {i['id']})" for i in items) if items else "Keine Notizbuecher"
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_notes(
|
def list_notes(
|
||||||
notebook: Annotated[str, Field(description="Notebook name or ID from list_notebooks. Leave empty for all notes.")] = "",
|
notebook: Annotated[str, Field(description="Notebook name or ID from list_notebooks. Leave empty for all notes.")] = "",
|
||||||
limit: Annotated[int, Field(description="Maximum number of notes to return")] = 50,
|
limit: Annotated[int, Field(description="Maximum number of notes")] = 50,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""List notes, optionally filtered by notebook. Shows title and ID. Use read_note with the ID to get content."""
|
"""List notes, optionally filtered by notebook. Shows title and ID."""
|
||||||
user = get_current_user()
|
user = get_current_user()
|
||||||
if not user: return "Error: not authenticated"
|
if not user: return "Error: not authenticated"
|
||||||
|
nb_id = ""
|
||||||
if notebook:
|
if notebook:
|
||||||
data, err = _api(user, "/api/folders")
|
folders, err = _all_items(user, "/folders")
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
nb_id = next((nb["id"] for nb in data.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None)
|
for f in folders:
|
||||||
if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden"
|
if notebook.lower() in f["title"].lower() or notebook == f["id"]:
|
||||||
data, err = _api(user, f"/api/folders/{nb_id}/notes?limit={limit}")
|
nb_id = f["id"]
|
||||||
|
break
|
||||||
|
if not nb_id:
|
||||||
|
return f"Notizbuch nicht gefunden: {notebook}"
|
||||||
|
path = f"/folders/{nb_id}/notes"
|
||||||
else:
|
else:
|
||||||
data, err = _api(user, f"/api/notes?limit={limit}&order_by=updated_time&order_dir=DESC")
|
path = "/notes"
|
||||||
|
items, err = _all_items(user, path, {"fields": "id,title", "order_by": "updated_time", "order_dir": "DESC"})
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Notizen"
|
return "\n".join(f"{i['title']} (id: {i['id']})" for i in items[:limit]) if items else "Keine Notizen"
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def read_note(
|
|
||||||
note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes results")],
|
|
||||||
) -> str:
|
|
||||||
"""Read the full content of a note (Markdown format)."""
|
|
||||||
user = get_current_user()
|
|
||||||
if not user: return "Error: not authenticated"
|
|
||||||
data, err = _api(user, f"/api/notes/{note_id}?fields=id,title,body")
|
|
||||||
if err: return f"Fehler: {err}"
|
|
||||||
return f"# {data.get('title','')}\n\n{data.get('body','')}"
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def search_notes(
|
def search_notes(
|
||||||
query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Passwort', 'Rezept'")],
|
query: Annotated[str, Field(description="Search term — matches note titles and content. Example: 'Einkaufsliste', 'Rezept', 'Passwort'")],
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Search notes by keyword. Returns matching note titles and IDs."""
|
"""Search notes by keyword (full text). Returns matching note titles and IDs."""
|
||||||
user = get_current_user()
|
user = get_current_user()
|
||||||
if not user: return "Error: not authenticated"
|
if not user: return "Error: not authenticated"
|
||||||
data, err = _api(user, f"/api/search?query={query}&limit=20")
|
items, err = _all_items(user, "/search", {"query": query, "type": "note", "fields": "id,title"})
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
return "\n".join(f"{n['title']} (ID: {n['id']})" for n in data.get("items", [])) or "Keine Treffer"
|
return "\n".join(f"{i['title']} (id: {i['id']})" for i in items[:30]) if items else "Keine Treffer"
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def read_note(
|
||||||
|
note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes")],
|
||||||
|
) -> str:
|
||||||
|
"""Read a note's full content (Markdown). Resource links appear as :/resourceId — use list_note_resources + read_resource for attachments."""
|
||||||
|
user = get_current_user()
|
||||||
|
if not user: return "Error: not authenticated"
|
||||||
|
r, err = _get(user, f"/notes/{note_id}", {"fields": "id,title,body"})
|
||||||
|
if err: return f"Fehler: {err}"
|
||||||
|
d = r.json()
|
||||||
|
return f"# {d.get('title','?')}\n\n{d.get('body','')}"
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def list_note_resources(
|
||||||
|
note_id: Annotated[str, Field(description="Note ID from list_notes or search_notes")],
|
||||||
|
) -> str:
|
||||||
|
"""List attachments (images, PDFs, files) embedded in a note. Use read_resource with the resource ID to fetch one."""
|
||||||
|
user = get_current_user()
|
||||||
|
if not user: return "Error: not authenticated"
|
||||||
|
items, err = _all_items(user, f"/notes/{note_id}/resources", {"fields": "id,title,mime,size"})
|
||||||
|
if err: return f"Fehler: {err}"
|
||||||
|
if not items:
|
||||||
|
return "Keine Anhaenge"
|
||||||
|
lines = []
|
||||||
|
for i in items:
|
||||||
|
size = i.get("size", 0)
|
||||||
|
lines.append(f"{i.get('title','?')} ({i.get('mime','?')}, {size:,} bytes) id: {i['id']}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def read_resource(
|
||||||
|
resource_id: Annotated[str, Field(description="Resource/attachment ID from list_note_resources")],
|
||||||
|
) -> list[TextContent | ImageContent | EmbeddedResource]:
|
||||||
|
"""Read an attachment. Images shown inline, documents (PDF/docx) as binary, text directly."""
|
||||||
|
user = get_current_user()
|
||||||
|
if not user: return [TextContent(type="text", text="Error: not authenticated")]
|
||||||
|
# Get metadata
|
||||||
|
rmeta, err = _get(user, f"/resources/{resource_id}", {"fields": "id,title,mime,size"})
|
||||||
|
if err: return [TextContent(type="text", text=f"Fehler: {err}")]
|
||||||
|
meta = rmeta.json()
|
||||||
|
mime = meta.get("mime", "application/octet-stream")
|
||||||
|
title = meta.get("title", "attachment")
|
||||||
|
# Get blob
|
||||||
|
rfile, err = _get(user, f"/resources/{resource_id}/file")
|
||||||
|
if err: return [TextContent(type="text", text=f"Fehler: {err}")]
|
||||||
|
content = rfile.content
|
||||||
|
if mime in IMAGE_TYPES and len(content) < 10_000_000:
|
||||||
|
return [ImageContent(type="image", data=base64.b64encode(content).decode(), mimeType=mime)]
|
||||||
|
if mime.startswith("text/"):
|
||||||
|
return [TextContent(type="text", text=content.decode("utf-8", errors="replace")[:100000])]
|
||||||
|
return [EmbeddedResource(type="resource", resource=BlobResourceContents(
|
||||||
|
uri=f"joplin://resource/{resource_id}/{title}",
|
||||||
|
blob=base64.b64encode(content).decode(), mimeType=mime))]
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def create_note(
|
def create_note(
|
||||||
notebook: Annotated[str, Field(description="Notebook name or ID where the note should be created")],
|
notebook: Annotated[str, Field(description="Notebook name or ID where the note is created")],
|
||||||
title: Annotated[str, Field(description="Note title")],
|
title: Annotated[str, Field(description="Note title")],
|
||||||
body: Annotated[str, Field(description="Note content in Markdown format")],
|
body: Annotated[str, Field(description="Note content in Markdown")],
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Create a new note in the specified notebook."""
|
"""Create a new note in the specified notebook."""
|
||||||
user = get_current_user()
|
user = get_current_user()
|
||||||
if not user: return "Error: not authenticated"
|
if not user: return "Error: not authenticated"
|
||||||
folders, err = _api(user, "/api/folders")
|
folders, err = _all_items(user, "/folders")
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
nb_id = next((nb["id"] for nb in folders.get("items", []) if notebook.lower() in nb.get("title","").lower() or notebook == nb.get("id")), None)
|
nb_id = None
|
||||||
if not nb_id: return f"Notizbuch '{notebook}' nicht gefunden"
|
for f in folders:
|
||||||
data, err = _api(user, "/api/notes", "POST", {"title": title, "body": body, "parent_id": nb_id})
|
if notebook.lower() in f["title"].lower() or notebook == f["id"]:
|
||||||
|
nb_id = f["id"]
|
||||||
|
break
|
||||||
|
if not nb_id: return f"Notizbuch nicht gefunden: {notebook}"
|
||||||
|
d, err = _post(user, "/notes", {"title": title, "body": body, "parent_id": nb_id})
|
||||||
if err: return f"Fehler: {err}"
|
if err: return f"Fehler: {err}"
|
||||||
return f"Notiz erstellt: {title}"
|
return f"Notiz erstellt: {title} (id: {d.get('id','?')})"
|
||||||
|
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
|
|||||||
+60
-2
@@ -167,6 +167,39 @@ class TestMail:
|
|||||||
assert "From:" in body
|
assert "From:" in body
|
||||||
assert "Subject:" in body
|
assert "Subject:" in body
|
||||||
|
|
||||||
|
def test_attachment_listing_and_fetch(self):
|
||||||
|
token = get_token(self.PORT)
|
||||||
|
# Find a mail with an attachment by scanning Rechnung search results
|
||||||
|
text = tool_call(self.PORT, token, "search_mail",
|
||||||
|
{"query": "Rechnung", "limit": 10, "account": "d370128_0-slohmaier"})
|
||||||
|
if "No results" in text:
|
||||||
|
pytest.skip("No mails found")
|
||||||
|
# Read each result, look for one with attachments
|
||||||
|
found_att = None
|
||||||
|
for block in text.split("\n\n"):
|
||||||
|
acc = fld = k = None
|
||||||
|
for line in block.split("\n"):
|
||||||
|
if "Account:" in line:
|
||||||
|
parts = line.strip().split(", ")
|
||||||
|
acc = parts[0].split(": ")[1]
|
||||||
|
fld = parts[1].split(": ")[1]
|
||||||
|
k = parts[2].split(": ")[1]
|
||||||
|
if not k:
|
||||||
|
continue
|
||||||
|
body = tool_call(self.PORT, token, "read_mail", {"account": acc, "folder": fld, "key": k})
|
||||||
|
if "Anhaenge" in body:
|
||||||
|
found_att = (acc, fld, k)
|
||||||
|
break
|
||||||
|
if not found_att:
|
||||||
|
pytest.skip("No mail with attachment found")
|
||||||
|
acc, fld, k = found_att
|
||||||
|
result = mcp_call(self.PORT, token, "tools/call", {
|
||||||
|
"name": "read_attachment",
|
||||||
|
"arguments": {"account": acc, "folder": fld, "key": k, "attachment_index": 1},
|
||||||
|
})
|
||||||
|
content = result["result"]["content"][0]
|
||||||
|
assert content["type"] in ("image", "resource", "text")
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# Calendar Tests (CRUD on calendar-test)
|
# Calendar Tests (CRUD on calendar-test)
|
||||||
@@ -354,12 +387,37 @@ class TestFiles:
|
|||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# Notes Tests (Joplin — likely no token configured)
|
# Notes Tests (Joplin Data API)
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
class TestNotes:
|
class TestNotes:
|
||||||
PORT = SERVERS["notes"]
|
PORT = SERVERS["notes"]
|
||||||
|
TEST_NOTEBOOK = "Inbox"
|
||||||
|
|
||||||
def test_list_notebooks(self):
|
def test_list_notebooks(self):
|
||||||
text = tool_call(self.PORT, get_token(self.PORT), "list_notebooks")
|
text = tool_call(self.PORT, get_token(self.PORT), "list_notebooks")
|
||||||
assert len(text) > 0 # Either notebooks or error about token
|
assert "id:" in text and "Keine" not in text
|
||||||
|
|
||||||
|
def test_list_notes(self):
|
||||||
|
text = tool_call(self.PORT, get_token(self.PORT), "list_notes", {"limit": 5})
|
||||||
|
assert "id:" in text or "Keine Notizen" in text
|
||||||
|
|
||||||
|
def test_create_search_read_note(self):
|
||||||
|
token = get_token(self.PORT)
|
||||||
|
tag = f"mcptest-{int(time.time())}"
|
||||||
|
# Create
|
||||||
|
result = tool_call(self.PORT, token, "create_note", {
|
||||||
|
"notebook": self.TEST_NOTEBOOK,
|
||||||
|
"title": f"Test Note {tag}",
|
||||||
|
"body": f"Automatischer Test {tag}\n\nInhalt.",
|
||||||
|
})
|
||||||
|
assert "erstellt" in result.lower(), f"Create failed: {result}"
|
||||||
|
note_id = result.split("id: ")[1].rstrip(")").strip()
|
||||||
|
|
||||||
|
# Read back (search FTS index may lag, so verify via read)
|
||||||
|
body = tool_call(self.PORT, token, "read_note", {"note_id": note_id})
|
||||||
|
assert tag in body, f"Note content mismatch: {body[:200]}"
|
||||||
|
|
||||||
|
# Verify it appears in the notebook listing
|
||||||
|
listing = tool_call(self.PORT, token, "list_notes", {"notebook": self.TEST_NOTEBOOK})
|
||||||
|
assert note_id in listing or f"Test Note {tag}" in listing
|
||||||
|
|||||||
Reference in New Issue
Block a user