""" Notification Manager - Core notification orchestration """ import asyncio import logging from datetime import datetime from typing import List, Optional, Dict, Any import uuid from models import ( Notification, NotificationChannel, NotificationStatus, NotificationPriority, NotificationHistory, NotificationPreference ) logger = logging.getLogger(__name__) class NotificationManager: """Manages notification creation, delivery, and tracking""" def __init__( self, channel_handlers: Dict[NotificationChannel, Any], queue_manager: Any, template_engine: Any, preference_manager: Any ): self.channel_handlers = channel_handlers self.queue_manager = queue_manager self.template_engine = template_engine self.preference_manager = preference_manager self.is_running = False self.notification_store = {} # In-memory store for demo self.history_store = [] # In-memory history for demo self.device_tokens = {} # In-memory device tokens for demo async def start(self): """Start notification manager""" self.is_running = True # Start background tasks for processing queued notifications asyncio.create_task(self._process_notification_queue()) asyncio.create_task(self._process_scheduled_notifications()) logger.info("Notification manager started") async def stop(self): """Stop notification manager""" self.is_running = False logger.info("Notification manager stopped") async def create_notification( self, user_id: str, title: str, message: str, channels: List[NotificationChannel], priority: NotificationPriority = NotificationPriority.NORMAL, data: Optional[Dict[str, Any]] = None, template_id: Optional[str] = None, schedule_at: Optional[datetime] = None ) -> Notification: """Create a new notification""" # Check user preferences preferences = await self.preference_manager.get_user_preferences(user_id) if preferences: # Filter channels based on user preferences channels = [ch for ch in channels if preferences.channels.get(ch, True)] # Apply template if provided if template_id: template = await self.template_engine.get_template(template_id) if template: message = await self.template_engine.render_template(template, data or {}) # Create notification objects for each channel notification = Notification( id=str(uuid.uuid4()), user_id=user_id, title=title, message=message, channel=channels[0] if channels else NotificationChannel.IN_APP, priority=priority, data=data, template_id=template_id, scheduled_at=schedule_at, created_at=datetime.now() ) # Store notification self.notification_store[notification.id] = notification logger.info(f"Created notification {notification.id} for user {user_id}") return notification async def send_notification(self, notification: Notification): """Send a single notification""" try: # Check if notification should be sent now if notification.scheduled_at and notification.scheduled_at > datetime.now(): await self.queue_manager.schedule_notification(notification, notification.scheduled_at) return # Get the appropriate handler handler = self.channel_handlers.get(notification.channel) if not handler: raise ValueError(f"No handler for channel {notification.channel}") # Send through the channel success = await handler.send(notification) if success: notification.status = NotificationStatus.SENT notification.sent_at = datetime.now() logger.info(f"Notification {notification.id} sent successfully") else: notification.status = NotificationStatus.FAILED notification.retry_count += 1 logger.error(f"Failed to send notification {notification.id}") # Retry if needed if notification.retry_count < self._get_max_retries(notification.priority): await self.queue_manager.enqueue_notification(notification) # Update notification self.notification_store[notification.id] = notification # Add to history await self._add_to_history(notification) except Exception as e: logger.error(f"Error sending notification {notification.id}: {e}") notification.status = NotificationStatus.FAILED notification.error_message = str(e) self.notification_store[notification.id] = notification async def send_bulk_notifications(self, notifications: List[Notification]): """Send multiple notifications""" tasks = [] for notification in notifications: tasks.append(self.send_notification(notification)) await asyncio.gather(*tasks, return_exceptions=True) async def mark_as_read(self, notification_id: str) -> bool: """Mark notification as read""" notification = self.notification_store.get(notification_id) if notification: notification.status = NotificationStatus.READ notification.read_at = datetime.now() self.notification_store[notification_id] = notification logger.info(f"Notification {notification_id} marked as read") return True return False async def delete_notification(self, notification_id: str) -> bool: """Delete a notification""" if notification_id in self.notification_store: del self.notification_store[notification_id] logger.info(f"Notification {notification_id} deleted") return True return False async def get_notification(self, notification_id: str) -> Optional[Notification]: """Get a notification by ID""" return self.notification_store.get(notification_id) async def get_user_notifications( self, user_id: str, status: Optional[NotificationStatus] = None, channel: Optional[NotificationChannel] = None, limit: int = 50, offset: int = 0 ) -> List[Notification]: """Get notifications for a user""" notifications = [] for notification in self.notification_store.values(): if notification.user_id != user_id: continue if status and notification.status != status: continue if channel and notification.channel != channel: continue notifications.append(notification) # Sort by created_at descending notifications.sort(key=lambda x: x.created_at, reverse=True) # Apply pagination return notifications[offset:offset + limit] async def retry_notification(self, notification: Notification): """Retry a failed notification""" notification.retry_count += 1 notification.status = NotificationStatus.PENDING notification.error_message = None await self.send_notification(notification) async def get_notification_history( self, user_id: Optional[str] = None, channel: Optional[NotificationChannel] = None, status: Optional[NotificationStatus] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 100 ) -> List[NotificationHistory]: """Get notification history""" history = [] for entry in self.history_store: if user_id and entry.user_id != user_id: continue if channel and entry.channel != channel: continue if status and entry.status != status: continue if start_date and entry.sent_at and entry.sent_at < start_date: continue if end_date and entry.sent_at and entry.sent_at > end_date: continue history.append(entry) # Sort by sent_at descending and limit history.sort(key=lambda x: x.sent_at or datetime.min, reverse=True) return history[:limit] async def get_analytics(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]: """Get notification analytics""" total_sent = 0 total_delivered = 0 total_read = 0 total_failed = 0 channel_stats = {} for notification in self.notification_store.values(): if notification.created_at < start_date or notification.created_at > end_date: continue if notification.status == NotificationStatus.SENT: total_sent += 1 elif notification.status == NotificationStatus.DELIVERED: total_delivered += 1 elif notification.status == NotificationStatus.READ: total_read += 1 elif notification.status == NotificationStatus.FAILED: total_failed += 1 # Channel stats channel_name = notification.channel.value if channel_name not in channel_stats: channel_stats[channel_name] = { "sent": 0, "delivered": 0, "read": 0, "failed": 0 } if notification.status == NotificationStatus.SENT: channel_stats[channel_name]["sent"] += 1 elif notification.status == NotificationStatus.DELIVERED: channel_stats[channel_name]["delivered"] += 1 elif notification.status == NotificationStatus.READ: channel_stats[channel_name]["read"] += 1 elif notification.status == NotificationStatus.FAILED: channel_stats[channel_name]["failed"] += 1 total = total_sent + total_delivered + total_read + total_failed return { "period": f"{start_date.isoformat()} to {end_date.isoformat()}", "total_notifications": total, "total_sent": total_sent, "total_delivered": total_delivered, "total_read": total_read, "total_failed": total_failed, "delivery_rate": (total_delivered / total * 100) if total > 0 else 0, "read_rate": (total_read / total * 100) if total > 0 else 0, "channel_stats": channel_stats } async def register_device_token( self, user_id: str, device_token: str, device_type: str ) -> bool: """Register a device token for push notifications""" if user_id not in self.device_tokens: self.device_tokens[user_id] = [] # Check if token already exists for token in self.device_tokens[user_id]: if token["token"] == device_token: # Update existing token token["device_type"] = device_type token["updated_at"] = datetime.now() return True # Add new token self.device_tokens[user_id].append({ "token": device_token, "device_type": device_type, "created_at": datetime.now(), "updated_at": datetime.now() }) logger.info(f"Registered device token for user {user_id}") return True async def unregister_device_token(self, device_token: str) -> bool: """Unregister a device token""" for user_id, tokens in self.device_tokens.items(): for i, token in enumerate(tokens): if token["token"] == device_token: del self.device_tokens[user_id][i] logger.info(f"Unregistered device token for user {user_id}") return True return False def _get_max_retries(self, priority: NotificationPriority) -> int: """Get max retries based on priority""" retry_map = { NotificationPriority.LOW: 1, NotificationPriority.NORMAL: 3, NotificationPriority.HIGH: 5, NotificationPriority.URGENT: 10 } return retry_map.get(priority, 3) async def _add_to_history(self, notification: Notification): """Add notification to history""" history_entry = NotificationHistory( notification_id=notification.id, user_id=notification.user_id, channel=notification.channel, status=notification.status, title=notification.title, message=notification.message, sent_at=notification.sent_at, delivered_at=notification.delivered_at, read_at=notification.read_at, error_message=notification.error_message, metadata={"priority": notification.priority.value} ) self.history_store.append(history_entry) async def _process_notification_queue(self): """Process queued notifications""" while self.is_running: try: # Get notification from queue notification_data = await self.queue_manager.dequeue_notification() if notification_data: notification = Notification(**notification_data) await self.send_notification(notification) except Exception as e: logger.error(f"Error processing notification queue: {e}") await asyncio.sleep(1) async def _process_scheduled_notifications(self): """Process scheduled notifications""" while self.is_running: try: # Check for scheduled notifications now = datetime.now() for notification in self.notification_store.values(): if (notification.scheduled_at and notification.scheduled_at <= now and notification.status == NotificationStatus.PENDING): await self.send_notification(notification) except Exception as e: logger.error(f"Error processing scheduled notifications: {e}") await asyncio.sleep(10) # Check every 10 seconds