import json from datetime import datetime, timezone from bson import ObjectId from motor.motor_asyncio import AsyncIOMotorDatabase import redis.asyncio as aioredis from app.models.todo import ( DashboardStats, OverviewStats, CategoryStats, PriorityStats, UpcomingDeadline, ) DASHBOARD_CACHE_KEY = "dashboard:stats" DASHBOARD_CACHE_TTL = 60 # 60초 class DashboardService: """대시보드 통계 + Redis 캐싱""" def __init__(self, db: AsyncIOMotorDatabase, redis_client: aioredis.Redis): self.todos = db["todos"] self.categories = db["categories"] self.redis = redis_client async def get_stats(self) -> DashboardStats: """F-018: 대시보드 통계 1. Redis 캐시 확인 2. 캐시 히트 -> JSON 파싱 후 반환 3. 캐시 미스 -> MongoDB 집계 -> Redis 저장 (TTL 60s) -> 반환 """ # Redis 캐시 확인 try: cached = await self.redis.get(DASHBOARD_CACHE_KEY) if cached: data = json.loads(cached) return DashboardStats(**data) except Exception: pass # Redis 에러 시 DB에서 직접 조회 # 캐시 미스: MongoDB 집계 stats = await self._compute_stats() # Redis에 캐싱 try: stats_json = stats.model_dump_json() await self.redis.setex(DASHBOARD_CACHE_KEY, DASHBOARD_CACHE_TTL, stats_json) except Exception: pass # Redis 저장 실패는 무시 return stats async def _compute_stats(self) -> DashboardStats: """MongoDB 집계 파이프라인으로 통계 계산""" # 1. Overview (전체/완료/미완료/완료율) total = await self.todos.count_documents({}) completed = await self.todos.count_documents({"completed": True}) incomplete = total - completed completion_rate = round((completed / total * 100), 1) if total > 0 else 0.0 overview = OverviewStats( total=total, completed=completed, incomplete=incomplete, completion_rate=completion_rate, ) # 2. By Category (카테고리별 할일 분포) by_category = await self._compute_by_category() # 3. By Priority (우선순위별 현황) by_priority = await self._compute_by_priority() # 4. Upcoming Deadlines (마감 임박 할일 상위 5개) upcoming_deadlines = await self._compute_upcoming_deadlines() return DashboardStats( overview=overview, by_category=by_category, by_priority=by_priority, upcoming_deadlines=upcoming_deadlines, ) async def _compute_by_category(self) -> list[CategoryStats]: """카테고리별 할일 분포 집계""" pipeline = [ {"$group": { "_id": "$category_id", "count": {"$sum": 1}, }}, {"$sort": {"count": -1}}, ] results = await self.todos.aggregate(pipeline).to_list(None) # 카테고리 정보 조회 cat_ids = set() for r in results: if r["_id"]: try: cat_ids.add(ObjectId(r["_id"])) except Exception: pass cat_map: dict[str, dict] = {} if cat_ids: async for cat in self.categories.find({"_id": {"$in": list(cat_ids)}}): cat_map[str(cat["_id"])] = cat category_stats = [] for r in results: cat_id = r["_id"] if cat_id and cat_id in cat_map: cat = cat_map[cat_id] category_stats.append(CategoryStats( category_id=cat_id, name=cat["name"], color=cat.get("color", "#6B7280"), count=r["count"], )) else: category_stats.append(CategoryStats( category_id=None, name="미분류", color="#9CA3AF", count=r["count"], )) return category_stats async def _compute_by_priority(self) -> PriorityStats: """우선순위별 현황 집계""" pipeline = [ {"$group": { "_id": "$priority", "count": {"$sum": 1}, }}, ] results = await self.todos.aggregate(pipeline).to_list(None) priority_map = {"high": 0, "medium": 0, "low": 0} for r in results: if r["_id"] in priority_map: priority_map[r["_id"]] = r["count"] return PriorityStats( high=priority_map["high"], medium=priority_map["medium"], low=priority_map["low"], ) async def _compute_upcoming_deadlines(self) -> list[UpcomingDeadline]: """마감 임박 할일 상위 5개 (미완료, 마감일 있는 것, 마감일 오름차순)""" now = datetime.now(timezone.utc) cursor = self.todos.find({ "completed": False, "due_date": {"$ne": None, "$gte": now}, }).sort("due_date", 1).limit(5) docs = await cursor.to_list(5) deadlines = [] for doc in docs: deadlines.append(UpcomingDeadline( id=str(doc["_id"]), title=doc["title"], due_date=doc["due_date"], priority=doc["priority"], )) return deadlines async def invalidate_cache(self) -> None: """캐시 무효화: 할일 CUD 작업 후 호출""" try: await self.redis.delete(DASHBOARD_CACHE_KEY) except Exception: pass