"""MCP Mail Server — searches and reads IMAP backup emails.""" import os import sys import contextlib import imaplib import mailbox from email.header import decode_header from email.mime.text import MIMEText from email.utils import formatdate from pathlib import Path from typing import Annotated from pydantic import Field from mcp.server.fastmcp import FastMCP from starlette.applications import Starlette from starlette.routing import Mount sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from common import get_current_user, OAUTH_ROUTES, BearerAuthMiddleware from common import load_config as _lc _cfg = _lc() MAIL_ROOTS = _cfg["mail_roots"] IMAP_ACCOUNTS = { "stefan": { "host": "imap.1blu.de", "user": "d370128_0-slohmaier", "password": _cfg["imap_passwords"]["stefan"], "drafts_folder": "Drafts", "from_addr": "stefan@slohmaier.de", }, "kati": { "host": "imap.gmail.com", "user": "akpolke@gmail.com", "password": _cfg["imap_passwords"]["kati"], "drafts_folder": "[Google Mail]/Entw&APw-rfe", "from_addr": "akpolke@gmail.com", }, } mcp = FastMCP("Mail", stateless_http=True, transport_security={"enable_dns_rebinding_protection": False}) def _decode_hdr(raw): if not raw: return "" parts = decode_header(str(raw)) decoded = [] for data, charset in parts: if isinstance(data, bytes): decoded.append(data.decode(charset or "utf-8", errors="replace")) else: decoded.append(str(data)) return " ".join(decoded) def _get_body(msg): if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: return payload.decode(part.get_content_charset() or "utf-8", errors="replace") for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload: return "[HTML] " + payload.decode(part.get_content_charset() or "utf-8", errors="replace") else: payload = msg.get_payload(decode=True) if payload: return payload.decode(msg.get_content_charset() or "utf-8", errors="replace") return "" def _discover_accounts(user): root = MAIL_ROOTS.get(user) if not root or not os.path.isdir(root): return {} return {a: os.path.join(root, a) for a in sorted(os.listdir(root)) if os.path.isdir(os.path.join(root, a))} def _discover_folders(acct_path): folders = [] for entry in sorted(Path(acct_path).rglob("cur")): rel = str(entry.parent.relative_to(acct_path)) if rel == ".": folders.insert(0, "INBOX") elif rel not in folders: folders.append(rel) return folders def _open_folder(acct_path, folder_name): path = acct_path if folder_name == "INBOX" else os.path.join(acct_path, folder_name) return mailbox.Maildir(path, create=False) if os.path.isdir(path) else None @mcp.tool() def list_accounts() -> str: """List all email accounts with their folder count. Call this first to see available accounts.""" user = get_current_user() if not user: return "Error: not authenticated" accounts = _discover_accounts(user) if not accounts: return "No mail accounts found" return "\n".join(f"{name}: {len(_discover_folders(path))} folders" for name, path in accounts.items()) @mcp.tool() def list_folders( account: Annotated[str, Field(description="Account name from list_accounts, e.g. 'd370128_0-slohmaier' or 'gmail'")] ) -> str: """List all IMAP folders in a mail account (INBOX, Sent, Archive, etc.).""" user = get_current_user() if not user: return "Error: not authenticated" acct_path = _discover_accounts(user).get(account) if not acct_path: return f"Account not found: {account}. Use list_accounts to see available accounts." folders = _discover_folders(acct_path) return "\n".join(folders) if folders else "No folders" @mcp.tool() def search_mail( query: Annotated[str, Field(description="Search term — matches subject, from, to, and body. Example: 'Rechnung', 'Amazon', 'meeting'")], account: Annotated[str, Field(description="Filter by account name. Leave empty to search all accounts.")] = "", folder: Annotated[str, Field(description="Filter by folder name, e.g. 'INBOX', 'Sent'. Leave empty for all folders.")] = "", limit: Annotated[int, Field(description="Maximum number of results to return")] = 20, ) -> str: """Search emails by keyword. Returns date, sender, recipient, subject, and a location key for reading the full email with read_mail.""" user = get_current_user() if not user: return "Error: not authenticated" query_lower = query.lower() results = [] for acct_name, acct_path in _discover_accounts(user).items(): if account and account not in acct_name: continue for fld in _discover_folders(acct_path): if folder and folder.lower() not in fld.lower(): continue md = _open_folder(acct_path, fld) if not md: continue for key, msg in md.items(): subj = _decode_hdr(msg.get("Subject", "")) frm = _decode_hdr(msg.get("From", "")) to = _decode_hdr(msg.get("To", "")) date_str = msg.get("Date", "") if query_lower not in f"{subj} {frm} {to}".lower(): body = _get_body(msg) if query_lower not in body.lower(): continue results.append(f"[{date_str}] {frm} -> {to}\n Subject: {subj}\n Account: {acct_name}, Folder: {fld}, Key: {key}") if len(results) >= limit: return "\n\n".join(results) return "\n\n".join(results) if results else "No results found" @mcp.tool() def read_mail( account: Annotated[str, Field(description="Account name from search results, e.g. 'd370128_0-slohmaier'")], folder: Annotated[str, Field(description="Folder name from search results, e.g. 'INBOX'")], key: Annotated[str, Field(description="Message key from search results")], ) -> str: """Read the full content of a single email. Use the account, folder, and key values from search_mail results.""" user = get_current_user() if not user: return "Error: not authenticated" acct_path = _discover_accounts(user).get(account) if not acct_path: return f"Account not found: {account}" md = _open_folder(acct_path, folder) if not md: return f"Folder not found: {folder}" msg = md.get(key) if not msg: return f"Message not found: {key}" return ( f"From: {_decode_hdr(msg.get('From', ''))}\n" f"To: {_decode_hdr(msg.get('To', ''))}\n" f"Cc: {_decode_hdr(msg.get('Cc', ''))}\n" f"Date: {msg.get('Date', '')}\n" f"Subject: {_decode_hdr(msg.get('Subject', ''))}\n\n" f"{_get_body(msg)[:50000]}" ) @mcp.tool() def create_draft( to: Annotated[str, Field(description="Recipient email address, e.g. 'max@example.com'")], subject: Annotated[str, Field(description="Email subject line")], body: Annotated[str, Field(description="Email body text (plain text)")], cc: Annotated[str, Field(description="CC recipients, comma-separated")] = "", account: Annotated[str, Field(description="Which account to create the draft in. Leave empty for default account.")] = "", ) -> str: """Create an email draft in the Drafts folder. The draft can then be reviewed and sent from Roundcube or phone. Does NOT send the email.""" user = get_current_user() if not user: return "Error: not authenticated" acct = IMAP_ACCOUNTS.get(user) if not acct: return f"Kein IMAP-Account fuer {user} konfiguriert" msg = MIMEText(body, "plain", "utf-8") msg["From"] = acct["from_addr"] msg["To"] = to if cc: msg["Cc"] = cc msg["Subject"] = subject msg["Date"] = formatdate(localtime=True) msg["X-Mailer"] = "MCP Mail Server" try: imap = imaplib.IMAP4_SSL(acct["host"]) imap.login(acct["user"], acct["password"]) status, _ = imap.append(acct["drafts_folder"], "\\Draft", None, msg.as_bytes()) imap.logout() if status == "OK": return f"Entwurf erstellt: An {to}, Betreff \"{subject}\". Oeffne Roundcube oder die Mail-App um ihn zu senden." return f"Fehler beim Erstellen des Entwurfs: {status}" except Exception as e: return f"IMAP-Fehler: {e}" def create_app(): from contextlib import asynccontextmanager mcp_app = mcp.streamable_http_app() @asynccontextmanager async def lifespan(app): async with contextlib.AsyncExitStack() as stack: await stack.enter_async_context(mcp_app.router.lifespan_context(mcp_app)) yield routes = list(OAUTH_ROUTES) + [Mount("/", app=mcp_app)] app = Starlette(routes=routes, lifespan=lifespan) app.add_middleware(BearerAuthMiddleware) return app if __name__ == "__main__": import uvicorn uvicorn.run(create_app(), host="127.0.0.1", port=5100)