Files
2025-09-28 20:41:57 +09:00

222 lines
7.1 KiB
Python

import feedparser
import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime
from dateutil import parser as date_parser
from bs4 import BeautifulSoup
import re
import hashlib
from .models import FeedEntry
class FeedParser:
def __init__(self):
self.client = httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (compatible; RSS Feed Reader/1.0)"
}
)
async def parse_feed(self, url: str) -> Dict[str, Any]:
"""Parse RSS/Atom feed from URL"""
try:
response = await self.client.get(url)
response.raise_for_status()
# Parse the feed
feed = feedparser.parse(response.content)
if feed.bozo and feed.bozo_exception:
raise Exception(f"Feed parsing error: {feed.bozo_exception}")
return {
"success": True,
"feed": feed.feed,
"entries": feed.entries,
"error": None
}
except Exception as e:
return {
"success": False,
"feed": None,
"entries": [],
"error": str(e)
}
def extract_entry_data(self, entry: Any, feed_id: str) -> FeedEntry:
"""Extract and normalize entry data"""
# Generate unique entry ID
entry_id = self._generate_entry_id(entry)
# Extract title
title = entry.get("title", "Untitled")
# Extract link
link = entry.get("link", "")
# Extract summary/description
summary = self._extract_summary(entry)
# Extract content
content = self._extract_content(entry)
# Extract author
author = entry.get("author", "")
# Extract published date
published = self._parse_date(entry.get("published", entry.get("updated")))
# Extract updated date
updated = self._parse_date(entry.get("updated", entry.get("published")))
# Extract categories
categories = self._extract_categories(entry)
# Extract thumbnail
thumbnail = self._extract_thumbnail(entry)
# Extract enclosures (media attachments)
enclosures = self._extract_enclosures(entry)
return FeedEntry(
feed_id=feed_id,
entry_id=entry_id,
title=title,
link=link,
summary=summary,
content=content,
author=author,
published=published,
updated=updated,
categories=categories,
thumbnail=thumbnail,
enclosures=enclosures
)
def _generate_entry_id(self, entry: Any) -> str:
"""Generate unique ID for entry"""
# Try to use entry's unique ID first
if hasattr(entry, "id"):
return entry.id
# Generate from link and title
unique_str = f"{entry.get('link', '')}{entry.get('title', '')}"
return hashlib.md5(unique_str.encode()).hexdigest()
def _extract_summary(self, entry: Any) -> Optional[str]:
"""Extract and clean summary"""
summary = entry.get("summary", entry.get("description", ""))
if summary:
# Clean HTML tags
soup = BeautifulSoup(summary, "html.parser")
text = soup.get_text(separator=" ", strip=True)
# Limit length
if len(text) > 500:
text = text[:497] + "..."
return text
return None
def _extract_content(self, entry: Any) -> Optional[str]:
"""Extract full content"""
content = ""
# Try content field
if hasattr(entry, "content"):
for c in entry.content:
if c.get("type") in ["text/html", "text/plain"]:
content = c.get("value", "")
break
# Fallback to summary detail
if not content and hasattr(entry, "summary_detail"):
content = entry.summary_detail.get("value", "")
# Clean excessive whitespace
if content:
content = re.sub(r'\s+', ' ', content).strip()
return content
return None
def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]:
"""Parse date string to datetime"""
if not date_str:
return None
try:
# Try parsing with dateutil
return date_parser.parse(date_str)
except:
try:
# Try feedparser's time structure
if hasattr(date_str, "tm_year"):
import time
return datetime.fromtimestamp(time.mktime(date_str))
except:
pass
return None
def _extract_categories(self, entry: Any) -> List[str]:
"""Extract categories/tags"""
categories = []
if hasattr(entry, "tags"):
for tag in entry.tags:
if hasattr(tag, "term"):
categories.append(tag.term)
elif isinstance(tag, str):
categories.append(tag)
return categories
def _extract_thumbnail(self, entry: Any) -> Optional[str]:
"""Extract thumbnail image URL"""
# Check media thumbnail
if hasattr(entry, "media_thumbnail"):
for thumb in entry.media_thumbnail:
if thumb.get("url"):
return thumb["url"]
# Check media content
if hasattr(entry, "media_content"):
for media in entry.media_content:
if media.get("type", "").startswith("image/"):
return media.get("url")
# Check enclosures
if hasattr(entry, "enclosures"):
for enc in entry.enclosures:
if enc.get("type", "").startswith("image/"):
return enc.get("href", enc.get("url"))
# Extract from content/summary
content = entry.get("summary", "") + entry.get("content", [{}])[0].get("value", "") if hasattr(entry, "content") else ""
if content:
soup = BeautifulSoup(content, "html.parser")
img = soup.find("img")
if img and img.get("src"):
return img["src"]
return None
def _extract_enclosures(self, entry: Any) -> List[Dict[str, Any]]:
"""Extract media enclosures"""
enclosures = []
if hasattr(entry, "enclosures"):
for enc in entry.enclosures:
enclosure = {
"url": enc.get("href", enc.get("url", "")),
"type": enc.get("type", ""),
"length": enc.get("length", 0)
}
if enclosure["url"]:
enclosures.append(enclosure)
return enclosures
async def close(self):
"""Close HTTP client"""
await self.client.aclose()