diff --git a/services/statistics/backend/Dockerfile b/services/statistics/backend/Dockerfile new file mode 100644 index 0000000..2515968 --- /dev/null +++ b/services/statistics/backend/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose port +EXPOSE 8000 + +# Run the application +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/statistics/backend/aggregator.py b/services/statistics/backend/aggregator.py new file mode 100644 index 0000000..df20f60 --- /dev/null +++ b/services/statistics/backend/aggregator.py @@ -0,0 +1,617 @@ +""" +Data Aggregator - Performs data aggregation and analytics +""" +import asyncio +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime, timedelta +from models import ( + AggregatedMetric, AggregationType, Granularity, + UserAnalytics, SystemAnalytics, EventAnalytics, + AlertRule, Alert +) +import uuid +import io +import csv + +logger = logging.getLogger(__name__) + +class DataAggregator: + """Performs data aggregation and analytics operations""" + + def __init__(self, ts_db, cache): + self.ts_db = ts_db + self.cache = cache + self.is_running = False + self.alert_rules = {} + self.active_alerts = {} + self.aggregation_jobs = [] + + async def start_aggregation_jobs(self): + """Start background aggregation jobs""" + self.is_running = True + + # Schedule periodic aggregation jobs + self.aggregation_jobs = [ + asyncio.create_task(self._aggregate_hourly_metrics()), + asyncio.create_task(self._aggregate_daily_metrics()), + asyncio.create_task(self._check_alert_rules()), + asyncio.create_task(self._cleanup_old_data()) + ] + + logger.info("Data aggregation jobs started") + + async def stop(self): + """Stop aggregation jobs""" + self.is_running = False + + # Cancel all jobs + for job in self.aggregation_jobs: + job.cancel() + + # Wait for jobs to complete + await asyncio.gather(*self.aggregation_jobs, return_exceptions=True) + + logger.info("Data aggregation jobs stopped") + + async def _aggregate_hourly_metrics(self): + """Aggregate metrics every hour""" + while self.is_running: + try: + await asyncio.sleep(3600) # Run every hour + + end_time = datetime.now() + start_time = end_time - timedelta(hours=1) + + # Aggregate different metric types + await self._aggregate_metric_type("user.event", start_time, end_time, Granularity.HOUR) + await self._aggregate_metric_type("system.cpu", start_time, end_time, Granularity.HOUR) + await self._aggregate_metric_type("system.memory", start_time, end_time, Granularity.HOUR) + + logger.info("Completed hourly metrics aggregation") + + except Exception as e: + logger.error(f"Error in hourly aggregation: {e}") + + async def _aggregate_daily_metrics(self): + """Aggregate metrics every day""" + while self.is_running: + try: + await asyncio.sleep(86400) # Run every 24 hours + + end_time = datetime.now() + start_time = end_time - timedelta(days=1) + + # Aggregate different metric types + await self._aggregate_metric_type("user.event", start_time, end_time, Granularity.DAY) + await self._aggregate_metric_type("system", start_time, end_time, Granularity.DAY) + + logger.info("Completed daily metrics aggregation") + + except Exception as e: + logger.error(f"Error in daily aggregation: {e}") + + async def _aggregate_metric_type( + self, + metric_prefix: str, + start_time: datetime, + end_time: datetime, + granularity: Granularity + ): + """Aggregate a specific metric type""" + try: + # Query raw metrics + metrics = await self.ts_db.query_metrics( + metric_type=metric_prefix, + start_time=start_time, + end_time=end_time + ) + + if not metrics: + return + + # Calculate aggregations + aggregations = { + AggregationType.AVG: sum(m['value'] for m in metrics) / len(metrics), + AggregationType.SUM: sum(m['value'] for m in metrics), + AggregationType.MIN: min(m['value'] for m in metrics), + AggregationType.MAX: max(m['value'] for m in metrics), + AggregationType.COUNT: len(metrics) + } + + # Store aggregated results + for agg_type, value in aggregations.items(): + aggregated = AggregatedMetric( + metric_name=metric_prefix, + aggregation_type=agg_type, + value=value, + start_time=start_time, + end_time=end_time, + granularity=granularity, + count=len(metrics) + ) + + await self.ts_db.store_aggregated_metric(aggregated) + + except Exception as e: + logger.error(f"Error aggregating {metric_prefix}: {e}") + + async def aggregate_metrics( + self, + metric_type: str, + aggregation: str, + group_by: Optional[str], + start_time: datetime, + end_time: datetime + ) -> Dict[str, Any]: + """Perform custom metric aggregation""" + try: + # Query metrics + metrics = await self.ts_db.query_metrics( + metric_type=metric_type, + start_time=start_time, + end_time=end_time + ) + + if not metrics: + return {"result": 0, "count": 0} + + # Group metrics if requested + if group_by: + grouped = {} + for metric in metrics: + key = metric.get('tags', {}).get(group_by, 'unknown') + if key not in grouped: + grouped[key] = [] + grouped[key].append(metric['value']) + + # Aggregate each group + results = {} + for key, values in grouped.items(): + results[key] = self._calculate_aggregation(values, aggregation) + + return {"grouped_results": results, "count": len(metrics)} + else: + # Single aggregation + values = [m['value'] for m in metrics] + result = self._calculate_aggregation(values, aggregation) + return {"result": result, "count": len(metrics)} + + except Exception as e: + logger.error(f"Error in custom aggregation: {e}") + raise + + def _calculate_aggregation(self, values: List[float], aggregation: str) -> float: + """Calculate aggregation on values""" + if not values: + return 0 + + if aggregation == "avg": + return sum(values) / len(values) + elif aggregation == "sum": + return sum(values) + elif aggregation == "min": + return min(values) + elif aggregation == "max": + return max(values) + elif aggregation == "count": + return len(values) + else: + return 0 + + async def get_overview(self) -> Dict[str, Any]: + """Get analytics overview""" + try: + now = datetime.now() + last_hour = now - timedelta(hours=1) + last_day = now - timedelta(days=1) + last_week = now - timedelta(weeks=1) + + # Get various metrics + hourly_events = await self.ts_db.count_metrics("user.event", last_hour, now) + daily_events = await self.ts_db.count_metrics("user.event", last_day, now) + weekly_events = await self.ts_db.count_metrics("user.event", last_week, now) + + # Get system status + cpu_avg = await self.ts_db.get_average("system.cpu.usage", last_hour, now) + memory_avg = await self.ts_db.get_average("system.memory.usage", last_hour, now) + + # Get active users (approximate from events) + active_users = await self.ts_db.count_distinct_tags("user.event", "user_id", last_day, now) + + return { + "events": { + "last_hour": hourly_events, + "last_day": daily_events, + "last_week": weekly_events + }, + "system": { + "cpu_usage": cpu_avg, + "memory_usage": memory_avg + }, + "users": { + "active_daily": active_users + }, + "alerts": { + "active": len(self.active_alerts) + }, + "timestamp": now.isoformat() + } + + except Exception as e: + logger.error(f"Error getting overview: {e}") + return {} + + async def get_user_analytics( + self, + start_date: datetime, + end_date: datetime, + granularity: str + ) -> UserAnalytics: + """Get user analytics""" + try: + # Get user metrics + total_users = await self.ts_db.count_distinct_tags( + "user.event.user_created", + "user_id", + datetime.min, + end_date + ) + + active_users = await self.ts_db.count_distinct_tags( + "user.event", + "user_id", + start_date, + end_date + ) + + new_users = await self.ts_db.count_metrics( + "user.event.user_created", + start_date, + end_date + ) + + # Calculate growth rate + prev_period_start = start_date - (end_date - start_date) + prev_users = await self.ts_db.count_distinct_tags( + "user.event", + "user_id", + prev_period_start, + start_date + ) + + growth_rate = ((active_users - prev_users) / max(prev_users, 1)) * 100 + + # Get top actions + top_actions = await self.ts_db.get_top_metrics( + "user.event", + "event_type", + start_date, + end_date, + limit=10 + ) + + return UserAnalytics( + total_users=total_users, + active_users=active_users, + new_users=new_users, + user_growth_rate=growth_rate, + average_session_duration=0, # Would need session tracking + top_actions=top_actions, + user_distribution={}, # Would need geographic data + period=f"{start_date.date()} to {end_date.date()}" + ) + + except Exception as e: + logger.error(f"Error getting user analytics: {e}") + raise + + async def get_system_analytics(self) -> SystemAnalytics: + """Get system performance analytics""" + try: + now = datetime.now() + last_hour = now - timedelta(hours=1) + last_day = now - timedelta(days=1) + + # Calculate uptime (simplified - would need actual downtime tracking) + total_checks = await self.ts_db.count_metrics("system.health", last_day, now) + successful_checks = await self.ts_db.count_metrics_with_value( + "system.health", + 1, + last_day, + now + ) + uptime = (successful_checks / max(total_checks, 1)) * 100 + + # Get averages + cpu_usage = await self.ts_db.get_average("system.cpu.usage", last_hour, now) + memory_usage = await self.ts_db.get_average("system.memory.usage", last_hour, now) + disk_usage = await self.ts_db.get_average("system.disk.usage", last_hour, now) + response_time = await self.ts_db.get_average("api.response_time", last_hour, now) + + # Get error rate + total_requests = await self.ts_db.count_metrics("api.request", last_hour, now) + error_requests = await self.ts_db.count_metrics("api.error", last_hour, now) + error_rate = (error_requests / max(total_requests, 1)) * 100 + + # Throughput + throughput = total_requests / 3600 # requests per second + + return SystemAnalytics( + uptime_percentage=uptime, + average_response_time=response_time or 0, + error_rate=error_rate, + throughput=throughput, + cpu_usage=cpu_usage or 0, + memory_usage=memory_usage or 0, + disk_usage=disk_usage or 0, + active_connections=0, # Would need connection tracking + services_health={} # Would need service health checks + ) + + except Exception as e: + logger.error(f"Error getting system analytics: {e}") + raise + + async def get_event_analytics( + self, + event_type: Optional[str], + limit: int + ) -> EventAnalytics: + """Get event analytics""" + try: + now = datetime.now() + last_hour = now - timedelta(hours=1) + + # Get total events + total_events = await self.ts_db.count_metrics( + event_type or "user.event", + last_hour, + now + ) + + # Events per second + events_per_second = total_events / 3600 + + # Get event types distribution + event_types = await self.ts_db.get_metric_distribution( + "user.event", + "event_type", + last_hour, + now + ) + + # Top events + top_events = await self.ts_db.get_top_metrics( + event_type or "user.event", + "event_type", + last_hour, + now, + limit=limit + ) + + # Error events + error_events = await self.ts_db.count_metrics( + "user.event.error", + last_hour, + now + ) + + # Success rate + success_rate = ((total_events - error_events) / max(total_events, 1)) * 100 + + return EventAnalytics( + total_events=total_events, + events_per_second=events_per_second, + event_types=event_types, + top_events=top_events, + error_events=error_events, + success_rate=success_rate, + processing_time={} # Would need timing metrics + ) + + except Exception as e: + logger.error(f"Error getting event analytics: {e}") + raise + + async def get_dashboard_configs(self) -> List[Dict[str, Any]]: + """Get available dashboard configurations""" + return [ + { + "id": "overview", + "name": "Overview Dashboard", + "description": "General system overview" + }, + { + "id": "users", + "name": "User Analytics", + "description": "User behavior and statistics" + }, + { + "id": "system", + "name": "System Performance", + "description": "System health and performance metrics" + }, + { + "id": "events", + "name": "Event Analytics", + "description": "Event processing and statistics" + } + ] + + async def get_dashboard_data(self, dashboard_id: str) -> Dict[str, Any]: + """Get data for a specific dashboard""" + if dashboard_id == "overview": + return await self.get_overview() + elif dashboard_id == "users": + end_date = datetime.now() + start_date = end_date - timedelta(days=7) + analytics = await self.get_user_analytics(start_date, end_date, "day") + return analytics.dict() + elif dashboard_id == "system": + analytics = await self.get_system_analytics() + return analytics.dict() + elif dashboard_id == "events": + analytics = await self.get_event_analytics(None, 100) + return analytics.dict() + else: + raise ValueError(f"Unknown dashboard: {dashboard_id}") + + async def create_alert_rule(self, rule_data: Dict[str, Any]) -> str: + """Create a new alert rule""" + rule = AlertRule(**rule_data) + rule.id = str(uuid.uuid4()) + self.alert_rules[rule.id] = rule + + # Store in cache + await self.cache.set( + f"alert_rule:{rule.id}", + rule.json(), + expire=None # Permanent + ) + + return rule.id + + async def _check_alert_rules(self): + """Check alert rules periodically""" + while self.is_running: + try: + await asyncio.sleep(60) # Check every minute + + for rule_id, rule in self.alert_rules.items(): + if not rule.enabled: + continue + + await self._evaluate_alert_rule(rule) + + except Exception as e: + logger.error(f"Error checking alert rules: {e}") + + async def _evaluate_alert_rule(self, rule: AlertRule): + """Evaluate a single alert rule""" + try: + # Get recent metric values + end_time = datetime.now() + start_time = end_time - timedelta(seconds=rule.duration) + + avg_value = await self.ts_db.get_average( + rule.metric_name, + start_time, + end_time + ) + + if avg_value is None: + return + + # Check condition + triggered = False + if rule.condition == "gt" and avg_value > rule.threshold: + triggered = True + elif rule.condition == "lt" and avg_value < rule.threshold: + triggered = True + elif rule.condition == "gte" and avg_value >= rule.threshold: + triggered = True + elif rule.condition == "lte" and avg_value <= rule.threshold: + triggered = True + elif rule.condition == "eq" and avg_value == rule.threshold: + triggered = True + elif rule.condition == "neq" and avg_value != rule.threshold: + triggered = True + + # Handle alert state + alert_key = f"{rule.id}:{rule.metric_name}" + + if triggered: + if alert_key not in self.active_alerts: + # New alert + alert = Alert( + id=str(uuid.uuid4()), + rule_id=rule.id, + rule_name=rule.name, + metric_name=rule.metric_name, + current_value=avg_value, + threshold=rule.threshold, + severity=rule.severity, + triggered_at=datetime.now(), + status="active" + ) + self.active_alerts[alert_key] = alert + + # Send notifications + await self._send_alert_notifications(alert, rule) + + else: + if alert_key in self.active_alerts: + # Alert resolved + alert = self.active_alerts[alert_key] + alert.resolved_at = datetime.now() + alert.status = "resolved" + del self.active_alerts[alert_key] + + logger.info(f"Alert resolved: {rule.name}") + + except Exception as e: + logger.error(f"Error evaluating alert rule {rule.id}: {e}") + + async def _send_alert_notifications(self, alert: Alert, rule: AlertRule): + """Send alert notifications""" + logger.warning(f"ALERT: {rule.name} - {alert.metric_name} = {alert.current_value} (threshold: {alert.threshold})") + # Would implement actual notification channels here + + async def get_active_alerts(self) -> List[Dict[str, Any]]: + """Get currently active alerts""" + return [alert.dict() for alert in self.active_alerts.values()] + + async def export_to_csv( + self, + metric_type: str, + start_time: datetime, + end_time: datetime + ): + """Export metrics to CSV""" + try: + # Get metrics + metrics = await self.ts_db.query_metrics( + metric_type=metric_type, + start_time=start_time, + end_time=end_time + ) + + # Create CSV + output = io.StringIO() + writer = csv.DictWriter( + output, + fieldnames=['timestamp', 'metric_name', 'value', 'tags', 'service'] + ) + writer.writeheader() + + for metric in metrics: + writer.writerow({ + 'timestamp': metric.get('timestamp'), + 'metric_name': metric.get('name'), + 'value': metric.get('value'), + 'tags': str(metric.get('tags', {})), + 'service': metric.get('service') + }) + + output.seek(0) + return output + + except Exception as e: + logger.error(f"Error exporting to CSV: {e}") + raise + + async def _cleanup_old_data(self): + """Clean up old data periodically""" + while self.is_running: + try: + await asyncio.sleep(86400) # Run daily + + # Delete data older than 30 days + cutoff_date = datetime.now() - timedelta(days=30) + await self.ts_db.delete_old_data(cutoff_date) + + logger.info("Completed old data cleanup") + + except Exception as e: + logger.error(f"Error in data cleanup: {e}") \ No newline at end of file diff --git a/services/statistics/backend/cache_manager.py b/services/statistics/backend/cache_manager.py new file mode 100644 index 0000000..51d5981 --- /dev/null +++ b/services/statistics/backend/cache_manager.py @@ -0,0 +1,32 @@ +"""Cache Manager for Redis""" +import json +import logging +from typing import Optional, Any + +logger = logging.getLogger(__name__) + +class CacheManager: + """Redis cache manager""" + + def __init__(self, redis_url: str): + self.redis_url = redis_url + self.is_connected = False + self.cache = {} # Simplified in-memory cache + + async def connect(self): + """Connect to Redis""" + self.is_connected = True + logger.info("Connected to cache") + + async def close(self): + """Close Redis connection""" + self.is_connected = False + logger.info("Disconnected from cache") + + async def get(self, key: str) -> Optional[str]: + """Get value from cache""" + return self.cache.get(key) + + async def set(self, key: str, value: str, expire: Optional[int] = None): + """Set value in cache""" + self.cache[key] = value \ No newline at end of file diff --git a/services/statistics/backend/main.py b/services/statistics/backend/main.py new file mode 100644 index 0000000..94119a2 --- /dev/null +++ b/services/statistics/backend/main.py @@ -0,0 +1,392 @@ +""" +Statistics Service - Real-time Analytics and Metrics +""" +from fastapi import FastAPI, HTTPException, Depends, Query +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse +import uvicorn +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any +import asyncio +import json +import os +from contextlib import asynccontextmanager +import logging + +# Import custom modules +from models import Metric, AggregatedMetric, TimeSeriesData, DashboardConfig +from metrics_collector import MetricsCollector +from aggregator import DataAggregator +from websocket_manager import WebSocketManager +from time_series_db import TimeSeriesDB +from cache_manager import CacheManager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global instances +metrics_collector = None +data_aggregator = None +ws_manager = None +ts_db = None +cache_manager = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + global metrics_collector, data_aggregator, ws_manager, ts_db, cache_manager + + try: + # Initialize TimeSeriesDB (using InfluxDB) + ts_db = TimeSeriesDB( + host=os.getenv("INFLUXDB_HOST", "influxdb"), + port=int(os.getenv("INFLUXDB_PORT", 8086)), + database=os.getenv("INFLUXDB_DATABASE", "statistics") + ) + await ts_db.connect() + logger.info("Connected to InfluxDB") + + # Initialize Cache Manager + cache_manager = CacheManager( + redis_url=os.getenv("REDIS_URL", "redis://redis:6379") + ) + await cache_manager.connect() + logger.info("Connected to Redis cache") + + # Initialize Metrics Collector + metrics_collector = MetricsCollector( + kafka_bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"), + ts_db=ts_db, + cache=cache_manager + ) + await metrics_collector.start() + logger.info("Metrics collector started") + + # Initialize Data Aggregator + data_aggregator = DataAggregator( + ts_db=ts_db, + cache=cache_manager + ) + asyncio.create_task(data_aggregator.start_aggregation_jobs()) + logger.info("Data aggregator started") + + # Initialize WebSocket Manager + ws_manager = WebSocketManager() + logger.info("WebSocket manager initialized") + + except Exception as e: + logger.error(f"Failed to start Statistics service: {e}") + raise + + yield + + # Shutdown + if metrics_collector: + await metrics_collector.stop() + if data_aggregator: + await data_aggregator.stop() + if ts_db: + await ts_db.close() + if cache_manager: + await cache_manager.close() + + logger.info("Statistics service shutdown complete") + +app = FastAPI( + title="Statistics Service", + description="Real-time Analytics and Metrics Service", + version="1.0.0", + lifespan=lifespan +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/") +async def root(): + return { + "service": "Statistics Service", + "status": "running", + "timestamp": datetime.now().isoformat() + } + +@app.get("/health") +async def health_check(): + return { + "status": "healthy", + "service": "statistics", + "components": { + "influxdb": "connected" if ts_db and ts_db.is_connected else "disconnected", + "redis": "connected" if cache_manager and cache_manager.is_connected else "disconnected", + "metrics_collector": "running" if metrics_collector and metrics_collector.is_running else "stopped", + "aggregator": "running" if data_aggregator and data_aggregator.is_running else "stopped" + }, + "timestamp": datetime.now().isoformat() + } + +# Metrics Endpoints +@app.post("/api/metrics") +async def record_metric(metric: Metric): + """Record a single metric""" + try: + await metrics_collector.record_metric(metric) + return {"status": "recorded", "metric_id": metric.id} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/metrics/batch") +async def record_metrics_batch(metrics: List[Metric]): + """Record multiple metrics in batch""" + try: + await metrics_collector.record_metrics_batch(metrics) + return {"status": "recorded", "count": len(metrics)} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/metrics/realtime/{metric_type}") +async def get_realtime_metrics( + metric_type: str, + duration: int = Query(60, description="Duration in seconds") +): + """Get real-time metrics for the specified type""" + try: + end_time = datetime.now() + start_time = end_time - timedelta(seconds=duration) + + metrics = await ts_db.query_metrics( + metric_type=metric_type, + start_time=start_time, + end_time=end_time + ) + + return { + "metric_type": metric_type, + "duration": duration, + "data": metrics, + "timestamp": datetime.now().isoformat() + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Analytics Endpoints +@app.get("/api/analytics/overview") +async def get_analytics_overview(): + """Get overall analytics overview""" + try: + # Try to get from cache first + cached = await cache_manager.get("analytics:overview") + if cached: + return json.loads(cached) + + # Calculate analytics + overview = await data_aggregator.get_overview() + + # Cache for 1 minute + await cache_manager.set( + "analytics:overview", + json.dumps(overview), + expire=60 + ) + + return overview + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/analytics/users") +async def get_user_analytics( + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + granularity: str = Query("hour", regex="^(minute|hour|day|week|month)$") +): + """Get user analytics""" + try: + if not start_date: + start_date = datetime.now() - timedelta(days=7) + if not end_date: + end_date = datetime.now() + + analytics = await data_aggregator.get_user_analytics( + start_date=start_date, + end_date=end_date, + granularity=granularity + ) + + return analytics + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/analytics/system") +async def get_system_analytics(): + """Get system performance analytics""" + try: + analytics = await data_aggregator.get_system_analytics() + return analytics + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/analytics/events") +async def get_event_analytics( + event_type: Optional[str] = None, + limit: int = Query(100, le=1000) +): + """Get event analytics""" + try: + analytics = await data_aggregator.get_event_analytics( + event_type=event_type, + limit=limit + ) + return analytics + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Time Series Endpoints +@app.get("/api/timeseries/{metric_name}") +async def get_time_series( + metric_name: str, + start_time: datetime, + end_time: datetime, + interval: str = Query("1m", regex="^\\d+[smhd]$") +): + """Get time series data for a specific metric""" + try: + data = await ts_db.get_time_series( + metric_name=metric_name, + start_time=start_time, + end_time=end_time, + interval=interval + ) + + return TimeSeriesData( + metric_name=metric_name, + start_time=start_time, + end_time=end_time, + interval=interval, + data=data + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Aggregation Endpoints +@app.get("/api/aggregates/{metric_type}") +async def get_aggregated_metrics( + metric_type: str, + aggregation: str = Query("avg", regex="^(avg|sum|min|max|count)$"), + group_by: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None +): + """Get aggregated metrics""" + try: + if not start_time: + start_time = datetime.now() - timedelta(hours=24) + if not end_time: + end_time = datetime.now() + + result = await data_aggregator.aggregate_metrics( + metric_type=metric_type, + aggregation=aggregation, + group_by=group_by, + start_time=start_time, + end_time=end_time + ) + + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Dashboard Endpoints +@app.get("/api/dashboard/configs") +async def get_dashboard_configs(): + """Get available dashboard configurations""" + configs = await data_aggregator.get_dashboard_configs() + return {"configs": configs} + +@app.get("/api/dashboard/{dashboard_id}") +async def get_dashboard_data(dashboard_id: str): + """Get data for a specific dashboard""" + try: + data = await data_aggregator.get_dashboard_data(dashboard_id) + return data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# WebSocket Endpoint for Real-time Updates +from fastapi import WebSocket, WebSocketDisconnect + +@app.websocket("/ws/metrics") +async def websocket_metrics(websocket: WebSocket): + """WebSocket endpoint for real-time metrics streaming""" + await ws_manager.connect(websocket) + try: + while True: + # Send metrics updates every second + metrics = await metrics_collector.get_latest_metrics() + await websocket.send_json({ + "type": "metrics_update", + "data": metrics, + "timestamp": datetime.now().isoformat() + }) + await asyncio.sleep(1) + except WebSocketDisconnect: + ws_manager.disconnect(websocket) + except Exception as e: + logger.error(f"WebSocket error: {e}") + ws_manager.disconnect(websocket) + +# Alert Management Endpoints +@app.post("/api/alerts/rules") +async def create_alert_rule(rule: Dict[str, Any]): + """Create a new alert rule""" + try: + rule_id = await data_aggregator.create_alert_rule(rule) + return {"rule_id": rule_id, "status": "created"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/alerts/active") +async def get_active_alerts(): + """Get currently active alerts""" + try: + alerts = await data_aggregator.get_active_alerts() + return {"alerts": alerts, "count": len(alerts)} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Export Endpoints +@app.get("/api/export/csv") +async def export_metrics_csv( + metric_type: str, + start_time: datetime, + end_time: datetime +): + """Export metrics as CSV""" + try: + csv_data = await data_aggregator.export_to_csv( + metric_type=metric_type, + start_time=start_time, + end_time=end_time + ) + + return StreamingResponse( + csv_data, + media_type="text/csv", + headers={ + "Content-Disposition": f"attachment; filename=metrics_{metric_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + } + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8000, + reload=True + ) \ No newline at end of file diff --git a/services/statistics/backend/metrics_collector.py b/services/statistics/backend/metrics_collector.py new file mode 100644 index 0000000..7a86cc4 --- /dev/null +++ b/services/statistics/backend/metrics_collector.py @@ -0,0 +1,242 @@ +""" +Metrics Collector - Collects metrics from Kafka and other sources +""" +import asyncio +import json +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +from aiokafka import AIOKafkaConsumer +from models import Metric, MetricType +import uuid + +logger = logging.getLogger(__name__) + +class MetricsCollector: + """Collects and processes metrics from various sources""" + + def __init__(self, kafka_bootstrap_servers: str, ts_db, cache): + self.kafka_servers = kafka_bootstrap_servers + self.ts_db = ts_db + self.cache = cache + self.consumer = None + self.is_running = False + self.latest_metrics = {} + self.metrics_buffer = [] + self.buffer_size = 100 + self.flush_interval = 5 # seconds + + async def start(self): + """Start the metrics collector""" + try: + # Start Kafka consumer for event metrics + self.consumer = AIOKafkaConsumer( + 'metrics-events', + 'user-events', + 'system-metrics', + bootstrap_servers=self.kafka_servers, + group_id='statistics-consumer-group', + value_deserializer=lambda m: json.loads(m.decode('utf-8')) + ) + await self.consumer.start() + self.is_running = True + + # Start background tasks + asyncio.create_task(self._consume_metrics()) + asyncio.create_task(self._flush_metrics_periodically()) + + logger.info("Metrics collector started") + except Exception as e: + logger.error(f"Failed to start metrics collector: {e}") + raise + + async def stop(self): + """Stop the metrics collector""" + self.is_running = False + if self.consumer: + await self.consumer.stop() + + # Flush remaining metrics + if self.metrics_buffer: + await self._flush_metrics() + + logger.info("Metrics collector stopped") + + async def _consume_metrics(self): + """Consume metrics from Kafka""" + while self.is_running: + try: + async for msg in self.consumer: + if not self.is_running: + break + + metric = self._parse_kafka_message(msg) + if metric: + await self.record_metric(metric) + + except Exception as e: + logger.error(f"Error consuming metrics: {e}") + await asyncio.sleep(5) + + def _parse_kafka_message(self, msg) -> Optional[Metric]: + """Parse Kafka message into Metric""" + try: + data = msg.value + topic = msg.topic + + # Create metric based on topic + if topic == 'user-events': + return self._create_user_metric(data) + elif topic == 'system-metrics': + return self._create_system_metric(data) + elif topic == 'metrics-events': + return Metric(**data) + else: + return None + + except Exception as e: + logger.error(f"Failed to parse Kafka message: {e}") + return None + + def _create_user_metric(self, data: Dict) -> Metric: + """Create metric from user event""" + event_type = data.get('event_type', 'unknown') + + return Metric( + id=str(uuid.uuid4()), + name=f"user.event.{event_type.lower()}", + type=MetricType.COUNTER, + value=1, + tags={ + "event_type": event_type, + "user_id": data.get('data', {}).get('user_id', 'unknown'), + "service": data.get('service', 'unknown') + }, + timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())), + service=data.get('service', 'users') + ) + + def _create_system_metric(self, data: Dict) -> Metric: + """Create metric from system event""" + return Metric( + id=str(uuid.uuid4()), + name=data.get('metric_name', 'system.unknown'), + type=MetricType.GAUGE, + value=float(data.get('value', 0)), + tags=data.get('tags', {}), + timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())), + service=data.get('service', 'system') + ) + + async def record_metric(self, metric: Metric): + """Record a single metric""" + try: + # Add to buffer + self.metrics_buffer.append(metric) + + # Update latest metrics cache + self.latest_metrics[metric.name] = { + "value": metric.value, + "timestamp": metric.timestamp.isoformat(), + "tags": metric.tags + } + + # Flush if buffer is full + if len(self.metrics_buffer) >= self.buffer_size: + await self._flush_metrics() + + except Exception as e: + logger.error(f"Failed to record metric: {e}") + raise + + async def record_metrics_batch(self, metrics: List[Metric]): + """Record multiple metrics""" + for metric in metrics: + await self.record_metric(metric) + + async def _flush_metrics(self): + """Flush metrics buffer to time series database""" + if not self.metrics_buffer: + return + + try: + # Write to time series database + await self.ts_db.write_metrics(self.metrics_buffer) + + # Clear buffer + self.metrics_buffer.clear() + + logger.debug(f"Flushed {len(self.metrics_buffer)} metrics to database") + + except Exception as e: + logger.error(f"Failed to flush metrics: {e}") + + async def _flush_metrics_periodically(self): + """Periodically flush metrics buffer""" + while self.is_running: + await asyncio.sleep(self.flush_interval) + await self._flush_metrics() + + async def get_latest_metrics(self) -> Dict[str, Any]: + """Get latest metrics for real-time display""" + return self.latest_metrics + + async def collect_system_metrics(self): + """Collect system-level metrics""" + import psutil + + try: + # CPU metrics + cpu_percent = psutil.cpu_percent(interval=1) + await self.record_metric(Metric( + name="system.cpu.usage", + type=MetricType.GAUGE, + value=cpu_percent, + tags={"host": "localhost"}, + service="statistics" + )) + + # Memory metrics + memory = psutil.virtual_memory() + await self.record_metric(Metric( + name="system.memory.usage", + type=MetricType.GAUGE, + value=memory.percent, + tags={"host": "localhost"}, + service="statistics" + )) + + # Disk metrics + disk = psutil.disk_usage('/') + await self.record_metric(Metric( + name="system.disk.usage", + type=MetricType.GAUGE, + value=disk.percent, + tags={"host": "localhost", "mount": "/"}, + service="statistics" + )) + + # Network metrics + net_io = psutil.net_io_counters() + await self.record_metric(Metric( + name="system.network.bytes_sent", + type=MetricType.COUNTER, + value=net_io.bytes_sent, + tags={"host": "localhost"}, + service="statistics" + )) + await self.record_metric(Metric( + name="system.network.bytes_recv", + type=MetricType.COUNTER, + value=net_io.bytes_recv, + tags={"host": "localhost"}, + service="statistics" + )) + + except Exception as e: + logger.error(f"Failed to collect system metrics: {e}") + + async def collect_application_metrics(self): + """Collect application-level metrics""" + # This would be called by other services to report their metrics + pass \ No newline at end of file diff --git a/services/statistics/backend/models.py b/services/statistics/backend/models.py new file mode 100644 index 0000000..a772e69 --- /dev/null +++ b/services/statistics/backend/models.py @@ -0,0 +1,159 @@ +""" +Data models for Statistics Service +""" +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Optional, List, Dict, Any, Literal +from enum import Enum + +class MetricType(str, Enum): + """Types of metrics""" + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + SUMMARY = "summary" + +class AggregationType(str, Enum): + """Types of aggregation""" + AVG = "avg" + SUM = "sum" + MIN = "min" + MAX = "max" + COUNT = "count" + PERCENTILE = "percentile" + +class Granularity(str, Enum): + """Time granularity for aggregation""" + MINUTE = "minute" + HOUR = "hour" + DAY = "day" + WEEK = "week" + MONTH = "month" + +class Metric(BaseModel): + """Single metric data point""" + id: Optional[str] = Field(None, description="Unique metric ID") + name: str = Field(..., description="Metric name") + type: MetricType = Field(..., description="Metric type") + value: float = Field(..., description="Metric value") + tags: Dict[str, str] = Field(default_factory=dict, description="Metric tags") + timestamp: datetime = Field(default_factory=datetime.now, description="Metric timestamp") + service: str = Field(..., description="Source service") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class AggregatedMetric(BaseModel): + """Aggregated metric result""" + metric_name: str + aggregation_type: AggregationType + value: float + start_time: datetime + end_time: datetime + granularity: Optional[Granularity] = None + group_by: Optional[str] = None + count: int = Field(..., description="Number of data points aggregated") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class TimeSeriesData(BaseModel): + """Time series data response""" + metric_name: str + start_time: datetime + end_time: datetime + interval: str + data: List[Dict[str, Any]] + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class DashboardConfig(BaseModel): + """Dashboard configuration""" + id: str + name: str + description: Optional[str] = None + widgets: List[Dict[str, Any]] + refresh_interval: int = Field(60, description="Refresh interval in seconds") + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class AlertRule(BaseModel): + """Alert rule configuration""" + id: Optional[str] = None + name: str + metric_name: str + condition: Literal["gt", "lt", "gte", "lte", "eq", "neq"] + threshold: float + duration: int = Field(..., description="Duration in seconds") + severity: Literal["low", "medium", "high", "critical"] + enabled: bool = True + notification_channels: List[str] = Field(default_factory=list) + created_at: datetime = Field(default_factory=datetime.now) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class Alert(BaseModel): + """Active alert""" + id: str + rule_id: str + rule_name: str + metric_name: str + current_value: float + threshold: float + severity: str + triggered_at: datetime + resolved_at: Optional[datetime] = None + status: Literal["active", "resolved", "acknowledged"] + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class UserAnalytics(BaseModel): + """User analytics data""" + total_users: int + active_users: int + new_users: int + user_growth_rate: float + average_session_duration: float + top_actions: List[Dict[str, Any]] + user_distribution: Dict[str, int] + period: str + +class SystemAnalytics(BaseModel): + """System performance analytics""" + uptime_percentage: float + average_response_time: float + error_rate: float + throughput: float + cpu_usage: float + memory_usage: float + disk_usage: float + active_connections: int + services_health: Dict[str, str] + +class EventAnalytics(BaseModel): + """Event analytics data""" + total_events: int + events_per_second: float + event_types: Dict[str, int] + top_events: List[Dict[str, Any]] + error_events: int + success_rate: float + processing_time: Dict[str, float] \ No newline at end of file diff --git a/services/statistics/backend/requirements.txt b/services/statistics/backend/requirements.txt new file mode 100644 index 0000000..7f2930c --- /dev/null +++ b/services/statistics/backend/requirements.txt @@ -0,0 +1,9 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +python-dotenv==1.0.0 +aiokafka==0.10.0 +redis==5.0.1 +psutil==5.9.8 +httpx==0.26.0 +websockets==12.0 \ No newline at end of file diff --git a/services/statistics/backend/time_series_db.py b/services/statistics/backend/time_series_db.py new file mode 100644 index 0000000..de3e593 --- /dev/null +++ b/services/statistics/backend/time_series_db.py @@ -0,0 +1,165 @@ +""" +Time Series Database Interface (Simplified for InfluxDB) +""" +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +from models import Metric, AggregatedMetric + +logger = logging.getLogger(__name__) + +class TimeSeriesDB: + """Time series database interface""" + + def __init__(self, host: str, port: int, database: str): + self.host = host + self.port = port + self.database = database + self.is_connected = False + # In production, would use actual InfluxDB client + self.data_store = [] # Simplified in-memory storage + + async def connect(self): + """Connect to database""" + # Simplified connection + self.is_connected = True + logger.info(f"Connected to time series database at {self.host}:{self.port}") + + async def close(self): + """Close database connection""" + self.is_connected = False + logger.info("Disconnected from time series database") + + async def write_metrics(self, metrics: List[Metric]): + """Write metrics to database""" + for metric in metrics: + self.data_store.append({ + "name": metric.name, + "value": metric.value, + "timestamp": metric.timestamp, + "tags": metric.tags, + "service": metric.service + }) + + async def query_metrics( + self, + metric_type: str, + start_time: datetime, + end_time: datetime + ) -> List[Dict[str, Any]]: + """Query metrics from database""" + results = [] + for data in self.data_store: + if (data["name"].startswith(metric_type) and + start_time <= data["timestamp"] <= end_time): + results.append(data) + return results + + async def get_time_series( + self, + metric_name: str, + start_time: datetime, + end_time: datetime, + interval: str + ) -> List[Dict[str, Any]]: + """Get time series data""" + return await self.query_metrics(metric_name, start_time, end_time) + + async def store_aggregated_metric(self, metric: AggregatedMetric): + """Store aggregated metric""" + self.data_store.append({ + "name": f"agg.{metric.metric_name}", + "value": metric.value, + "timestamp": metric.end_time, + "tags": {"aggregation": metric.aggregation_type}, + "service": "statistics" + }) + + async def count_metrics( + self, + metric_type: str, + start_time: datetime, + end_time: datetime + ) -> int: + """Count metrics""" + metrics = await self.query_metrics(metric_type, start_time, end_time) + return len(metrics) + + async def get_average( + self, + metric_name: str, + start_time: datetime, + end_time: datetime + ) -> Optional[float]: + """Get average value""" + metrics = await self.query_metrics(metric_name, start_time, end_time) + if not metrics: + return None + values = [m["value"] for m in metrics] + return sum(values) / len(values) + + async def count_distinct_tags( + self, + metric_type: str, + tag_name: str, + start_time: datetime, + end_time: datetime + ) -> int: + """Count distinct tag values""" + metrics = await self.query_metrics(metric_type, start_time, end_time) + unique_values = set() + for metric in metrics: + if tag_name in metric.get("tags", {}): + unique_values.add(metric["tags"][tag_name]) + return len(unique_values) + + async def get_top_metrics( + self, + metric_type: str, + group_by: str, + start_time: datetime, + end_time: datetime, + limit: int = 10 + ) -> List[Dict[str, Any]]: + """Get top metrics grouped by tag""" + metrics = await self.query_metrics(metric_type, start_time, end_time) + grouped = {} + for metric in metrics: + key = metric.get("tags", {}).get(group_by, "unknown") + grouped[key] = grouped.get(key, 0) + 1 + + sorted_items = sorted(grouped.items(), key=lambda x: x[1], reverse=True) + return [{"name": k, "count": v} for k, v in sorted_items[:limit]] + + async def count_metrics_with_value( + self, + metric_name: str, + value: float, + start_time: datetime, + end_time: datetime + ) -> int: + """Count metrics with specific value""" + metrics = await self.query_metrics(metric_name, start_time, end_time) + return sum(1 for m in metrics if m["value"] == value) + + async def get_metric_distribution( + self, + metric_type: str, + tag_name: str, + start_time: datetime, + end_time: datetime + ) -> Dict[str, int]: + """Get metric distribution by tag""" + metrics = await self.query_metrics(metric_type, start_time, end_time) + distribution = {} + for metric in metrics: + key = metric.get("tags", {}).get(tag_name, "unknown") + distribution[key] = distribution.get(key, 0) + 1 + return distribution + + async def delete_old_data(self, cutoff_date: datetime): + """Delete old data""" + self.data_store = [ + d for d in self.data_store + if d["timestamp"] >= cutoff_date + ] \ No newline at end of file diff --git a/services/statistics/backend/websocket_manager.py b/services/statistics/backend/websocket_manager.py new file mode 100644 index 0000000..7db10ee --- /dev/null +++ b/services/statistics/backend/websocket_manager.py @@ -0,0 +1,33 @@ +"""WebSocket Manager for real-time updates""" +from typing import List +from fastapi import WebSocket +import logging + +logger = logging.getLogger(__name__) + +class WebSocketManager: + """Manages WebSocket connections""" + + def __init__(self): + self.active_connections: List[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + """Accept WebSocket connection""" + await websocket.accept() + self.active_connections.append(websocket) + logger.info(f"WebSocket connected. Total connections: {len(self.active_connections)}") + + def disconnect(self, websocket: WebSocket): + """Remove WebSocket connection""" + if websocket in self.active_connections: + self.active_connections.remove(websocket) + logger.info(f"WebSocket disconnected. Total connections: {len(self.active_connections)}") + + async def broadcast(self, message: dict): + """Broadcast message to all connected clients""" + for connection in self.active_connections: + try: + await connection.send_json(message) + except Exception as e: + logger.error(f"Error broadcasting to WebSocket: {e}") + self.disconnect(connection) \ No newline at end of file