""" Data Aggregator - Performs data aggregation and analytics """ import asyncio import logging from typing import Dict, Any, List, Optional from datetime import datetime, timedelta from models import ( AggregatedMetric, AggregationType, Granularity, UserAnalytics, SystemAnalytics, EventAnalytics, AlertRule, Alert ) import uuid import io import csv logger = logging.getLogger(__name__) class DataAggregator: """Performs data aggregation and analytics operations""" def __init__(self, ts_db, cache): self.ts_db = ts_db self.cache = cache self.is_running = False self.alert_rules = {} self.active_alerts = {} self.aggregation_jobs = [] async def start_aggregation_jobs(self): """Start background aggregation jobs""" self.is_running = True # Schedule periodic aggregation jobs self.aggregation_jobs = [ asyncio.create_task(self._aggregate_hourly_metrics()), asyncio.create_task(self._aggregate_daily_metrics()), asyncio.create_task(self._check_alert_rules()), asyncio.create_task(self._cleanup_old_data()) ] logger.info("Data aggregation jobs started") async def stop(self): """Stop aggregation jobs""" self.is_running = False # Cancel all jobs for job in self.aggregation_jobs: job.cancel() # Wait for jobs to complete await asyncio.gather(*self.aggregation_jobs, return_exceptions=True) logger.info("Data aggregation jobs stopped") async def _aggregate_hourly_metrics(self): """Aggregate metrics every hour""" while self.is_running: try: await asyncio.sleep(3600) # Run every hour end_time = datetime.now() start_time = end_time - timedelta(hours=1) # Aggregate different metric types await self._aggregate_metric_type("user.event", start_time, end_time, Granularity.HOUR) await self._aggregate_metric_type("system.cpu", start_time, end_time, Granularity.HOUR) await self._aggregate_metric_type("system.memory", start_time, end_time, Granularity.HOUR) logger.info("Completed hourly metrics aggregation") except Exception as e: logger.error(f"Error in hourly aggregation: {e}") async def _aggregate_daily_metrics(self): """Aggregate metrics every day""" while self.is_running: try: await asyncio.sleep(86400) # Run every 24 hours end_time = datetime.now() start_time = end_time - timedelta(days=1) # Aggregate different metric types await self._aggregate_metric_type("user.event", start_time, end_time, Granularity.DAY) await self._aggregate_metric_type("system", start_time, end_time, Granularity.DAY) logger.info("Completed daily metrics aggregation") except Exception as e: logger.error(f"Error in daily aggregation: {e}") async def _aggregate_metric_type( self, metric_prefix: str, start_time: datetime, end_time: datetime, granularity: Granularity ): """Aggregate a specific metric type""" try: # Query raw metrics metrics = await self.ts_db.query_metrics( metric_type=metric_prefix, start_time=start_time, end_time=end_time ) if not metrics: return # Calculate aggregations aggregations = { AggregationType.AVG: sum(m['value'] for m in metrics) / len(metrics), AggregationType.SUM: sum(m['value'] for m in metrics), AggregationType.MIN: min(m['value'] for m in metrics), AggregationType.MAX: max(m['value'] for m in metrics), AggregationType.COUNT: len(metrics) } # Store aggregated results for agg_type, value in aggregations.items(): aggregated = AggregatedMetric( metric_name=metric_prefix, aggregation_type=agg_type, value=value, start_time=start_time, end_time=end_time, granularity=granularity, count=len(metrics) ) await self.ts_db.store_aggregated_metric(aggregated) except Exception as e: logger.error(f"Error aggregating {metric_prefix}: {e}") async def aggregate_metrics( self, metric_type: str, aggregation: str, group_by: Optional[str], start_time: datetime, end_time: datetime ) -> Dict[str, Any]: """Perform custom metric aggregation""" try: # Query metrics metrics = await self.ts_db.query_metrics( metric_type=metric_type, start_time=start_time, end_time=end_time ) if not metrics: return {"result": 0, "count": 0} # Group metrics if requested if group_by: grouped = {} for metric in metrics: key = metric.get('tags', {}).get(group_by, 'unknown') if key not in grouped: grouped[key] = [] grouped[key].append(metric['value']) # Aggregate each group results = {} for key, values in grouped.items(): results[key] = self._calculate_aggregation(values, aggregation) return {"grouped_results": results, "count": len(metrics)} else: # Single aggregation values = [m['value'] for m in metrics] result = self._calculate_aggregation(values, aggregation) return {"result": result, "count": len(metrics)} except Exception as e: logger.error(f"Error in custom aggregation: {e}") raise def _calculate_aggregation(self, values: List[float], aggregation: str) -> float: """Calculate aggregation on values""" if not values: return 0 if aggregation == "avg": return sum(values) / len(values) elif aggregation == "sum": return sum(values) elif aggregation == "min": return min(values) elif aggregation == "max": return max(values) elif aggregation == "count": return len(values) else: return 0 async def get_overview(self) -> Dict[str, Any]: """Get analytics overview""" try: now = datetime.now() last_hour = now - timedelta(hours=1) last_day = now - timedelta(days=1) last_week = now - timedelta(weeks=1) # Get various metrics hourly_events = await self.ts_db.count_metrics("user.event", last_hour, now) daily_events = await self.ts_db.count_metrics("user.event", last_day, now) weekly_events = await self.ts_db.count_metrics("user.event", last_week, now) # Get system status cpu_avg = await self.ts_db.get_average("system.cpu.usage", last_hour, now) memory_avg = await self.ts_db.get_average("system.memory.usage", last_hour, now) # Get active users (approximate from events) active_users = await self.ts_db.count_distinct_tags("user.event", "user_id", last_day, now) return { "events": { "last_hour": hourly_events, "last_day": daily_events, "last_week": weekly_events }, "system": { "cpu_usage": cpu_avg, "memory_usage": memory_avg }, "users": { "active_daily": active_users }, "alerts": { "active": len(self.active_alerts) }, "timestamp": now.isoformat() } except Exception as e: logger.error(f"Error getting overview: {e}") return {} async def get_user_analytics( self, start_date: datetime, end_date: datetime, granularity: str ) -> UserAnalytics: """Get user analytics""" try: # Get user metrics total_users = await self.ts_db.count_distinct_tags( "user.event.user_created", "user_id", datetime.min, end_date ) active_users = await self.ts_db.count_distinct_tags( "user.event", "user_id", start_date, end_date ) new_users = await self.ts_db.count_metrics( "user.event.user_created", start_date, end_date ) # Calculate growth rate prev_period_start = start_date - (end_date - start_date) prev_users = await self.ts_db.count_distinct_tags( "user.event", "user_id", prev_period_start, start_date ) growth_rate = ((active_users - prev_users) / max(prev_users, 1)) * 100 # Get top actions top_actions = await self.ts_db.get_top_metrics( "user.event", "event_type", start_date, end_date, limit=10 ) return UserAnalytics( total_users=total_users, active_users=active_users, new_users=new_users, user_growth_rate=growth_rate, average_session_duration=0, # Would need session tracking top_actions=top_actions, user_distribution={}, # Would need geographic data period=f"{start_date.date()} to {end_date.date()}" ) except Exception as e: logger.error(f"Error getting user analytics: {e}") raise async def get_system_analytics(self) -> SystemAnalytics: """Get system performance analytics""" try: now = datetime.now() last_hour = now - timedelta(hours=1) last_day = now - timedelta(days=1) # Calculate uptime (simplified - would need actual downtime tracking) total_checks = await self.ts_db.count_metrics("system.health", last_day, now) successful_checks = await self.ts_db.count_metrics_with_value( "system.health", 1, last_day, now ) uptime = (successful_checks / max(total_checks, 1)) * 100 # Get averages cpu_usage = await self.ts_db.get_average("system.cpu.usage", last_hour, now) memory_usage = await self.ts_db.get_average("system.memory.usage", last_hour, now) disk_usage = await self.ts_db.get_average("system.disk.usage", last_hour, now) response_time = await self.ts_db.get_average("api.response_time", last_hour, now) # Get error rate total_requests = await self.ts_db.count_metrics("api.request", last_hour, now) error_requests = await self.ts_db.count_metrics("api.error", last_hour, now) error_rate = (error_requests / max(total_requests, 1)) * 100 # Throughput throughput = total_requests / 3600 # requests per second return SystemAnalytics( uptime_percentage=uptime, average_response_time=response_time or 0, error_rate=error_rate, throughput=throughput, cpu_usage=cpu_usage or 0, memory_usage=memory_usage or 0, disk_usage=disk_usage or 0, active_connections=0, # Would need connection tracking services_health={} # Would need service health checks ) except Exception as e: logger.error(f"Error getting system analytics: {e}") raise async def get_event_analytics( self, event_type: Optional[str], limit: int ) -> EventAnalytics: """Get event analytics""" try: now = datetime.now() last_hour = now - timedelta(hours=1) # Get total events total_events = await self.ts_db.count_metrics( event_type or "user.event", last_hour, now ) # Events per second events_per_second = total_events / 3600 # Get event types distribution event_types = await self.ts_db.get_metric_distribution( "user.event", "event_type", last_hour, now ) # Top events top_events = await self.ts_db.get_top_metrics( event_type or "user.event", "event_type", last_hour, now, limit=limit ) # Error events error_events = await self.ts_db.count_metrics( "user.event.error", last_hour, now ) # Success rate success_rate = ((total_events - error_events) / max(total_events, 1)) * 100 return EventAnalytics( total_events=total_events, events_per_second=events_per_second, event_types=event_types, top_events=top_events, error_events=error_events, success_rate=success_rate, processing_time={} # Would need timing metrics ) except Exception as e: logger.error(f"Error getting event analytics: {e}") raise async def get_dashboard_configs(self) -> List[Dict[str, Any]]: """Get available dashboard configurations""" return [ { "id": "overview", "name": "Overview Dashboard", "description": "General system overview" }, { "id": "users", "name": "User Analytics", "description": "User behavior and statistics" }, { "id": "system", "name": "System Performance", "description": "System health and performance metrics" }, { "id": "events", "name": "Event Analytics", "description": "Event processing and statistics" } ] async def get_dashboard_data(self, dashboard_id: str) -> Dict[str, Any]: """Get data for a specific dashboard""" if dashboard_id == "overview": return await self.get_overview() elif dashboard_id == "users": end_date = datetime.now() start_date = end_date - timedelta(days=7) analytics = await self.get_user_analytics(start_date, end_date, "day") return analytics.dict() elif dashboard_id == "system": analytics = await self.get_system_analytics() return analytics.dict() elif dashboard_id == "events": analytics = await self.get_event_analytics(None, 100) return analytics.dict() else: raise ValueError(f"Unknown dashboard: {dashboard_id}") async def create_alert_rule(self, rule_data: Dict[str, Any]) -> str: """Create a new alert rule""" rule = AlertRule(**rule_data) rule.id = str(uuid.uuid4()) self.alert_rules[rule.id] = rule # Store in cache await self.cache.set( f"alert_rule:{rule.id}", rule.json(), expire=None # Permanent ) return rule.id async def _check_alert_rules(self): """Check alert rules periodically""" while self.is_running: try: await asyncio.sleep(60) # Check every minute for rule_id, rule in self.alert_rules.items(): if not rule.enabled: continue await self._evaluate_alert_rule(rule) except Exception as e: logger.error(f"Error checking alert rules: {e}") async def _evaluate_alert_rule(self, rule: AlertRule): """Evaluate a single alert rule""" try: # Get recent metric values end_time = datetime.now() start_time = end_time - timedelta(seconds=rule.duration) avg_value = await self.ts_db.get_average( rule.metric_name, start_time, end_time ) if avg_value is None: return # Check condition triggered = False if rule.condition == "gt" and avg_value > rule.threshold: triggered = True elif rule.condition == "lt" and avg_value < rule.threshold: triggered = True elif rule.condition == "gte" and avg_value >= rule.threshold: triggered = True elif rule.condition == "lte" and avg_value <= rule.threshold: triggered = True elif rule.condition == "eq" and avg_value == rule.threshold: triggered = True elif rule.condition == "neq" and avg_value != rule.threshold: triggered = True # Handle alert state alert_key = f"{rule.id}:{rule.metric_name}" if triggered: if alert_key not in self.active_alerts: # New alert alert = Alert( id=str(uuid.uuid4()), rule_id=rule.id, rule_name=rule.name, metric_name=rule.metric_name, current_value=avg_value, threshold=rule.threshold, severity=rule.severity, triggered_at=datetime.now(), status="active" ) self.active_alerts[alert_key] = alert # Send notifications await self._send_alert_notifications(alert, rule) else: if alert_key in self.active_alerts: # Alert resolved alert = self.active_alerts[alert_key] alert.resolved_at = datetime.now() alert.status = "resolved" del self.active_alerts[alert_key] logger.info(f"Alert resolved: {rule.name}") except Exception as e: logger.error(f"Error evaluating alert rule {rule.id}: {e}") async def _send_alert_notifications(self, alert: Alert, rule: AlertRule): """Send alert notifications""" logger.warning(f"ALERT: {rule.name} - {alert.metric_name} = {alert.current_value} (threshold: {alert.threshold})") # Would implement actual notification channels here async def get_active_alerts(self) -> List[Dict[str, Any]]: """Get currently active alerts""" return [alert.dict() for alert in self.active_alerts.values()] async def export_to_csv( self, metric_type: str, start_time: datetime, end_time: datetime ): """Export metrics to CSV""" try: # Get metrics metrics = await self.ts_db.query_metrics( metric_type=metric_type, start_time=start_time, end_time=end_time ) # Create CSV output = io.StringIO() writer = csv.DictWriter( output, fieldnames=['timestamp', 'metric_name', 'value', 'tags', 'service'] ) writer.writeheader() for metric in metrics: writer.writerow({ 'timestamp': metric.get('timestamp'), 'metric_name': metric.get('name'), 'value': metric.get('value'), 'tags': str(metric.get('tags', {})), 'service': metric.get('service') }) output.seek(0) return output except Exception as e: logger.error(f"Error exporting to CSV: {e}") raise async def _cleanup_old_data(self): """Clean up old data periodically""" while self.is_running: try: await asyncio.sleep(86400) # Run daily # Delete data older than 30 days cutoff_date = datetime.now() - timedelta(days=30) await self.ts_db.delete_old_data(cutoff_date) logger.info("Completed old data cleanup") except Exception as e: logger.error(f"Error in data cleanup: {e}")