""" Redis connection management using redis-py async client. """ import json import logging from redis.asyncio import Redis from app.core.config import get_settings logger = logging.getLogger(__name__) _redis: Redis | None = None async def connect_redis() -> None: """Establish Redis connection.""" global _redis settings = get_settings() _redis = Redis.from_url( settings.REDIS_URL, decode_responses=True, ) # Verify connection await _redis.ping() logger.info("Redis connected successfully: %s", settings.REDIS_URL) async def close_redis() -> None: """Close Redis connection.""" global _redis if _redis is not None: await _redis.close() _redis = None logger.info("Redis connection closed") def get_redis() -> Redis: """Get Redis instance.""" if _redis is None: raise RuntimeError("Redis is not connected. Call connect_redis() first.") return _redis # --- Helper functions --- PROGRESS_TTL = 300 # 5 minutes RESULT_CACHE_TTL = 3600 # 1 hour RECENT_LIST_TTL = 300 # 5 minutes async def set_inspection_status(inspection_id: str, status: str) -> None: """Set inspection status in Redis with TTL.""" r = get_redis() key = f"inspection:{inspection_id}:status" await r.set(key, status, ex=PROGRESS_TTL) async def get_inspection_status(inspection_id: str) -> str | None: """Get inspection status from Redis.""" r = get_redis() key = f"inspection:{inspection_id}:status" return await r.get(key) async def update_category_progress( inspection_id: str, category: str, progress: int, current_step: str ) -> None: """Update category progress in Redis hash.""" r = get_redis() key = f"inspection:{inspection_id}:progress" await r.hset(key, mapping={ f"{category}_progress": str(progress), f"{category}_step": current_step, f"{category}_status": "completed" if progress >= 100 else "running", }) await r.expire(key, PROGRESS_TTL) async def get_current_progress(inspection_id: str) -> dict | None: """Get current progress data from Redis.""" r = get_redis() key = f"inspection:{inspection_id}:progress" data = await r.hgetall(key) if not data: return None return data async def publish_event(inspection_id: str, event_data: dict) -> None: """Publish an SSE event via Redis Pub/Sub.""" r = get_redis() channel = f"inspection:{inspection_id}:events" await r.publish(channel, json.dumps(event_data, ensure_ascii=False)) async def cache_result(inspection_id: str, result: dict) -> None: """Cache inspection result in Redis.""" r = get_redis() key = f"inspection:result:{inspection_id}" await r.set(key, json.dumps(result, ensure_ascii=False, default=str), ex=RESULT_CACHE_TTL) async def get_cached_result(inspection_id: str) -> dict | None: """Get cached inspection result from Redis.""" r = get_redis() key = f"inspection:result:{inspection_id}" data = await r.get(key) if data: return json.loads(data) return None