""" Notification Service - Real-time Multi-channel Notifications """ from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn from datetime import datetime, timedelta from typing import Optional, List, Dict, Any import asyncio import os from contextlib import asynccontextmanager import logging # Import custom modules from models import ( Notification, NotificationChannel, NotificationTemplate, NotificationPreference, NotificationHistory, NotificationStatus, NotificationPriority, CreateNotificationRequest, BulkNotificationRequest ) from notification_manager import NotificationManager from channel_handlers import EmailHandler, SMSHandler, PushHandler, InAppHandler from websocket_server import WebSocketNotificationServer from queue_manager import NotificationQueueManager from template_engine import TemplateEngine from preference_manager import PreferenceManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global instances notification_manager = None ws_server = None queue_manager = None template_engine = None preference_manager = None @asynccontextmanager async def lifespan(app: FastAPI): # Startup global notification_manager, ws_server, queue_manager, template_engine, preference_manager try: # Initialize Template Engine template_engine = TemplateEngine() await template_engine.load_templates() logger.info("Template engine initialized") # Initialize Preference Manager preference_manager = PreferenceManager( mongodb_url=os.getenv("MONGODB_URL", "mongodb://mongodb:27017"), database_name="notifications" ) await preference_manager.connect() logger.info("Preference manager connected") # Initialize Notification Queue Manager queue_manager = NotificationQueueManager( redis_url=os.getenv("REDIS_URL", "redis://redis:6379") ) await queue_manager.connect() logger.info("Queue manager connected") # Initialize Channel Handlers email_handler = EmailHandler( smtp_host=os.getenv("SMTP_HOST", "smtp.gmail.com"), smtp_port=int(os.getenv("SMTP_PORT", 587)), smtp_user=os.getenv("SMTP_USER", ""), smtp_password=os.getenv("SMTP_PASSWORD", "") ) sms_handler = SMSHandler( api_key=os.getenv("SMS_API_KEY", ""), api_url=os.getenv("SMS_API_URL", "") ) push_handler = PushHandler( fcm_server_key=os.getenv("FCM_SERVER_KEY", "") ) in_app_handler = InAppHandler() # Initialize Notification Manager notification_manager = NotificationManager( channel_handlers={ NotificationChannel.EMAIL: email_handler, NotificationChannel.SMS: sms_handler, NotificationChannel.PUSH: push_handler, NotificationChannel.IN_APP: in_app_handler }, queue_manager=queue_manager, template_engine=template_engine, preference_manager=preference_manager ) await notification_manager.start() logger.info("Notification manager started") # Initialize WebSocket Server ws_server = WebSocketNotificationServer() logger.info("WebSocket server initialized") # Register in-app handler with WebSocket server in_app_handler.set_ws_server(ws_server) except Exception as e: logger.error(f"Failed to start Notification service: {e}") raise yield # Shutdown if notification_manager: await notification_manager.stop() if queue_manager: await queue_manager.close() if preference_manager: await preference_manager.close() logger.info("Notification service shutdown complete") app = FastAPI( title="Notification Service", description="Real-time Multi-channel Notification Service", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return { "service": "Notification Service", "status": "running", "timestamp": datetime.now().isoformat() } @app.get("/health") async def health_check(): return { "status": "healthy", "service": "notifications", "components": { "queue_manager": "connected" if queue_manager and queue_manager.is_connected else "disconnected", "preference_manager": "connected" if preference_manager and preference_manager.is_connected else "disconnected", "notification_manager": "running" if notification_manager and notification_manager.is_running else "stopped", "websocket_connections": len(ws_server.active_connections) if ws_server else 0 }, "timestamp": datetime.now().isoformat() } # Notification Endpoints @app.post("/api/notifications/send") async def send_notification( request: CreateNotificationRequest, background_tasks: BackgroundTasks ): """Send a single notification""" try: notification = await notification_manager.create_notification( user_id=request.user_id, title=request.title, message=request.message, channels=request.channels, priority=request.priority, data=request.data, template_id=request.template_id, schedule_at=request.schedule_at ) if request.schedule_at and request.schedule_at > datetime.now(): # Schedule for later await queue_manager.schedule_notification(notification, request.schedule_at) return { "notification_id": notification.id, "status": "scheduled", "scheduled_at": request.schedule_at.isoformat() } else: # Send immediately background_tasks.add_task( notification_manager.send_notification, notification ) return { "notification_id": notification.id, "status": "queued" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/notifications/send-bulk") async def send_bulk_notifications( request: BulkNotificationRequest, background_tasks: BackgroundTasks ): """Send notifications to multiple users""" try: notifications = [] for user_id in request.user_ids: notification = await notification_manager.create_notification( user_id=user_id, title=request.title, message=request.message, channels=request.channels, priority=request.priority, data=request.data, template_id=request.template_id ) notifications.append(notification) # Queue all notifications background_tasks.add_task( notification_manager.send_bulk_notifications, notifications ) return { "count": len(notifications), "notification_ids": [n.id for n in notifications], "status": "queued" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/notifications/user/{user_id}") async def get_user_notifications( user_id: str, status: Optional[NotificationStatus] = None, channel: Optional[NotificationChannel] = None, limit: int = Query(50, le=200), offset: int = Query(0, ge=0) ): """Get notifications for a specific user""" try: notifications = await notification_manager.get_user_notifications( user_id=user_id, status=status, channel=channel, limit=limit, offset=offset ) return { "notifications": notifications, "count": len(notifications), "limit": limit, "offset": offset } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.patch("/api/notifications/{notification_id}/read") async def mark_notification_read(notification_id: str): """Mark a notification as read""" try: success = await notification_manager.mark_as_read(notification_id) if success: return {"status": "marked_as_read", "notification_id": notification_id} else: raise HTTPException(status_code=404, detail="Notification not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/notifications/{notification_id}") async def delete_notification(notification_id: str): """Delete a notification""" try: success = await notification_manager.delete_notification(notification_id) if success: return {"status": "deleted", "notification_id": notification_id} else: raise HTTPException(status_code=404, detail="Notification not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Template Endpoints @app.get("/api/templates") async def get_templates(): """Get all notification templates""" templates = await template_engine.get_all_templates() return {"templates": templates} @app.post("/api/templates") async def create_template(template: NotificationTemplate): """Create a new notification template""" try: template_id = await template_engine.create_template(template) return {"template_id": template_id, "status": "created"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.put("/api/templates/{template_id}") async def update_template(template_id: str, template: NotificationTemplate): """Update an existing template""" try: success = await template_engine.update_template(template_id, template) if success: return {"status": "updated", "template_id": template_id} else: raise HTTPException(status_code=404, detail="Template not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Preference Endpoints @app.get("/api/preferences/{user_id}") async def get_user_preferences(user_id: str): """Get notification preferences for a user""" try: preferences = await preference_manager.get_user_preferences(user_id) return {"user_id": user_id, "preferences": preferences} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.put("/api/preferences/{user_id}") async def update_user_preferences( user_id: str, preferences: NotificationPreference ): """Update notification preferences for a user""" try: success = await preference_manager.update_user_preferences(user_id, preferences) if success: return {"status": "updated", "user_id": user_id} else: return {"status": "created", "user_id": user_id} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preferences/{user_id}/unsubscribe/{category}") async def unsubscribe_from_category(user_id: str, category: str): """Unsubscribe user from a notification category""" try: success = await preference_manager.unsubscribe_category(user_id, category) if success: return {"status": "unsubscribed", "user_id": user_id, "category": category} else: raise HTTPException(status_code=404, detail="User preferences not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # History and Analytics Endpoints @app.get("/api/history") async def get_notification_history( user_id: Optional[str] = None, channel: Optional[NotificationChannel] = None, status: Optional[NotificationStatus] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = Query(100, le=1000) ): """Get notification history with filters""" try: history = await notification_manager.get_notification_history( user_id=user_id, channel=channel, status=status, start_date=start_date, end_date=end_date, limit=limit ) return { "history": history, "count": len(history), "filters": { "user_id": user_id, "channel": channel, "status": status, "date_range": f"{start_date} to {end_date}" if start_date and end_date else None } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics") async def get_notification_analytics( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ): """Get notification analytics""" try: if not start_date: start_date = datetime.now() - timedelta(days=7) if not end_date: end_date = datetime.now() analytics = await notification_manager.get_analytics(start_date, end_date) return analytics except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Queue Management Endpoints @app.get("/api/queue/status") async def get_queue_status(): """Get current queue status""" try: status = await queue_manager.get_queue_status() return status except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/queue/retry/{notification_id}") async def retry_failed_notification( notification_id: str, background_tasks: BackgroundTasks ): """Retry a failed notification""" try: notification = await notification_manager.get_notification(notification_id) if not notification: raise HTTPException(status_code=404, detail="Notification not found") if notification.status != NotificationStatus.FAILED: raise HTTPException(status_code=400, detail="Only failed notifications can be retried") background_tasks.add_task( notification_manager.retry_notification, notification ) return {"status": "retry_queued", "notification_id": notification_id} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # WebSocket Endpoint from fastapi import WebSocket, WebSocketDisconnect @app.websocket("/ws/notifications/{user_id}") async def websocket_notifications(websocket: WebSocket, user_id: str): """WebSocket endpoint for real-time notifications""" await ws_server.connect(websocket, user_id) try: while True: # Keep connection alive and handle incoming messages data = await websocket.receive_text() # Handle different message types if data == "ping": await websocket.send_text("pong") elif data.startswith("read:"): # Mark notification as read notification_id = data.split(":")[1] await notification_manager.mark_as_read(notification_id) except WebSocketDisconnect: ws_server.disconnect(user_id) except Exception as e: logger.error(f"WebSocket error for user {user_id}: {e}") ws_server.disconnect(user_id) # Device Token Management @app.post("/api/devices/register") async def register_device_token( user_id: str, device_token: str, device_type: str = Query(..., regex="^(ios|android|web)$") ): """Register a device token for push notifications""" try: success = await notification_manager.register_device_token( user_id=user_id, device_token=device_token, device_type=device_type ) if success: return { "status": "registered", "user_id": user_id, "device_type": device_type } else: raise HTTPException(status_code=500, detail="Failed to register device token") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/devices/{device_token}") async def unregister_device_token(device_token: str): """Unregister a device token""" try: success = await notification_manager.unregister_device_token(device_token) if success: return {"status": "unregistered", "device_token": device_token} else: raise HTTPException(status_code=404, detail="Device token not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )