Files
jungwoo choi b5fa5d96b9 feat: 웹사이트 표준화 검사 도구 구현
- 4개 검사 엔진: HTML/CSS, 접근성(WCAG), SEO, 성능/보안 (총 50개 항목)
- FastAPI 백엔드 (9개 API, SSE 실시간 진행, PDF/JSON 리포트)
- Next.js 15 프론트엔드 (6개 페이지, 29개 컴포넌트, 반원 게이지 차트)
- Docker Compose 배포 (Backend:8011, Frontend:3011, MongoDB:27022, Redis:6392)
- 전체 테스트 32/32 PASS

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 13:57:27 +09:00

111 lines
3.0 KiB
Python

"""
Redis connection management using redis-py async client.
"""
import json
import logging
from redis.asyncio import Redis
from app.core.config import get_settings
logger = logging.getLogger(__name__)
_redis: Redis | None = None
async def connect_redis() -> None:
"""Establish Redis connection."""
global _redis
settings = get_settings()
_redis = Redis.from_url(
settings.REDIS_URL,
decode_responses=True,
)
# Verify connection
await _redis.ping()
logger.info("Redis connected successfully: %s", settings.REDIS_URL)
async def close_redis() -> None:
"""Close Redis connection."""
global _redis
if _redis is not None:
await _redis.close()
_redis = None
logger.info("Redis connection closed")
def get_redis() -> Redis:
"""Get Redis instance."""
if _redis is None:
raise RuntimeError("Redis is not connected. Call connect_redis() first.")
return _redis
# --- Helper functions ---
PROGRESS_TTL = 300 # 5 minutes
RESULT_CACHE_TTL = 3600 # 1 hour
RECENT_LIST_TTL = 300 # 5 minutes
async def set_inspection_status(inspection_id: str, status: str) -> None:
"""Set inspection status in Redis with TTL."""
r = get_redis()
key = f"inspection:{inspection_id}:status"
await r.set(key, status, ex=PROGRESS_TTL)
async def get_inspection_status(inspection_id: str) -> str | None:
"""Get inspection status from Redis."""
r = get_redis()
key = f"inspection:{inspection_id}:status"
return await r.get(key)
async def update_category_progress(
inspection_id: str, category: str, progress: int, current_step: str
) -> None:
"""Update category progress in Redis hash."""
r = get_redis()
key = f"inspection:{inspection_id}:progress"
await r.hset(key, mapping={
f"{category}_progress": str(progress),
f"{category}_step": current_step,
f"{category}_status": "completed" if progress >= 100 else "running",
})
await r.expire(key, PROGRESS_TTL)
async def get_current_progress(inspection_id: str) -> dict | None:
"""Get current progress data from Redis."""
r = get_redis()
key = f"inspection:{inspection_id}:progress"
data = await r.hgetall(key)
if not data:
return None
return data
async def publish_event(inspection_id: str, event_data: dict) -> None:
"""Publish an SSE event via Redis Pub/Sub."""
r = get_redis()
channel = f"inspection:{inspection_id}:events"
await r.publish(channel, json.dumps(event_data, ensure_ascii=False))
async def cache_result(inspection_id: str, result: dict) -> None:
"""Cache inspection result in Redis."""
r = get_redis()
key = f"inspection:result:{inspection_id}"
await r.set(key, json.dumps(result, ensure_ascii=False, default=str), ex=RESULT_CACHE_TTL)
async def get_cached_result(inspection_id: str) -> dict | None:
"""Get cached inspection result from Redis."""
r = get_redis()
key = f"inspection:result:{inspection_id}"
data = await r.get(key)
if data:
return json.loads(data)
return None