Files
site11/backup-services/ai-writer/backend/app/worker.py
2025-09-28 20:41:57 +09:00

201 lines
6.9 KiB
Python

"""
AI Writer Consumer Worker
큐에서 작업을 가져와 기사를 생성하는 백그라운드 워커
"""
import asyncio
import logging
import signal
import sys
import uuid
from datetime import datetime
from typing import Optional
import os
from motor.motor_asyncio import AsyncIOMotorClient
from anthropic import AsyncAnthropic
from queue_manager import RedisQueueManager
from queue_models import NewsJobData, JobStatus
from article_generator import generate_article_with_claude
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AIWriterWorker:
"""AI Writer 백그라운드 워커"""
def __init__(self, worker_id: Optional[str] = None):
self.worker_id = worker_id or str(uuid.uuid4())
self.queue_manager = RedisQueueManager(
redis_url=os.getenv("REDIS_URL", "redis://redis:6379")
)
# MongoDB 설정
self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
self.db_name = os.getenv("DB_NAME", "ai_writer_db")
self.mongo_client = None
self.db = None
# Claude 클라이언트
self.claude_api_key = os.getenv("CLAUDE_API_KEY")
self.claude_client = AsyncAnthropic(api_key=self.claude_api_key)
# 실행 상태
self.running = False
self.tasks = []
async def start(self, num_workers: int = 1):
"""워커 시작"""
logger.info(f"Starting AI Writer Worker {self.worker_id} with {num_workers} concurrent workers")
try:
# Redis 연결
await self.queue_manager.connect()
await self.queue_manager.register_worker(self.worker_id)
# MongoDB 연결
self.mongo_client = AsyncIOMotorClient(self.mongodb_url)
self.db = self.mongo_client[self.db_name]
logger.info("Connected to MongoDB")
self.running = True
# 여러 워커 태스크 생성
for i in range(num_workers):
task = asyncio.create_task(self._process_jobs(f"{self.worker_id}-{i}"))
self.tasks.append(task)
# 워커 핑 태스크
ping_task = asyncio.create_task(self._ping_worker())
self.tasks.append(ping_task)
# 모든 태스크 대기
await asyncio.gather(*self.tasks)
except Exception as e:
logger.error(f"Worker error: {e}")
finally:
await self.stop()
async def stop(self):
"""워커 정지"""
logger.info(f"Stopping AI Writer Worker {self.worker_id}")
self.running = False
# 태스크 취소
for task in self.tasks:
task.cancel()
# 워커 등록 해제
await self.queue_manager.unregister_worker(self.worker_id)
# 연결 해제
await self.queue_manager.disconnect()
if self.mongo_client:
self.mongo_client.close()
logger.info(f"Worker {self.worker_id} stopped")
async def _process_jobs(self, sub_worker_id: str):
"""작업 처리 루프"""
logger.info(f"Sub-worker {sub_worker_id} started")
while self.running:
try:
# 큐에서 작업 가져오기 (5초 타임아웃)
job = await self.queue_manager.dequeue(timeout=5)
if job:
logger.info(f"[{sub_worker_id}] Processing job {job.job_id}: {job.rss_title[:50]}")
start_time = datetime.now()
try:
# 기사 생성
article = await self._generate_article(job)
# MongoDB에 저장
if article and self.db is not None:
article_dict = article.dict()
await self.db.articles.insert_one(article_dict)
# 처리 시간 계산
processing_time = (datetime.now() - start_time).total_seconds()
# 완료 표시
await self.queue_manager.mark_completed(
job.job_id,
article.news_id
)
logger.info(f"[{sub_worker_id}] Job {job.job_id} completed in {processing_time:.2f}s")
else:
raise Exception("Failed to generate article")
except Exception as e:
logger.error(f"[{sub_worker_id}] Job {job.job_id} failed: {e}")
await self.queue_manager.mark_failed(job.job_id, str(e))
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[{sub_worker_id}] Worker error: {e}")
await asyncio.sleep(1)
logger.info(f"Sub-worker {sub_worker_id} stopped")
async def _generate_article(self, job: NewsJobData):
"""기사 생성"""
# 작업 데이터를 기존 형식으로 변환
news_data = {
"keyword": job.keyword,
"news_items": [{
"rss_title": job.rss_title,
"rss_link": job.rss_link,
"rss_published": job.rss_published,
"google_results": job.google_results
}]
}
# 기사 생성 (기존 함수 재사용)
return await generate_article_with_claude(news_data, job.style)
async def _ping_worker(self):
"""워커 활동 신호 전송"""
while self.running:
try:
await self.queue_manager.ping_worker(self.worker_id)
await asyncio.sleep(30) # 30초마다 핑
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Ping error: {e}")
def signal_handler(signum, frame):
"""시그널 핸들러"""
logger.info(f"Received signal {signum}")
sys.exit(0)
async def main():
"""메인 함수"""
# 시그널 핸들러 등록
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 워커 수 설정 (환경변수 또는 기본값)
num_workers = int(os.getenv("WORKER_COUNT", "3"))
# 워커 시작
worker = AIWriterWorker()
try:
await worker.start(num_workers=num_workers)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
await worker.stop()
if __name__ == "__main__":
asyncio.run(main())