Files
site11/services/pipeline/ai-article-generator/ai_article_generator.py
2025-09-28 20:41:57 +09:00

300 lines
10 KiB
Python

"""
AI Article Generator Service
Claude API를 사용한 뉴스 기사 생성 서비스
"""
import asyncio
import logging
import os
import sys
import json
from datetime import datetime
from typing import List, Dict, Any
from anthropic import AsyncAnthropic
from motor.motor_asyncio import AsyncIOMotorClient
# Import from shared module
from shared.models import PipelineJob, EnrichedItem, FinalArticle, Subtopic, Entities, NewsReference
from shared.queue_manager import QueueManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AIArticleGeneratorWorker:
def __init__(self):
self.queue_manager = QueueManager(
redis_url=os.getenv("REDIS_URL", "redis://redis:6379")
)
self.claude_api_key = os.getenv("CLAUDE_API_KEY")
self.claude_client = None
self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
self.db_name = os.getenv("DB_NAME", "ai_writer_db") # ai_writer_db 사용
self.db = None
async def start(self):
"""워커 시작"""
logger.info("Starting AI Article Generator Worker")
# Redis 연결
await self.queue_manager.connect()
# MongoDB 연결
client = AsyncIOMotorClient(self.mongodb_url)
self.db = client[self.db_name]
# Claude 클라이언트 초기화
if self.claude_api_key:
self.claude_client = AsyncAnthropic(api_key=self.claude_api_key)
else:
logger.error("Claude API key not configured")
return
# 메인 처리 루프
while True:
try:
# 큐에서 작업 가져오기
job = await self.queue_manager.dequeue('ai_article_generation', timeout=5)
if job:
await self.process_job(job)
except Exception as e:
logger.error(f"Error in worker loop: {e}")
await asyncio.sleep(1)
async def process_job(self, job: PipelineJob):
"""AI 기사 생성 작업 처리 - 단일 RSS 아이템"""
try:
start_time = datetime.now()
logger.info(f"Processing job {job.job_id} for AI article generation")
# 단일 enriched item 처리
enriched_item_data = job.data.get('enriched_item')
if not enriched_item_data:
# 이전 버전 호환성
enriched_items = job.data.get('enriched_items', [])
if enriched_items:
enriched_item_data = enriched_items[0]
else:
logger.warning(f"No enriched item in job {job.job_id}")
await self.queue_manager.mark_failed(
'ai_article_generation',
job,
"No enriched item to process"
)
return
enriched_item = EnrichedItem(**enriched_item_data)
# 기사 생성
article = await self._generate_article(job, enriched_item)
# 처리 시간 계산
processing_time = (datetime.now() - start_time).total_seconds()
article.processing_time = processing_time
# MongoDB에 저장 (ai_writer_db.articles_ko)
result = await self.db.articles_ko.insert_one(article.model_dump())
mongodb_id = str(result.inserted_id)
logger.info(f"Article {article.news_id} saved to MongoDB with _id: {mongodb_id}")
# 다음 단계로 전달 (이미지 생성)
job.data['news_id'] = article.news_id
job.data['mongodb_id'] = mongodb_id
job.stages_completed.append('ai_article_generation')
job.stage = 'image_generation'
await self.queue_manager.enqueue('image_generation', job)
await self.queue_manager.mark_completed('ai_article_generation', job.job_id)
except Exception as e:
logger.error(f"Error processing job {job.job_id}: {e}")
await self.queue_manager.mark_failed('ai_article_generation', job, str(e))
async def _generate_article(self, job: PipelineJob, enriched_item: EnrichedItem) -> FinalArticle:
"""Claude를 사용한 기사 생성"""
# RSS 아이템 정보
rss_item = enriched_item.rss_item
search_results = enriched_item.search_results
# 검색 결과 텍스트 준비 (최대 10개)
search_text = ""
if search_results:
search_text = "\n관련 검색 결과:\n"
for idx, result in enumerate(search_results[:10], 1):
search_text += f"{idx}. {result.title}\n"
if result.snippet:
search_text += f" {result.snippet}\n"
# Claude로 기사 작성
prompt = f"""다음 뉴스 정보를 바탕으로 상세한 기사를 작성해주세요.
키워드: {job.keyword}
뉴스 정보:
제목: {rss_item.title}
요약: {rss_item.summary or '내용 없음'}
링크: {rss_item.link}
{search_text}
다음 JSON 형식으로 작성해주세요:
{{
"title": "기사 제목 (50자 이내)",
"summary": "한 줄 요약 (100자 이내)",
"subtopics": [
{{
"title": "소제목1",
"content": ["문단1", "문단2", "문단3"]
}},
{{
"title": "소제목2",
"content": ["문단1", "문단2"]
}},
{{
"title": "소제목3",
"content": ["문단1", "문단2"]
}}
],
"categories": ["카테고리1", "카테고리2"],
"entities": {{
"people": ["인물1", "인물2"],
"organizations": ["조직1", "조직2"],
"groups": ["그룹1"],
"countries": ["국가1"],
"events": ["이벤트1"]
}}
}}
요구사항:
- 3개의 소제목로 구성
- 각 소제목별로 2-3개 문단
- 전문적이고 객관적인 톤
- 한국어로 작성
- 실제 정보를 바탕으로 구체적으로 작성"""
try:
response = await self.claude_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4000,
temperature=0.7,
messages=[
{"role": "user", "content": prompt}
]
)
# JSON 파싱
content_text = response.content[0].text
json_start = content_text.find('{')
json_end = content_text.rfind('}') + 1
if json_start != -1 and json_end > json_start:
article_data = json.loads(content_text[json_start:json_end])
else:
raise ValueError("No valid JSON in response")
# Subtopic 객체 생성
subtopics = []
for subtopic_data in article_data.get('subtopics', []):
subtopics.append(Subtopic(
title=subtopic_data.get('title', ''),
content=subtopic_data.get('content', [])
))
# Entities 객체 생성
entities_data = article_data.get('entities', {})
entities = Entities(
people=entities_data.get('people', []),
organizations=entities_data.get('organizations', []),
groups=entities_data.get('groups', []),
countries=entities_data.get('countries', []),
events=entities_data.get('events', [])
)
# 레퍼런스 생성
references = []
# RSS 원본 추가
references.append(NewsReference(
title=rss_item.title,
link=rss_item.link,
source=rss_item.source_feed,
published=rss_item.published
))
# 검색 결과 레퍼런스 추가 (최대 9개 - RSS 원본과 합쳐 총 10개)
for search_result in search_results[:9]: # 상위 9개까지
references.append(NewsReference(
title=search_result.title,
link=search_result.link,
source=search_result.source,
published=None
))
# FinalArticle 생성 (ai_writer_db.articles 스키마)
article = FinalArticle(
title=article_data.get('title', rss_item.title),
summary=article_data.get('summary', ''),
subtopics=subtopics,
categories=article_data.get('categories', []),
entities=entities,
source_keyword=job.keyword,
source_count=len(references),
references=references,
job_id=job.job_id,
keyword_id=job.keyword_id,
pipeline_stages=job.stages_completed.copy(),
language='ko',
rss_guid=rss_item.guid # RSS GUID 저장
)
return article
except Exception as e:
logger.error(f"Error generating article: {e}")
# 폴백 기사 생성
fallback_references = [NewsReference(
title=rss_item.title,
link=rss_item.link,
source=rss_item.source_feed,
published=rss_item.published
)]
return FinalArticle(
title=rss_item.title,
summary=rss_item.summary[:100] if rss_item.summary else '',
subtopics=[
Subtopic(
title="주요 내용",
content=[rss_item.summary or rss_item.title]
)
],
categories=['자동생성'],
entities=Entities(),
source_keyword=job.keyword,
source_count=1,
references=fallback_references,
job_id=job.job_id,
keyword_id=job.keyword_id,
pipeline_stages=job.stages_completed.copy(),
language='ko',
rss_guid=rss_item.guid # RSS GUID 저장
)
async def stop(self):
"""워커 중지"""
await self.queue_manager.disconnect()
logger.info("AI Article Generator Worker stopped")
async def main():
"""메인 함수"""
worker = AIArticleGeneratorWorker()
try:
await worker.start()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
await worker.stop()
if __name__ == "__main__":
asyncio.run(main())