""" Statistics Service - Real-time Analytics and Metrics """ from fastapi import FastAPI, HTTPException, Depends, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse import uvicorn from datetime import datetime, timedelta from typing import Optional, List, Dict, Any import asyncio import json import os from contextlib import asynccontextmanager import logging # Import custom modules from models import Metric, AggregatedMetric, TimeSeriesData, DashboardConfig from metrics_collector import MetricsCollector from aggregator import DataAggregator from websocket_manager import WebSocketManager from time_series_db import TimeSeriesDB from cache_manager import CacheManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global instances metrics_collector = None data_aggregator = None ws_manager = None ts_db = None cache_manager = None @asynccontextmanager async def lifespan(app: FastAPI): # Startup global metrics_collector, data_aggregator, ws_manager, ts_db, cache_manager try: # Initialize TimeSeriesDB (using InfluxDB) ts_db = TimeSeriesDB( host=os.getenv("INFLUXDB_HOST", "influxdb"), port=int(os.getenv("INFLUXDB_PORT", 8086)), database=os.getenv("INFLUXDB_DATABASE", "statistics") ) await ts_db.connect() logger.info("Connected to InfluxDB") # Initialize Cache Manager cache_manager = CacheManager( redis_url=os.getenv("REDIS_URL", "redis://redis:6379") ) await cache_manager.connect() logger.info("Connected to Redis cache") # Initialize Metrics Collector metrics_collector = MetricsCollector( kafka_bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"), ts_db=ts_db, cache=cache_manager ) await metrics_collector.start() logger.info("Metrics collector started") # Initialize Data Aggregator data_aggregator = DataAggregator( ts_db=ts_db, cache=cache_manager ) asyncio.create_task(data_aggregator.start_aggregation_jobs()) logger.info("Data aggregator started") # Initialize WebSocket Manager ws_manager = WebSocketManager() logger.info("WebSocket manager initialized") except Exception as e: logger.error(f"Failed to start Statistics service: {e}") raise yield # Shutdown if metrics_collector: await metrics_collector.stop() if data_aggregator: await data_aggregator.stop() if ts_db: await ts_db.close() if cache_manager: await cache_manager.close() logger.info("Statistics service shutdown complete") app = FastAPI( title="Statistics Service", description="Real-time Analytics and Metrics Service", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return { "service": "Statistics Service", "status": "running", "timestamp": datetime.now().isoformat() } @app.get("/health") async def health_check(): return { "status": "healthy", "service": "statistics", "components": { "influxdb": "connected" if ts_db and ts_db.is_connected else "disconnected", "redis": "connected" if cache_manager and cache_manager.is_connected else "disconnected", "metrics_collector": "running" if metrics_collector and metrics_collector.is_running else "stopped", "aggregator": "running" if data_aggregator and data_aggregator.is_running else "stopped" }, "timestamp": datetime.now().isoformat() } # Metrics Endpoints @app.post("/api/metrics") async def record_metric(metric: Metric): """Record a single metric""" try: await metrics_collector.record_metric(metric) return {"status": "recorded", "metric_id": metric.id} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/metrics/batch") async def record_metrics_batch(metrics: List[Metric]): """Record multiple metrics in batch""" try: await metrics_collector.record_metrics_batch(metrics) return {"status": "recorded", "count": len(metrics)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/metrics/realtime/{metric_type}") async def get_realtime_metrics( metric_type: str, duration: int = Query(60, description="Duration in seconds") ): """Get real-time metrics for the specified type""" try: end_time = datetime.now() start_time = end_time - timedelta(seconds=duration) metrics = await ts_db.query_metrics( metric_type=metric_type, start_time=start_time, end_time=end_time ) return { "metric_type": metric_type, "duration": duration, "data": metrics, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Analytics Endpoints @app.get("/api/analytics/overview") async def get_analytics_overview(): """Get overall analytics overview""" try: # Try to get from cache first cached = await cache_manager.get("analytics:overview") if cached: return json.loads(cached) # Calculate analytics overview = await data_aggregator.get_overview() # Cache for 1 minute await cache_manager.set( "analytics:overview", json.dumps(overview), expire=60 ) return overview except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/users") async def get_user_analytics( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, granularity: str = Query("hour", regex="^(minute|hour|day|week|month)$") ): """Get user analytics""" try: if not start_date: start_date = datetime.now() - timedelta(days=7) if not end_date: end_date = datetime.now() analytics = await data_aggregator.get_user_analytics( start_date=start_date, end_date=end_date, granularity=granularity ) return analytics except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/system") async def get_system_analytics(): """Get system performance analytics""" try: analytics = await data_aggregator.get_system_analytics() return analytics except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/events") async def get_event_analytics( event_type: Optional[str] = None, limit: int = Query(100, le=1000) ): """Get event analytics""" try: analytics = await data_aggregator.get_event_analytics( event_type=event_type, limit=limit ) return analytics except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Time Series Endpoints @app.get("/api/timeseries/{metric_name}") async def get_time_series( metric_name: str, start_time: datetime, end_time: datetime, interval: str = Query("1m", regex="^\\d+[smhd]$") ): """Get time series data for a specific metric""" try: data = await ts_db.get_time_series( metric_name=metric_name, start_time=start_time, end_time=end_time, interval=interval ) return TimeSeriesData( metric_name=metric_name, start_time=start_time, end_time=end_time, interval=interval, data=data ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Aggregation Endpoints @app.get("/api/aggregates/{metric_type}") async def get_aggregated_metrics( metric_type: str, aggregation: str = Query("avg", regex="^(avg|sum|min|max|count)$"), group_by: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None ): """Get aggregated metrics""" try: if not start_time: start_time = datetime.now() - timedelta(hours=24) if not end_time: end_time = datetime.now() result = await data_aggregator.aggregate_metrics( metric_type=metric_type, aggregation=aggregation, group_by=group_by, start_time=start_time, end_time=end_time ) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Dashboard Endpoints @app.get("/api/dashboard/configs") async def get_dashboard_configs(): """Get available dashboard configurations""" configs = await data_aggregator.get_dashboard_configs() return {"configs": configs} @app.get("/api/dashboard/{dashboard_id}") async def get_dashboard_data(dashboard_id: str): """Get data for a specific dashboard""" try: data = await data_aggregator.get_dashboard_data(dashboard_id) return data except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # WebSocket Endpoint for Real-time Updates from fastapi import WebSocket, WebSocketDisconnect @app.websocket("/ws/metrics") async def websocket_metrics(websocket: WebSocket): """WebSocket endpoint for real-time metrics streaming""" await ws_manager.connect(websocket) try: while True: # Send metrics updates every second metrics = await metrics_collector.get_latest_metrics() await websocket.send_json({ "type": "metrics_update", "data": metrics, "timestamp": datetime.now().isoformat() }) await asyncio.sleep(1) except WebSocketDisconnect: ws_manager.disconnect(websocket) except Exception as e: logger.error(f"WebSocket error: {e}") ws_manager.disconnect(websocket) # Alert Management Endpoints @app.post("/api/alerts/rules") async def create_alert_rule(rule: Dict[str, Any]): """Create a new alert rule""" try: rule_id = await data_aggregator.create_alert_rule(rule) return {"rule_id": rule_id, "status": "created"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/alerts/active") async def get_active_alerts(): """Get currently active alerts""" try: alerts = await data_aggregator.get_active_alerts() return {"alerts": alerts, "count": len(alerts)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Export Endpoints @app.get("/api/export/csv") async def export_metrics_csv( metric_type: str, start_time: datetime, end_time: datetime ): """Export metrics as CSV""" try: csv_data = await data_aggregator.export_to_csv( metric_type=metric_type, start_time=start_time, end_time=end_time ) return StreamingResponse( csv_data, media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=metrics_{metric_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )