Files
2025-09-28 20:41:57 +09:00

514 lines
17 KiB
Python

"""
Notification Service - Real-time Multi-channel Notifications
"""
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import asyncio
import os
from contextlib import asynccontextmanager
import logging
# Import custom modules
from models import (
Notification, NotificationChannel, NotificationTemplate,
NotificationPreference, NotificationHistory, NotificationStatus,
NotificationPriority, CreateNotificationRequest, BulkNotificationRequest
)
from notification_manager import NotificationManager
from channel_handlers import EmailHandler, SMSHandler, PushHandler, InAppHandler
from websocket_server import WebSocketNotificationServer
from queue_manager import NotificationQueueManager
from template_engine import TemplateEngine
from preference_manager import PreferenceManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global instances
notification_manager = None
ws_server = None
queue_manager = None
template_engine = None
preference_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global notification_manager, ws_server, queue_manager, template_engine, preference_manager
try:
# Initialize Template Engine
template_engine = TemplateEngine()
await template_engine.load_templates()
logger.info("Template engine initialized")
# Initialize Preference Manager
preference_manager = PreferenceManager(
mongodb_url=os.getenv("MONGODB_URL", "mongodb://mongodb:27017"),
database_name="notifications"
)
await preference_manager.connect()
logger.info("Preference manager connected")
# Initialize Notification Queue Manager
queue_manager = NotificationQueueManager(
redis_url=os.getenv("REDIS_URL", "redis://redis:6379")
)
await queue_manager.connect()
logger.info("Queue manager connected")
# Initialize Channel Handlers
email_handler = EmailHandler(
smtp_host=os.getenv("SMTP_HOST", "smtp.gmail.com"),
smtp_port=int(os.getenv("SMTP_PORT", 587)),
smtp_user=os.getenv("SMTP_USER", ""),
smtp_password=os.getenv("SMTP_PASSWORD", "")
)
sms_handler = SMSHandler(
api_key=os.getenv("SMS_API_KEY", ""),
api_url=os.getenv("SMS_API_URL", "")
)
push_handler = PushHandler(
fcm_server_key=os.getenv("FCM_SERVER_KEY", "")
)
in_app_handler = InAppHandler()
# Initialize Notification Manager
notification_manager = NotificationManager(
channel_handlers={
NotificationChannel.EMAIL: email_handler,
NotificationChannel.SMS: sms_handler,
NotificationChannel.PUSH: push_handler,
NotificationChannel.IN_APP: in_app_handler
},
queue_manager=queue_manager,
template_engine=template_engine,
preference_manager=preference_manager
)
await notification_manager.start()
logger.info("Notification manager started")
# Initialize WebSocket Server
ws_server = WebSocketNotificationServer()
logger.info("WebSocket server initialized")
# Register in-app handler with WebSocket server
in_app_handler.set_ws_server(ws_server)
except Exception as e:
logger.error(f"Failed to start Notification service: {e}")
raise
yield
# Shutdown
if notification_manager:
await notification_manager.stop()
if queue_manager:
await queue_manager.close()
if preference_manager:
await preference_manager.close()
logger.info("Notification service shutdown complete")
app = FastAPI(
title="Notification Service",
description="Real-time Multi-channel Notification Service",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"service": "Notification Service",
"status": "running",
"timestamp": datetime.now().isoformat()
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "notifications",
"components": {
"queue_manager": "connected" if queue_manager and queue_manager.is_connected else "disconnected",
"preference_manager": "connected" if preference_manager and preference_manager.is_connected else "disconnected",
"notification_manager": "running" if notification_manager and notification_manager.is_running else "stopped",
"websocket_connections": len(ws_server.active_connections) if ws_server else 0
},
"timestamp": datetime.now().isoformat()
}
# Notification Endpoints
@app.post("/api/notifications/send")
async def send_notification(
request: CreateNotificationRequest,
background_tasks: BackgroundTasks
):
"""Send a single notification"""
try:
notification = await notification_manager.create_notification(
user_id=request.user_id,
title=request.title,
message=request.message,
channels=request.channels,
priority=request.priority,
data=request.data,
template_id=request.template_id,
schedule_at=request.schedule_at
)
if request.schedule_at and request.schedule_at > datetime.now():
# Schedule for later
await queue_manager.schedule_notification(notification, request.schedule_at)
return {
"notification_id": notification.id,
"status": "scheduled",
"scheduled_at": request.schedule_at.isoformat()
}
else:
# Send immediately
background_tasks.add_task(
notification_manager.send_notification,
notification
)
return {
"notification_id": notification.id,
"status": "queued"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/notifications/send-bulk")
async def send_bulk_notifications(
request: BulkNotificationRequest,
background_tasks: BackgroundTasks
):
"""Send notifications to multiple users"""
try:
notifications = []
for user_id in request.user_ids:
notification = await notification_manager.create_notification(
user_id=user_id,
title=request.title,
message=request.message,
channels=request.channels,
priority=request.priority,
data=request.data,
template_id=request.template_id
)
notifications.append(notification)
# Queue all notifications
background_tasks.add_task(
notification_manager.send_bulk_notifications,
notifications
)
return {
"count": len(notifications),
"notification_ids": [n.id for n in notifications],
"status": "queued"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/notifications/user/{user_id}")
async def get_user_notifications(
user_id: str,
status: Optional[NotificationStatus] = None,
channel: Optional[NotificationChannel] = None,
limit: int = Query(50, le=200),
offset: int = Query(0, ge=0)
):
"""Get notifications for a specific user"""
try:
notifications = await notification_manager.get_user_notifications(
user_id=user_id,
status=status,
channel=channel,
limit=limit,
offset=offset
)
return {
"notifications": notifications,
"count": len(notifications),
"limit": limit,
"offset": offset
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.patch("/api/notifications/{notification_id}/read")
async def mark_notification_read(notification_id: str):
"""Mark a notification as read"""
try:
success = await notification_manager.mark_as_read(notification_id)
if success:
return {"status": "marked_as_read", "notification_id": notification_id}
else:
raise HTTPException(status_code=404, detail="Notification not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/notifications/{notification_id}")
async def delete_notification(notification_id: str):
"""Delete a notification"""
try:
success = await notification_manager.delete_notification(notification_id)
if success:
return {"status": "deleted", "notification_id": notification_id}
else:
raise HTTPException(status_code=404, detail="Notification not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Template Endpoints
@app.get("/api/templates")
async def get_templates():
"""Get all notification templates"""
templates = await template_engine.get_all_templates()
return {"templates": templates}
@app.post("/api/templates")
async def create_template(template: NotificationTemplate):
"""Create a new notification template"""
try:
template_id = await template_engine.create_template(template)
return {"template_id": template_id, "status": "created"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/templates/{template_id}")
async def update_template(template_id: str, template: NotificationTemplate):
"""Update an existing template"""
try:
success = await template_engine.update_template(template_id, template)
if success:
return {"status": "updated", "template_id": template_id}
else:
raise HTTPException(status_code=404, detail="Template not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Preference Endpoints
@app.get("/api/preferences/{user_id}")
async def get_user_preferences(user_id: str):
"""Get notification preferences for a user"""
try:
preferences = await preference_manager.get_user_preferences(user_id)
return {"user_id": user_id, "preferences": preferences}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/preferences/{user_id}")
async def update_user_preferences(
user_id: str,
preferences: NotificationPreference
):
"""Update notification preferences for a user"""
try:
success = await preference_manager.update_user_preferences(user_id, preferences)
if success:
return {"status": "updated", "user_id": user_id}
else:
return {"status": "created", "user_id": user_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preferences/{user_id}/unsubscribe/{category}")
async def unsubscribe_from_category(user_id: str, category: str):
"""Unsubscribe user from a notification category"""
try:
success = await preference_manager.unsubscribe_category(user_id, category)
if success:
return {"status": "unsubscribed", "user_id": user_id, "category": category}
else:
raise HTTPException(status_code=404, detail="User preferences not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# History and Analytics Endpoints
@app.get("/api/history")
async def get_notification_history(
user_id: Optional[str] = None,
channel: Optional[NotificationChannel] = None,
status: Optional[NotificationStatus] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = Query(100, le=1000)
):
"""Get notification history with filters"""
try:
history = await notification_manager.get_notification_history(
user_id=user_id,
channel=channel,
status=status,
start_date=start_date,
end_date=end_date,
limit=limit
)
return {
"history": history,
"count": len(history),
"filters": {
"user_id": user_id,
"channel": channel,
"status": status,
"date_range": f"{start_date} to {end_date}" if start_date and end_date else None
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/analytics")
async def get_notification_analytics(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
):
"""Get notification analytics"""
try:
if not start_date:
start_date = datetime.now() - timedelta(days=7)
if not end_date:
end_date = datetime.now()
analytics = await notification_manager.get_analytics(start_date, end_date)
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Queue Management Endpoints
@app.get("/api/queue/status")
async def get_queue_status():
"""Get current queue status"""
try:
status = await queue_manager.get_queue_status()
return status
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/queue/retry/{notification_id}")
async def retry_failed_notification(
notification_id: str,
background_tasks: BackgroundTasks
):
"""Retry a failed notification"""
try:
notification = await notification_manager.get_notification(notification_id)
if not notification:
raise HTTPException(status_code=404, detail="Notification not found")
if notification.status != NotificationStatus.FAILED:
raise HTTPException(status_code=400, detail="Only failed notifications can be retried")
background_tasks.add_task(
notification_manager.retry_notification,
notification
)
return {"status": "retry_queued", "notification_id": notification_id}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# WebSocket Endpoint
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/notifications/{user_id}")
async def websocket_notifications(websocket: WebSocket, user_id: str):
"""WebSocket endpoint for real-time notifications"""
await ws_server.connect(websocket, user_id)
try:
while True:
# Keep connection alive and handle incoming messages
data = await websocket.receive_text()
# Handle different message types
if data == "ping":
await websocket.send_text("pong")
elif data.startswith("read:"):
# Mark notification as read
notification_id = data.split(":")[1]
await notification_manager.mark_as_read(notification_id)
except WebSocketDisconnect:
ws_server.disconnect(user_id)
except Exception as e:
logger.error(f"WebSocket error for user {user_id}: {e}")
ws_server.disconnect(user_id)
# Device Token Management
@app.post("/api/devices/register")
async def register_device_token(
user_id: str,
device_token: str,
device_type: str = Query(..., regex="^(ios|android|web)$")
):
"""Register a device token for push notifications"""
try:
success = await notification_manager.register_device_token(
user_id=user_id,
device_token=device_token,
device_type=device_type
)
if success:
return {
"status": "registered",
"user_id": user_id,
"device_type": device_type
}
else:
raise HTTPException(status_code=500, detail="Failed to register device token")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/devices/{device_token}")
async def unregister_device_token(device_token: str):
"""Unregister a device token"""
try:
success = await notification_manager.unregister_device_token(device_token)
if success:
return {"status": "unregistered", "device_token": device_token}
else:
raise HTTPException(status_code=404, detail="Device token not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)