bollwerk/.github/skills/admin-message-e2ee/receive_admin_messages.py

84 lines
2.8 KiB
Python

#!/usr/bin/env python3
"""Empfaengt Inbox-Nachrichten fuer genau einen User.
Pflichtparameter:
- --username
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Dict, List
def request_json(url: str, token: str, timeout: int, retries: int) -> List[Dict[str, Any]]:
headers = {"Authorization": f"Bearer {token}"}
last_err = ""
for attempt in range(1, retries + 1):
req = urllib.request.Request(url, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
if not body.strip():
return []
parsed = json.loads(body)
if isinstance(parsed, list):
return parsed
return []
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
last_err = f"HTTP {exc.code}: {body}"
if 500 <= exc.code < 600 and attempt < retries:
time.sleep(min(2 * attempt, 5))
continue
raise RuntimeError(last_err)
except Exception as exc: # noqa: BLE001
last_err = str(exc)
if attempt < retries:
time.sleep(min(2 * attempt, 5))
continue
raise RuntimeError(last_err)
raise RuntimeError(last_err)
def format_line(msg: Dict[str, Any]) -> str:
sent_at = msg.get("sentAt", "")
sender = msg.get("senderUsername", "?")
receiver = msg.get("receiverId", "?")
msg_id = msg.get("id", "")
body = str(msg.get("body", ""))
short_body = body if len(body) <= 80 else body[:77] + "..."
return f"{sent_at} id={msg_id} sender={sender} receiverId={receiver} body={short_body}"
def main() -> int:
parser = argparse.ArgumentParser(description="Receive inbox messages for one user")
parser.add_argument("--username", required=True, help="Username of receiver")
args = parser.parse_args()
token = os.environ.get("BOLLWERK_ADMIN_TOKEN", "").strip()
if not token:
print("ERROR: Missing env BOLLWERK_ADMIN_TOKEN.", file=sys.stderr)
return 1
params = {"username": args.username.strip()}
if not params["username"]:
print("ERROR: username must not be blank.", file=sys.stderr)
return 1
url = "https://bollwerk.online/api/admin/send-message/inbox?" + urllib.parse.urlencode(params)
messages = request_json(url, token, timeout=15, retries=3)
print(f"RECEIVE_DONE count={len(messages)}")
for msg in messages:
print(format_line(msg))
return 0
if __name__ == "__main__":
raise SystemExit(main())