feat: Implement automated keyword-based news pipeline scheduler
- Add multi-threaded keyword scheduler for periodic news collection - Create Keyword Manager API for CRUD operations and monitoring - Implement automatic pipeline triggering (RSS → Google → AI → Translation) - Add thread status monitoring and dynamic keyword management - Support priority-based execution and configurable intervals - Add comprehensive scheduler documentation guide - Default keywords: AI, 테크놀로지, 경제, 블록체인 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@ -11,6 +11,7 @@ from datetime import datetime
|
||||
import feedparser
|
||||
import aiohttp
|
||||
import redis.asyncio as redis
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# Import from shared module
|
||||
@ -27,13 +28,16 @@ class RSSCollectorWorker:
|
||||
)
|
||||
self.redis_client = None
|
||||
self.redis_url = os.getenv("REDIS_URL", "redis://redis:6379")
|
||||
self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
|
||||
self.db_name = os.getenv("DB_NAME", "ai_writer_db")
|
||||
self.db = None
|
||||
self.dedup_ttl = 86400 * 7 # 7일간 중복 방지
|
||||
self.max_items_per_feed = 10 # 피드당 최대 항목 수
|
||||
self.max_items_per_feed = 100 # 피드당 최대 항목 수 (Google News는 최대 100개)
|
||||
|
||||
async def start(self):
|
||||
"""워커 시작"""
|
||||
logger.info("Starting RSS Collector Worker")
|
||||
|
||||
|
||||
# Redis 연결
|
||||
await self.queue_manager.connect()
|
||||
self.redis_client = await redis.from_url(
|
||||
@ -41,6 +45,10 @@ class RSSCollectorWorker:
|
||||
encoding="utf-8",
|
||||
decode_responses=True
|
||||
)
|
||||
|
||||
# MongoDB 연결
|
||||
client = AsyncIOMotorClient(self.mongodb_url)
|
||||
self.db = client[self.db_name]
|
||||
|
||||
# 메인 처리 루프
|
||||
while True:
|
||||
@ -60,9 +68,20 @@ class RSSCollectorWorker:
|
||||
try:
|
||||
logger.info(f"Processing job {job.job_id} for keyword '{job.keyword}'")
|
||||
|
||||
keyword = job.data.get('keyword', '')
|
||||
keyword = job.keyword # keyword는 job의 직접 속성
|
||||
rss_feeds = job.data.get('rss_feeds', [])
|
||||
|
||||
# RSS 피드가 없으면 기본 피드 사용
|
||||
if not rss_feeds:
|
||||
# 기본 RSS 피드 추가 (Google News RSS)
|
||||
rss_feeds = [
|
||||
f"https://news.google.com/rss/search?q={keyword}&hl=en-US&gl=US&ceid=US:en",
|
||||
f"https://news.google.com/rss/search?q={keyword}&hl=ko&gl=KR&ceid=KR:ko",
|
||||
"https://feeds.bbci.co.uk/news/technology/rss.xml",
|
||||
"https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml"
|
||||
]
|
||||
logger.info(f"Using default RSS feeds for keyword: {keyword}")
|
||||
|
||||
# 키워드가 포함된 RSS URL 생성
|
||||
processed_feeds = self._prepare_feeds(rss_feeds, keyword)
|
||||
|
||||
@ -78,25 +97,53 @@ class RSSCollectorWorker:
|
||||
if all_items:
|
||||
# 중복 제거
|
||||
unique_items = await self._deduplicate_items(all_items, keyword)
|
||||
|
||||
|
||||
if unique_items:
|
||||
logger.info(f"Collected {len(unique_items)} unique items for '{keyword}'")
|
||||
|
||||
# 다음 단계로 전달
|
||||
job.data['rss_items'] = [item.dict() for item in unique_items]
|
||||
job.stages_completed.append('rss_collection')
|
||||
job.stage = 'search_enrichment'
|
||||
|
||||
await self.queue_manager.enqueue('search_enrichment', job)
|
||||
|
||||
# 각 RSS 아이템별로 개별 job 생성하여 다음 단계로 전달
|
||||
# 시간 지연을 추가하여 API 호출 분산 (초기값: 1초, 점진적으로 조정 가능)
|
||||
enqueue_delay = float(os.getenv("RSS_ENQUEUE_DELAY", "1.0"))
|
||||
|
||||
for idx, item in enumerate(unique_items):
|
||||
# 각 아이템별로 새로운 job 생성
|
||||
item_job = PipelineJob(
|
||||
keyword_id=f"{job.keyword_id}_{idx}",
|
||||
keyword=job.keyword,
|
||||
stage='search_enrichment',
|
||||
data={
|
||||
'rss_item': item.dict(), # 단일 아이템
|
||||
'original_job_id': job.job_id,
|
||||
'item_index': idx,
|
||||
'total_items': len(unique_items),
|
||||
'item_hash': hashlib.md5(
|
||||
f"{keyword}:guid:{item.guid}".encode() if item.guid
|
||||
else f"{keyword}:title:{item.title}:link:{item.link}".encode()
|
||||
).hexdigest() # GUID 또는 title+link 해시
|
||||
},
|
||||
stages_completed=['rss_collection']
|
||||
)
|
||||
|
||||
# 개별 아이템을 다음 단계로 전달
|
||||
await self.queue_manager.enqueue('search_enrichment', item_job)
|
||||
logger.info(f"Enqueued item {idx+1}/{len(unique_items)} for keyword '{keyword}'")
|
||||
|
||||
# 다음 아이템 enqueue 전에 지연 추가 (마지막 아이템 제외)
|
||||
if idx < len(unique_items) - 1:
|
||||
await asyncio.sleep(enqueue_delay)
|
||||
logger.debug(f"Waiting {enqueue_delay}s before next item...")
|
||||
|
||||
# 원본 job 완료 처리
|
||||
await self.queue_manager.mark_completed('rss_collection', job.job_id)
|
||||
logger.info(f"Completed RSS collection for job {job.job_id}: {len(unique_items)} items processed")
|
||||
else:
|
||||
logger.info(f"No new items found for '{keyword}'")
|
||||
logger.info(f"No new items found for '{keyword}' after deduplication")
|
||||
await self.queue_manager.mark_completed('rss_collection', job.job_id)
|
||||
else:
|
||||
logger.warning(f"No RSS items collected for '{keyword}'")
|
||||
await self.queue_manager.mark_failed(
|
||||
'rss_collection',
|
||||
job,
|
||||
'rss_collection',
|
||||
job,
|
||||
"No RSS items collected"
|
||||
)
|
||||
|
||||
@ -126,21 +173,34 @@ class RSSCollectorWorker:
|
||||
# feedparser로 파싱
|
||||
feed = feedparser.parse(content)
|
||||
|
||||
logger.info(f"Found {len(feed.entries)} entries in feed {feed_url}")
|
||||
|
||||
for entry in feed.entries[:self.max_items_per_feed]:
|
||||
# 키워드 관련성 체크
|
||||
title = entry.get('title', '')
|
||||
summary = entry.get('summary', '')
|
||||
|
||||
# 제목이나 요약에 키워드가 포함된 경우만
|
||||
if keyword.lower() in title.lower() or keyword.lower() in summary.lower():
|
||||
# 대소문자 무시하고 키워드 매칭 (영문의 경우)
|
||||
title_lower = title.lower() if keyword.isascii() else title
|
||||
summary_lower = summary.lower() if keyword.isascii() else summary
|
||||
keyword_lower = keyword.lower() if keyword.isascii() else keyword
|
||||
|
||||
# 제목이나 요약에 키워드가 포함된 경우
|
||||
# Google News RSS는 이미 키워드 검색 결과이므로 모든 항목 포함
|
||||
if "news.google.com" in feed_url or keyword_lower in title_lower or keyword_lower in summary_lower:
|
||||
# GUID 추출 (Google RSS에서 일반적으로 사용)
|
||||
guid = entry.get('id', entry.get('guid', ''))
|
||||
|
||||
item = RSSItem(
|
||||
title=title,
|
||||
link=entry.get('link', ''),
|
||||
guid=guid, # GUID 추가
|
||||
published=entry.get('published', ''),
|
||||
summary=summary[:500] if summary else '',
|
||||
source_feed=feed_url
|
||||
)
|
||||
items.append(item)
|
||||
logger.debug(f"Added item: {title[:50]}... (guid: {guid[:30] if guid else 'no-guid'})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching RSS feed {feed_url}: {e}")
|
||||
@ -148,26 +208,44 @@ class RSSCollectorWorker:
|
||||
return items
|
||||
|
||||
async def _deduplicate_items(self, items: List[RSSItem], keyword: str) -> List[RSSItem]:
|
||||
"""중복 항목 제거"""
|
||||
"""중복 항목 제거 - GUID 또는 링크 기준으로만 중복 체크"""
|
||||
unique_items = []
|
||||
dedup_key = f"dedup:{keyword}"
|
||||
|
||||
seen_guids = set() # 현재 배치에서 본 GUID
|
||||
seen_links = set() # 현재 배치에서 본 링크
|
||||
|
||||
for item in items:
|
||||
# 제목 해시 생성
|
||||
item_hash = hashlib.md5(
|
||||
f"{keyword}:{item.title}".encode()
|
||||
).hexdigest()
|
||||
|
||||
# Redis Set으로 중복 확인
|
||||
is_new = await self.redis_client.sadd(dedup_key, item_hash)
|
||||
|
||||
if is_new:
|
||||
unique_items.append(item)
|
||||
|
||||
# TTL 설정
|
||||
if unique_items:
|
||||
await self.redis_client.expire(dedup_key, self.dedup_ttl)
|
||||
|
||||
# GUID가 있는 경우 GUID로 중복 체크
|
||||
if item.guid:
|
||||
if item.guid in seen_guids:
|
||||
logger.debug(f"Duplicate GUID in batch: {item.guid[:30]}")
|
||||
continue
|
||||
|
||||
# MongoDB에서 이미 처리된 기사인지 확인
|
||||
existing_article = await self.db.articles_ko.find_one({"rss_guid": item.guid})
|
||||
if existing_article:
|
||||
logger.info(f"Article with GUID {item.guid[:30]} already processed, skipping")
|
||||
continue
|
||||
|
||||
seen_guids.add(item.guid)
|
||||
else:
|
||||
# GUID가 없으면 링크로 중복 체크
|
||||
if item.link in seen_links:
|
||||
logger.debug(f"Duplicate link in batch: {item.link[:50]}")
|
||||
continue
|
||||
|
||||
# MongoDB에서 링크로 중복 확인 (references 필드에서 검색)
|
||||
existing_article = await self.db.articles_ko.find_one({"references.link": item.link})
|
||||
if existing_article:
|
||||
logger.info(f"Article with link {item.link[:50]} already processed, skipping")
|
||||
continue
|
||||
|
||||
seen_links.add(item.link)
|
||||
|
||||
unique_items.append(item)
|
||||
logger.debug(f"New item added: {item.title[:50]}...")
|
||||
|
||||
logger.info(f"Deduplication result: {len(unique_items)} new items out of {len(items)} total")
|
||||
|
||||
return unique_items
|
||||
|
||||
async def stop(self):
|
||||
|
||||
Reference in New Issue
Block a user