#!/usr/bin/env python3 """Sendet genau eine Nachricht ueber die Admin-Message-API. Pflichtparameter: - --sender - --receiver - --body """ from __future__ import annotations import argparse import json import os import sys import time import urllib.error import urllib.request from typing import Any, Dict def request_json( url: str, method: str, token: str, payload: Dict[str, Any] | None, timeout: int, retries: int, ) -> Dict[str, Any]: data = None if payload is None else json.dumps(payload).encode("utf-8") headers = {"Authorization": f"Bearer {token}"} if payload is not None: headers["Content-Type"] = "application/json" last_err = "" for attempt in range(1, retries + 1): req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8", errors="replace") if not body.strip(): return {} return json.loads(body) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") last_err = f"HTTP {exc.code}: {body}" if 500 <= exc.code < 600 and attempt < retries: time.sleep(min(2 * attempt, 5)) continue raise RuntimeError(last_err) except Exception as exc: # noqa: BLE001 last_err = str(exc) if attempt < retries: time.sleep(min(2 * attempt, 5)) continue raise RuntimeError(last_err) raise RuntimeError(last_err) def main() -> int: parser = argparse.ArgumentParser(description="Send one admin message") parser.add_argument("--sender", required=True, help="Username that sends the message") parser.add_argument("--receiver", required=True, help="Username that receives the message") parser.add_argument("--body", required=True, help="Message body") args = parser.parse_args() token = os.environ.get("BOLLWERK_ADMIN_TOKEN", "").strip() if not token: print("ERROR: Missing env BOLLWERK_ADMIN_TOKEN.", file=sys.stderr) return 1 url = "https://bollwerk.online/api/admin/send-message" payload = { "senderUsername": args.sender.strip(), "receiverUsername": args.receiver.strip(), "body": args.body, "sentAt": int(time.time() * 1000), } if not payload["senderUsername"] or not payload["receiverUsername"] or not payload["body"]: print("ERROR: sender, receiver and body must not be blank.", file=sys.stderr) return 1 response = request_json(url, "POST", token, payload, timeout=15, retries=3) msg_id = response.get("message", {}).get("id", "") print(json.dumps({"ok": True, "messageId": msg_id}, ensure_ascii=True)) return 0 if __name__ == "__main__": raise SystemExit(main())