diff --git a/docker-compose.yml b/docker-compose.yml index aef7500..ae63c6b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -191,6 +191,64 @@ services: timeout: 5s retries: 5 + # Notifications Service + notifications-backend: + build: + context: ./services/notifications/backend + dockerfile: Dockerfile + container_name: site11-notifications-backend + ports: + - "8013:8000" + environment: + - MONGODB_URL=mongodb://mongodb:27017 + - REDIS_URL=redis://redis:6379 + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - SMTP_HOST=${SMTP_HOST:-smtp.gmail.com} + - SMTP_PORT=${SMTP_PORT:-587} + - SMTP_USER=${SMTP_USER:-} + - SMTP_PASSWORD=${SMTP_PASSWORD:-} + - SMS_API_KEY=${SMS_API_KEY:-} + - SMS_API_URL=${SMS_API_URL:-} + - FCM_SERVER_KEY=${FCM_SERVER_KEY:-} + depends_on: + - mongodb + - redis + - kafka + networks: + - site11_network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + + # Statistics Service + statistics-backend: + build: + context: ./services/statistics/backend + dockerfile: Dockerfile + container_name: site11-statistics-backend + ports: + - "8012:8000" + environment: + - REDIS_URL=redis://redis:6379 + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - INFLUXDB_HOST=influxdb + - INFLUXDB_PORT=8086 + - INFLUXDB_DATABASE=statistics + depends_on: + - redis + - kafka + networks: + - site11_network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + networks: site11_network: driver: bridge diff --git a/services/notifications/backend/Dockerfile b/services/notifications/backend/Dockerfile new file mode 100644 index 0000000..dfb00c3 --- /dev/null +++ b/services/notifications/backend/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose port +EXPOSE 8000 + +# Run the application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] \ No newline at end of file diff --git a/services/notifications/backend/channel_handlers.py b/services/notifications/backend/channel_handlers.py new file mode 100644 index 0000000..304e830 --- /dev/null +++ b/services/notifications/backend/channel_handlers.py @@ -0,0 +1,335 @@ +""" +Channel Handlers for different notification delivery methods +""" +import logging +import asyncio +from typing import Optional, Dict, Any +from models import Notification, NotificationStatus +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +import httpx +import json + +logger = logging.getLogger(__name__) + +class BaseChannelHandler: + """Base class for channel handlers""" + + async def send(self, notification: Notification) -> bool: + """Send notification through the channel""" + raise NotImplementedError + + async def verify_delivery(self, notification: Notification) -> bool: + """Verify if notification was delivered""" + return True + +class EmailHandler(BaseChannelHandler): + """Email notification handler""" + + def __init__(self, smtp_host: str, smtp_port: int, smtp_user: str, smtp_password: str): + self.smtp_host = smtp_host + self.smtp_port = smtp_port + self.smtp_user = smtp_user + self.smtp_password = smtp_password + + async def send(self, notification: Notification) -> bool: + """Send email notification""" + try: + # In production, would use async SMTP library + # For demo, we'll simulate email sending + logger.info(f"Sending email to user {notification.user_id}") + + if not self.smtp_user or not self.smtp_password: + # Simulate sending without actual SMTP config + await asyncio.sleep(0.1) # Simulate network delay + logger.info(f"Email sent (simulated) to user {notification.user_id}") + return True + + # Create message + msg = MIMEMultipart() + msg['From'] = self.smtp_user + msg['To'] = f"user_{notification.user_id}@example.com" # Would fetch actual email + msg['Subject'] = notification.title + + # Add body + body = notification.message + if notification.data and "html_content" in notification.data: + msg.attach(MIMEText(notification.data["html_content"], 'html')) + else: + msg.attach(MIMEText(body, 'plain')) + + # Send email (would be async in production) + # server = smtplib.SMTP(self.smtp_host, self.smtp_port) + # server.starttls() + # server.login(self.smtp_user, self.smtp_password) + # server.send_message(msg) + # server.quit() + + logger.info(f"Email sent successfully to user {notification.user_id}") + return True + + except Exception as e: + logger.error(f"Failed to send email: {e}") + return False + +class SMSHandler(BaseChannelHandler): + """SMS notification handler""" + + def __init__(self, api_key: str, api_url: str): + self.api_key = api_key + self.api_url = api_url + self.client = httpx.AsyncClient() + + async def send(self, notification: Notification) -> bool: + """Send SMS notification""" + try: + # In production, would integrate with SMS provider (Twilio, etc.) + logger.info(f"Sending SMS to user {notification.user_id}") + + if not self.api_key or not self.api_url: + # Simulate sending without actual API config + await asyncio.sleep(0.1) # Simulate network delay + logger.info(f"SMS sent (simulated) to user {notification.user_id}") + return True + + # Would fetch user's phone number from database + phone_number = notification.data.get("phone") if notification.data else None + if not phone_number: + phone_number = "+1234567890" # Demo number + + # Send SMS via API (example structure) + payload = { + "to": phone_number, + "message": f"{notification.title}\n{notification.message}", + "api_key": self.api_key + } + + # response = await self.client.post(self.api_url, json=payload) + # return response.status_code == 200 + + # Simulate success + await asyncio.sleep(0.1) + logger.info(f"SMS sent successfully to user {notification.user_id}") + return True + + except Exception as e: + logger.error(f"Failed to send SMS: {e}") + return False + +class PushHandler(BaseChannelHandler): + """Push notification handler (FCM/APNS)""" + + def __init__(self, fcm_server_key: str): + self.fcm_server_key = fcm_server_key + self.fcm_url = "https://fcm.googleapis.com/fcm/send" + self.client = httpx.AsyncClient() + + async def send(self, notification: Notification) -> bool: + """Send push notification""" + try: + logger.info(f"Sending push notification to user {notification.user_id}") + + if not self.fcm_server_key: + # Simulate sending without actual FCM config + await asyncio.sleep(0.1) + logger.info(f"Push notification sent (simulated) to user {notification.user_id}") + return True + + # Would fetch user's device tokens from database + device_tokens = notification.data.get("device_tokens", []) if notification.data else [] + + if not device_tokens: + # Simulate with dummy token + device_tokens = ["dummy_token"] + + # Send to each device token + for token in device_tokens: + payload = { + "to": token, + "notification": { + "title": notification.title, + "body": notification.message, + "icon": notification.data.get("icon") if notification.data else None, + "click_action": notification.data.get("click_action") if notification.data else None + }, + "data": notification.data or {} + } + + headers = { + "Authorization": f"key={self.fcm_server_key}", + "Content-Type": "application/json" + } + + # response = await self.client.post( + # self.fcm_url, + # json=payload, + # headers=headers + # ) + + # Simulate success + await asyncio.sleep(0.05) + + logger.info(f"Push notification sent successfully to user {notification.user_id}") + return True + + except Exception as e: + logger.error(f"Failed to send push notification: {e}") + return False + +class InAppHandler(BaseChannelHandler): + """In-app notification handler""" + + def __init__(self): + self.ws_server = None + + def set_ws_server(self, ws_server): + """Set WebSocket server for real-time delivery""" + self.ws_server = ws_server + + async def send(self, notification: Notification) -> bool: + """Send in-app notification""" + try: + logger.info(f"Sending in-app notification to user {notification.user_id}") + + # Store notification in database (already done in manager) + # This would be retrieved when user logs in or requests notifications + + # If WebSocket connection exists, send real-time + if self.ws_server: + await self.ws_server.send_to_user( + notification.user_id, + { + "type": "notification", + "notification": { + "id": notification.id, + "title": notification.title, + "message": notification.message, + "priority": notification.priority.value, + "category": notification.category.value if hasattr(notification, 'category') else "system", + "timestamp": notification.created_at.isoformat(), + "data": notification.data + } + } + ) + + logger.info(f"In-app notification sent successfully to user {notification.user_id}") + return True + + except Exception as e: + logger.error(f"Failed to send in-app notification: {e}") + return False + +class SlackHandler(BaseChannelHandler): + """Slack notification handler""" + + def __init__(self, webhook_url: Optional[str] = None): + self.webhook_url = webhook_url + self.client = httpx.AsyncClient() + + async def send(self, notification: Notification) -> bool: + """Send Slack notification""" + try: + logger.info(f"Sending Slack notification for user {notification.user_id}") + + if not self.webhook_url: + # Simulate sending + await asyncio.sleep(0.1) + logger.info(f"Slack notification sent (simulated) for user {notification.user_id}") + return True + + # Format message for Slack + slack_message = { + "text": notification.title, + "blocks": [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": notification.title + } + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": notification.message + } + } + ] + } + + # Add additional fields if present + if notification.data: + fields = [] + for key, value in notification.data.items(): + if key not in ["html_content", "device_tokens"]: + fields.append({ + "type": "mrkdwn", + "text": f"*{key}:* {value}" + }) + + if fields: + slack_message["blocks"].append({ + "type": "section", + "fields": fields[:10] # Slack limits to 10 fields + }) + + # Send to Slack + # response = await self.client.post(self.webhook_url, json=slack_message) + # return response.status_code == 200 + + await asyncio.sleep(0.1) + logger.info(f"Slack notification sent successfully") + return True + + except Exception as e: + logger.error(f"Failed to send Slack notification: {e}") + return False + +class WebhookHandler(BaseChannelHandler): + """Generic webhook notification handler""" + + def __init__(self, default_webhook_url: Optional[str] = None): + self.default_webhook_url = default_webhook_url + self.client = httpx.AsyncClient() + + async def send(self, notification: Notification) -> bool: + """Send webhook notification""" + try: + # Get webhook URL from notification data or use default + webhook_url = None + if notification.data and "webhook_url" in notification.data: + webhook_url = notification.data["webhook_url"] + else: + webhook_url = self.default_webhook_url + + if not webhook_url: + logger.warning("No webhook URL configured") + return False + + logger.info(f"Sending webhook notification for user {notification.user_id}") + + # Prepare payload + payload = { + "notification_id": notification.id, + "user_id": notification.user_id, + "title": notification.title, + "message": notification.message, + "priority": notification.priority.value, + "timestamp": notification.created_at.isoformat(), + "data": notification.data + } + + # Send webhook + # response = await self.client.post(webhook_url, json=payload) + # return response.status_code in [200, 201, 202, 204] + + # Simulate success + await asyncio.sleep(0.1) + logger.info(f"Webhook notification sent successfully") + return True + + except Exception as e: + logger.error(f"Failed to send webhook notification: {e}") + return False \ No newline at end of file diff --git a/services/notifications/backend/main.py b/services/notifications/backend/main.py new file mode 100644 index 0000000..e500c2b --- /dev/null +++ b/services/notifications/backend/main.py @@ -0,0 +1,514 @@ +""" +Notification Service - Real-time Multi-channel Notifications +""" +from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +import uvicorn +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any +import asyncio +import os +from contextlib import asynccontextmanager +import logging + +# Import custom modules +from models import ( + Notification, NotificationChannel, NotificationTemplate, + NotificationPreference, NotificationHistory, NotificationStatus, + NotificationPriority, CreateNotificationRequest, BulkNotificationRequest +) +from notification_manager import NotificationManager +from channel_handlers import EmailHandler, SMSHandler, PushHandler, InAppHandler +from websocket_server import WebSocketNotificationServer +from queue_manager import NotificationQueueManager +from template_engine import TemplateEngine +from preference_manager import PreferenceManager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global instances +notification_manager = None +ws_server = None +queue_manager = None +template_engine = None +preference_manager = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + global notification_manager, ws_server, queue_manager, template_engine, preference_manager + + try: + # Initialize Template Engine + template_engine = TemplateEngine() + await template_engine.load_templates() + logger.info("Template engine initialized") + + # Initialize Preference Manager + preference_manager = PreferenceManager( + mongodb_url=os.getenv("MONGODB_URL", "mongodb://mongodb:27017"), + database_name="notifications" + ) + await preference_manager.connect() + logger.info("Preference manager connected") + + # Initialize Notification Queue Manager + queue_manager = NotificationQueueManager( + redis_url=os.getenv("REDIS_URL", "redis://redis:6379") + ) + await queue_manager.connect() + logger.info("Queue manager connected") + + # Initialize Channel Handlers + email_handler = EmailHandler( + smtp_host=os.getenv("SMTP_HOST", "smtp.gmail.com"), + smtp_port=int(os.getenv("SMTP_PORT", 587)), + smtp_user=os.getenv("SMTP_USER", ""), + smtp_password=os.getenv("SMTP_PASSWORD", "") + ) + + sms_handler = SMSHandler( + api_key=os.getenv("SMS_API_KEY", ""), + api_url=os.getenv("SMS_API_URL", "") + ) + + push_handler = PushHandler( + fcm_server_key=os.getenv("FCM_SERVER_KEY", "") + ) + + in_app_handler = InAppHandler() + + # Initialize Notification Manager + notification_manager = NotificationManager( + channel_handlers={ + NotificationChannel.EMAIL: email_handler, + NotificationChannel.SMS: sms_handler, + NotificationChannel.PUSH: push_handler, + NotificationChannel.IN_APP: in_app_handler + }, + queue_manager=queue_manager, + template_engine=template_engine, + preference_manager=preference_manager + ) + await notification_manager.start() + logger.info("Notification manager started") + + # Initialize WebSocket Server + ws_server = WebSocketNotificationServer() + logger.info("WebSocket server initialized") + + # Register in-app handler with WebSocket server + in_app_handler.set_ws_server(ws_server) + + except Exception as e: + logger.error(f"Failed to start Notification service: {e}") + raise + + yield + + # Shutdown + if notification_manager: + await notification_manager.stop() + if queue_manager: + await queue_manager.close() + if preference_manager: + await preference_manager.close() + + logger.info("Notification service shutdown complete") + +app = FastAPI( + title="Notification Service", + description="Real-time Multi-channel Notification Service", + version="1.0.0", + lifespan=lifespan +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/") +async def root(): + return { + "service": "Notification Service", + "status": "running", + "timestamp": datetime.now().isoformat() + } + +@app.get("/health") +async def health_check(): + return { + "status": "healthy", + "service": "notifications", + "components": { + "queue_manager": "connected" if queue_manager and queue_manager.is_connected else "disconnected", + "preference_manager": "connected" if preference_manager and preference_manager.is_connected else "disconnected", + "notification_manager": "running" if notification_manager and notification_manager.is_running else "stopped", + "websocket_connections": len(ws_server.active_connections) if ws_server else 0 + }, + "timestamp": datetime.now().isoformat() + } + +# Notification Endpoints +@app.post("/api/notifications/send") +async def send_notification( + request: CreateNotificationRequest, + background_tasks: BackgroundTasks +): + """Send a single notification""" + try: + notification = await notification_manager.create_notification( + user_id=request.user_id, + title=request.title, + message=request.message, + channels=request.channels, + priority=request.priority, + data=request.data, + template_id=request.template_id, + schedule_at=request.schedule_at + ) + + if request.schedule_at and request.schedule_at > datetime.now(): + # Schedule for later + await queue_manager.schedule_notification(notification, request.schedule_at) + return { + "notification_id": notification.id, + "status": "scheduled", + "scheduled_at": request.schedule_at.isoformat() + } + else: + # Send immediately + background_tasks.add_task( + notification_manager.send_notification, + notification + ) + return { + "notification_id": notification.id, + "status": "queued" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/notifications/send-bulk") +async def send_bulk_notifications( + request: BulkNotificationRequest, + background_tasks: BackgroundTasks +): + """Send notifications to multiple users""" + try: + notifications = [] + for user_id in request.user_ids: + notification = await notification_manager.create_notification( + user_id=user_id, + title=request.title, + message=request.message, + channels=request.channels, + priority=request.priority, + data=request.data, + template_id=request.template_id + ) + notifications.append(notification) + + # Queue all notifications + background_tasks.add_task( + notification_manager.send_bulk_notifications, + notifications + ) + + return { + "count": len(notifications), + "notification_ids": [n.id for n in notifications], + "status": "queued" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/notifications/user/{user_id}") +async def get_user_notifications( + user_id: str, + status: Optional[NotificationStatus] = None, + channel: Optional[NotificationChannel] = None, + limit: int = Query(50, le=200), + offset: int = Query(0, ge=0) +): + """Get notifications for a specific user""" + try: + notifications = await notification_manager.get_user_notifications( + user_id=user_id, + status=status, + channel=channel, + limit=limit, + offset=offset + ) + + return { + "notifications": notifications, + "count": len(notifications), + "limit": limit, + "offset": offset + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.patch("/api/notifications/{notification_id}/read") +async def mark_notification_read(notification_id: str): + """Mark a notification as read""" + try: + success = await notification_manager.mark_as_read(notification_id) + if success: + return {"status": "marked_as_read", "notification_id": notification_id} + else: + raise HTTPException(status_code=404, detail="Notification not found") + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.delete("/api/notifications/{notification_id}") +async def delete_notification(notification_id: str): + """Delete a notification""" + try: + success = await notification_manager.delete_notification(notification_id) + if success: + return {"status": "deleted", "notification_id": notification_id} + else: + raise HTTPException(status_code=404, detail="Notification not found") + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Template Endpoints +@app.get("/api/templates") +async def get_templates(): + """Get all notification templates""" + templates = await template_engine.get_all_templates() + return {"templates": templates} + +@app.post("/api/templates") +async def create_template(template: NotificationTemplate): + """Create a new notification template""" + try: + template_id = await template_engine.create_template(template) + return {"template_id": template_id, "status": "created"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.put("/api/templates/{template_id}") +async def update_template(template_id: str, template: NotificationTemplate): + """Update an existing template""" + try: + success = await template_engine.update_template(template_id, template) + if success: + return {"status": "updated", "template_id": template_id} + else: + raise HTTPException(status_code=404, detail="Template not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Preference Endpoints +@app.get("/api/preferences/{user_id}") +async def get_user_preferences(user_id: str): + """Get notification preferences for a user""" + try: + preferences = await preference_manager.get_user_preferences(user_id) + return {"user_id": user_id, "preferences": preferences} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.put("/api/preferences/{user_id}") +async def update_user_preferences( + user_id: str, + preferences: NotificationPreference +): + """Update notification preferences for a user""" + try: + success = await preference_manager.update_user_preferences(user_id, preferences) + if success: + return {"status": "updated", "user_id": user_id} + else: + return {"status": "created", "user_id": user_id} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/preferences/{user_id}/unsubscribe/{category}") +async def unsubscribe_from_category(user_id: str, category: str): + """Unsubscribe user from a notification category""" + try: + success = await preference_manager.unsubscribe_category(user_id, category) + if success: + return {"status": "unsubscribed", "user_id": user_id, "category": category} + else: + raise HTTPException(status_code=404, detail="User preferences not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# History and Analytics Endpoints +@app.get("/api/history") +async def get_notification_history( + user_id: Optional[str] = None, + channel: Optional[NotificationChannel] = None, + status: Optional[NotificationStatus] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + limit: int = Query(100, le=1000) +): + """Get notification history with filters""" + try: + history = await notification_manager.get_notification_history( + user_id=user_id, + channel=channel, + status=status, + start_date=start_date, + end_date=end_date, + limit=limit + ) + + return { + "history": history, + "count": len(history), + "filters": { + "user_id": user_id, + "channel": channel, + "status": status, + "date_range": f"{start_date} to {end_date}" if start_date and end_date else None + } + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/analytics") +async def get_notification_analytics( + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None +): + """Get notification analytics""" + try: + if not start_date: + start_date = datetime.now() - timedelta(days=7) + if not end_date: + end_date = datetime.now() + + analytics = await notification_manager.get_analytics(start_date, end_date) + return analytics + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Queue Management Endpoints +@app.get("/api/queue/status") +async def get_queue_status(): + """Get current queue status""" + try: + status = await queue_manager.get_queue_status() + return status + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/queue/retry/{notification_id}") +async def retry_failed_notification( + notification_id: str, + background_tasks: BackgroundTasks +): + """Retry a failed notification""" + try: + notification = await notification_manager.get_notification(notification_id) + if not notification: + raise HTTPException(status_code=404, detail="Notification not found") + + if notification.status != NotificationStatus.FAILED: + raise HTTPException(status_code=400, detail="Only failed notifications can be retried") + + background_tasks.add_task( + notification_manager.retry_notification, + notification + ) + + return {"status": "retry_queued", "notification_id": notification_id} + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# WebSocket Endpoint +from fastapi import WebSocket, WebSocketDisconnect + +@app.websocket("/ws/notifications/{user_id}") +async def websocket_notifications(websocket: WebSocket, user_id: str): + """WebSocket endpoint for real-time notifications""" + await ws_server.connect(websocket, user_id) + try: + while True: + # Keep connection alive and handle incoming messages + data = await websocket.receive_text() + + # Handle different message types + if data == "ping": + await websocket.send_text("pong") + elif data.startswith("read:"): + # Mark notification as read + notification_id = data.split(":")[1] + await notification_manager.mark_as_read(notification_id) + + except WebSocketDisconnect: + ws_server.disconnect(user_id) + except Exception as e: + logger.error(f"WebSocket error for user {user_id}: {e}") + ws_server.disconnect(user_id) + +# Device Token Management +@app.post("/api/devices/register") +async def register_device_token( + user_id: str, + device_token: str, + device_type: str = Query(..., regex="^(ios|android|web)$") +): + """Register a device token for push notifications""" + try: + success = await notification_manager.register_device_token( + user_id=user_id, + device_token=device_token, + device_type=device_type + ) + + if success: + return { + "status": "registered", + "user_id": user_id, + "device_type": device_type + } + else: + raise HTTPException(status_code=500, detail="Failed to register device token") + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.delete("/api/devices/{device_token}") +async def unregister_device_token(device_token: str): + """Unregister a device token""" + try: + success = await notification_manager.unregister_device_token(device_token) + + if success: + return {"status": "unregistered", "device_token": device_token} + else: + raise HTTPException(status_code=404, detail="Device token not found") + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8000, + reload=True + ) \ No newline at end of file diff --git a/services/notifications/backend/models.py b/services/notifications/backend/models.py new file mode 100644 index 0000000..eff2864 --- /dev/null +++ b/services/notifications/backend/models.py @@ -0,0 +1,201 @@ +""" +Data models for Notification Service +""" +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Optional, List, Dict, Any, Literal +from enum import Enum + +class NotificationChannel(str, Enum): + """Notification delivery channels""" + EMAIL = "email" + SMS = "sms" + PUSH = "push" + IN_APP = "in_app" + +class NotificationStatus(str, Enum): + """Notification status""" + PENDING = "pending" + SENT = "sent" + DELIVERED = "delivered" + READ = "read" + FAILED = "failed" + CANCELLED = "cancelled" + +class NotificationPriority(str, Enum): + """Notification priority levels""" + LOW = "low" + NORMAL = "normal" + HIGH = "high" + URGENT = "urgent" + +class NotificationCategory(str, Enum): + """Notification categories""" + SYSTEM = "system" + MARKETING = "marketing" + TRANSACTION = "transaction" + SOCIAL = "social" + SECURITY = "security" + UPDATE = "update" + +class Notification(BaseModel): + """Notification model""" + id: Optional[str] = Field(None, description="Unique notification ID") + user_id: str = Field(..., description="Target user ID") + title: str = Field(..., description="Notification title") + message: str = Field(..., description="Notification message") + channel: NotificationChannel = Field(..., description="Delivery channel") + status: NotificationStatus = Field(default=NotificationStatus.PENDING) + priority: NotificationPriority = Field(default=NotificationPriority.NORMAL) + category: NotificationCategory = Field(default=NotificationCategory.SYSTEM) + data: Optional[Dict[str, Any]] = Field(default=None, description="Additional data") + template_id: Optional[str] = Field(None, description="Template ID if using template") + scheduled_at: Optional[datetime] = Field(None, description="Scheduled delivery time") + sent_at: Optional[datetime] = Field(None, description="Actual sent time") + delivered_at: Optional[datetime] = Field(None, description="Delivery confirmation time") + read_at: Optional[datetime] = Field(None, description="Read time") + retry_count: int = Field(default=0, description="Number of retry attempts") + error_message: Optional[str] = Field(None, description="Error message if failed") + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class NotificationTemplate(BaseModel): + """Notification template model""" + id: Optional[str] = Field(None, description="Template ID") + name: str = Field(..., description="Template name") + channel: NotificationChannel = Field(..., description="Target channel") + category: NotificationCategory = Field(..., description="Template category") + subject_template: Optional[str] = Field(None, description="Subject template (for email)") + body_template: str = Field(..., description="Body template with variables") + variables: List[str] = Field(default_factory=list, description="List of required variables") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Template metadata") + is_active: bool = Field(default=True, description="Template active status") + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class NotificationPreference(BaseModel): + """User notification preferences""" + user_id: str = Field(..., description="User ID") + channels: Dict[NotificationChannel, bool] = Field( + default_factory=lambda: { + NotificationChannel.EMAIL: True, + NotificationChannel.SMS: False, + NotificationChannel.PUSH: True, + NotificationChannel.IN_APP: True + } + ) + categories: Dict[NotificationCategory, bool] = Field( + default_factory=lambda: { + NotificationCategory.SYSTEM: True, + NotificationCategory.MARKETING: False, + NotificationCategory.TRANSACTION: True, + NotificationCategory.SOCIAL: True, + NotificationCategory.SECURITY: True, + NotificationCategory.UPDATE: True + } + ) + quiet_hours: Optional[Dict[str, str]] = Field( + default=None, + description="Quiet hours configuration {start: 'HH:MM', end: 'HH:MM'}" + ) + timezone: str = Field(default="UTC", description="User timezone") + language: str = Field(default="en", description="Preferred language") + email_frequency: Literal["immediate", "daily", "weekly"] = Field(default="immediate") + updated_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class NotificationHistory(BaseModel): + """Notification history entry""" + notification_id: str + user_id: str + channel: NotificationChannel + status: NotificationStatus + title: str + message: str + sent_at: Optional[datetime] + delivered_at: Optional[datetime] + read_at: Optional[datetime] + error_message: Optional[str] + metadata: Dict[str, Any] = Field(default_factory=dict) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class CreateNotificationRequest(BaseModel): + """Request model for creating notification""" + user_id: str + title: str + message: str + channels: List[NotificationChannel] = Field(default=[NotificationChannel.IN_APP]) + priority: NotificationPriority = Field(default=NotificationPriority.NORMAL) + category: NotificationCategory = Field(default=NotificationCategory.SYSTEM) + data: Optional[Dict[str, Any]] = None + template_id: Optional[str] = None + schedule_at: Optional[datetime] = None + +class BulkNotificationRequest(BaseModel): + """Request model for bulk notifications""" + user_ids: List[str] + title: str + message: str + channels: List[NotificationChannel] = Field(default=[NotificationChannel.IN_APP]) + priority: NotificationPriority = Field(default=NotificationPriority.NORMAL) + category: NotificationCategory = Field(default=NotificationCategory.SYSTEM) + data: Optional[Dict[str, Any]] = None + template_id: Optional[str] = None + +class DeviceToken(BaseModel): + """Device token for push notifications""" + user_id: str + token: str + device_type: Literal["ios", "android", "web"] + app_version: Optional[str] = None + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class NotificationStats(BaseModel): + """Notification statistics""" + total_sent: int + total_delivered: int + total_read: int + total_failed: int + delivery_rate: float + read_rate: float + channel_stats: Dict[str, Dict[str, int]] + category_stats: Dict[str, Dict[str, int]] + period: str + +class NotificationEvent(BaseModel): + """Notification event for tracking""" + event_type: Literal["sent", "delivered", "read", "failed", "clicked"] + notification_id: str + user_id: str + channel: NotificationChannel + timestamp: datetime = Field(default_factory=datetime.now) + metadata: Dict[str, Any] = Field(default_factory=dict) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } \ No newline at end of file diff --git a/services/notifications/backend/notification_manager.py b/services/notifications/backend/notification_manager.py new file mode 100644 index 0000000..528871c --- /dev/null +++ b/services/notifications/backend/notification_manager.py @@ -0,0 +1,375 @@ +""" +Notification Manager - Core notification orchestration +""" +import asyncio +import logging +from datetime import datetime +from typing import List, Optional, Dict, Any +import uuid +from models import ( + Notification, NotificationChannel, NotificationStatus, + NotificationPriority, NotificationHistory, NotificationPreference +) + +logger = logging.getLogger(__name__) + +class NotificationManager: + """Manages notification creation, delivery, and tracking""" + + def __init__( + self, + channel_handlers: Dict[NotificationChannel, Any], + queue_manager: Any, + template_engine: Any, + preference_manager: Any + ): + self.channel_handlers = channel_handlers + self.queue_manager = queue_manager + self.template_engine = template_engine + self.preference_manager = preference_manager + self.is_running = False + self.notification_store = {} # In-memory store for demo + self.history_store = [] # In-memory history for demo + self.device_tokens = {} # In-memory device tokens for demo + + async def start(self): + """Start notification manager""" + self.is_running = True + # Start background tasks for processing queued notifications + asyncio.create_task(self._process_notification_queue()) + asyncio.create_task(self._process_scheduled_notifications()) + logger.info("Notification manager started") + + async def stop(self): + """Stop notification manager""" + self.is_running = False + logger.info("Notification manager stopped") + + async def create_notification( + self, + user_id: str, + title: str, + message: str, + channels: List[NotificationChannel], + priority: NotificationPriority = NotificationPriority.NORMAL, + data: Optional[Dict[str, Any]] = None, + template_id: Optional[str] = None, + schedule_at: Optional[datetime] = None + ) -> Notification: + """Create a new notification""" + + # Check user preferences + preferences = await self.preference_manager.get_user_preferences(user_id) + if preferences: + # Filter channels based on user preferences + channels = [ch for ch in channels if preferences.channels.get(ch, True)] + + # Apply template if provided + if template_id: + template = await self.template_engine.get_template(template_id) + if template: + message = await self.template_engine.render_template(template, data or {}) + + # Create notification objects for each channel + notification = Notification( + id=str(uuid.uuid4()), + user_id=user_id, + title=title, + message=message, + channel=channels[0] if channels else NotificationChannel.IN_APP, + priority=priority, + data=data, + template_id=template_id, + scheduled_at=schedule_at, + created_at=datetime.now() + ) + + # Store notification + self.notification_store[notification.id] = notification + + logger.info(f"Created notification {notification.id} for user {user_id}") + return notification + + async def send_notification(self, notification: Notification): + """Send a single notification""" + try: + # Check if notification should be sent now + if notification.scheduled_at and notification.scheduled_at > datetime.now(): + await self.queue_manager.schedule_notification(notification, notification.scheduled_at) + return + + # Get the appropriate handler + handler = self.channel_handlers.get(notification.channel) + if not handler: + raise ValueError(f"No handler for channel {notification.channel}") + + # Send through the channel + success = await handler.send(notification) + + if success: + notification.status = NotificationStatus.SENT + notification.sent_at = datetime.now() + logger.info(f"Notification {notification.id} sent successfully") + else: + notification.status = NotificationStatus.FAILED + notification.retry_count += 1 + logger.error(f"Failed to send notification {notification.id}") + + # Retry if needed + if notification.retry_count < self._get_max_retries(notification.priority): + await self.queue_manager.enqueue_notification(notification) + + # Update notification + self.notification_store[notification.id] = notification + + # Add to history + await self._add_to_history(notification) + + except Exception as e: + logger.error(f"Error sending notification {notification.id}: {e}") + notification.status = NotificationStatus.FAILED + notification.error_message = str(e) + self.notification_store[notification.id] = notification + + async def send_bulk_notifications(self, notifications: List[Notification]): + """Send multiple notifications""" + tasks = [] + for notification in notifications: + tasks.append(self.send_notification(notification)) + + await asyncio.gather(*tasks, return_exceptions=True) + + async def mark_as_read(self, notification_id: str) -> bool: + """Mark notification as read""" + notification = self.notification_store.get(notification_id) + if notification: + notification.status = NotificationStatus.READ + notification.read_at = datetime.now() + self.notification_store[notification_id] = notification + logger.info(f"Notification {notification_id} marked as read") + return True + return False + + async def delete_notification(self, notification_id: str) -> bool: + """Delete a notification""" + if notification_id in self.notification_store: + del self.notification_store[notification_id] + logger.info(f"Notification {notification_id} deleted") + return True + return False + + async def get_notification(self, notification_id: str) -> Optional[Notification]: + """Get a notification by ID""" + return self.notification_store.get(notification_id) + + async def get_user_notifications( + self, + user_id: str, + status: Optional[NotificationStatus] = None, + channel: Optional[NotificationChannel] = None, + limit: int = 50, + offset: int = 0 + ) -> List[Notification]: + """Get notifications for a user""" + notifications = [] + + for notification in self.notification_store.values(): + if notification.user_id != user_id: + continue + if status and notification.status != status: + continue + if channel and notification.channel != channel: + continue + notifications.append(notification) + + # Sort by created_at descending + notifications.sort(key=lambda x: x.created_at, reverse=True) + + # Apply pagination + return notifications[offset:offset + limit] + + async def retry_notification(self, notification: Notification): + """Retry a failed notification""" + notification.retry_count += 1 + notification.status = NotificationStatus.PENDING + notification.error_message = None + await self.send_notification(notification) + + async def get_notification_history( + self, + user_id: Optional[str] = None, + channel: Optional[NotificationChannel] = None, + status: Optional[NotificationStatus] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + limit: int = 100 + ) -> List[NotificationHistory]: + """Get notification history""" + history = [] + + for entry in self.history_store: + if user_id and entry.user_id != user_id: + continue + if channel and entry.channel != channel: + continue + if status and entry.status != status: + continue + if start_date and entry.sent_at and entry.sent_at < start_date: + continue + if end_date and entry.sent_at and entry.sent_at > end_date: + continue + history.append(entry) + + # Sort by sent_at descending and limit + history.sort(key=lambda x: x.sent_at or datetime.min, reverse=True) + return history[:limit] + + async def get_analytics(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]: + """Get notification analytics""" + total_sent = 0 + total_delivered = 0 + total_read = 0 + total_failed = 0 + channel_stats = {} + + for notification in self.notification_store.values(): + if notification.created_at < start_date or notification.created_at > end_date: + continue + + if notification.status == NotificationStatus.SENT: + total_sent += 1 + elif notification.status == NotificationStatus.DELIVERED: + total_delivered += 1 + elif notification.status == NotificationStatus.READ: + total_read += 1 + elif notification.status == NotificationStatus.FAILED: + total_failed += 1 + + # Channel stats + channel_name = notification.channel.value + if channel_name not in channel_stats: + channel_stats[channel_name] = { + "sent": 0, + "delivered": 0, + "read": 0, + "failed": 0 + } + + if notification.status == NotificationStatus.SENT: + channel_stats[channel_name]["sent"] += 1 + elif notification.status == NotificationStatus.DELIVERED: + channel_stats[channel_name]["delivered"] += 1 + elif notification.status == NotificationStatus.READ: + channel_stats[channel_name]["read"] += 1 + elif notification.status == NotificationStatus.FAILED: + channel_stats[channel_name]["failed"] += 1 + + total = total_sent + total_delivered + total_read + total_failed + + return { + "period": f"{start_date.isoformat()} to {end_date.isoformat()}", + "total_notifications": total, + "total_sent": total_sent, + "total_delivered": total_delivered, + "total_read": total_read, + "total_failed": total_failed, + "delivery_rate": (total_delivered / total * 100) if total > 0 else 0, + "read_rate": (total_read / total * 100) if total > 0 else 0, + "channel_stats": channel_stats + } + + async def register_device_token( + self, + user_id: str, + device_token: str, + device_type: str + ) -> bool: + """Register a device token for push notifications""" + if user_id not in self.device_tokens: + self.device_tokens[user_id] = [] + + # Check if token already exists + for token in self.device_tokens[user_id]: + if token["token"] == device_token: + # Update existing token + token["device_type"] = device_type + token["updated_at"] = datetime.now() + return True + + # Add new token + self.device_tokens[user_id].append({ + "token": device_token, + "device_type": device_type, + "created_at": datetime.now(), + "updated_at": datetime.now() + }) + + logger.info(f"Registered device token for user {user_id}") + return True + + async def unregister_device_token(self, device_token: str) -> bool: + """Unregister a device token""" + for user_id, tokens in self.device_tokens.items(): + for i, token in enumerate(tokens): + if token["token"] == device_token: + del self.device_tokens[user_id][i] + logger.info(f"Unregistered device token for user {user_id}") + return True + return False + + def _get_max_retries(self, priority: NotificationPriority) -> int: + """Get max retries based on priority""" + retry_map = { + NotificationPriority.LOW: 1, + NotificationPriority.NORMAL: 3, + NotificationPriority.HIGH: 5, + NotificationPriority.URGENT: 10 + } + return retry_map.get(priority, 3) + + async def _add_to_history(self, notification: Notification): + """Add notification to history""" + history_entry = NotificationHistory( + notification_id=notification.id, + user_id=notification.user_id, + channel=notification.channel, + status=notification.status, + title=notification.title, + message=notification.message, + sent_at=notification.sent_at, + delivered_at=notification.delivered_at, + read_at=notification.read_at, + error_message=notification.error_message, + metadata={"priority": notification.priority.value} + ) + self.history_store.append(history_entry) + + async def _process_notification_queue(self): + """Process queued notifications""" + while self.is_running: + try: + # Get notification from queue + notification_data = await self.queue_manager.dequeue_notification() + if notification_data: + notification = Notification(**notification_data) + await self.send_notification(notification) + except Exception as e: + logger.error(f"Error processing notification queue: {e}") + + await asyncio.sleep(1) + + async def _process_scheduled_notifications(self): + """Process scheduled notifications""" + while self.is_running: + try: + # Check for scheduled notifications + now = datetime.now() + for notification in self.notification_store.values(): + if (notification.scheduled_at and + notification.scheduled_at <= now and + notification.status == NotificationStatus.PENDING): + await self.send_notification(notification) + except Exception as e: + logger.error(f"Error processing scheduled notifications: {e}") + + await asyncio.sleep(10) # Check every 10 seconds \ No newline at end of file diff --git a/services/notifications/backend/preference_manager.py b/services/notifications/backend/preference_manager.py new file mode 100644 index 0000000..cf51bbe --- /dev/null +++ b/services/notifications/backend/preference_manager.py @@ -0,0 +1,340 @@ +""" +Preference Manager for user notification preferences +""" +import logging +from typing import Optional, Dict, Any, List +from datetime import datetime +import motor.motor_asyncio +from models import NotificationPreference, NotificationChannel, NotificationCategory + +logger = logging.getLogger(__name__) + +class PreferenceManager: + """Manages user notification preferences""" + + def __init__(self, mongodb_url: str = "mongodb://mongodb:27017", database_name: str = "notifications"): + self.mongodb_url = mongodb_url + self.database_name = database_name + self.client = None + self.db = None + self.preferences_collection = None + self.is_connected = False + + # In-memory cache for demo + self.preferences_cache = {} + + async def connect(self): + """Connect to MongoDB""" + try: + self.client = motor.motor_asyncio.AsyncIOMotorClient(self.mongodb_url) + self.db = self.client[self.database_name] + self.preferences_collection = self.db["preferences"] + + # Test connection + await self.client.admin.command('ping') + self.is_connected = True + + # Create indexes + await self._create_indexes() + + logger.info("Connected to MongoDB for preferences") + + except Exception as e: + logger.error(f"Failed to connect to MongoDB: {e}") + # Fallback to in-memory storage + self.is_connected = False + logger.warning("Using in-memory storage for preferences") + + async def close(self): + """Close MongoDB connection""" + if self.client: + self.client.close() + self.is_connected = False + logger.info("Disconnected from MongoDB") + + async def _create_indexes(self): + """Create database indexes""" + if self.preferences_collection: + try: + await self.preferences_collection.create_index("user_id", unique=True) + logger.info("Created indexes for preferences collection") + except Exception as e: + logger.error(f"Failed to create indexes: {e}") + + async def get_user_preferences(self, user_id: str) -> Optional[NotificationPreference]: + """Get notification preferences for a user""" + try: + # Check cache first + if user_id in self.preferences_cache: + return self.preferences_cache[user_id] + + if self.is_connected and self.preferences_collection: + # Get from MongoDB + doc = await self.preferences_collection.find_one({"user_id": user_id}) + + if doc: + # Convert document to model + doc.pop('_id', None) # Remove MongoDB ID + preference = NotificationPreference(**doc) + + # Update cache + self.preferences_cache[user_id] = preference + + return preference + + # Return default preferences if not found + return self._get_default_preferences(user_id) + + except Exception as e: + logger.error(f"Failed to get preferences for user {user_id}: {e}") + return self._get_default_preferences(user_id) + + async def update_user_preferences( + self, + user_id: str, + preferences: NotificationPreference + ) -> bool: + """Update notification preferences for a user""" + try: + preferences.user_id = user_id + preferences.updated_at = datetime.now() + + # Update cache + self.preferences_cache[user_id] = preferences + + if self.is_connected and self.preferences_collection: + # Convert to dict for MongoDB + pref_dict = preferences.dict() + + # Upsert in MongoDB + result = await self.preferences_collection.update_one( + {"user_id": user_id}, + {"$set": pref_dict}, + upsert=True + ) + + logger.info(f"Updated preferences for user {user_id}") + return result.modified_count > 0 or result.upserted_id is not None + + # If not connected, just use cache + return True + + except Exception as e: + logger.error(f"Failed to update preferences for user {user_id}: {e}") + return False + + async def unsubscribe_category(self, user_id: str, category: str) -> bool: + """Unsubscribe user from a notification category""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + # Update category preference + if hasattr(NotificationCategory, category.upper()): + cat_enum = NotificationCategory(category.lower()) + preferences.categories[cat_enum] = False + + # Save updated preferences + return await self.update_user_preferences(user_id, preferences) + + return False + + except Exception as e: + logger.error(f"Failed to unsubscribe user {user_id} from {category}: {e}") + return False + + async def subscribe_category(self, user_id: str, category: str) -> bool: + """Subscribe user to a notification category""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + # Update category preference + if hasattr(NotificationCategory, category.upper()): + cat_enum = NotificationCategory(category.lower()) + preferences.categories[cat_enum] = True + + # Save updated preferences + return await self.update_user_preferences(user_id, preferences) + + return False + + except Exception as e: + logger.error(f"Failed to subscribe user {user_id} to {category}: {e}") + return False + + async def enable_channel(self, user_id: str, channel: NotificationChannel) -> bool: + """Enable a notification channel for user""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + preferences.channels[channel] = True + + return await self.update_user_preferences(user_id, preferences) + + except Exception as e: + logger.error(f"Failed to enable channel {channel} for user {user_id}: {e}") + return False + + async def disable_channel(self, user_id: str, channel: NotificationChannel) -> bool: + """Disable a notification channel for user""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + preferences.channels[channel] = False + + return await self.update_user_preferences(user_id, preferences) + + except Exception as e: + logger.error(f"Failed to disable channel {channel} for user {user_id}: {e}") + return False + + async def set_quiet_hours( + self, + user_id: str, + start_time: str, + end_time: str + ) -> bool: + """Set quiet hours for user""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + preferences.quiet_hours = { + "start": start_time, + "end": end_time + } + + return await self.update_user_preferences(user_id, preferences) + + except Exception as e: + logger.error(f"Failed to set quiet hours for user {user_id}: {e}") + return False + + async def clear_quiet_hours(self, user_id: str) -> bool: + """Clear quiet hours for user""" + try: + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + preferences.quiet_hours = None + + return await self.update_user_preferences(user_id, preferences) + + except Exception as e: + logger.error(f"Failed to clear quiet hours for user {user_id}: {e}") + return False + + async def set_email_frequency(self, user_id: str, frequency: str) -> bool: + """Set email notification frequency""" + try: + if frequency not in ["immediate", "daily", "weekly"]: + return False + + preferences = await self.get_user_preferences(user_id) + + if not preferences: + preferences = self._get_default_preferences(user_id) + + preferences.email_frequency = frequency + + return await self.update_user_preferences(user_id, preferences) + + except Exception as e: + logger.error(f"Failed to set email frequency for user {user_id}: {e}") + return False + + async def batch_get_preferences(self, user_ids: List[str]) -> Dict[str, NotificationPreference]: + """Get preferences for multiple users""" + results = {} + + for user_id in user_ids: + pref = await self.get_user_preferences(user_id) + if pref: + results[user_id] = pref + + return results + + async def delete_user_preferences(self, user_id: str) -> bool: + """Delete all preferences for a user""" + try: + # Remove from cache + if user_id in self.preferences_cache: + del self.preferences_cache[user_id] + + if self.is_connected and self.preferences_collection: + # Delete from MongoDB + result = await self.preferences_collection.delete_one({"user_id": user_id}) + logger.info(f"Deleted preferences for user {user_id}") + return result.deleted_count > 0 + + return True + + except Exception as e: + logger.error(f"Failed to delete preferences for user {user_id}: {e}") + return False + + def _get_default_preferences(self, user_id: str) -> NotificationPreference: + """Get default notification preferences""" + return NotificationPreference( + user_id=user_id, + channels={ + NotificationChannel.EMAIL: True, + NotificationChannel.SMS: False, + NotificationChannel.PUSH: True, + NotificationChannel.IN_APP: True + }, + categories={ + NotificationCategory.SYSTEM: True, + NotificationCategory.MARKETING: False, + NotificationCategory.TRANSACTION: True, + NotificationCategory.SOCIAL: True, + NotificationCategory.SECURITY: True, + NotificationCategory.UPDATE: True + }, + email_frequency="immediate", + timezone="UTC", + language="en" + ) + + async def is_notification_allowed( + self, + user_id: str, + channel: NotificationChannel, + category: NotificationCategory + ) -> bool: + """Check if notification is allowed based on preferences""" + preferences = await self.get_user_preferences(user_id) + + if not preferences: + return True # Allow by default if no preferences + + # Check channel preference + if not preferences.channels.get(channel, True): + return False + + # Check category preference + if not preferences.categories.get(category, True): + return False + + # Check quiet hours + if preferences.quiet_hours and channel != NotificationChannel.IN_APP: + # Would need to check current time against quiet hours + # For demo, we'll allow all + pass + + return True \ No newline at end of file diff --git a/services/notifications/backend/queue_manager.py b/services/notifications/backend/queue_manager.py new file mode 100644 index 0000000..37c2c82 --- /dev/null +++ b/services/notifications/backend/queue_manager.py @@ -0,0 +1,304 @@ +""" +Notification Queue Manager with priority support +""" +import logging +import json +import asyncio +from typing import Optional, Dict, Any, List +from datetime import datetime +import redis.asyncio as redis +from models import NotificationPriority + +logger = logging.getLogger(__name__) + +class NotificationQueueManager: + """Manages notification queues with priority levels""" + + def __init__(self, redis_url: str = "redis://redis:6379"): + self.redis_url = redis_url + self.redis_client = None + self.is_connected = False + + # Queue names by priority + self.queue_names = { + NotificationPriority.URGENT: "notifications:queue:urgent", + NotificationPriority.HIGH: "notifications:queue:high", + NotificationPriority.NORMAL: "notifications:queue:normal", + NotificationPriority.LOW: "notifications:queue:low" + } + + # Scheduled notifications sorted set + self.scheduled_key = "notifications:scheduled" + + # Failed notifications queue (DLQ) + self.dlq_key = "notifications:dlq" + + async def connect(self): + """Connect to Redis""" + try: + self.redis_client = await redis.from_url(self.redis_url) + await self.redis_client.ping() + self.is_connected = True + logger.info("Connected to Redis for notification queue") + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + self.is_connected = False + raise + + async def close(self): + """Close Redis connection""" + if self.redis_client: + await self.redis_client.close() + self.is_connected = False + logger.info("Disconnected from Redis") + + async def enqueue_notification(self, notification: Any, priority: Optional[NotificationPriority] = None): + """Add notification to queue based on priority""" + if not self.is_connected: + logger.error("Redis not connected") + return False + + try: + # Use notification's priority or provided priority + if priority is None: + priority = notification.priority if hasattr(notification, 'priority') else NotificationPriority.NORMAL + + queue_name = self.queue_names.get(priority, self.queue_names[NotificationPriority.NORMAL]) + + # Serialize notification + notification_data = notification.dict() if hasattr(notification, 'dict') else notification + notification_json = json.dumps(notification_data, default=str) + + # Add to appropriate queue + await self.redis_client.lpush(queue_name, notification_json) + + logger.info(f"Enqueued notification to {queue_name}") + return True + + except Exception as e: + logger.error(f"Failed to enqueue notification: {e}") + return False + + async def dequeue_notification(self, timeout: int = 1) -> Optional[Dict[str, Any]]: + """Dequeue notification with priority order""" + if not self.is_connected: + return None + + try: + # Check queues in priority order + for priority in [NotificationPriority.URGENT, NotificationPriority.HIGH, + NotificationPriority.NORMAL, NotificationPriority.LOW]: + queue_name = self.queue_names[priority] + + # Try to get from this queue + result = await self.redis_client.brpop(queue_name, timeout=timeout) + + if result: + _, notification_json = result + notification_data = json.loads(notification_json) + logger.debug(f"Dequeued notification from {queue_name}") + return notification_data + + return None + + except Exception as e: + logger.error(f"Failed to dequeue notification: {e}") + return None + + async def schedule_notification(self, notification: Any, scheduled_time: datetime): + """Schedule a notification for future delivery""" + if not self.is_connected: + return False + + try: + # Serialize notification + notification_data = notification.dict() if hasattr(notification, 'dict') else notification + notification_json = json.dumps(notification_data, default=str) + + # Add to scheduled set with timestamp as score + timestamp = scheduled_time.timestamp() + await self.redis_client.zadd(self.scheduled_key, {notification_json: timestamp}) + + logger.info(f"Scheduled notification for {scheduled_time}") + return True + + except Exception as e: + logger.error(f"Failed to schedule notification: {e}") + return False + + async def get_due_notifications(self) -> List[Dict[str, Any]]: + """Get notifications that are due for delivery""" + if not self.is_connected: + return [] + + try: + # Get current timestamp + now = datetime.now().timestamp() + + # Get all notifications with score <= now + results = await self.redis_client.zrangebyscore( + self.scheduled_key, + min=0, + max=now, + withscores=False + ) + + notifications = [] + for notification_json in results: + notification_data = json.loads(notification_json) + notifications.append(notification_data) + + # Remove from scheduled set + await self.redis_client.zrem(self.scheduled_key, notification_json) + + if notifications: + logger.info(f"Retrieved {len(notifications)} due notifications") + + return notifications + + except Exception as e: + logger.error(f"Failed to get due notifications: {e}") + return [] + + async def add_to_dlq(self, notification: Any, error_message: str): + """Add failed notification to Dead Letter Queue""" + if not self.is_connected: + return False + + try: + # Add error information + notification_data = notification.dict() if hasattr(notification, 'dict') else notification + notification_data['dlq_error'] = error_message + notification_data['dlq_timestamp'] = datetime.now().isoformat() + + notification_json = json.dumps(notification_data, default=str) + + # Add to DLQ + await self.redis_client.lpush(self.dlq_key, notification_json) + + logger.info(f"Added notification to DLQ: {error_message}") + return True + + except Exception as e: + logger.error(f"Failed to add to DLQ: {e}") + return False + + async def get_dlq_notifications(self, limit: int = 10) -> List[Dict[str, Any]]: + """Get notifications from Dead Letter Queue""" + if not self.is_connected: + return [] + + try: + # Get from DLQ + results = await self.redis_client.lrange(self.dlq_key, 0, limit - 1) + + notifications = [] + for notification_json in results: + notification_data = json.loads(notification_json) + notifications.append(notification_data) + + return notifications + + except Exception as e: + logger.error(f"Failed to get DLQ notifications: {e}") + return [] + + async def retry_dlq_notification(self, index: int) -> bool: + """Retry a notification from DLQ""" + if not self.is_connected: + return False + + try: + # Get notification at index + notification_json = await self.redis_client.lindex(self.dlq_key, index) + + if not notification_json: + return False + + # Parse and remove DLQ info + notification_data = json.loads(notification_json) + notification_data.pop('dlq_error', None) + notification_data.pop('dlq_timestamp', None) + + # Re-enqueue + priority = NotificationPriority(notification_data.get('priority', 'normal')) + queue_name = self.queue_names[priority] + + new_json = json.dumps(notification_data, default=str) + await self.redis_client.lpush(queue_name, new_json) + + # Remove from DLQ + await self.redis_client.lrem(self.dlq_key, 1, notification_json) + + logger.info(f"Retried DLQ notification at index {index}") + return True + + except Exception as e: + logger.error(f"Failed to retry DLQ notification: {e}") + return False + + async def get_queue_status(self) -> Dict[str, Any]: + """Get current queue status""" + if not self.is_connected: + return {"status": "disconnected"} + + try: + status = { + "status": "connected", + "queues": {}, + "scheduled": 0, + "dlq": 0 + } + + # Get queue lengths + for priority, queue_name in self.queue_names.items(): + length = await self.redis_client.llen(queue_name) + status["queues"][priority.value] = length + + # Get scheduled count + status["scheduled"] = await self.redis_client.zcard(self.scheduled_key) + + # Get DLQ count + status["dlq"] = await self.redis_client.llen(self.dlq_key) + + return status + + except Exception as e: + logger.error(f"Failed to get queue status: {e}") + return {"status": "error", "error": str(e)} + + async def clear_queue(self, priority: NotificationPriority) -> bool: + """Clear a specific priority queue""" + if not self.is_connected: + return False + + try: + queue_name = self.queue_names[priority] + await self.redis_client.delete(queue_name) + logger.info(f"Cleared queue: {queue_name}") + return True + + except Exception as e: + logger.error(f"Failed to clear queue: {e}") + return False + + async def clear_all_queues(self) -> bool: + """Clear all notification queues""" + if not self.is_connected: + return False + + try: + # Clear all priority queues + for queue_name in self.queue_names.values(): + await self.redis_client.delete(queue_name) + + # Clear scheduled and DLQ + await self.redis_client.delete(self.scheduled_key) + await self.redis_client.delete(self.dlq_key) + + logger.info("Cleared all notification queues") + return True + + except Exception as e: + logger.error(f"Failed to clear all queues: {e}") + return False \ No newline at end of file diff --git a/services/notifications/backend/requirements.txt b/services/notifications/backend/requirements.txt new file mode 100644 index 0000000..d0501d7 --- /dev/null +++ b/services/notifications/backend/requirements.txt @@ -0,0 +1,11 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +python-dotenv==1.0.0 +redis==5.0.1 +motor==3.5.1 +pymongo==4.6.1 +httpx==0.26.0 +websockets==12.0 +aiofiles==23.2.1 +python-multipart==0.0.6 \ No newline at end of file diff --git a/services/notifications/backend/template_engine.py b/services/notifications/backend/template_engine.py new file mode 100644 index 0000000..9c2be75 --- /dev/null +++ b/services/notifications/backend/template_engine.py @@ -0,0 +1,334 @@ +""" +Template Engine for notification templates +""" +import logging +import re +from typing import Dict, Any, List, Optional +from datetime import datetime +import uuid +from models import NotificationTemplate, NotificationChannel, NotificationCategory + +logger = logging.getLogger(__name__) + +class TemplateEngine: + """Manages and renders notification templates""" + + def __init__(self): + self.templates = {} # In-memory storage for demo + self._load_default_templates() + + async def load_templates(self): + """Load templates from storage""" + # In production, would load from database + logger.info(f"Loaded {len(self.templates)} templates") + + def _load_default_templates(self): + """Load default system templates""" + default_templates = [ + NotificationTemplate( + id="welcome", + name="Welcome Email", + channel=NotificationChannel.EMAIL, + category=NotificationCategory.SYSTEM, + subject_template="Welcome to {{app_name}}!", + body_template=""" + Hi {{user_name}}, + + Welcome to {{app_name}}! We're excited to have you on board. + + Here are some things you can do to get started: + - Complete your profile + - Explore our features + - Connect with other users + + If you have any questions, feel free to reach out to our support team. + + Best regards, + The {{app_name}} Team + """, + variables=["user_name", "app_name"] + ), + NotificationTemplate( + id="password_reset", + name="Password Reset", + channel=NotificationChannel.EMAIL, + category=NotificationCategory.SECURITY, + subject_template="Password Reset Request", + body_template=""" + Hi {{user_name}}, + + We received a request to reset your password for {{app_name}}. + + Click the link below to reset your password: + {{reset_link}} + + This link will expire in {{expiry_hours}} hours. + + If you didn't request this, please ignore this email or contact support. + + Best regards, + The {{app_name}} Team + """, + variables=["user_name", "app_name", "reset_link", "expiry_hours"] + ), + NotificationTemplate( + id="order_confirmation", + name="Order Confirmation", + channel=NotificationChannel.EMAIL, + category=NotificationCategory.TRANSACTION, + subject_template="Order #{{order_id}} Confirmed", + body_template=""" + Hi {{user_name}}, + + Your order #{{order_id}} has been confirmed! + + Order Details: + - Total: {{order_total}} + - Items: {{item_count}} + - Estimated Delivery: {{delivery_date}} + + You can track your order status at: {{tracking_link}} + + Thank you for your purchase! + + Best regards, + The {{app_name}} Team + """, + variables=["user_name", "app_name", "order_id", "order_total", "item_count", "delivery_date", "tracking_link"] + ), + NotificationTemplate( + id="sms_verification", + name="SMS Verification", + channel=NotificationChannel.SMS, + category=NotificationCategory.SECURITY, + body_template="Your {{app_name}} verification code is: {{code}}. Valid for {{expiry_minutes}} minutes.", + variables=["app_name", "code", "expiry_minutes"] + ), + NotificationTemplate( + id="push_reminder", + name="Push Reminder", + channel=NotificationChannel.PUSH, + category=NotificationCategory.UPDATE, + body_template="{{reminder_text}}", + variables=["reminder_text"] + ), + NotificationTemplate( + id="in_app_alert", + name="In-App Alert", + channel=NotificationChannel.IN_APP, + category=NotificationCategory.SYSTEM, + body_template="{{alert_message}}", + variables=["alert_message"] + ), + NotificationTemplate( + id="weekly_digest", + name="Weekly Digest", + channel=NotificationChannel.EMAIL, + category=NotificationCategory.MARKETING, + subject_template="Your Weekly {{app_name}} Digest", + body_template=""" + Hi {{user_name}}, + + Here's what happened this week on {{app_name}}: + + šŸ“Š Stats: + - New connections: {{new_connections}} + - Messages received: {{messages_count}} + - Activities completed: {{activities_count}} + + šŸ”„ Trending: + {{trending_items}} + + šŸ’” Tip of the week: + {{weekly_tip}} + + See you next week! + The {{app_name}} Team + """, + variables=["user_name", "app_name", "new_connections", "messages_count", "activities_count", "trending_items", "weekly_tip"] + ), + NotificationTemplate( + id="friend_request", + name="Friend Request", + channel=NotificationChannel.IN_APP, + category=NotificationCategory.SOCIAL, + body_template="{{sender_name}} sent you a friend request. {{personal_message}}", + variables=["sender_name", "personal_message"] + ) + ] + + for template in default_templates: + self.templates[template.id] = template + + async def create_template(self, template: NotificationTemplate) -> str: + """Create a new template""" + if not template.id: + template.id = str(uuid.uuid4()) + + # Validate template + if not self._validate_template(template): + raise ValueError("Invalid template format") + + # Extract variables from template + template.variables = self._extract_variables(template.body_template) + if template.subject_template: + template.variables.extend(self._extract_variables(template.subject_template)) + template.variables = list(set(template.variables)) # Remove duplicates + + # Store template + self.templates[template.id] = template + + logger.info(f"Created template: {template.id}") + return template.id + + async def update_template(self, template_id: str, template: NotificationTemplate) -> bool: + """Update an existing template""" + if template_id not in self.templates: + return False + + # Validate template + if not self._validate_template(template): + raise ValueError("Invalid template format") + + # Update template + template.id = template_id + template.updated_at = datetime.now() + + # Re-extract variables + template.variables = self._extract_variables(template.body_template) + if template.subject_template: + template.variables.extend(self._extract_variables(template.subject_template)) + template.variables = list(set(template.variables)) + + self.templates[template_id] = template + + logger.info(f"Updated template: {template_id}") + return True + + async def get_template(self, template_id: str) -> Optional[NotificationTemplate]: + """Get a template by ID""" + return self.templates.get(template_id) + + async def get_all_templates(self) -> List[NotificationTemplate]: + """Get all templates""" + return list(self.templates.values()) + + async def delete_template(self, template_id: str) -> bool: + """Delete a template""" + if template_id in self.templates: + del self.templates[template_id] + logger.info(f"Deleted template: {template_id}") + return True + return False + + async def render_template(self, template: NotificationTemplate, variables: Dict[str, Any]) -> str: + """Render a template with variables""" + if not template: + raise ValueError("Template not provided") + + # Start with body template + rendered = template.body_template + + # Replace variables + for var_name in template.variables: + placeholder = f"{{{{{var_name}}}}}" + value = variables.get(var_name, f"[{var_name}]") # Default to placeholder if not provided + + # Convert non-string values to string + if not isinstance(value, str): + value = str(value) + + rendered = rendered.replace(placeholder, value) + + # Clean up extra whitespace + rendered = re.sub(r'\n\s*\n', '\n\n', rendered.strip()) + + return rendered + + async def render_subject(self, template: NotificationTemplate, variables: Dict[str, Any]) -> Optional[str]: + """Render a template subject with variables""" + if not template or not template.subject_template: + return None + + rendered = template.subject_template + + # Replace variables + for var_name in self._extract_variables(template.subject_template): + placeholder = f"{{{{{var_name}}}}}" + value = variables.get(var_name, f"[{var_name}]") + + if not isinstance(value, str): + value = str(value) + + rendered = rendered.replace(placeholder, value) + + return rendered + + def _validate_template(self, template: NotificationTemplate) -> bool: + """Validate template format""" + if not template.name or not template.body_template: + return False + + # Check for basic template syntax + try: + # Check for balanced braces + open_count = template.body_template.count("{{") + close_count = template.body_template.count("}}") + if open_count != close_count: + return False + + if template.subject_template: + open_count = template.subject_template.count("{{") + close_count = template.subject_template.count("}}") + if open_count != close_count: + return False + + return True + + except Exception as e: + logger.error(f"Template validation error: {e}") + return False + + def _extract_variables(self, template_text: str) -> List[str]: + """Extract variable names from template text""" + if not template_text: + return [] + + # Find all {{variable_name}} patterns + pattern = r'\{\{(\w+)\}\}' + matches = re.findall(pattern, template_text) + + return list(set(matches)) # Return unique variable names + + async def get_templates_by_channel(self, channel: NotificationChannel) -> List[NotificationTemplate]: + """Get templates for a specific channel""" + return [t for t in self.templates.values() if t.channel == channel] + + async def get_templates_by_category(self, category: NotificationCategory) -> List[NotificationTemplate]: + """Get templates for a specific category""" + return [t for t in self.templates.values() if t.category == category] + + async def clone_template(self, template_id: str, new_name: str) -> str: + """Clone an existing template""" + original = self.templates.get(template_id) + if not original: + raise ValueError(f"Template {template_id} not found") + + # Create new template + new_template = NotificationTemplate( + id=str(uuid.uuid4()), + name=new_name, + channel=original.channel, + category=original.category, + subject_template=original.subject_template, + body_template=original.body_template, + variables=original.variables.copy(), + metadata=original.metadata.copy(), + is_active=True, + created_at=datetime.now() + ) + + self.templates[new_template.id] = new_template + + logger.info(f"Cloned template {template_id} to {new_template.id}") + return new_template.id \ No newline at end of file diff --git a/services/notifications/backend/test_notifications.py b/services/notifications/backend/test_notifications.py new file mode 100644 index 0000000..3a820dc --- /dev/null +++ b/services/notifications/backend/test_notifications.py @@ -0,0 +1,268 @@ +""" +Test script for Notification Service +""" +import asyncio +import httpx +import websockets +import json +from datetime import datetime, timedelta + +BASE_URL = "http://localhost:8013" +WS_URL = "ws://localhost:8013/ws/notifications" + +async def test_notification_api(): + """Test notification API endpoints""" + async with httpx.AsyncClient() as client: + print("\nšŸ”” Testing Notification Service API...") + + # Test health check + print("\n1. Testing health check...") + response = await client.get(f"{BASE_URL}/health") + print(f"Health check: {response.json()}") + + # Test sending single notification + print("\n2. Testing single notification...") + notification_data = { + "user_id": "test_user_123", + "title": "Welcome to Our App!", + "message": "Thank you for joining our platform. We're excited to have you!", + "channels": ["in_app", "email"], + "priority": "high", + "category": "system", + "data": { + "action_url": "https://example.com/welcome", + "icon": "welcome" + } + } + + response = await client.post( + f"{BASE_URL}/api/notifications/send", + json=notification_data + ) + notification_result = response.json() + print(f"Notification sent: {notification_result}") + notification_id = notification_result.get("notification_id") + + # Test bulk notifications + print("\n3. Testing bulk notifications...") + bulk_data = { + "user_ids": ["user1", "user2", "user3"], + "title": "System Maintenance Notice", + "message": "We will be performing system maintenance tonight from 2-4 AM.", + "channels": ["in_app", "push"], + "priority": "normal", + "category": "update" + } + + response = await client.post( + f"{BASE_URL}/api/notifications/send-bulk", + json=bulk_data + ) + print(f"Bulk notifications: {response.json()}") + + # Test scheduled notification + print("\n4. Testing scheduled notification...") + scheduled_time = datetime.now() + timedelta(minutes=5) + scheduled_data = { + "user_id": "test_user_123", + "title": "Reminder: Meeting in 5 minutes", + "message": "Your scheduled meeting is about to start.", + "channels": ["in_app", "push"], + "priority": "urgent", + "category": "system", + "schedule_at": scheduled_time.isoformat() + } + + response = await client.post( + f"{BASE_URL}/api/notifications/send", + json=scheduled_data + ) + print(f"Scheduled notification: {response.json()}") + + # Test get user notifications + print("\n5. Testing get user notifications...") + response = await client.get( + f"{BASE_URL}/api/notifications/user/test_user_123" + ) + notifications = response.json() + print(f"User notifications: Found {notifications['count']} notifications") + + # Test mark as read + if notification_id: + print("\n6. Testing mark as read...") + response = await client.patch( + f"{BASE_URL}/api/notifications/{notification_id}/read" + ) + print(f"Mark as read: {response.json()}") + + # Test templates + print("\n7. Testing templates...") + response = await client.get(f"{BASE_URL}/api/templates") + templates = response.json() + print(f"Available templates: {len(templates['templates'])} templates") + + # Test preferences + print("\n8. Testing user preferences...") + + # Get preferences + response = await client.get( + f"{BASE_URL}/api/preferences/test_user_123" + ) + print(f"Current preferences: {response.json()}") + + # Update preferences + new_preferences = { + "user_id": "test_user_123", + "channels": { + "email": True, + "sms": False, + "push": True, + "in_app": True + }, + "categories": { + "system": True, + "marketing": False, + "transaction": True, + "social": True, + "security": True, + "update": True + }, + "email_frequency": "daily", + "timezone": "America/New_York", + "language": "en" + } + + response = await client.put( + f"{BASE_URL}/api/preferences/test_user_123", + json=new_preferences + ) + print(f"Update preferences: {response.json()}") + + # Test unsubscribe + response = await client.post( + f"{BASE_URL}/api/preferences/test_user_123/unsubscribe/marketing" + ) + print(f"Unsubscribe from marketing: {response.json()}") + + # Test notification with template + print("\n9. Testing notification with template...") + template_notification = { + "user_id": "test_user_123", + "title": "Password Reset Request", + "message": "", # Will be filled by template + "channels": ["email"], + "priority": "high", + "category": "security", + "template_id": "password_reset", + "data": { + "user_name": "John Doe", + "app_name": "Our App", + "reset_link": "https://example.com/reset/abc123", + "expiry_hours": 24 + } + } + + response = await client.post( + f"{BASE_URL}/api/notifications/send", + json=template_notification + ) + print(f"Template notification: {response.json()}") + + # Test queue status + print("\n10. Testing queue status...") + response = await client.get(f"{BASE_URL}/api/queue/status") + print(f"Queue status: {response.json()}") + + # Test analytics + print("\n11. Testing analytics...") + response = await client.get(f"{BASE_URL}/api/analytics") + analytics = response.json() + print(f"Analytics overview: {analytics}") + + # Test notification history + print("\n12. Testing notification history...") + response = await client.get( + f"{BASE_URL}/api/history", + params={"user_id": "test_user_123", "limit": 10} + ) + history = response.json() + print(f"Notification history: {history['count']} entries") + + # Test device registration + print("\n13. Testing device registration...") + response = await client.post( + f"{BASE_URL}/api/devices/register", + params={ + "user_id": "test_user_123", + "device_token": "dummy_token_12345", + "device_type": "ios" + } + ) + print(f"Device registration: {response.json()}") + +async def test_websocket(): + """Test WebSocket connection for real-time notifications""" + print("\n\n🌐 Testing WebSocket Connection...") + + try: + uri = f"{WS_URL}/test_user_123" + async with websockets.connect(uri) as websocket: + print(f"Connected to WebSocket at {uri}") + + # Listen for welcome message + message = await websocket.recv() + data = json.loads(message) + print(f"Welcome message: {data}") + + # Send ping + await websocket.send("ping") + pong = await websocket.recv() + print(f"Ping response: {pong}") + + # Send notification via API while connected + async with httpx.AsyncClient() as client: + notification_data = { + "user_id": "test_user_123", + "title": "Real-time Test", + "message": "This should appear in WebSocket!", + "channels": ["in_app"], + "priority": "normal" + } + + response = await client.post( + f"{BASE_URL}/api/notifications/send", + json=notification_data + ) + print(f"Sent notification: {response.json()}") + + # Wait for real-time notification + print("Waiting for real-time notification...") + try: + notification = await asyncio.wait_for(websocket.recv(), timeout=5.0) + print(f"Received real-time notification: {json.loads(notification)}") + except asyncio.TimeoutError: + print("No real-time notification received (timeout)") + + print("WebSocket test completed") + + except Exception as e: + print(f"WebSocket error: {e}") + +async def main(): + """Run all tests""" + print("=" * 60) + print("NOTIFICATION SERVICE TEST SUITE") + print("=" * 60) + + # Test API endpoints + await test_notification_api() + + # Test WebSocket + await test_websocket() + + print("\n" + "=" * 60) + print("āœ… All tests completed!") + print("=" * 60) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/services/notifications/backend/websocket_server.py b/services/notifications/backend/websocket_server.py new file mode 100644 index 0000000..0295af5 --- /dev/null +++ b/services/notifications/backend/websocket_server.py @@ -0,0 +1,194 @@ +""" +WebSocket Server for real-time notifications +""" +import logging +import json +from typing import Dict, List +from fastapi import WebSocket +from datetime import datetime + +logger = logging.getLogger(__name__) + +class WebSocketNotificationServer: + """Manages WebSocket connections for real-time notifications""" + + def __init__(self): + # Store connections by user_id + self.active_connections: Dict[str, List[WebSocket]] = {} + self.connection_metadata: Dict[WebSocket, Dict] = {} + + async def connect(self, websocket: WebSocket, user_id: str): + """Accept a new WebSocket connection""" + await websocket.accept() + + # Add to active connections + if user_id not in self.active_connections: + self.active_connections[user_id] = [] + + self.active_connections[user_id].append(websocket) + + # Store metadata + self.connection_metadata[websocket] = { + "user_id": user_id, + "connected_at": datetime.now(), + "last_activity": datetime.now() + } + + logger.info(f"WebSocket connected for user {user_id}. Total connections: {len(self.active_connections[user_id])}") + + # Send welcome message + await self.send_welcome_message(websocket, user_id) + + def disconnect(self, user_id: str): + """Remove a WebSocket connection""" + if user_id in self.active_connections: + # Remove all connections for this user + for websocket in self.active_connections[user_id]: + if websocket in self.connection_metadata: + del self.connection_metadata[websocket] + + del self.active_connections[user_id] + logger.info(f"WebSocket disconnected for user {user_id}") + + async def send_to_user(self, user_id: str, message: Dict): + """Send a message to all connections for a specific user""" + if user_id not in self.active_connections: + logger.debug(f"No active connections for user {user_id}") + return False + + disconnected = [] + for websocket in self.active_connections[user_id]: + try: + await websocket.send_json(message) + # Update last activity + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["last_activity"] = datetime.now() + except Exception as e: + logger.error(f"Error sending to WebSocket for user {user_id}: {e}") + disconnected.append(websocket) + + # Remove disconnected websockets + for ws in disconnected: + self.active_connections[user_id].remove(ws) + if ws in self.connection_metadata: + del self.connection_metadata[ws] + + # Clean up if no more connections + if not self.active_connections[user_id]: + del self.active_connections[user_id] + + return True + + async def broadcast(self, message: Dict): + """Broadcast a message to all connected users""" + for user_id in list(self.active_connections.keys()): + await self.send_to_user(user_id, message) + + async def send_notification(self, user_id: str, notification: Dict): + """Send a notification to a specific user""" + message = { + "type": "notification", + "timestamp": datetime.now().isoformat(), + "data": notification + } + return await self.send_to_user(user_id, message) + + async def send_welcome_message(self, websocket: WebSocket, user_id: str): + """Send a welcome message to newly connected user""" + welcome_message = { + "type": "connection", + "status": "connected", + "user_id": user_id, + "timestamp": datetime.now().isoformat(), + "message": "Connected to notification service" + } + + try: + await websocket.send_json(welcome_message) + except Exception as e: + logger.error(f"Error sending welcome message: {e}") + + def get_connection_count(self, user_id: str = None) -> int: + """Get the number of active connections""" + if user_id: + return len(self.active_connections.get(user_id, [])) + + total = 0 + for connections in self.active_connections.values(): + total += len(connections) + return total + + def get_connected_users(self) -> List[str]: + """Get list of connected user IDs""" + return list(self.active_connections.keys()) + + async def send_system_message(self, user_id: str, message: str, severity: str = "info"): + """Send a system message to a user""" + system_message = { + "type": "system", + "severity": severity, + "message": message, + "timestamp": datetime.now().isoformat() + } + return await self.send_to_user(user_id, system_message) + + async def send_presence_update(self, user_id: str, status: str): + """Send presence update to user's connections""" + presence_message = { + "type": "presence", + "user_id": user_id, + "status": status, + "timestamp": datetime.now().isoformat() + } + + # Could send to friends/contacts if implemented + return await self.send_to_user(user_id, presence_message) + + async def handle_ping(self, websocket: WebSocket): + """Handle ping message from client""" + try: + await websocket.send_json({ + "type": "pong", + "timestamp": datetime.now().isoformat() + }) + + # Update last activity + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["last_activity"] = datetime.now() + except Exception as e: + logger.error(f"Error handling ping: {e}") + + async def cleanup_stale_connections(self, timeout_minutes: int = 30): + """Clean up stale connections that haven't been active""" + now = datetime.now() + stale_connections = [] + + for websocket, metadata in self.connection_metadata.items(): + last_activity = metadata.get("last_activity") + if last_activity: + time_diff = (now - last_activity).total_seconds() / 60 + if time_diff > timeout_minutes: + stale_connections.append({ + "websocket": websocket, + "user_id": metadata.get("user_id") + }) + + # Remove stale connections + for conn in stale_connections: + user_id = conn["user_id"] + websocket = conn["websocket"] + + if user_id in self.active_connections: + if websocket in self.active_connections[user_id]: + self.active_connections[user_id].remove(websocket) + + # Clean up if no more connections + if not self.active_connections[user_id]: + del self.active_connections[user_id] + + if websocket in self.connection_metadata: + del self.connection_metadata[websocket] + + logger.info(f"Cleaned up stale connection for user {user_id}") + + return len(stale_connections) \ No newline at end of file diff --git a/services/statistics/backend/main.py b/services/statistics/backend/main.py index 94119a2..f1a3c53 100644 --- a/services/statistics/backend/main.py +++ b/services/statistics/backend/main.py @@ -53,14 +53,18 @@ async def lifespan(app: FastAPI): await cache_manager.connect() logger.info("Connected to Redis cache") - # Initialize Metrics Collector - metrics_collector = MetricsCollector( - kafka_bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"), - ts_db=ts_db, - cache=cache_manager - ) - await metrics_collector.start() - logger.info("Metrics collector started") + # Initialize Metrics Collector (optional Kafka connection) + try: + metrics_collector = MetricsCollector( + kafka_bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"), + ts_db=ts_db, + cache=cache_manager + ) + await metrics_collector.start() + logger.info("Metrics collector started") + except Exception as e: + logger.warning(f"Metrics collector failed to start (Kafka not available): {e}") + metrics_collector = None # Initialize Data Aggregator data_aggregator = DataAggregator( diff --git a/test_integration.py b/test_integration.py new file mode 100755 index 0000000..eed6419 --- /dev/null +++ b/test_integration.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +Integration Test Suite for Site11 Services +Tests the interaction between Console, Statistics, and Notification services +""" +import asyncio +import httpx +import json +from datetime import datetime, timedelta +import time + +BASE_URLS = { + "console": "http://localhost:8011", + "statistics": "http://localhost:8012", + "notifications": "http://localhost:8013", + "users": "http://localhost:8001", + "oauth": "http://localhost:8003", + "images": "http://localhost:8002" +} + +async def test_service_health(): + """Test health checks for all services""" + print("\nšŸ„ Testing Service Health Checks...") + print("=" * 60) + + async with httpx.AsyncClient() as client: + for service, url in BASE_URLS.items(): + try: + response = await client.get(f"{url}/health") + status = "āœ… HEALTHY" if response.status_code == 200 else f"āŒ UNHEALTHY ({response.status_code})" + print(f"{service.ljust(15)}: {status}") + if response.status_code == 200: + data = response.json() + if "components" in data: + for comp, comp_status in data["components"].items(): + print(f" └─ {comp}: {comp_status}") + except Exception as e: + print(f"{service.ljust(15)}: āŒ ERROR - {str(e)}") + + print("=" * 60) + +async def test_notification_to_statistics_flow(): + """Test flow from notification creation to statistics recording""" + print("\nšŸ“Š Testing Notification → Statistics Flow...") + print("=" * 60) + + async with httpx.AsyncClient() as client: + # 1. Send a notification + print("1. Sending notification...") + notification_data = { + "user_id": "integration_test_user", + "title": "Integration Test Alert", + "message": "Testing integration between services", + "channels": ["in_app"], + "priority": "high", + "category": "system" + } + + try: + response = await client.post( + f"{BASE_URLS['notifications']}/api/notifications/send", + json=notification_data + ) + result = response.json() + print(f" Notification sent: {result}") + notification_id = result.get("notification_id") + + # 2. Wait a moment for processing + await asyncio.sleep(2) + + # 3. Check if statistics recorded the event + print("\n2. Checking statistics for notification events...") + response = await client.get( + f"{BASE_URLS['statistics']}/api/analytics/events", + params={"event_type": "notification", "limit": 5} + ) + + if response.status_code == 200: + events = response.json() + print(f" Found {events.get('count', 0)} notification events in statistics") + else: + print(f" Statistics check failed: {response.status_code}") + + # 4. Check analytics overview + print("\n3. Getting analytics overview...") + response = await client.get( + f"{BASE_URLS['statistics']}/api/analytics/overview" + ) + + if response.status_code == 200: + overview = response.json() + print(f" Total events: {overview.get('total_events', 'N/A')}") + print(f" Active users: {overview.get('active_users', 'N/A')}") + print(f" System health: {overview.get('system_health', 'N/A')}") + + except Exception as e: + print(f" Error: {e}") + + print("=" * 60) + +async def test_user_action_flow(): + """Test a complete user action flow across services""" + print("\nšŸ‘¤ Testing User Action Flow...") + print("=" * 60) + + async with httpx.AsyncClient() as client: + # 1. Create a test user (if Users service supports it) + print("1. Creating/verifying test user...") + try: + # Try to get user first + response = await client.get(f"{BASE_URLS['users']}/api/users/test_user_integration") + if response.status_code == 404: + # Create user if doesn't exist + user_data = { + "username": "test_user_integration", + "email": "integration@test.com", + "full_name": "Integration Test User" + } + response = await client.post( + f"{BASE_URLS['users']}/api/users", + json=user_data + ) + print(f" User status: {response.status_code}") + except Exception as e: + print(f" User service not fully implemented or accessible: {e}") + + # 2. Record user activity in statistics + print("\n2. Recording user activity metrics...") + try: + metric_data = { + "id": f"metric_{int(time.time())}", + "metric_type": "user_activity", + "value": 1, + "timestamp": datetime.now().isoformat(), + "labels": { + "user_id": "test_user_integration", + "action": "login", + "source": "web" + } + } + + response = await client.post( + f"{BASE_URLS['statistics']}/api/metrics", + json=metric_data + ) + print(f" Metric recorded: {response.status_code}") + except Exception as e: + print(f" Statistics error: {e}") + + # 3. Send a notification about the user action + print("\n3. Sending user action notification...") + try: + notification_data = { + "user_id": "test_user_integration", + "title": "Welcome Back!", + "message": "Your login was successful", + "channels": ["in_app"], + "priority": "normal", + "category": "system" + } + + response = await client.post( + f"{BASE_URLS['notifications']}/api/notifications/send", + json=notification_data + ) + print(f" Notification sent: {response.json()}") + except Exception as e: + print(f" Notification error: {e}") + + print("=" * 60) + +async def test_real_time_metrics(): + """Test real-time metrics collection and retrieval""" + print("\n⚔ Testing Real-time Metrics...") + print("=" * 60) + + async with httpx.AsyncClient() as client: + # 1. Send batch metrics + print("1. Sending batch metrics...") + metrics = [] + for i in range(5): + metrics.append({ + "id": f"realtime_{int(time.time())}_{i}", + "metric_type": "api_request", + "value": 100 + i * 10, + "timestamp": datetime.now().isoformat(), + "labels": { + "endpoint": f"/api/test_{i}", + "method": "GET", + "status": "200" + } + }) + + try: + response = await client.post( + f"{BASE_URLS['statistics']}/api/metrics/batch", + json=metrics + ) + print(f" Batch metrics sent: {response.json()}") + except Exception as e: + print(f" Error sending metrics: {e}") + + # 2. Wait and retrieve real-time metrics + await asyncio.sleep(1) + + print("\n2. Retrieving real-time metrics...") + try: + response = await client.get( + f"{BASE_URLS['statistics']}/api/metrics/realtime/api_request", + params={"duration": 60} + ) + + if response.status_code == 200: + data = response.json() + print(f" Metric type: {data.get('metric_type')}") + print(f" Duration: {data.get('duration')}s") + print(f" Data points: {len(data.get('data', []))}") + except Exception as e: + print(f" Error retrieving metrics: {e}") + + print("=" * 60) + +async def test_notification_preferences(): + """Test notification preference management""" + print("\nāš™ļø Testing Notification Preferences...") + print("=" * 60) + + async with httpx.AsyncClient() as client: + user_id = "pref_test_user" + + # 1. Set user preferences + print("1. Setting user preferences...") + preferences = { + "user_id": user_id, + "channels": { + "email": True, + "sms": False, + "push": True, + "in_app": True + }, + "categories": { + "system": True, + "marketing": False, + "transaction": True, + "social": False, + "security": True, + "update": True + }, + "email_frequency": "daily", + "timezone": "America/New_York", + "language": "en" + } + + try: + response = await client.put( + f"{BASE_URLS['notifications']}/api/preferences/{user_id}", + json=preferences + ) + print(f" Preferences updated: {response.json()}") + except Exception as e: + print(f" Error setting preferences: {e}") + + # 2. Test notification with preferences + print("\n2. Sending notification respecting preferences...") + try: + # This should be blocked due to marketing=False + notification_data = { + "user_id": user_id, + "title": "Special Offer!", + "message": "Get 50% off today", + "channels": ["email", "in_app"], + "priority": "normal", + "category": "marketing" + } + + response = await client.post( + f"{BASE_URLS['notifications']}/api/notifications/send", + json=notification_data + ) + print(f" Marketing notification (should be filtered): {response.json()}") + + # This should go through due to system=True + notification_data["category"] = "system" + notification_data["title"] = "System Update" + notification_data["message"] = "New features available" + + response = await client.post( + f"{BASE_URLS['notifications']}/api/notifications/send", + json=notification_data + ) + print(f" System notification (should be sent): {response.json()}") + except Exception as e: + print(f" Error sending notifications: {e}") + + print("=" * 60) + +async def main(): + """Run all integration tests""" + print("=" * 60) + print("šŸš€ SITE11 INTEGRATION TEST SUITE") + print("=" * 60) + print(f"Started at: {datetime.now().isoformat()}") + + # Run tests + await test_service_health() + await test_notification_to_statistics_flow() + await test_user_action_flow() + await test_real_time_metrics() + await test_notification_preferences() + + print("\n" + "=" * 60) + print("āœ… Integration tests completed!") + print(f"Finished at: {datetime.now().isoformat()}") + print("=" * 60) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file