91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Sendet genau eine Nachricht ueber die Admin-Message-API.
|
|
|
|
Pflichtparameter:
|
|
- --sender
|
|
- --receiver
|
|
- --body
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Any, Dict
|
|
|
|
|
|
def request_json(
|
|
url: str,
|
|
method: str,
|
|
token: str,
|
|
payload: Dict[str, Any] | None,
|
|
timeout: int,
|
|
retries: int,
|
|
) -> Dict[str, Any]:
|
|
data = None if payload is None else json.dumps(payload).encode("utf-8")
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
if payload is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
last_err = ""
|
|
for attempt in range(1, retries + 1):
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8", errors="replace")
|
|
if not body.strip():
|
|
return {}
|
|
return json.loads(body)
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
last_err = f"HTTP {exc.code}: {body}"
|
|
if 500 <= exc.code < 600 and attempt < retries:
|
|
time.sleep(min(2 * attempt, 5))
|
|
continue
|
|
raise RuntimeError(last_err)
|
|
except Exception as exc: # noqa: BLE001
|
|
last_err = str(exc)
|
|
if attempt < retries:
|
|
time.sleep(min(2 * attempt, 5))
|
|
continue
|
|
raise RuntimeError(last_err)
|
|
raise RuntimeError(last_err)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Send one admin message")
|
|
parser.add_argument("--sender", required=True, help="Username that sends the message")
|
|
parser.add_argument("--receiver", required=True, help="Username that receives the message")
|
|
parser.add_argument("--body", required=True, help="Message body")
|
|
args = parser.parse_args()
|
|
|
|
token = os.environ.get("BOLLWERK_ADMIN_TOKEN", "").strip()
|
|
if not token:
|
|
print("ERROR: Missing env BOLLWERK_ADMIN_TOKEN.", file=sys.stderr)
|
|
return 1
|
|
|
|
url = "https://bollwerk.online/api/admin/send-message"
|
|
payload = {
|
|
"senderUsername": args.sender.strip(),
|
|
"receiverUsername": args.receiver.strip(),
|
|
"body": args.body,
|
|
"sentAt": int(time.time() * 1000),
|
|
}
|
|
|
|
if not payload["senderUsername"] or not payload["receiverUsername"] or not payload["body"]:
|
|
print("ERROR: sender, receiver and body must not be blank.", file=sys.stderr)
|
|
return 1
|
|
|
|
response = request_json(url, "POST", token, payload, timeout=15, retries=3)
|
|
msg_id = response.get("message", {}).get("id", "")
|
|
print(json.dumps({"ok": True, "messageId": msg_id}, ensure_ascii=True))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|