""" Notification Queue Manager with priority support """ import logging import json import asyncio from typing import Optional, Dict, Any, List from datetime import datetime import redis.asyncio as redis from models import NotificationPriority logger = logging.getLogger(__name__) class NotificationQueueManager: """Manages notification queues with priority levels""" def __init__(self, redis_url: str = "redis://redis:6379"): self.redis_url = redis_url self.redis_client = None self.is_connected = False # Queue names by priority self.queue_names = { NotificationPriority.URGENT: "notifications:queue:urgent", NotificationPriority.HIGH: "notifications:queue:high", NotificationPriority.NORMAL: "notifications:queue:normal", NotificationPriority.LOW: "notifications:queue:low" } # Scheduled notifications sorted set self.scheduled_key = "notifications:scheduled" # Failed notifications queue (DLQ) self.dlq_key = "notifications:dlq" async def connect(self): """Connect to Redis""" try: self.redis_client = await redis.from_url(self.redis_url) await self.redis_client.ping() self.is_connected = True logger.info("Connected to Redis for notification queue") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") self.is_connected = False raise async def close(self): """Close Redis connection""" if self.redis_client: await self.redis_client.close() self.is_connected = False logger.info("Disconnected from Redis") async def enqueue_notification(self, notification: Any, priority: Optional[NotificationPriority] = None): """Add notification to queue based on priority""" if not self.is_connected: logger.error("Redis not connected") return False try: # Use notification's priority or provided priority if priority is None: priority = notification.priority if hasattr(notification, 'priority') else NotificationPriority.NORMAL queue_name = self.queue_names.get(priority, self.queue_names[NotificationPriority.NORMAL]) # Serialize notification notification_data = notification.dict() if hasattr(notification, 'dict') else notification notification_json = json.dumps(notification_data, default=str) # Add to appropriate queue await self.redis_client.lpush(queue_name, notification_json) logger.info(f"Enqueued notification to {queue_name}") return True except Exception as e: logger.error(f"Failed to enqueue notification: {e}") return False async def dequeue_notification(self, timeout: int = 1) -> Optional[Dict[str, Any]]: """Dequeue notification with priority order""" if not self.is_connected: return None try: # Check queues in priority order for priority in [NotificationPriority.URGENT, NotificationPriority.HIGH, NotificationPriority.NORMAL, NotificationPriority.LOW]: queue_name = self.queue_names[priority] # Try to get from this queue result = await self.redis_client.brpop(queue_name, timeout=timeout) if result: _, notification_json = result notification_data = json.loads(notification_json) logger.debug(f"Dequeued notification from {queue_name}") return notification_data return None except Exception as e: logger.error(f"Failed to dequeue notification: {e}") return None async def schedule_notification(self, notification: Any, scheduled_time: datetime): """Schedule a notification for future delivery""" if not self.is_connected: return False try: # Serialize notification notification_data = notification.dict() if hasattr(notification, 'dict') else notification notification_json = json.dumps(notification_data, default=str) # Add to scheduled set with timestamp as score timestamp = scheduled_time.timestamp() await self.redis_client.zadd(self.scheduled_key, {notification_json: timestamp}) logger.info(f"Scheduled notification for {scheduled_time}") return True except Exception as e: logger.error(f"Failed to schedule notification: {e}") return False async def get_due_notifications(self) -> List[Dict[str, Any]]: """Get notifications that are due for delivery""" if not self.is_connected: return [] try: # Get current timestamp now = datetime.now().timestamp() # Get all notifications with score <= now results = await self.redis_client.zrangebyscore( self.scheduled_key, min=0, max=now, withscores=False ) notifications = [] for notification_json in results: notification_data = json.loads(notification_json) notifications.append(notification_data) # Remove from scheduled set await self.redis_client.zrem(self.scheduled_key, notification_json) if notifications: logger.info(f"Retrieved {len(notifications)} due notifications") return notifications except Exception as e: logger.error(f"Failed to get due notifications: {e}") return [] async def add_to_dlq(self, notification: Any, error_message: str): """Add failed notification to Dead Letter Queue""" if not self.is_connected: return False try: # Add error information notification_data = notification.dict() if hasattr(notification, 'dict') else notification notification_data['dlq_error'] = error_message notification_data['dlq_timestamp'] = datetime.now().isoformat() notification_json = json.dumps(notification_data, default=str) # Add to DLQ await self.redis_client.lpush(self.dlq_key, notification_json) logger.info(f"Added notification to DLQ: {error_message}") return True except Exception as e: logger.error(f"Failed to add to DLQ: {e}") return False async def get_dlq_notifications(self, limit: int = 10) -> List[Dict[str, Any]]: """Get notifications from Dead Letter Queue""" if not self.is_connected: return [] try: # Get from DLQ results = await self.redis_client.lrange(self.dlq_key, 0, limit - 1) notifications = [] for notification_json in results: notification_data = json.loads(notification_json) notifications.append(notification_data) return notifications except Exception as e: logger.error(f"Failed to get DLQ notifications: {e}") return [] async def retry_dlq_notification(self, index: int) -> bool: """Retry a notification from DLQ""" if not self.is_connected: return False try: # Get notification at index notification_json = await self.redis_client.lindex(self.dlq_key, index) if not notification_json: return False # Parse and remove DLQ info notification_data = json.loads(notification_json) notification_data.pop('dlq_error', None) notification_data.pop('dlq_timestamp', None) # Re-enqueue priority = NotificationPriority(notification_data.get('priority', 'normal')) queue_name = self.queue_names[priority] new_json = json.dumps(notification_data, default=str) await self.redis_client.lpush(queue_name, new_json) # Remove from DLQ await self.redis_client.lrem(self.dlq_key, 1, notification_json) logger.info(f"Retried DLQ notification at index {index}") return True except Exception as e: logger.error(f"Failed to retry DLQ notification: {e}") return False async def get_queue_status(self) -> Dict[str, Any]: """Get current queue status""" if not self.is_connected: return {"status": "disconnected"} try: status = { "status": "connected", "queues": {}, "scheduled": 0, "dlq": 0 } # Get queue lengths for priority, queue_name in self.queue_names.items(): length = await self.redis_client.llen(queue_name) status["queues"][priority.value] = length # Get scheduled count status["scheduled"] = await self.redis_client.zcard(self.scheduled_key) # Get DLQ count status["dlq"] = await self.redis_client.llen(self.dlq_key) return status except Exception as e: logger.error(f"Failed to get queue status: {e}") return {"status": "error", "error": str(e)} async def clear_queue(self, priority: NotificationPriority) -> bool: """Clear a specific priority queue""" if not self.is_connected: return False try: queue_name = self.queue_names[priority] await self.redis_client.delete(queue_name) logger.info(f"Cleared queue: {queue_name}") return True except Exception as e: logger.error(f"Failed to clear queue: {e}") return False async def clear_all_queues(self) -> bool: """Clear all notification queues""" if not self.is_connected: return False try: # Clear all priority queues for queue_name in self.queue_names.values(): await self.redis_client.delete(queue_name) # Clear scheduled and DLQ await self.redis_client.delete(self.scheduled_key) await self.redis_client.delete(self.dlq_key) logger.info("Cleared all notification queues") return True except Exception as e: logger.error(f"Failed to clear all queues: {e}") return False