Files
todos2/backend/app/services/dashboard_service.py
jungwoo choi 074b5133bf feat: 풀스택 할일관리 앱 구현 (통합 모달 + 간트차트)
- Backend: FastAPI + MongoDB + Redis (카테고리, 할일 CRUD, 파일 첨부, 검색, 대시보드)
- Frontend: Next.js 15 + Tailwind + React Query + Zustand
- 통합 TodoModal: 생성/수정 모달 통합, 탭 구조 (기본/태그와 첨부)
- 간트차트: 카테고리별 할일 타임라인 시각화
- TodoCard: 제목/카테고리/우선순위/태그/첨부 한줄 표시
- Docker Compose 배포 (Frontend:3010, Backend:8010, MongoDB:27021, Redis:6391)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 15:45:03 +09:00

183 lines
5.7 KiB
Python

import json
from datetime import datetime, timezone
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorDatabase
import redis.asyncio as aioredis
from app.models.todo import (
DashboardStats,
OverviewStats,
CategoryStats,
PriorityStats,
UpcomingDeadline,
)
DASHBOARD_CACHE_KEY = "dashboard:stats"
DASHBOARD_CACHE_TTL = 60 # 60초
class DashboardService:
"""대시보드 통계 + Redis 캐싱"""
def __init__(self, db: AsyncIOMotorDatabase, redis_client: aioredis.Redis):
self.todos = db["todos"]
self.categories = db["categories"]
self.redis = redis_client
async def get_stats(self) -> DashboardStats:
"""F-018: 대시보드 통계
1. Redis 캐시 확인
2. 캐시 히트 -> JSON 파싱 후 반환
3. 캐시 미스 -> MongoDB 집계 -> Redis 저장 (TTL 60s) -> 반환
"""
# Redis 캐시 확인
try:
cached = await self.redis.get(DASHBOARD_CACHE_KEY)
if cached:
data = json.loads(cached)
return DashboardStats(**data)
except Exception:
pass # Redis 에러 시 DB에서 직접 조회
# 캐시 미스: MongoDB 집계
stats = await self._compute_stats()
# Redis에 캐싱
try:
stats_json = stats.model_dump_json()
await self.redis.setex(DASHBOARD_CACHE_KEY, DASHBOARD_CACHE_TTL, stats_json)
except Exception:
pass # Redis 저장 실패는 무시
return stats
async def _compute_stats(self) -> DashboardStats:
"""MongoDB 집계 파이프라인으로 통계 계산"""
# 1. Overview (전체/완료/미완료/완료율)
total = await self.todos.count_documents({})
completed = await self.todos.count_documents({"completed": True})
incomplete = total - completed
completion_rate = round((completed / total * 100), 1) if total > 0 else 0.0
overview = OverviewStats(
total=total,
completed=completed,
incomplete=incomplete,
completion_rate=completion_rate,
)
# 2. By Category (카테고리별 할일 분포)
by_category = await self._compute_by_category()
# 3. By Priority (우선순위별 현황)
by_priority = await self._compute_by_priority()
# 4. Upcoming Deadlines (마감 임박 할일 상위 5개)
upcoming_deadlines = await self._compute_upcoming_deadlines()
return DashboardStats(
overview=overview,
by_category=by_category,
by_priority=by_priority,
upcoming_deadlines=upcoming_deadlines,
)
async def _compute_by_category(self) -> list[CategoryStats]:
"""카테고리별 할일 분포 집계"""
pipeline = [
{"$group": {
"_id": "$category_id",
"count": {"$sum": 1},
}},
{"$sort": {"count": -1}},
]
results = await self.todos.aggregate(pipeline).to_list(None)
# 카테고리 정보 조회
cat_ids = set()
for r in results:
if r["_id"]:
try:
cat_ids.add(ObjectId(r["_id"]))
except Exception:
pass
cat_map: dict[str, dict] = {}
if cat_ids:
async for cat in self.categories.find({"_id": {"$in": list(cat_ids)}}):
cat_map[str(cat["_id"])] = cat
category_stats = []
for r in results:
cat_id = r["_id"]
if cat_id and cat_id in cat_map:
cat = cat_map[cat_id]
category_stats.append(CategoryStats(
category_id=cat_id,
name=cat["name"],
color=cat.get("color", "#6B7280"),
count=r["count"],
))
else:
category_stats.append(CategoryStats(
category_id=None,
name="미분류",
color="#9CA3AF",
count=r["count"],
))
return category_stats
async def _compute_by_priority(self) -> PriorityStats:
"""우선순위별 현황 집계"""
pipeline = [
{"$group": {
"_id": "$priority",
"count": {"$sum": 1},
}},
]
results = await self.todos.aggregate(pipeline).to_list(None)
priority_map = {"high": 0, "medium": 0, "low": 0}
for r in results:
if r["_id"] in priority_map:
priority_map[r["_id"]] = r["count"]
return PriorityStats(
high=priority_map["high"],
medium=priority_map["medium"],
low=priority_map["low"],
)
async def _compute_upcoming_deadlines(self) -> list[UpcomingDeadline]:
"""마감 임박 할일 상위 5개 (미완료, 마감일 있는 것, 마감일 오름차순)"""
now = datetime.now(timezone.utc)
cursor = self.todos.find({
"completed": False,
"due_date": {"$ne": None, "$gte": now},
}).sort("due_date", 1).limit(5)
docs = await cursor.to_list(5)
deadlines = []
for doc in docs:
deadlines.append(UpcomingDeadline(
id=str(doc["_id"]),
title=doc["title"],
due_date=doc["due_date"],
priority=doc["priority"],
))
return deadlines
async def invalidate_cache(self) -> None:
"""캐시 무효화: 할일 CUD 작업 후 호출"""
try:
await self.redis.delete(DASHBOARD_CACHE_KEY)
except Exception:
pass