""" Metrics Collector - Collects metrics from Kafka and other sources """ import asyncio import json import logging from typing import List, Dict, Any, Optional from datetime import datetime from aiokafka import AIOKafkaConsumer from models import Metric, MetricType import uuid logger = logging.getLogger(__name__) class MetricsCollector: """Collects and processes metrics from various sources""" def __init__(self, kafka_bootstrap_servers: str, ts_db, cache): self.kafka_servers = kafka_bootstrap_servers self.ts_db = ts_db self.cache = cache self.consumer = None self.is_running = False self.latest_metrics = {} self.metrics_buffer = [] self.buffer_size = 100 self.flush_interval = 5 # seconds async def start(self): """Start the metrics collector""" try: # Start Kafka consumer for event metrics self.consumer = AIOKafkaConsumer( 'metrics-events', 'user-events', 'system-metrics', bootstrap_servers=self.kafka_servers, group_id='statistics-consumer-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) await self.consumer.start() self.is_running = True # Start background tasks asyncio.create_task(self._consume_metrics()) asyncio.create_task(self._flush_metrics_periodically()) logger.info("Metrics collector started") except Exception as e: logger.error(f"Failed to start metrics collector: {e}") raise async def stop(self): """Stop the metrics collector""" self.is_running = False if self.consumer: await self.consumer.stop() # Flush remaining metrics if self.metrics_buffer: await self._flush_metrics() logger.info("Metrics collector stopped") async def _consume_metrics(self): """Consume metrics from Kafka""" while self.is_running: try: async for msg in self.consumer: if not self.is_running: break metric = self._parse_kafka_message(msg) if metric: await self.record_metric(metric) except Exception as e: logger.error(f"Error consuming metrics: {e}") await asyncio.sleep(5) def _parse_kafka_message(self, msg) -> Optional[Metric]: """Parse Kafka message into Metric""" try: data = msg.value topic = msg.topic # Create metric based on topic if topic == 'user-events': return self._create_user_metric(data) elif topic == 'system-metrics': return self._create_system_metric(data) elif topic == 'metrics-events': return Metric(**data) else: return None except Exception as e: logger.error(f"Failed to parse Kafka message: {e}") return None def _create_user_metric(self, data: Dict) -> Metric: """Create metric from user event""" event_type = data.get('event_type', 'unknown') return Metric( id=str(uuid.uuid4()), name=f"user.event.{event_type.lower()}", type=MetricType.COUNTER, value=1, tags={ "event_type": event_type, "user_id": data.get('data', {}).get('user_id', 'unknown'), "service": data.get('service', 'unknown') }, timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())), service=data.get('service', 'users') ) def _create_system_metric(self, data: Dict) -> Metric: """Create metric from system event""" return Metric( id=str(uuid.uuid4()), name=data.get('metric_name', 'system.unknown'), type=MetricType.GAUGE, value=float(data.get('value', 0)), tags=data.get('tags', {}), timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())), service=data.get('service', 'system') ) async def record_metric(self, metric: Metric): """Record a single metric""" try: # Add to buffer self.metrics_buffer.append(metric) # Update latest metrics cache self.latest_metrics[metric.name] = { "value": metric.value, "timestamp": metric.timestamp.isoformat(), "tags": metric.tags } # Flush if buffer is full if len(self.metrics_buffer) >= self.buffer_size: await self._flush_metrics() except Exception as e: logger.error(f"Failed to record metric: {e}") raise async def record_metrics_batch(self, metrics: List[Metric]): """Record multiple metrics""" for metric in metrics: await self.record_metric(metric) async def _flush_metrics(self): """Flush metrics buffer to time series database""" if not self.metrics_buffer: return try: # Write to time series database await self.ts_db.write_metrics(self.metrics_buffer) # Clear buffer self.metrics_buffer.clear() logger.debug(f"Flushed {len(self.metrics_buffer)} metrics to database") except Exception as e: logger.error(f"Failed to flush metrics: {e}") async def _flush_metrics_periodically(self): """Periodically flush metrics buffer""" while self.is_running: await asyncio.sleep(self.flush_interval) await self._flush_metrics() async def get_latest_metrics(self) -> Dict[str, Any]: """Get latest metrics for real-time display""" return self.latest_metrics async def collect_system_metrics(self): """Collect system-level metrics""" import psutil try: # CPU metrics cpu_percent = psutil.cpu_percent(interval=1) await self.record_metric(Metric( name="system.cpu.usage", type=MetricType.GAUGE, value=cpu_percent, tags={"host": "localhost"}, service="statistics" )) # Memory metrics memory = psutil.virtual_memory() await self.record_metric(Metric( name="system.memory.usage", type=MetricType.GAUGE, value=memory.percent, tags={"host": "localhost"}, service="statistics" )) # Disk metrics disk = psutil.disk_usage('/') await self.record_metric(Metric( name="system.disk.usage", type=MetricType.GAUGE, value=disk.percent, tags={"host": "localhost", "mount": "/"}, service="statistics" )) # Network metrics net_io = psutil.net_io_counters() await self.record_metric(Metric( name="system.network.bytes_sent", type=MetricType.COUNTER, value=net_io.bytes_sent, tags={"host": "localhost"}, service="statistics" )) await self.record_metric(Metric( name="system.network.bytes_recv", type=MetricType.COUNTER, value=net_io.bytes_recv, tags={"host": "localhost"}, service="statistics" )) except Exception as e: logger.error(f"Failed to collect system metrics: {e}") async def collect_application_metrics(self): """Collect application-level metrics""" # This would be called by other services to report their metrics pass