Files
site11/services/rss-feed/backend/app/main.py
jungwoo choi 1d90af7c3c Add RSS Feed Subscription Service
- RSS/Atom 피드 구독 및 관리 서비스 구현
- 자동 업데이트 스케줄러 포함 (기본 15분 주기)
- 피드 엔트리 읽음/별표 상태 관리
- 카테고리별 분류 기능
- OPML 내보내기 지원
- MongoDB 데이터 저장, Redis 캐싱
- Docker 컨테이너 구성 (포트 8017)

🤖 Generated with Claude Code (https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-12 13:16:07 +09:00

442 lines
14 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Path, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
from datetime import datetime
from contextlib import asynccontextmanager
import motor.motor_asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import pytz
import redis.asyncio as redis
import json
from .config import settings
from .models import (
FeedSubscription, FeedEntry, CreateFeedRequest,
UpdateFeedRequest, FeedStatistics, FeedStatus
)
from .feed_parser import FeedParser
# Database connection
db_client = None
db = None
redis_client = None
scheduler = None
parser = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_client, db, redis_client, scheduler, parser
# Connect to MongoDB
db_client = motor.motor_asyncio.AsyncIOMotorClient(settings.mongodb_url)
db = db_client[settings.db_name]
# Connect to Redis
redis_client = redis.from_url(settings.redis_url, db=settings.redis_db)
# Initialize feed parser
parser = FeedParser()
# Initialize scheduler
if settings.enable_scheduler:
scheduler = AsyncIOScheduler(timezone=pytz.timezone(settings.scheduler_timezone))
scheduler.add_job(
update_all_feeds,
trigger=IntervalTrigger(seconds=60),
id="update_feeds",
replace_existing=True
)
scheduler.start()
print("RSS Feed scheduler started")
print("RSS Feed Service starting...")
yield
# Cleanup
if scheduler:
scheduler.shutdown()
if parser:
await parser.close()
if redis_client:
await redis_client.close()
db_client.close()
print("RSS Feed Service stopping...")
app = FastAPI(
title="RSS Feed Service",
description="RSS/Atom 피드 구독 및 관리 서비스",
version="1.0.0",
lifespan=lifespan
)
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Helper functions
async def update_feed(feed_id: str):
"""Update a single feed"""
feed = await db.feeds.find_one({"_id": feed_id})
if not feed:
return
# Parse feed
result = await parser.parse_feed(feed["url"])
if result["success"]:
# Update feed metadata
await db.feeds.update_one(
{"_id": feed_id},
{
"$set": {
"last_fetch": datetime.now(),
"status": FeedStatus.ACTIVE,
"error_count": 0,
"last_error": None,
"updated_at": datetime.now()
}
}
)
# Process entries
for entry_data in result["entries"][:settings.max_entries_per_feed]:
entry = parser.extract_entry_data(entry_data, feed_id)
# Check if entry already exists
existing = await db.entries.find_one({
"feed_id": feed_id,
"entry_id": entry.entry_id
})
if not existing:
# Insert new entry
await db.entries.insert_one(entry.dict())
else:
# Update existing entry if newer
if entry.updated and existing.get("updated"):
if entry.updated > existing["updated"]:
await db.entries.update_one(
{"_id": existing["_id"]},
{"$set": entry.dict(exclude={"id", "created_at"})}
)
else:
# Update error status
await db.feeds.update_one(
{"_id": feed_id},
{
"$set": {
"status": FeedStatus.ERROR,
"last_error": result["error"],
"updated_at": datetime.now()
},
"$inc": {"error_count": 1}
}
)
async def update_all_feeds():
"""Update all active feeds that need updating"""
now = datetime.now()
# Find feeds that need updating
feeds = await db.feeds.find({
"status": FeedStatus.ACTIVE,
"$or": [
{"last_fetch": None},
{"last_fetch": {"$lt": now}}
]
}).to_list(100)
for feed in feeds:
# Check if it's time to update
if feed.get("last_fetch"):
time_diff = (now - feed["last_fetch"]).total_seconds()
if time_diff < feed.get("update_interval", settings.default_update_interval):
continue
# Update feed in background
await update_feed(str(feed["_id"]))
# API Endpoints
@app.get("/")
async def root():
return {
"service": "RSS Feed Service",
"version": "1.0.0",
"timestamp": datetime.now().isoformat(),
"endpoints": {
"subscribe": "POST /api/feeds",
"list_feeds": "GET /api/feeds",
"get_entries": "GET /api/entries",
"mark_read": "PUT /api/entries/{entry_id}/read",
"mark_starred": "PUT /api/entries/{entry_id}/star",
"statistics": "GET /api/stats"
}
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "rss-feed",
"timestamp": datetime.now().isoformat()
}
@app.post("/api/feeds", response_model=FeedSubscription)
async def subscribe_to_feed(request: CreateFeedRequest, background_tasks: BackgroundTasks):
"""RSS/Atom 피드 구독"""
# Check if already subscribed
existing = await db.feeds.find_one({"url": str(request.url)})
if existing:
raise HTTPException(status_code=400, detail="이미 구독 중인 피드입니다")
# Parse feed to get metadata
result = await parser.parse_feed(str(request.url))
if not result["success"]:
raise HTTPException(status_code=400, detail=f"피드 파싱 실패: {result['error']}")
# Create subscription
feed = FeedSubscription(
title=request.title or result["feed"].get("title", "Untitled Feed"),
url=request.url,
description=result["feed"].get("description", ""),
category=request.category,
update_interval=request.update_interval or settings.default_update_interval
)
# Save to database - convert URL to string
feed_dict = feed.dict()
feed_dict["url"] = str(feed_dict["url"])
result = await db.feeds.insert_one(feed_dict)
feed.id = str(result.inserted_id)
# Fetch entries in background
background_tasks.add_task(update_feed, feed.id)
return feed
@app.get("/api/feeds", response_model=List[FeedSubscription])
async def list_feeds(
category: Optional[str] = Query(None, description="카테고리 필터"),
status: Optional[FeedStatus] = Query(None, description="상태 필터")
):
"""구독 중인 피드 목록 조회"""
query = {}
if category:
query["category"] = category
if status:
query["status"] = status
feeds = await db.feeds.find(query).to_list(100)
for feed in feeds:
feed["_id"] = str(feed["_id"])
return feeds
@app.get("/api/feeds/{feed_id}", response_model=FeedSubscription)
async def get_feed(feed_id: str = Path(..., description="피드 ID")):
"""특정 피드 정보 조회"""
feed = await db.feeds.find_one({"_id": feed_id})
if not feed:
raise HTTPException(status_code=404, detail="피드를 찾을 수 없습니다")
feed["_id"] = str(feed["_id"])
return feed
@app.put("/api/feeds/{feed_id}", response_model=FeedSubscription)
async def update_feed_subscription(
feed_id: str = Path(..., description="피드 ID"),
request: UpdateFeedRequest = ...
):
"""피드 구독 정보 수정"""
update_data = request.dict(exclude_unset=True)
if update_data:
update_data["updated_at"] = datetime.now()
result = await db.feeds.update_one(
{"_id": feed_id},
{"$set": update_data}
)
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="피드를 찾을 수 없습니다")
feed = await db.feeds.find_one({"_id": feed_id})
feed["_id"] = str(feed["_id"])
return feed
@app.delete("/api/feeds/{feed_id}")
async def unsubscribe_from_feed(feed_id: str = Path(..., description="피드 ID")):
"""피드 구독 취소"""
# Delete feed
result = await db.feeds.delete_one({"_id": feed_id})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="피드를 찾을 수 없습니다")
# Delete associated entries
await db.entries.delete_many({"feed_id": feed_id})
return {"message": "구독이 취소되었습니다"}
@app.post("/api/feeds/{feed_id}/refresh")
async def refresh_feed(
feed_id: str = Path(..., description="피드 ID"),
background_tasks: BackgroundTasks = ...
):
"""피드 수동 새로고침"""
feed = await db.feeds.find_one({"_id": feed_id})
if not feed:
raise HTTPException(status_code=404, detail="피드를 찾을 수 없습니다")
background_tasks.add_task(update_feed, feed_id)
return {"message": "피드 새로고침이 시작되었습니다"}
@app.get("/api/entries", response_model=List[FeedEntry])
async def get_entries(
feed_id: Optional[str] = Query(None, description="피드 ID"),
is_read: Optional[bool] = Query(None, description="읽음 상태 필터"),
is_starred: Optional[bool] = Query(None, description="별표 상태 필터"),
limit: int = Query(50, ge=1, le=100, description="결과 개수"),
offset: int = Query(0, ge=0, description="오프셋")
):
"""피드 엔트리 목록 조회"""
query = {}
if feed_id:
query["feed_id"] = feed_id
if is_read is not None:
query["is_read"] = is_read
if is_starred is not None:
query["is_starred"] = is_starred
entries = await db.entries.find(query) \
.sort("published", -1) \
.skip(offset) \
.limit(limit) \
.to_list(limit)
for entry in entries:
entry["_id"] = str(entry["_id"])
return entries
@app.get("/api/entries/{entry_id}", response_model=FeedEntry)
async def get_entry(entry_id: str = Path(..., description="엔트리 ID")):
"""특정 엔트리 조회"""
entry = await db.entries.find_one({"_id": entry_id})
if not entry:
raise HTTPException(status_code=404, detail="엔트리를 찾을 수 없습니다")
entry["_id"] = str(entry["_id"])
return entry
@app.put("/api/entries/{entry_id}/read")
async def mark_entry_as_read(
entry_id: str = Path(..., description="엔트리 ID"),
is_read: bool = Query(True, description="읽음 상태")
):
"""엔트리 읽음 상태 변경"""
result = await db.entries.update_one(
{"_id": entry_id},
{"$set": {"is_read": is_read}}
)
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="엔트리를 찾을 수 없습니다")
return {"message": f"읽음 상태가 {is_read}로 변경되었습니다"}
@app.put("/api/entries/{entry_id}/star")
async def mark_entry_as_starred(
entry_id: str = Path(..., description="엔트리 ID"),
is_starred: bool = Query(True, description="별표 상태")
):
"""엔트리 별표 상태 변경"""
result = await db.entries.update_one(
{"_id": entry_id},
{"$set": {"is_starred": is_starred}}
)
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="엔트리를 찾을 수 없습니다")
return {"message": f"별표 상태가 {is_starred}로 변경되었습니다"}
@app.post("/api/entries/mark-all-read")
async def mark_all_as_read(feed_id: Optional[str] = Query(None, description="피드 ID")):
"""모든 엔트리를 읽음으로 표시"""
query = {}
if feed_id:
query["feed_id"] = feed_id
result = await db.entries.update_many(
query,
{"$set": {"is_read": True}}
)
return {"message": f"{result.modified_count}개 엔트리가 읽음으로 표시되었습니다"}
@app.get("/api/stats", response_model=List[FeedStatistics])
async def get_statistics(feed_id: Optional[str] = Query(None, description="피드 ID")):
"""피드 통계 조회"""
if feed_id:
feeds = [await db.feeds.find_one({"_id": feed_id})]
if not feeds[0]:
raise HTTPException(status_code=404, detail="피드를 찾을 수 없습니다")
else:
feeds = await db.feeds.find().to_list(100)
stats = []
for feed in feeds:
feed_id = str(feed["_id"])
# Count entries
total = await db.entries.count_documents({"feed_id": feed_id})
unread = await db.entries.count_documents({"feed_id": feed_id, "is_read": False})
starred = await db.entries.count_documents({"feed_id": feed_id, "is_starred": True})
# Calculate error rate
error_rate = 0
if feed.get("error_count", 0) > 0:
total_fetches = feed.get("error_count", 0) + (1 if feed.get("last_fetch") else 0)
error_rate = feed.get("error_count", 0) / total_fetches
stats.append(FeedStatistics(
feed_id=feed_id,
total_entries=total,
unread_entries=unread,
starred_entries=starred,
last_update=feed.get("last_fetch"),
error_rate=error_rate
))
return stats
@app.get("/api/export/opml")
async def export_opml():
"""피드 목록을 OPML 형식으로 내보내기"""
feeds = await db.feeds.find().to_list(100)
opml = """<?xml version="1.0" encoding="UTF-8"?>
<opml version="2.0">
<head>
<title>RSS Feed Subscriptions</title>
<dateCreated>{}</dateCreated>
</head>
<body>""".format(datetime.now().isoformat())
for feed in feeds:
opml += f'\n <outline text="{feed["title"]}" xmlUrl="{feed["url"]}" type="rss" category="{feed.get("category", "")}" />'
opml += "\n</body>\n</opml>"
return {
"opml": opml,
"feed_count": len(feeds)
}