165 lines
5.3 KiB
Python
165 lines
5.3 KiB
Python
"""
|
|
Time Series Database Interface (Simplified for InfluxDB)
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from models import Metric, AggregatedMetric
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TimeSeriesDB:
|
|
"""Time series database interface"""
|
|
|
|
def __init__(self, host: str, port: int, database: str):
|
|
self.host = host
|
|
self.port = port
|
|
self.database = database
|
|
self.is_connected = False
|
|
# In production, would use actual InfluxDB client
|
|
self.data_store = [] # Simplified in-memory storage
|
|
|
|
async def connect(self):
|
|
"""Connect to database"""
|
|
# Simplified connection
|
|
self.is_connected = True
|
|
logger.info(f"Connected to time series database at {self.host}:{self.port}")
|
|
|
|
async def close(self):
|
|
"""Close database connection"""
|
|
self.is_connected = False
|
|
logger.info("Disconnected from time series database")
|
|
|
|
async def write_metrics(self, metrics: List[Metric]):
|
|
"""Write metrics to database"""
|
|
for metric in metrics:
|
|
self.data_store.append({
|
|
"name": metric.name,
|
|
"value": metric.value,
|
|
"timestamp": metric.timestamp,
|
|
"tags": metric.tags,
|
|
"service": metric.service
|
|
})
|
|
|
|
async def query_metrics(
|
|
self,
|
|
metric_type: str,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> List[Dict[str, Any]]:
|
|
"""Query metrics from database"""
|
|
results = []
|
|
for data in self.data_store:
|
|
if (data["name"].startswith(metric_type) and
|
|
start_time <= data["timestamp"] <= end_time):
|
|
results.append(data)
|
|
return results
|
|
|
|
async def get_time_series(
|
|
self,
|
|
metric_name: str,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
interval: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get time series data"""
|
|
return await self.query_metrics(metric_name, start_time, end_time)
|
|
|
|
async def store_aggregated_metric(self, metric: AggregatedMetric):
|
|
"""Store aggregated metric"""
|
|
self.data_store.append({
|
|
"name": f"agg.{metric.metric_name}",
|
|
"value": metric.value,
|
|
"timestamp": metric.end_time,
|
|
"tags": {"aggregation": metric.aggregation_type},
|
|
"service": "statistics"
|
|
})
|
|
|
|
async def count_metrics(
|
|
self,
|
|
metric_type: str,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> int:
|
|
"""Count metrics"""
|
|
metrics = await self.query_metrics(metric_type, start_time, end_time)
|
|
return len(metrics)
|
|
|
|
async def get_average(
|
|
self,
|
|
metric_name: str,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> Optional[float]:
|
|
"""Get average value"""
|
|
metrics = await self.query_metrics(metric_name, start_time, end_time)
|
|
if not metrics:
|
|
return None
|
|
values = [m["value"] for m in metrics]
|
|
return sum(values) / len(values)
|
|
|
|
async def count_distinct_tags(
|
|
self,
|
|
metric_type: str,
|
|
tag_name: str,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> int:
|
|
"""Count distinct tag values"""
|
|
metrics = await self.query_metrics(metric_type, start_time, end_time)
|
|
unique_values = set()
|
|
for metric in metrics:
|
|
if tag_name in metric.get("tags", {}):
|
|
unique_values.add(metric["tags"][tag_name])
|
|
return len(unique_values)
|
|
|
|
async def get_top_metrics(
|
|
self,
|
|
metric_type: str,
|
|
group_by: str,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
limit: int = 10
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get top metrics grouped by tag"""
|
|
metrics = await self.query_metrics(metric_type, start_time, end_time)
|
|
grouped = {}
|
|
for metric in metrics:
|
|
key = metric.get("tags", {}).get(group_by, "unknown")
|
|
grouped[key] = grouped.get(key, 0) + 1
|
|
|
|
sorted_items = sorted(grouped.items(), key=lambda x: x[1], reverse=True)
|
|
return [{"name": k, "count": v} for k, v in sorted_items[:limit]]
|
|
|
|
async def count_metrics_with_value(
|
|
self,
|
|
metric_name: str,
|
|
value: float,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> int:
|
|
"""Count metrics with specific value"""
|
|
metrics = await self.query_metrics(metric_name, start_time, end_time)
|
|
return sum(1 for m in metrics if m["value"] == value)
|
|
|
|
async def get_metric_distribution(
|
|
self,
|
|
metric_type: str,
|
|
tag_name: str,
|
|
start_time: datetime,
|
|
end_time: datetime
|
|
) -> Dict[str, int]:
|
|
"""Get metric distribution by tag"""
|
|
metrics = await self.query_metrics(metric_type, start_time, end_time)
|
|
distribution = {}
|
|
for metric in metrics:
|
|
key = metric.get("tags", {}).get(tag_name, "unknown")
|
|
distribution[key] = distribution.get(key, 0) + 1
|
|
return distribution
|
|
|
|
async def delete_old_data(self, cutoff_date: datetime):
|
|
"""Delete old data"""
|
|
self.data_store = [
|
|
d for d in self.data_store
|
|
if d["timestamp"] >= cutoff_date
|
|
] |