import feedparser import httpx from typing import List, Dict, Any, Optional from datetime import datetime from dateutil import parser as date_parser from bs4 import BeautifulSoup import re import hashlib from .models import FeedEntry class FeedParser: def __init__(self): self.client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, headers={ "User-Agent": "Mozilla/5.0 (compatible; RSS Feed Reader/1.0)" } ) async def parse_feed(self, url: str) -> Dict[str, Any]: """Parse RSS/Atom feed from URL""" try: response = await self.client.get(url) response.raise_for_status() # Parse the feed feed = feedparser.parse(response.content) if feed.bozo and feed.bozo_exception: raise Exception(f"Feed parsing error: {feed.bozo_exception}") return { "success": True, "feed": feed.feed, "entries": feed.entries, "error": None } except Exception as e: return { "success": False, "feed": None, "entries": [], "error": str(e) } def extract_entry_data(self, entry: Any, feed_id: str) -> FeedEntry: """Extract and normalize entry data""" # Generate unique entry ID entry_id = self._generate_entry_id(entry) # Extract title title = entry.get("title", "Untitled") # Extract link link = entry.get("link", "") # Extract summary/description summary = self._extract_summary(entry) # Extract content content = self._extract_content(entry) # Extract author author = entry.get("author", "") # Extract published date published = self._parse_date(entry.get("published", entry.get("updated"))) # Extract updated date updated = self._parse_date(entry.get("updated", entry.get("published"))) # Extract categories categories = self._extract_categories(entry) # Extract thumbnail thumbnail = self._extract_thumbnail(entry) # Extract enclosures (media attachments) enclosures = self._extract_enclosures(entry) return FeedEntry( feed_id=feed_id, entry_id=entry_id, title=title, link=link, summary=summary, content=content, author=author, published=published, updated=updated, categories=categories, thumbnail=thumbnail, enclosures=enclosures ) def _generate_entry_id(self, entry: Any) -> str: """Generate unique ID for entry""" # Try to use entry's unique ID first if hasattr(entry, "id"): return entry.id # Generate from link and title unique_str = f"{entry.get('link', '')}{entry.get('title', '')}" return hashlib.md5(unique_str.encode()).hexdigest() def _extract_summary(self, entry: Any) -> Optional[str]: """Extract and clean summary""" summary = entry.get("summary", entry.get("description", "")) if summary: # Clean HTML tags soup = BeautifulSoup(summary, "html.parser") text = soup.get_text(separator=" ", strip=True) # Limit length if len(text) > 500: text = text[:497] + "..." return text return None def _extract_content(self, entry: Any) -> Optional[str]: """Extract full content""" content = "" # Try content field if hasattr(entry, "content"): for c in entry.content: if c.get("type") in ["text/html", "text/plain"]: content = c.get("value", "") break # Fallback to summary detail if not content and hasattr(entry, "summary_detail"): content = entry.summary_detail.get("value", "") # Clean excessive whitespace if content: content = re.sub(r'\s+', ' ', content).strip() return content return None def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]: """Parse date string to datetime""" if not date_str: return None try: # Try parsing with dateutil return date_parser.parse(date_str) except: try: # Try feedparser's time structure if hasattr(date_str, "tm_year"): import time return datetime.fromtimestamp(time.mktime(date_str)) except: pass return None def _extract_categories(self, entry: Any) -> List[str]: """Extract categories/tags""" categories = [] if hasattr(entry, "tags"): for tag in entry.tags: if hasattr(tag, "term"): categories.append(tag.term) elif isinstance(tag, str): categories.append(tag) return categories def _extract_thumbnail(self, entry: Any) -> Optional[str]: """Extract thumbnail image URL""" # Check media thumbnail if hasattr(entry, "media_thumbnail"): for thumb in entry.media_thumbnail: if thumb.get("url"): return thumb["url"] # Check media content if hasattr(entry, "media_content"): for media in entry.media_content: if media.get("type", "").startswith("image/"): return media.get("url") # Check enclosures if hasattr(entry, "enclosures"): for enc in entry.enclosures: if enc.get("type", "").startswith("image/"): return enc.get("href", enc.get("url")) # Extract from content/summary content = entry.get("summary", "") + entry.get("content", [{}])[0].get("value", "") if hasattr(entry, "content") else "" if content: soup = BeautifulSoup(content, "html.parser") img = soup.find("img") if img and img.get("src"): return img["src"] return None def _extract_enclosures(self, entry: Any) -> List[Dict[str, Any]]: """Extract media enclosures""" enclosures = [] if hasattr(entry, "enclosures"): for enc in entry.enclosures: enclosure = { "url": enc.get("href", enc.get("url", "")), "type": enc.get("type", ""), "length": enc.get("length", 0) } if enclosure["url"]: enclosures.append(enclosure) return enclosures async def close(self): """Close HTTP client""" await self.client.aclose()