Files
site11/services/pipeline/shared/queue_manager.py
2025-09-28 20:41:57 +09:00

176 lines
6.4 KiB
Python

"""
Queue Manager
Redis 기반 큐 관리 시스템
"""
import redis.asyncio as redis
import json
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime
from .models import PipelineJob, QueueMessage
logger = logging.getLogger(__name__)
class QueueManager:
"""Redis 기반 큐 매니저"""
QUEUES = {
"keyword_processing": "queue:keyword_processing",
"rss_collection": "queue:rss_collection",
"search_enrichment": "queue:search_enrichment",
"google_search": "queue:google_search",
"ai_article_generation": "queue:ai_article_generation",
"image_generation": "queue:image_generation",
"translation": "queue:translation",
"failed": "queue:failed",
"scheduled": "queue:scheduled"
}
def __init__(self, redis_url: str = "redis://redis:6379"):
self.redis_url = redis_url
self.redis_client: Optional[redis.Redis] = None
async def connect(self):
"""Redis 연결"""
if not self.redis_client:
self.redis_client = await redis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info("Connected to Redis")
async def disconnect(self):
"""Redis 연결 해제"""
if self.redis_client:
await self.redis_client.close()
self.redis_client = None
async def enqueue(self, queue_name: str, job: PipelineJob, priority: int = 0) -> str:
"""작업을 큐에 추가"""
try:
queue_key = self.QUEUES.get(queue_name, f"queue:{queue_name}")
message = QueueMessage(
queue_name=queue_name,
job=job
)
# 우선순위에 따라 추가
if priority > 0:
await self.redis_client.lpush(queue_key, message.json())
else:
await self.redis_client.rpush(queue_key, message.json())
# 통계 업데이트
await self.redis_client.hincrby("stats:queues", queue_name, 1)
logger.info(f"Job {job.job_id} enqueued to {queue_name}")
return job.job_id
except Exception as e:
logger.error(f"Failed to enqueue job: {e}")
raise
async def dequeue(self, queue_name: str, timeout: int = 0) -> Optional[PipelineJob]:
"""큐에서 작업 가져오기"""
try:
queue_key = self.QUEUES.get(queue_name, f"queue:{queue_name}")
logger.info(f"Attempting to dequeue from {queue_key} with timeout={timeout}")
if timeout > 0:
result = await self.redis_client.blpop(queue_key, timeout)
if result:
_, data = result
logger.info(f"Dequeued item from {queue_key}")
else:
logger.debug(f"No item available in {queue_key}")
return None
else:
data = await self.redis_client.lpop(queue_key)
if data:
message = QueueMessage.parse_raw(data)
# 처리 중 목록에 추가
processing_key = f"processing:{queue_name}"
await self.redis_client.hset(
processing_key,
message.job.job_id,
message.json()
)
return message.job
return None
except Exception as e:
logger.error(f"Failed to dequeue job: {e}")
return None
async def mark_completed(self, queue_name: str, job_id: str):
"""작업 완료 표시"""
try:
processing_key = f"processing:{queue_name}"
await self.redis_client.hdel(processing_key, job_id)
# 통계 업데이트
await self.redis_client.hincrby("stats:completed", queue_name, 1)
logger.info(f"Job {job_id} completed in {queue_name}")
except Exception as e:
logger.error(f"Failed to mark job as completed: {e}")
async def mark_failed(self, queue_name: str, job: PipelineJob, error: str):
"""작업 실패 처리"""
try:
processing_key = f"processing:{queue_name}"
await self.redis_client.hdel(processing_key, job.job_id)
# 재시도 확인
if job.retry_count < job.max_retries:
job.retry_count += 1
await self.enqueue(queue_name, job)
logger.info(f"Job {job.job_id} requeued (retry {job.retry_count}/{job.max_retries})")
else:
# 실패 큐로 이동
job.data["error"] = error
job.data["failed_stage"] = queue_name
await self.enqueue("failed", job)
# 통계 업데이트
await self.redis_client.hincrby("stats:failed", queue_name, 1)
logger.error(f"Job {job.job_id} failed: {error}")
except Exception as e:
logger.error(f"Failed to mark job as failed: {e}")
async def get_queue_stats(self) -> Dict[str, Any]:
"""큐 통계 조회"""
try:
stats = {}
for name, key in self.QUEUES.items():
stats[name] = {
"pending": await self.redis_client.llen(key),
"processing": await self.redis_client.hlen(f"processing:{name}"),
}
# 완료/실패 통계
stats["completed"] = await self.redis_client.hgetall("stats:completed") or {}
stats["failed"] = await self.redis_client.hgetall("stats:failed") or {}
return stats
except Exception as e:
logger.error(f"Failed to get queue stats: {e}")
return {}
async def clear_queue(self, queue_name: str):
"""큐 초기화 (테스트용)"""
queue_key = self.QUEUES.get(queue_name, f"queue:{queue_name}")
await self.redis_client.delete(queue_key)
await self.redis_client.delete(f"processing:{queue_name}")
logger.info(f"Queue {queue_name} cleared")