ironnotify-python/src/ironnotify/queue.py

131 lines
4.3 KiB
Python

"""Offline queue for IronNotify SDK."""
import json
from pathlib import Path
from typing import Any
from .types import NotificationPayload, NotificationAction
class OfflineQueue:
"""Offline queue for storing notifications when offline."""
def __init__(self, max_size: int, debug: bool):
self.max_size = max_size
self.debug = debug
self._queue: list[NotificationPayload] = []
self._storage_path = Path.home() / ".ironnotify" / "offline_queue.json"
self._load_from_storage()
def add(self, payload: NotificationPayload) -> None:
"""Add a notification to the queue."""
if len(self._queue) >= self.max_size:
self._queue.pop(0)
if self.debug:
print("[IronNotify] Offline queue full, dropping oldest notification")
self._queue.append(payload)
self._save_to_storage()
if self.debug:
print(f"[IronNotify] Notification queued for later: {payload.event_type}")
def get_all(self) -> list[NotificationPayload]:
"""Get all queued notifications."""
return list(self._queue)
def remove(self, index: int) -> None:
"""Remove a notification from the queue."""
if 0 <= index < len(self._queue):
self._queue.pop(index)
self._save_to_storage()
def clear(self) -> None:
"""Clear the queue."""
self._queue = []
self._save_to_storage()
@property
def size(self) -> int:
"""Get queue size."""
return len(self._queue)
@property
def is_empty(self) -> bool:
"""Check if queue is empty."""
return len(self._queue) == 0
def _serialize_payload(self, payload: NotificationPayload) -> dict[str, Any]:
data: dict[str, Any] = {
"event_type": payload.event_type,
"title": payload.title,
"severity": payload.severity,
}
if payload.message:
data["message"] = payload.message
if payload.metadata:
data["metadata"] = payload.metadata
if payload.actions:
data["actions"] = [
{"label": a.label, "url": a.url, "action": a.action, "style": a.style}
for a in payload.actions
]
if payload.user_id:
data["user_id"] = payload.user_id
if payload.group_key:
data["group_key"] = payload.group_key
if payload.deduplication_key:
data["deduplication_key"] = payload.deduplication_key
if payload.expires_at:
data["expires_at"] = payload.expires_at.isoformat()
return data
def _deserialize_payload(self, data: dict[str, Any]) -> NotificationPayload:
from datetime import datetime
actions = None
if data.get("actions"):
actions = [
NotificationAction(
label=a["label"],
url=a.get("url"),
action=a.get("action"),
style=a.get("style", "default"),
)
for a in data["actions"]
]
return NotificationPayload(
event_type=data["event_type"],
title=data["title"],
message=data.get("message"),
severity=data.get("severity", "info"),
metadata=data.get("metadata"),
actions=actions,
user_id=data.get("user_id"),
group_key=data.get("group_key"),
deduplication_key=data.get("deduplication_key"),
expires_at=datetime.fromisoformat(data["expires_at"])
if data.get("expires_at")
else None,
)
def _load_from_storage(self) -> None:
"""Load queue from storage."""
try:
if self._storage_path.exists():
with open(self._storage_path, "r") as f:
data = json.load(f)
self._queue = [self._deserialize_payload(p) for p in data]
except Exception:
pass
def _save_to_storage(self) -> None:
"""Save queue to storage."""
try:
self._storage_path.parent.mkdir(parents=True, exist_ok=True)
with open(self._storage_path, "w") as f:
json.dump([self._serialize_payload(p) for p in self._queue], f)
except Exception:
pass