""" Site-wide inspection orchestration service. Manages the full site inspection lifecycle: 1. BFS crawling to discover same-domain pages 2. Sequential/parallel inspection of each discovered page 3. Aggregate score computation 4. Progress tracking via Redis Pub/Sub (SSE events) 5. Result storage in MongoDB (site_inspections collection) """ import asyncio import json import logging import uuid from datetime import datetime, timezone from typing import Optional from urllib.parse import urlparse from motor.motor_asyncio import AsyncIOMotorDatabase from redis.asyncio import Redis from app.core.config import get_settings from app.core.redis import get_redis from app.models.schemas import calculate_grade from app.services.link_crawler import LinkCrawler from app.services.inspection_service import InspectionService logger = logging.getLogger(__name__) # Redis key TTLs SITE_RESULT_CACHE_TTL = 3600 # 1 hour class SiteInspectionService: """Site-wide inspection orchestration service.""" def __init__(self, db: AsyncIOMotorDatabase, redis: Redis): self.db = db self.redis = redis self.inspection_service = InspectionService(db=db, redis=redis) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def start_site_inspection( self, url: str, max_pages: int = 20, max_depth: int = 2, ) -> str: """ Start a site-wide inspection. 1. Validate URL 2. Generate site_inspection_id 3. Create initial MongoDB document with status "crawling" 4. Launch background crawl-and-inspect task 5. Return site_inspection_id """ settings = get_settings() # Clamp to server-side limits (0 = unlimited, don't clamp) if max_pages > 0: max_pages = min(max_pages, settings.SITE_MAX_PAGES) max_depth = min(max_depth, settings.SITE_MAX_DEPTH) site_inspection_id = str(uuid.uuid4()) parsed = urlparse(url) domain = parsed.netloc.lower() # Create initial document doc = { "site_inspection_id": site_inspection_id, "root_url": url, "domain": domain, "status": "crawling", "created_at": datetime.now(timezone.utc), "completed_at": None, "config": { "max_pages": max_pages, "max_depth": max_depth, }, "discovered_pages": [], "aggregate_scores": None, } await self.db.site_inspections.insert_one(doc) logger.info( "Site inspection started: id=%s, url=%s, max_pages=%d, max_depth=%d", site_inspection_id, url, max_pages, max_depth, ) # Launch background task asyncio.create_task( self._crawl_and_inspect(site_inspection_id, url, max_pages, max_depth) ) return site_inspection_id async def get_site_inspection(self, site_inspection_id: str) -> Optional[dict]: """Get site inspection result by ID (cache-first).""" # Try Redis cache first cache_key = f"site-inspection:result:{site_inspection_id}" cached = await self.redis.get(cache_key) if cached: return json.loads(cached) # Fetch from MongoDB doc = await self.db.site_inspections.find_one( {"site_inspection_id": site_inspection_id}, {"_id": 0}, ) if doc: # Only cache completed results if doc.get("status") in ("completed", "error"): await self.redis.set( cache_key, json.dumps(doc, ensure_ascii=False, default=str), ex=SITE_RESULT_CACHE_TTL, ) return doc return None async def get_site_inspection_list( self, page: int = 1, limit: int = 20, ) -> dict: """Get paginated list of site inspections.""" limit = min(limit, 100) skip = (page - 1) * limit total = await self.db.site_inspections.count_documents({}) cursor = self.db.site_inspections.find( {}, { "_id": 0, "site_inspection_id": 1, "root_url": 1, "domain": 1, "status": 1, "created_at": 1, "discovered_pages": 1, "aggregate_scores": 1, }, ).sort("created_at", -1).skip(skip).limit(limit) items = [] async for doc in cursor: pages = doc.get("discovered_pages", []) pages_total = len(pages) pages_inspected = sum( 1 for p in pages if p.get("status") == "completed" ) agg = doc.get("aggregate_scores") items.append({ "site_inspection_id": doc.get("site_inspection_id"), "root_url": doc.get("root_url"), "domain": doc.get("domain"), "status": doc.get("status"), "created_at": doc.get("created_at"), "pages_total": pages_total, "pages_inspected": pages_inspected, "overall_score": agg.get("overall_score") if agg else None, "grade": agg.get("grade") if agg else None, }) total_pages = max(1, -(-total // limit)) return { "items": items, "total": total, "page": page, "limit": limit, "total_pages": total_pages, } async def inspect_single_page( self, site_inspection_id: str, page_url: str, ) -> Optional[str]: """ Trigger inspection for a single page within a site inspection. Returns the inspection_id if successful, None if site inspection not found. """ doc = await self.db.site_inspections.find_one( {"site_inspection_id": site_inspection_id}, ) if not doc: return None # Find the page in discovered_pages page_found = False for page in doc.get("discovered_pages", []): if page["url"] == page_url: page_found = True break if not page_found: return None # Run inspection inline inspection_id = str(uuid.uuid4()) try: inspection_id, result = await self.inspection_service.run_inspection_inline( url=page_url, inspection_id=inspection_id, ) # Update page status in site inspection overall_score = result.get("overall_score", 0) grade = result.get("grade", "F") await self.db.site_inspections.update_one( { "site_inspection_id": site_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.inspection_id": inspection_id, "discovered_pages.$.status": "completed", "discovered_pages.$.overall_score": overall_score, "discovered_pages.$.grade": grade, } }, ) # Recompute aggregates await self._compute_and_store_aggregates(site_inspection_id) # Invalidate cache cache_key = f"site-inspection:result:{site_inspection_id}" await self.redis.delete(cache_key) return inspection_id except Exception as e: logger.error( "Failed to inspect page %s in site %s: %s", page_url, site_inspection_id, str(e), ) await self.db.site_inspections.update_one( { "site_inspection_id": site_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "error", } }, ) return None # ------------------------------------------------------------------ # Background task: Crawl + Inspect # ------------------------------------------------------------------ async def _crawl_and_inspect( self, site_inspection_id: str, url: str, max_pages: int, max_depth: int, ) -> None: """ Background task that runs in two phases: Phase 1: BFS crawling to discover pages Phase 2: Parallel inspection of discovered pages (with semaphore) """ try: # ============================== # Phase 1: Crawling # ============================== logger.info("Phase 1 (crawling) started: %s", site_inspection_id) async def crawl_progress(pages_found: int, current_url: str): await self._publish_site_event(site_inspection_id, { "event_type": "crawl_progress", "site_inspection_id": site_inspection_id, "pages_found": pages_found, "current_url": current_url, }) crawler = LinkCrawler( root_url=url, max_pages=max_pages, max_depth=max_depth, ) discovered = await crawler.crawl(progress_callback=crawl_progress) if not discovered: raise ValueError("크롤링 결과가 없습니다. URL을 확인해주세요.") # Build discovered_pages documents discovered_pages = [] for page in discovered: discovered_pages.append({ "url": page["url"], "depth": page["depth"], "parent_url": page["parent_url"], "inspection_id": None, "status": "pending", "title": page.get("title"), "overall_score": None, "grade": None, }) # Store discovered pages in MongoDB await self.db.site_inspections.update_one( {"site_inspection_id": site_inspection_id}, { "$set": { "status": "inspecting", "discovered_pages": discovered_pages, } }, ) # Publish crawl_complete event await self._publish_site_event(site_inspection_id, { "event_type": "crawl_complete", "site_inspection_id": site_inspection_id, "total_pages": len(discovered_pages), "pages": [ { "url": p["url"], "depth": p["depth"], "parent_url": p["parent_url"], "title": p.get("title"), } for p in discovered_pages ], }) logger.info( "Phase 1 completed: %s, pages=%d", site_inspection_id, len(discovered_pages), ) # ============================== # Phase 2: Page-by-page inspection # ============================== logger.info("Phase 2 (inspection) started: %s", site_inspection_id) settings = get_settings() semaphore = asyncio.Semaphore(settings.SITE_CONCURRENCY) tasks = [ self._inspect_page_with_semaphore( semaphore=semaphore, site_inspection_id=site_inspection_id, page_url=page["url"], page_index=idx, total_pages=len(discovered_pages), ) for idx, page in enumerate(discovered_pages) ] await asyncio.gather(*tasks, return_exceptions=True) # ============================== # Finalize: Compute aggregates # ============================== aggregate_scores = await self._compute_and_store_aggregates(site_inspection_id) # Mark as completed await self.db.site_inspections.update_one( {"site_inspection_id": site_inspection_id}, { "$set": { "status": "completed", "completed_at": datetime.now(timezone.utc), } }, ) # Publish complete event await self._publish_site_event(site_inspection_id, { "event_type": "complete", "site_inspection_id": site_inspection_id, "status": "completed", "aggregate_scores": aggregate_scores, }) logger.info("Site inspection completed: %s", site_inspection_id) except Exception as e: logger.error( "Site inspection %s failed: %s", site_inspection_id, str(e), exc_info=True, ) await self.db.site_inspections.update_one( {"site_inspection_id": site_inspection_id}, { "$set": { "status": "error", "completed_at": datetime.now(timezone.utc), } }, ) await self._publish_site_event(site_inspection_id, { "event_type": "error", "site_inspection_id": site_inspection_id, "status": "error", "message": f"사이트 검사 중 오류가 발생했습니다: {str(e)[:200]}", }) async def _inspect_page_with_semaphore( self, semaphore: asyncio.Semaphore, site_inspection_id: str, page_url: str, page_index: int, total_pages: int, ) -> None: """Inspect a single page with semaphore-controlled concurrency.""" async with semaphore: await self._inspect_single_page( site_inspection_id=site_inspection_id, page_url=page_url, page_index=page_index, total_pages=total_pages, ) async def _inspect_single_page( self, site_inspection_id: str, page_url: str, page_index: int, total_pages: int, ) -> None: """Run inspection for a single discovered page.""" inspection_id = str(uuid.uuid4()) # Publish page_start event await self._publish_site_event(site_inspection_id, { "event_type": "page_start", "site_inspection_id": site_inspection_id, "page_url": page_url, "page_index": page_index, }) # Mark page as inspecting in MongoDB await self.db.site_inspections.update_one( { "site_inspection_id": site_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "inspecting", "discovered_pages.$.inspection_id": inspection_id, } }, ) try: # Progress callback for per-page SSE updates async def page_progress_callback(category: str, progress: int, current_step: str): await self._publish_site_event(site_inspection_id, { "event_type": "page_progress", "site_inspection_id": site_inspection_id, "page_url": page_url, "page_index": page_index, "category": category, "progress": progress, "current_step": current_step, }) # Run the inspection _, result = await self.inspection_service.run_inspection_inline( url=page_url, inspection_id=inspection_id, progress_callback=page_progress_callback, ) overall_score = result.get("overall_score", 0) grade = result.get("grade", "F") # Update page status in MongoDB await self.db.site_inspections.update_one( { "site_inspection_id": site_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "completed", "discovered_pages.$.overall_score": overall_score, "discovered_pages.$.grade": grade, } }, ) # Publish page_complete event await self._publish_site_event(site_inspection_id, { "event_type": "page_complete", "site_inspection_id": site_inspection_id, "page_url": page_url, "page_index": page_index, "inspection_id": inspection_id, "overall_score": overall_score, "grade": grade, }) # Compute and publish aggregate update aggregate_scores = await self._compute_and_store_aggregates(site_inspection_id) await self._publish_site_event(site_inspection_id, { "event_type": "aggregate_update", "site_inspection_id": site_inspection_id, "pages_inspected": aggregate_scores.get("pages_inspected", 0), "pages_total": aggregate_scores.get("pages_total", total_pages), "overall_score": aggregate_scores.get("overall_score", 0), "grade": aggregate_scores.get("grade", "F"), }) logger.info( "Page inspection completed: site=%s, page=%s, score=%d", site_inspection_id, page_url, overall_score, ) except Exception as e: logger.error( "Page inspection failed: site=%s, page=%s, error=%s", site_inspection_id, page_url, str(e), ) # Mark page as error await self.db.site_inspections.update_one( { "site_inspection_id": site_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "error", } }, ) # Publish page error (non-fatal, continue with other pages) await self._publish_site_event(site_inspection_id, { "event_type": "page_complete", "site_inspection_id": site_inspection_id, "page_url": page_url, "page_index": page_index, "inspection_id": None, "overall_score": 0, "grade": "F", "error": str(e)[:200], }) # ------------------------------------------------------------------ # Aggregate computation # ------------------------------------------------------------------ async def _compute_and_store_aggregates(self, site_inspection_id: str) -> dict: """ Compute aggregate scores from all completed page inspections. Fetches each completed page's full inspection result from the inspections collection, averages category scores, and stores the aggregate in the site_inspections document. Returns the aggregate_scores dict. """ doc = await self.db.site_inspections.find_one( {"site_inspection_id": site_inspection_id}, ) if not doc: return {} pages = doc.get("discovered_pages", []) total_pages = len(pages) # Collect inspection IDs for completed pages completed_ids = [ p["inspection_id"] for p in pages if p.get("status") == "completed" and p.get("inspection_id") ] if not completed_ids: aggregate = { "overall_score": 0, "grade": "F", "html_css": 0, "accessibility": 0, "seo": 0, "performance_security": 0, "total_issues": 0, "pages_inspected": 0, "pages_total": total_pages, } await self._store_aggregates(site_inspection_id, aggregate) return aggregate # Fetch all completed inspection results cursor = self.db.inspections.find( {"inspection_id": {"$in": completed_ids}}, { "_id": 0, "overall_score": 1, "categories.html_css.score": 1, "categories.accessibility.score": 1, "categories.seo.score": 1, "categories.performance_security.score": 1, "summary.total_issues": 1, }, ) scores_overall = [] scores_html_css = [] scores_accessibility = [] scores_seo = [] scores_perf = [] total_issues = 0 async for insp in cursor: scores_overall.append(insp.get("overall_score", 0)) cats = insp.get("categories", {}) scores_html_css.append(cats.get("html_css", {}).get("score", 0)) scores_accessibility.append(cats.get("accessibility", {}).get("score", 0)) scores_seo.append(cats.get("seo", {}).get("score", 0)) scores_perf.append(cats.get("performance_security", {}).get("score", 0)) total_issues += insp.get("summary", {}).get("total_issues", 0) pages_inspected = len(scores_overall) def safe_avg(values: list[int]) -> int: return round(sum(values) / len(values)) if values else 0 overall_score = safe_avg(scores_overall) grade = calculate_grade(overall_score) aggregate = { "overall_score": overall_score, "grade": grade, "html_css": safe_avg(scores_html_css), "accessibility": safe_avg(scores_accessibility), "seo": safe_avg(scores_seo), "performance_security": safe_avg(scores_perf), "total_issues": total_issues, "pages_inspected": pages_inspected, "pages_total": total_pages, } await self._store_aggregates(site_inspection_id, aggregate) return aggregate async def _store_aggregates(self, site_inspection_id: str, aggregate: dict) -> None: """Store aggregate scores in MongoDB.""" await self.db.site_inspections.update_one( {"site_inspection_id": site_inspection_id}, {"$set": {"aggregate_scores": aggregate}}, ) # ------------------------------------------------------------------ # SSE event publishing # ------------------------------------------------------------------ async def _publish_site_event(self, site_inspection_id: str, event_data: dict) -> None: """Publish an SSE event for site inspection via Redis Pub/Sub.""" channel = f"site-inspection:{site_inspection_id}:events" await self.redis.publish( channel, json.dumps(event_data, ensure_ascii=False, default=str), )