ef37d1e467
Mail: INBOX is a subfolder (INBOX/cur), not the account root. Removed special-case mapping that pointed INBOX to root dir. Files: oCIS user may differ from MCP user (e.g. stefan -> admin). Added _ocis_user() mapping for WebDAV paths and auth. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
250 lines
9.2 KiB
Python
250 lines
9.2 KiB
Python
"""MCP Mail Server — searches and reads IMAP backup emails."""
|
|
|
|
import os
|
|
import sys
|
|
import contextlib
|
|
import imaplib
|
|
import mailbox
|
|
from email.header import decode_header
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formatdate
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from pydantic import Field
|
|
from mcp.server.fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.routing import Mount
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware
|
|
|
|
from common import load_config as _lc
|
|
_cfg = _lc()
|
|
MAIL_ROOTS = _cfg["mail_roots"]
|
|
|
|
IMAP_ACCOUNTS = {
|
|
"stefan": {
|
|
"host": "imap.1blu.de",
|
|
"user": "d370128_0-slohmaier",
|
|
"password": _cfg["imap_passwords"]["stefan"],
|
|
"drafts_folder": "Drafts",
|
|
"from_addr": "stefan@slohmaier.de",
|
|
},
|
|
"kati": {
|
|
"host": "imap.gmail.com",
|
|
"user": "akpolke@gmail.com",
|
|
"password": _cfg["imap_passwords"]["kati"],
|
|
"drafts_folder": "[Google Mail]/Entw&APw-rfe",
|
|
"from_addr": "akpolke@gmail.com",
|
|
},
|
|
}
|
|
|
|
mcp = FastMCP("Mail", stateless_http=True,
|
|
transport_security={"enable_dns_rebinding_protection": False})
|
|
|
|
|
|
def _decode_hdr(raw):
|
|
if not raw:
|
|
return ""
|
|
parts = decode_header(str(raw))
|
|
decoded = []
|
|
for data, charset in parts:
|
|
if isinstance(data, bytes):
|
|
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(str(data))
|
|
return " ".join(decoded)
|
|
|
|
|
|
def _get_body(msg):
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
return "[HTML] " + payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
return payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
|
|
return ""
|
|
|
|
|
|
def _discover_accounts(user):
|
|
root = MAIL_ROOTS.get(user)
|
|
if not root or not os.path.isdir(root):
|
|
return {}
|
|
return {a: os.path.join(root, a) for a in sorted(os.listdir(root)) if os.path.isdir(os.path.join(root, a))}
|
|
|
|
|
|
def _discover_folders(acct_path):
|
|
folders = []
|
|
for entry in sorted(Path(acct_path).rglob("cur")):
|
|
rel = str(entry.parent.relative_to(acct_path))
|
|
if rel != "." and rel not in folders:
|
|
folders.append(rel)
|
|
if "INBOX" in folders:
|
|
folders.remove("INBOX")
|
|
folders.insert(0, "INBOX")
|
|
return folders
|
|
|
|
|
|
def _open_folder(acct_path, folder_name):
|
|
path = os.path.join(acct_path, folder_name)
|
|
return mailbox.Maildir(path, create=False) if os.path.isdir(path) else None
|
|
|
|
|
|
@mcp.tool()
|
|
def list_accounts() -> str:
|
|
"""List all email accounts with their folder count. Call this first to see available accounts."""
|
|
user = get_current_user()
|
|
if not user:
|
|
return "Error: not authenticated"
|
|
accounts = _discover_accounts(user)
|
|
if not accounts:
|
|
return "No mail accounts found"
|
|
return "\n".join(f"{name}: {len(_discover_folders(path))} folders" for name, path in accounts.items())
|
|
|
|
|
|
@mcp.tool()
|
|
def list_folders(
|
|
account: Annotated[str, Field(description="Account name from list_accounts, e.g. 'd370128_0-slohmaier' or 'gmail'")]
|
|
) -> str:
|
|
"""List all IMAP folders in a mail account (INBOX, Sent, Archive, etc.)."""
|
|
user = get_current_user()
|
|
if not user:
|
|
return "Error: not authenticated"
|
|
acct_path = _discover_accounts(user).get(account)
|
|
if not acct_path:
|
|
return f"Account not found: {account}. Use list_accounts to see available accounts."
|
|
folders = _discover_folders(acct_path)
|
|
return "\n".join(folders) if folders else "No folders"
|
|
|
|
|
|
@mcp.tool()
|
|
def search_mail(
|
|
query: Annotated[str, Field(description="Search term — matches subject, from, to, and body. Example: 'Rechnung', 'Amazon', 'meeting'")],
|
|
account: Annotated[str, Field(description="Filter by account name. Leave empty to search all accounts.")] = "",
|
|
folder: Annotated[str, Field(description="Filter by folder name, e.g. 'INBOX', 'Sent'. Leave empty for all folders.")] = "",
|
|
limit: Annotated[int, Field(description="Maximum number of results to return")] = 20,
|
|
) -> str:
|
|
"""Search emails by keyword. Returns date, sender, recipient, subject, and a location key for reading the full email with read_mail."""
|
|
user = get_current_user()
|
|
if not user:
|
|
return "Error: not authenticated"
|
|
query_lower = query.lower()
|
|
results = []
|
|
for acct_name, acct_path in _discover_accounts(user).items():
|
|
if account and account not in acct_name:
|
|
continue
|
|
for fld in _discover_folders(acct_path):
|
|
if folder and folder.lower() not in fld.lower():
|
|
continue
|
|
md = _open_folder(acct_path, fld)
|
|
if not md:
|
|
continue
|
|
for key, msg in md.items():
|
|
subj = _decode_hdr(msg.get("Subject", ""))
|
|
frm = _decode_hdr(msg.get("From", ""))
|
|
to = _decode_hdr(msg.get("To", ""))
|
|
date_str = msg.get("Date", "")
|
|
if query_lower not in f"{subj} {frm} {to}".lower():
|
|
body = _get_body(msg)
|
|
if query_lower not in body.lower():
|
|
continue
|
|
results.append(f"[{date_str}] {frm} -> {to}\n Subject: {subj}\n Account: {acct_name}, Folder: {fld}, Key: {key}")
|
|
if len(results) >= limit:
|
|
return "\n\n".join(results)
|
|
return "\n\n".join(results) if results else "No results found"
|
|
|
|
|
|
@mcp.tool()
|
|
def read_mail(
|
|
account: Annotated[str, Field(description="Account name from search results, e.g. 'd370128_0-slohmaier'")],
|
|
folder: Annotated[str, Field(description="Folder name from search results, e.g. 'INBOX'")],
|
|
key: Annotated[str, Field(description="Message key from search results")],
|
|
) -> str:
|
|
"""Read the full content of a single email. Use the account, folder, and key values from search_mail results."""
|
|
user = get_current_user()
|
|
if not user:
|
|
return "Error: not authenticated"
|
|
acct_path = _discover_accounts(user).get(account)
|
|
if not acct_path:
|
|
return f"Account not found: {account}"
|
|
md = _open_folder(acct_path, folder)
|
|
if not md:
|
|
return f"Folder not found: {folder}"
|
|
msg = md.get(key)
|
|
if not msg:
|
|
return f"Message not found: {key}"
|
|
return (
|
|
f"From: {_decode_hdr(msg.get('From', ''))}\n"
|
|
f"To: {_decode_hdr(msg.get('To', ''))}\n"
|
|
f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n"
|
|
f"Date: {msg.get('Date', '')}\n"
|
|
f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n"
|
|
f"{_get_body(msg)[:50000]}"
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_draft(
|
|
to: Annotated[str, Field(description="Recipient email address, e.g. 'max@example.com'")],
|
|
subject: Annotated[str, Field(description="Email subject line")],
|
|
body: Annotated[str, Field(description="Email body text (plain text)")],
|
|
cc: Annotated[str, Field(description="CC recipients, comma-separated")] = "",
|
|
account: Annotated[str, Field(description="Which account to create the draft in. Leave empty for default account.")] = "",
|
|
) -> str:
|
|
"""Create an email draft in the Drafts folder. The draft can then be reviewed and sent from Roundcube or phone. Does NOT send the email."""
|
|
user = get_current_user()
|
|
if not user: return "Error: not authenticated"
|
|
acct = IMAP_ACCOUNTS.get(user)
|
|
if not acct: return f"Kein IMAP-Account fuer {user} konfiguriert"
|
|
|
|
msg = MIMEText(body, "plain", "utf-8")
|
|
msg["From"] = acct["from_addr"]
|
|
msg["To"] = to
|
|
if cc:
|
|
msg["Cc"] = cc
|
|
msg["Subject"] = subject
|
|
msg["Date"] = formatdate(localtime=True)
|
|
msg["X-Mailer"] = "MCP Mail Server"
|
|
|
|
try:
|
|
imap = imaplib.IMAP4_SSL(acct["host"])
|
|
imap.login(acct["user"], acct["password"])
|
|
status, _ = imap.append(acct["drafts_folder"], "\\Draft", None, msg.as_bytes())
|
|
imap.logout()
|
|
if status == "OK":
|
|
return f"Entwurf erstellt: An {to}, Betreff \"{subject}\". Oeffne Roundcube oder die Mail-App um ihn zu senden."
|
|
return f"Fehler beim Erstellen des Entwurfs: {status}"
|
|
except Exception as e:
|
|
return f"IMAP-Fehler: {e}"
|
|
|
|
|
|
def create_app():
|
|
from contextlib import asynccontextmanager
|
|
mcp_app = mcp.streamable_http_app()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
async with contextlib.AsyncExitStack() as stack:
|
|
await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app))
|
|
yield
|
|
|
|
routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)]
|
|
app = Starlette(routes=routes, lifespan=lifespan)
|
|
app.add_middleware(BearerAuthMiddleware)
|
|
return app
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(create_app(), host="127.0.0.1", port=5100)
|