Files
site11/services/pipeline/rss-collector/rss_collector.py
jungwoo choi 070032006e feat: Implement async queue-based news pipeline with microservices
Major architectural transformation from synchronous to asynchronous processing:

## Pipeline Services (8 microservices)
- pipeline-scheduler: APScheduler for 30-minute periodic job triggers
- pipeline-rss-collector: RSS feed collection with deduplication (7-day TTL)
- pipeline-google-search: Content enrichment via Google Search API
- pipeline-ai-summarizer: AI summarization using Claude API (claude-sonnet-4-20250514)
- pipeline-translator: Translation using DeepL Pro API
- pipeline-image-generator: Image generation with Replicate API (Stable Diffusion)
- pipeline-article-assembly: Final article assembly and MongoDB storage
- pipeline-monitor: Real-time monitoring dashboard (port 8100)

## Key Features
- Redis-based job queue with deduplication
- Asynchronous processing with Python asyncio
- Shared models and queue manager for inter-service communication
- Docker containerization for all services
- Container names standardized with site11_ prefix

## Removed Services
- Moved to backup: google-search, rss-feed, news-aggregator, ai-writer

## Configuration
- DeepL Pro API: 3abbc796-2515-44a8-972d-22dcf27ab54a
- Claude Model: claude-sonnet-4-20250514
- Redis Queue TTL: 7 days for deduplication

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-13 19:22:14 +09:00

192 lines
6.8 KiB
Python

"""
RSS Collector Service
RSS 피드 수집 및 중복 제거 서비스
"""
import asyncio
import logging
import os
import sys
import hashlib
from datetime import datetime
import feedparser
import aiohttp
import redis.asyncio as redis
from typing import List, Dict, Any
# Import from shared module
from shared.models import PipelineJob, RSSItem, EnrichedItem
from shared.queue_manager import QueueManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RSSCollectorWorker:
def __init__(self):
self.queue_manager = QueueManager(
redis_url=os.getenv("REDIS_URL", "redis://redis:6379")
)
self.redis_client = None
self.redis_url = os.getenv("REDIS_URL", "redis://redis:6379")
self.dedup_ttl = 86400 * 7 # 7일간 중복 방지
self.max_items_per_feed = 10 # 피드당 최대 항목 수
async def start(self):
"""워커 시작"""
logger.info("Starting RSS Collector Worker")
# Redis 연결
await self.queue_manager.connect()
self.redis_client = await redis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
# 메인 처리 루프
while True:
try:
# 큐에서 작업 가져오기 (5초 대기)
job = await self.queue_manager.dequeue('rss_collection', timeout=5)
if job:
await self.process_job(job)
except Exception as e:
logger.error(f"Error in worker loop: {e}")
await asyncio.sleep(1)
async def process_job(self, job: PipelineJob):
"""RSS 수집 작업 처리"""
try:
logger.info(f"Processing job {job.job_id} for keyword '{job.keyword}'")
keyword = job.data.get('keyword', '')
rss_feeds = job.data.get('rss_feeds', [])
# 키워드가 포함된 RSS URL 생성
processed_feeds = self._prepare_feeds(rss_feeds, keyword)
all_items = []
for feed_url in processed_feeds:
try:
items = await self._fetch_rss_feed(feed_url, keyword)
all_items.extend(items)
except Exception as e:
logger.error(f"Error fetching feed {feed_url}: {e}")
if all_items:
# 중복 제거
unique_items = await self._deduplicate_items(all_items, keyword)
if unique_items:
logger.info(f"Collected {len(unique_items)} unique items for '{keyword}'")
# 다음 단계로 전달
job.data['rss_items'] = [item.dict() for item in unique_items]
job.stages_completed.append('rss_collection')
job.stage = 'search_enrichment'
await self.queue_manager.enqueue('search_enrichment', job)
await self.queue_manager.mark_completed('rss_collection', job.job_id)
else:
logger.info(f"No new items found for '{keyword}'")
await self.queue_manager.mark_completed('rss_collection', job.job_id)
else:
logger.warning(f"No RSS items collected for '{keyword}'")
await self.queue_manager.mark_failed(
'rss_collection',
job,
"No RSS items collected"
)
except Exception as e:
logger.error(f"Error processing job {job.job_id}: {e}")
await self.queue_manager.mark_failed('rss_collection', job, str(e))
def _prepare_feeds(self, feeds: List[str], keyword: str) -> List[str]:
"""RSS 피드 URL 준비 (키워드 치환)"""
processed = []
for feed in feeds:
if '{keyword}' in feed:
processed.append(feed.replace('{keyword}', keyword))
else:
processed.append(feed)
return processed
async def _fetch_rss_feed(self, feed_url: str, keyword: str) -> List[RSSItem]:
"""RSS 피드 가져오기"""
items = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(feed_url, timeout=30) as response:
content = await response.text()
# feedparser로 파싱
feed = feedparser.parse(content)
for entry in feed.entries[:self.max_items_per_feed]:
# 키워드 관련성 체크
title = entry.get('title', '')
summary = entry.get('summary', '')
# 제목이나 요약에 키워드가 포함된 경우만
if keyword.lower() in title.lower() or keyword.lower() in summary.lower():
item = RSSItem(
title=title,
link=entry.get('link', ''),
published=entry.get('published', ''),
summary=summary[:500] if summary else '',
source_feed=feed_url
)
items.append(item)
except Exception as e:
logger.error(f"Error fetching RSS feed {feed_url}: {e}")
return items
async def _deduplicate_items(self, items: List[RSSItem], keyword: str) -> List[RSSItem]:
"""중복 항목 제거"""
unique_items = []
dedup_key = f"dedup:{keyword}"
for item in items:
# 제목 해시 생성
item_hash = hashlib.md5(
f"{keyword}:{item.title}".encode()
).hexdigest()
# Redis Set으로 중복 확인
is_new = await self.redis_client.sadd(dedup_key, item_hash)
if is_new:
unique_items.append(item)
# TTL 설정
if unique_items:
await self.redis_client.expire(dedup_key, self.dedup_ttl)
return unique_items
async def stop(self):
"""워커 중지"""
await self.queue_manager.disconnect()
if self.redis_client:
await self.redis_client.close()
logger.info("RSS Collector Worker stopped")
async def main():
"""메인 함수"""
worker = RSSCollectorWorker()
try:
await worker.start()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
await worker.stop()
if __name__ == "__main__":
asyncio.run(main())