242 lines
8.3 KiB
Python
242 lines
8.3 KiB
Python
"""
|
|
Metrics Collector - Collects metrics from Kafka and other sources
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from aiokafka import AIOKafkaConsumer
|
|
from models import Metric, MetricType
|
|
import uuid
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MetricsCollector:
|
|
"""Collects and processes metrics from various sources"""
|
|
|
|
def __init__(self, kafka_bootstrap_servers: str, ts_db, cache):
|
|
self.kafka_servers = kafka_bootstrap_servers
|
|
self.ts_db = ts_db
|
|
self.cache = cache
|
|
self.consumer = None
|
|
self.is_running = False
|
|
self.latest_metrics = {}
|
|
self.metrics_buffer = []
|
|
self.buffer_size = 100
|
|
self.flush_interval = 5 # seconds
|
|
|
|
async def start(self):
|
|
"""Start the metrics collector"""
|
|
try:
|
|
# Start Kafka consumer for event metrics
|
|
self.consumer = AIOKafkaConsumer(
|
|
'metrics-events',
|
|
'user-events',
|
|
'system-metrics',
|
|
bootstrap_servers=self.kafka_servers,
|
|
group_id='statistics-consumer-group',
|
|
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
|
|
)
|
|
await self.consumer.start()
|
|
self.is_running = True
|
|
|
|
# Start background tasks
|
|
asyncio.create_task(self._consume_metrics())
|
|
asyncio.create_task(self._flush_metrics_periodically())
|
|
|
|
logger.info("Metrics collector started")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start metrics collector: {e}")
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""Stop the metrics collector"""
|
|
self.is_running = False
|
|
if self.consumer:
|
|
await self.consumer.stop()
|
|
|
|
# Flush remaining metrics
|
|
if self.metrics_buffer:
|
|
await self._flush_metrics()
|
|
|
|
logger.info("Metrics collector stopped")
|
|
|
|
async def _consume_metrics(self):
|
|
"""Consume metrics from Kafka"""
|
|
while self.is_running:
|
|
try:
|
|
async for msg in self.consumer:
|
|
if not self.is_running:
|
|
break
|
|
|
|
metric = self._parse_kafka_message(msg)
|
|
if metric:
|
|
await self.record_metric(metric)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error consuming metrics: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
def _parse_kafka_message(self, msg) -> Optional[Metric]:
|
|
"""Parse Kafka message into Metric"""
|
|
try:
|
|
data = msg.value
|
|
topic = msg.topic
|
|
|
|
# Create metric based on topic
|
|
if topic == 'user-events':
|
|
return self._create_user_metric(data)
|
|
elif topic == 'system-metrics':
|
|
return self._create_system_metric(data)
|
|
elif topic == 'metrics-events':
|
|
return Metric(**data)
|
|
else:
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse Kafka message: {e}")
|
|
return None
|
|
|
|
def _create_user_metric(self, data: Dict) -> Metric:
|
|
"""Create metric from user event"""
|
|
event_type = data.get('event_type', 'unknown')
|
|
|
|
return Metric(
|
|
id=str(uuid.uuid4()),
|
|
name=f"user.event.{event_type.lower()}",
|
|
type=MetricType.COUNTER,
|
|
value=1,
|
|
tags={
|
|
"event_type": event_type,
|
|
"user_id": data.get('data', {}).get('user_id', 'unknown'),
|
|
"service": data.get('service', 'unknown')
|
|
},
|
|
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())),
|
|
service=data.get('service', 'users')
|
|
)
|
|
|
|
def _create_system_metric(self, data: Dict) -> Metric:
|
|
"""Create metric from system event"""
|
|
return Metric(
|
|
id=str(uuid.uuid4()),
|
|
name=data.get('metric_name', 'system.unknown'),
|
|
type=MetricType.GAUGE,
|
|
value=float(data.get('value', 0)),
|
|
tags=data.get('tags', {}),
|
|
timestamp=datetime.fromisoformat(data.get('timestamp', datetime.now().isoformat())),
|
|
service=data.get('service', 'system')
|
|
)
|
|
|
|
async def record_metric(self, metric: Metric):
|
|
"""Record a single metric"""
|
|
try:
|
|
# Add to buffer
|
|
self.metrics_buffer.append(metric)
|
|
|
|
# Update latest metrics cache
|
|
self.latest_metrics[metric.name] = {
|
|
"value": metric.value,
|
|
"timestamp": metric.timestamp.isoformat(),
|
|
"tags": metric.tags
|
|
}
|
|
|
|
# Flush if buffer is full
|
|
if len(self.metrics_buffer) >= self.buffer_size:
|
|
await self._flush_metrics()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to record metric: {e}")
|
|
raise
|
|
|
|
async def record_metrics_batch(self, metrics: List[Metric]):
|
|
"""Record multiple metrics"""
|
|
for metric in metrics:
|
|
await self.record_metric(metric)
|
|
|
|
async def _flush_metrics(self):
|
|
"""Flush metrics buffer to time series database"""
|
|
if not self.metrics_buffer:
|
|
return
|
|
|
|
try:
|
|
# Write to time series database
|
|
await self.ts_db.write_metrics(self.metrics_buffer)
|
|
|
|
# Clear buffer
|
|
self.metrics_buffer.clear()
|
|
|
|
logger.debug(f"Flushed {len(self.metrics_buffer)} metrics to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to flush metrics: {e}")
|
|
|
|
async def _flush_metrics_periodically(self):
|
|
"""Periodically flush metrics buffer"""
|
|
while self.is_running:
|
|
await asyncio.sleep(self.flush_interval)
|
|
await self._flush_metrics()
|
|
|
|
async def get_latest_metrics(self) -> Dict[str, Any]:
|
|
"""Get latest metrics for real-time display"""
|
|
return self.latest_metrics
|
|
|
|
async def collect_system_metrics(self):
|
|
"""Collect system-level metrics"""
|
|
import psutil
|
|
|
|
try:
|
|
# CPU metrics
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
await self.record_metric(Metric(
|
|
name="system.cpu.usage",
|
|
type=MetricType.GAUGE,
|
|
value=cpu_percent,
|
|
tags={"host": "localhost"},
|
|
service="statistics"
|
|
))
|
|
|
|
# Memory metrics
|
|
memory = psutil.virtual_memory()
|
|
await self.record_metric(Metric(
|
|
name="system.memory.usage",
|
|
type=MetricType.GAUGE,
|
|
value=memory.percent,
|
|
tags={"host": "localhost"},
|
|
service="statistics"
|
|
))
|
|
|
|
# Disk metrics
|
|
disk = psutil.disk_usage('/')
|
|
await self.record_metric(Metric(
|
|
name="system.disk.usage",
|
|
type=MetricType.GAUGE,
|
|
value=disk.percent,
|
|
tags={"host": "localhost", "mount": "/"},
|
|
service="statistics"
|
|
))
|
|
|
|
# Network metrics
|
|
net_io = psutil.net_io_counters()
|
|
await self.record_metric(Metric(
|
|
name="system.network.bytes_sent",
|
|
type=MetricType.COUNTER,
|
|
value=net_io.bytes_sent,
|
|
tags={"host": "localhost"},
|
|
service="statistics"
|
|
))
|
|
await self.record_metric(Metric(
|
|
name="system.network.bytes_recv",
|
|
type=MetricType.COUNTER,
|
|
value=net_io.bytes_recv,
|
|
tags={"host": "localhost"},
|
|
service="statistics"
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to collect system metrics: {e}")
|
|
|
|
async def collect_application_metrics(self):
|
|
"""Collect application-level metrics"""
|
|
# This would be called by other services to report their metrics
|
|
pass |