Switch to per-service subdomains, shared token store

URLs: mail.mcp.home.slohmaier.de, calendar.mcp..., etc.
No more path-prefix routing — each service has its own domain.
OAuth tokens stored in shared .active_tokens.json file so all
services can validate tokens issued by any service.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Stefan Lohmaier
2026-06-12 09:13:54 +02:00
parent 7f0b03606a
commit 30351f1bcf
2 changed files with 26 additions and 6 deletions
+1
View File
@@ -3,3 +3,4 @@ __pycache__/
*.pyc
tokens.json
config.json
.active_tokens.json
+25 -6
View File
@@ -36,10 +36,23 @@ def load_config():
_tokens_cache = None
_current_user: contextvars.ContextVar[str | None] = contextvars.ContextVar("current_user", default=None)
_access_tokens: dict[str, dict] = {}
_TOKEN_STORE = os.path.join(BASE_DIR, ".active_tokens.json")
_auth_codes: dict[str, dict] = {}
def _load_access_tokens():
try:
with open(_TOKEN_STORE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_access_tokens(tokens):
with open(_TOKEN_STORE, "w") as f:
json.dump(tokens, f)
def _load_tokens():
global _tokens_cache
if _tokens_cache is None:
@@ -65,11 +78,13 @@ def _resolve_client(client_id, client_secret):
def _resolve_access_token(token):
info = _access_tokens.get(token)
tokens = _load_access_tokens()
info = tokens.get(token)
if not info:
return None
if info.get("expires_at", 0) < time.time():
del _access_tokens[token]
del tokens[token]
_save_access_tokens(tokens)
return None
return info["user"]
@@ -87,8 +102,7 @@ def _verify_pkce(code_verifier, code_challenge, method):
async def oauth_metadata(request: Request):
proto = request.headers.get("x-forwarded-proto", request.url.scheme)
host = request.headers.get("host", "")
prefix = request.headers.get("x-forwarded-prefix", "")
base = f"{proto}://{host}{prefix}"
base = f"{proto}://{host}"
return JSONResponse({
"issuer": base,
"authorization_endpoint": base + "/authorize",
@@ -171,7 +185,12 @@ async def oauth_token(request: Request):
access_token = secrets.token_urlsafe(48)
expires_in = 86400
_access_tokens[access_token] = {"user": user, "expires_at": time.time() + expires_in}
tokens = _load_access_tokens()
# Cleanup expired
now = time.time()
tokens = {k: v for k, v in tokens.items() if v.get("expires_at", 0) > now}
tokens[access_token] = {"user": user, "expires_at": now + expires_in}
_save_access_tokens(tokens)
return JSONResponse({
"access_token": access_token,