84 lines
2.8 KiB
Python
84 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Empfaengt Inbox-Nachrichten fuer genau einen User.
|
|
|
|
Pflichtparameter:
|
|
- --username
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
def request_json(url: str, token: str, timeout: int, retries: int) -> List[Dict[str, Any]]:
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
last_err = ""
|
|
for attempt in range(1, retries + 1):
|
|
req = urllib.request.Request(url, headers=headers, method="GET")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8", errors="replace")
|
|
if not body.strip():
|
|
return []
|
|
parsed = json.loads(body)
|
|
if isinstance(parsed, list):
|
|
return parsed
|
|
return []
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
last_err = f"HTTP {exc.code}: {body}"
|
|
if 500 <= exc.code < 600 and attempt < retries:
|
|
time.sleep(min(2 * attempt, 5))
|
|
continue
|
|
raise RuntimeError(last_err)
|
|
except Exception as exc: # noqa: BLE001
|
|
last_err = str(exc)
|
|
if attempt < retries:
|
|
time.sleep(min(2 * attempt, 5))
|
|
continue
|
|
raise RuntimeError(last_err)
|
|
raise RuntimeError(last_err)
|
|
|
|
|
|
def format_line(msg: Dict[str, Any]) -> str:
|
|
sent_at = msg.get("sentAt", "")
|
|
sender = msg.get("senderUsername", "?")
|
|
receiver = msg.get("receiverId", "?")
|
|
msg_id = msg.get("id", "")
|
|
body = str(msg.get("body", ""))
|
|
short_body = body if len(body) <= 80 else body[:77] + "..."
|
|
return f"{sent_at} id={msg_id} sender={sender} receiverId={receiver} body={short_body}"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Receive inbox messages for one user")
|
|
parser.add_argument("--username", required=True, help="Username of receiver")
|
|
args = parser.parse_args()
|
|
|
|
token = os.environ.get("BOLLWERK_ADMIN_TOKEN", "").strip()
|
|
if not token:
|
|
print("ERROR: Missing env BOLLWERK_ADMIN_TOKEN.", file=sys.stderr)
|
|
return 1
|
|
|
|
params = {"username": args.username.strip()}
|
|
if not params["username"]:
|
|
print("ERROR: username must not be blank.", file=sys.stderr)
|
|
return 1
|
|
|
|
url = "https://bollwerk.online/api/admin/send-message/inbox?" + urllib.parse.urlencode(params)
|
|
messages = request_json(url, token, timeout=15, retries=3)
|
|
print(f"RECEIVE_DONE count={len(messages)}")
|
|
for msg in messages:
|
|
print(format_line(msg))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|