""" Redis Queue Manager for AI Writer Service Redis를 사용한 작업 큐 관리 """ import redis.asyncio as redis import json import uuid from typing import Optional, List, Dict, Any from datetime import datetime, timedelta import logging from queue_models import NewsJobData, JobResult, JobStatus, QueueStats logger = logging.getLogger(__name__) class RedisQueueManager: """Redis 기반 작업 큐 매니저""" def __init__(self, redis_url: str = "redis://redis:6379"): self.redis_url = redis_url self.redis_client: Optional[redis.Redis] = None # Redis 키 정의 self.QUEUE_KEY = "ai_writer:queue:pending" self.PROCESSING_KEY = "ai_writer:queue:processing" self.COMPLETED_KEY = "ai_writer:queue:completed" self.FAILED_KEY = "ai_writer:queue:failed" self.STATS_KEY = "ai_writer:stats" self.WORKERS_KEY = "ai_writer:workers" self.LOCK_PREFIX = "ai_writer:lock:" async def connect(self): """Redis 연결""" if not self.redis_client: self.redis_client = await redis.from_url( self.redis_url, encoding="utf-8", decode_responses=True ) logger.info("Connected to Redis queue") async def disconnect(self): """Redis 연결 해제""" if self.redis_client: await self.redis_client.close() self.redis_client = None logger.info("Disconnected from Redis queue") async def enqueue(self, job_data: NewsJobData) -> str: """작업을 큐에 추가""" try: if not job_data.job_id: job_data.job_id = str(uuid.uuid4()) # JSON으로 직렬화 job_json = job_data.json() # 우선순위에 따라 큐에 추가 if job_data.priority > 0: # 높은 우선순위는 앞쪽에 await self.redis_client.lpush(self.QUEUE_KEY, job_json) else: # 일반 우선순위는 뒤쪽에 await self.redis_client.rpush(self.QUEUE_KEY, job_json) # 통계 업데이트 await self.redis_client.hincrby(self.STATS_KEY, "total_jobs", 1) await self.redis_client.hincrby(self.STATS_KEY, "pending_jobs", 1) logger.info(f"Job {job_data.job_id} enqueued") return job_data.job_id except Exception as e: logger.error(f"Failed to enqueue job: {e}") raise async def dequeue(self, timeout: int = 0) -> Optional[NewsJobData]: """큐에서 작업 가져오기 (블로킹 가능)""" try: # 대기 중인 작업을 가져와서 처리 중 목록으로 이동 if timeout > 0: result = await self.redis_client.blmove( self.QUEUE_KEY, self.PROCESSING_KEY, timeout, "LEFT", "RIGHT" ) else: result = await self.redis_client.lmove( self.QUEUE_KEY, self.PROCESSING_KEY, "LEFT", "RIGHT" ) if result: # 통계 업데이트 await self.redis_client.hincrby(self.STATS_KEY, "pending_jobs", -1) await self.redis_client.hincrby(self.STATS_KEY, "processing_jobs", 1) return NewsJobData.parse_raw(result) return None except Exception as e: logger.error(f"Failed to dequeue job: {e}") return None async def mark_completed(self, job_id: str, article_id: str): """작업을 완료로 표시""" try: # 처리 중 목록에서 작업 찾기 processing_jobs = await self.redis_client.lrange(self.PROCESSING_KEY, 0, -1) for job_json in processing_jobs: job = NewsJobData.parse_raw(job_json) if job.job_id == job_id: # 처리 중 목록에서 제거 await self.redis_client.lrem(self.PROCESSING_KEY, 1, job_json) # 완료 결과 생성 result = JobResult( job_id=job_id, status=JobStatus.COMPLETED, article_id=article_id, completed_at=datetime.now() ) # 완료 목록에 추가 (최대 1000개 유지) await self.redis_client.lpush(self.COMPLETED_KEY, result.json()) await self.redis_client.ltrim(self.COMPLETED_KEY, 0, 999) # 통계 업데이트 await self.redis_client.hincrby(self.STATS_KEY, "processing_jobs", -1) await self.redis_client.hincrby(self.STATS_KEY, "completed_jobs", 1) logger.info(f"Job {job_id} marked as completed") break except Exception as e: logger.error(f"Failed to mark job as completed: {e}") async def mark_failed(self, job_id: str, error_message: str): """작업을 실패로 표시""" try: # 처리 중 목록에서 작업 찾기 processing_jobs = await self.redis_client.lrange(self.PROCESSING_KEY, 0, -1) for job_json in processing_jobs: job = NewsJobData.parse_raw(job_json) if job.job_id == job_id: # 처리 중 목록에서 제거 await self.redis_client.lrem(self.PROCESSING_KEY, 1, job_json) # 재시도 확인 if job.retry_count < job.max_retries: job.retry_count += 1 # 다시 큐에 추가 await self.redis_client.rpush(self.QUEUE_KEY, job.json()) await self.redis_client.hincrby(self.STATS_KEY, "pending_jobs", 1) logger.info(f"Job {job_id} requeued (retry {job.retry_count}/{job.max_retries})") else: # 실패 결과 생성 result = JobResult( job_id=job_id, status=JobStatus.FAILED, error_message=error_message, completed_at=datetime.now() ) # 실패 목록에 추가 await self.redis_client.lpush(self.FAILED_KEY, result.json()) await self.redis_client.ltrim(self.FAILED_KEY, 0, 999) # 통계 업데이트 await self.redis_client.hincrby(self.STATS_KEY, "failed_jobs", 1) logger.error(f"Job {job_id} marked as failed: {error_message}") await self.redis_client.hincrby(self.STATS_KEY, "processing_jobs", -1) break except Exception as e: logger.error(f"Failed to mark job as failed: {e}") async def get_stats(self) -> QueueStats: """큐 통계 조회""" try: stats_data = await self.redis_client.hgetall(self.STATS_KEY) # 활성 워커 수 계산 workers = await self.redis_client.smembers(self.WORKERS_KEY) active_workers = 0 for worker_id in workers: # 워커가 최근 1분 이내에 활동했는지 확인 last_ping = await self.redis_client.get(f"{self.WORKERS_KEY}:{worker_id}") if last_ping: last_ping_time = datetime.fromisoformat(last_ping) if datetime.now() - last_ping_time < timedelta(minutes=1): active_workers += 1 return QueueStats( pending_jobs=int(stats_data.get("pending_jobs", 0)), processing_jobs=int(stats_data.get("processing_jobs", 0)), completed_jobs=int(stats_data.get("completed_jobs", 0)), failed_jobs=int(stats_data.get("failed_jobs", 0)), total_jobs=int(stats_data.get("total_jobs", 0)), workers_active=active_workers ) except Exception as e: logger.error(f"Failed to get stats: {e}") return QueueStats( pending_jobs=0, processing_jobs=0, completed_jobs=0, failed_jobs=0, total_jobs=0, workers_active=0 ) async def register_worker(self, worker_id: str): """워커 등록""" await self.redis_client.sadd(self.WORKERS_KEY, worker_id) await self.redis_client.set( f"{self.WORKERS_KEY}:{worker_id}", datetime.now().isoformat(), ex=300 # 5분 후 자동 만료 ) async def ping_worker(self, worker_id: str): """워커 활동 업데이트""" await self.redis_client.set( f"{self.WORKERS_KEY}:{worker_id}", datetime.now().isoformat(), ex=300 ) async def unregister_worker(self, worker_id: str): """워커 등록 해제""" await self.redis_client.srem(self.WORKERS_KEY, worker_id) await self.redis_client.delete(f"{self.WORKERS_KEY}:{worker_id}") async def clear_queue(self): """큐 초기화 (테스트용)""" await self.redis_client.delete(self.QUEUE_KEY) await self.redis_client.delete(self.PROCESSING_KEY) await self.redis_client.delete(self.COMPLETED_KEY) await self.redis_client.delete(self.FAILED_KEY) await self.redis_client.delete(self.STATS_KEY) logger.info("Queue cleared")