""" RSS Collector Service RSS 피드 수집 및 중복 제거 서비스 """ import asyncio import logging import os import sys import hashlib from datetime import datetime import feedparser import aiohttp import redis.asyncio as redis from motor.motor_asyncio import AsyncIOMotorClient from typing import List, Dict, Any # Import from shared module from shared.models import PipelineJob, RSSItem, EnrichedItem from shared.queue_manager import QueueManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RSSCollectorWorker: def __init__(self): self.queue_manager = QueueManager( redis_url=os.getenv("REDIS_URL", "redis://redis:6379") ) self.redis_client = None self.redis_url = os.getenv("REDIS_URL", "redis://redis:6379") self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017") self.db_name = os.getenv("DB_NAME", "ai_writer_db") self.db = None self.dedup_ttl = 86400 * 7 # 7일간 중복 방지 self.max_items_per_feed = 100 # 피드당 최대 항목 수 (Google News는 최대 100개) async def start(self): """워커 시작""" logger.info("Starting RSS Collector Worker") # Redis 연결 await self.queue_manager.connect() self.redis_client = await redis.from_url( self.redis_url, encoding="utf-8", decode_responses=True ) # MongoDB 연결 client = AsyncIOMotorClient(self.mongodb_url) self.db = client[self.db_name] # 메인 처리 루프 while True: try: # 큐에서 작업 가져오기 (5초 대기) job = await self.queue_manager.dequeue('rss_collection', timeout=5) if job: await self.process_job(job) except Exception as e: logger.error(f"Error in worker loop: {e}") await asyncio.sleep(1) async def process_job(self, job: PipelineJob): """RSS 수집 작업 처리""" try: logger.info(f"Processing job {job.job_id} for keyword '{job.keyword}'") keyword = job.keyword # keyword는 job의 직접 속성 rss_feeds = job.data.get('rss_feeds', []) # RSS 피드가 없으면 기본 피드 사용 if not rss_feeds: # 기본 RSS 피드 추가 (Google News RSS) rss_feeds = [ f"https://news.google.com/rss/search?q={keyword}&hl=en-US&gl=US&ceid=US:en", f"https://news.google.com/rss/search?q={keyword}&hl=ko&gl=KR&ceid=KR:ko", "https://feeds.bbci.co.uk/news/technology/rss.xml", "https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml" ] logger.info(f"Using default RSS feeds for keyword: {keyword}") # 키워드가 포함된 RSS URL 생성 processed_feeds = self._prepare_feeds(rss_feeds, keyword) all_items = [] for feed_url in processed_feeds: try: items = await self._fetch_rss_feed(feed_url, keyword) all_items.extend(items) except Exception as e: logger.error(f"Error fetching feed {feed_url}: {e}") if all_items: # 중복 제거 unique_items = await self._deduplicate_items(all_items, keyword) if unique_items: logger.info(f"Collected {len(unique_items)} unique items for '{keyword}'") # 각 RSS 아이템별로 개별 job 생성하여 다음 단계로 전달 # 시간 지연을 추가하여 API 호출 분산 (초기값: 1초, 점진적으로 조정 가능) enqueue_delay = float(os.getenv("RSS_ENQUEUE_DELAY", "1.0")) for idx, item in enumerate(unique_items): # 각 아이템별로 새로운 job 생성 item_job = PipelineJob( keyword_id=f"{job.keyword_id}_{idx}", keyword=job.keyword, stage='search_enrichment', data={ 'rss_item': item.dict(), # 단일 아이템 'original_job_id': job.job_id, 'item_index': idx, 'total_items': len(unique_items), 'item_hash': hashlib.md5( f"{keyword}:guid:{item.guid}".encode() if item.guid else f"{keyword}:title:{item.title}:link:{item.link}".encode() ).hexdigest() # GUID 또는 title+link 해시 }, stages_completed=['rss_collection'] ) # 개별 아이템을 다음 단계로 전달 await self.queue_manager.enqueue('search_enrichment', item_job) logger.info(f"Enqueued item {idx+1}/{len(unique_items)} for keyword '{keyword}'") # 다음 아이템 enqueue 전에 지연 추가 (마지막 아이템 제외) if idx < len(unique_items) - 1: await asyncio.sleep(enqueue_delay) logger.debug(f"Waiting {enqueue_delay}s before next item...") # 원본 job 완료 처리 await self.queue_manager.mark_completed('rss_collection', job.job_id) logger.info(f"Completed RSS collection for job {job.job_id}: {len(unique_items)} items processed") else: logger.info(f"No new items found for '{keyword}' after deduplication") await self.queue_manager.mark_completed('rss_collection', job.job_id) else: logger.warning(f"No RSS items collected for '{keyword}'") await self.queue_manager.mark_failed( 'rss_collection', job, "No RSS items collected" ) except Exception as e: logger.error(f"Error processing job {job.job_id}: {e}") await self.queue_manager.mark_failed('rss_collection', job, str(e)) def _prepare_feeds(self, feeds: List[str], keyword: str) -> List[str]: """RSS 피드 URL 준비 (키워드 치환)""" processed = [] for feed in feeds: if '{keyword}' in feed: processed.append(feed.replace('{keyword}', keyword)) else: processed.append(feed) return processed async def _fetch_rss_feed(self, feed_url: str, keyword: str) -> List[RSSItem]: """RSS 피드 가져오기""" items = [] try: async with aiohttp.ClientSession() as session: async with session.get(feed_url, timeout=30) as response: content = await response.text() # feedparser로 파싱 feed = feedparser.parse(content) logger.info(f"Found {len(feed.entries)} entries in feed {feed_url}") for entry in feed.entries[:self.max_items_per_feed]: # 키워드 관련성 체크 title = entry.get('title', '') summary = entry.get('summary', '') # 대소문자 무시하고 키워드 매칭 (영문의 경우) title_lower = title.lower() if keyword.isascii() else title summary_lower = summary.lower() if keyword.isascii() else summary keyword_lower = keyword.lower() if keyword.isascii() else keyword # 제목이나 요약에 키워드가 포함된 경우 # Google News RSS는 이미 키워드 검색 결과이므로 모든 항목 포함 if "news.google.com" in feed_url or keyword_lower in title_lower or keyword_lower in summary_lower: # GUID 추출 (Google RSS에서 일반적으로 사용) guid = entry.get('id', entry.get('guid', '')) item = RSSItem( title=title, link=entry.get('link', ''), guid=guid, # GUID 추가 published=entry.get('published', ''), summary=summary[:500] if summary else '', source_feed=feed_url ) items.append(item) logger.debug(f"Added item: {title[:50]}... (guid: {guid[:30] if guid else 'no-guid'})") except Exception as e: logger.error(f"Error fetching RSS feed {feed_url}: {e}") return items async def _deduplicate_items(self, items: List[RSSItem], keyword: str) -> List[RSSItem]: """중복 항목 제거 - GUID 또는 링크 기준으로만 중복 체크""" unique_items = [] seen_guids = set() # 현재 배치에서 본 GUID seen_links = set() # 현재 배치에서 본 링크 for item in items: # GUID가 있는 경우 GUID로 중복 체크 if item.guid: if item.guid in seen_guids: logger.debug(f"Duplicate GUID in batch: {item.guid[:30]}") continue # MongoDB에서 이미 처리된 기사인지 확인 existing_article = await self.db.articles_ko.find_one({"rss_guid": item.guid}) if existing_article: logger.info(f"Article with GUID {item.guid[:30]} already processed, skipping") continue seen_guids.add(item.guid) else: # GUID가 없으면 링크로 중복 체크 if item.link in seen_links: logger.debug(f"Duplicate link in batch: {item.link[:50]}") continue # MongoDB에서 링크로 중복 확인 (references 필드에서 검색) existing_article = await self.db.articles_ko.find_one({"references.link": item.link}) if existing_article: logger.info(f"Article with link {item.link[:50]} already processed, skipping") continue seen_links.add(item.link) unique_items.append(item) logger.debug(f"New item added: {item.title[:50]}...") logger.info(f"Deduplication result: {len(unique_items)} new items out of {len(items)} total") return unique_items async def stop(self): """워커 중지""" await self.queue_manager.disconnect() if self.redis_client: await self.redis_client.close() logger.info("RSS Collector Worker stopped") async def main(): """메인 함수""" worker = RSSCollectorWorker() try: await worker.start() except KeyboardInterrupt: logger.info("Received interrupt signal") finally: await worker.stop() if __name__ == "__main__": asyncio.run(main())