""" Batch inspection orchestration service. Manages batch inspection lifecycle without crawling: 1. Accept a list of URLs (from uploaded file) 2. Parallel inspection of each URL (semaphore-controlled) 3. Aggregate score computation 4. Progress tracking via Redis Pub/Sub (SSE events) 5. Result storage in MongoDB (batch_inspections collection) """ import asyncio import json import logging import uuid from datetime import datetime, timezone from typing import Optional from motor.motor_asyncio import AsyncIOMotorDatabase from redis.asyncio import Redis from app.core.config import get_settings from app.models.schemas import calculate_grade from app.services.inspection_service import InspectionService logger = logging.getLogger(__name__) # Redis key TTLs BATCH_RESULT_CACHE_TTL = 3600 # 1 hour class BatchInspectionService: """Batch inspection orchestration service.""" def __init__(self, db: AsyncIOMotorDatabase, redis: Redis): self.db = db self.redis = redis self.inspection_service = InspectionService(db=db, redis=redis) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def start_batch_inspection( self, name: str, urls: list[str], concurrency: int = 4, accessibility_standard: str = "wcag_2.1_aa", ) -> str: """ Start a batch inspection. 1. Generate batch_inspection_id 2. Create initial MongoDB document with status "inspecting" 3. Build discovered_pages array (depth=0, parent_url=None) 4. Launch background inspect-all task 5. Return batch_inspection_id """ settings = get_settings() # Clamp concurrency to server-side limit concurrency = min(concurrency, settings.BATCH_CONCURRENCY) batch_inspection_id = str(uuid.uuid4()) # Build discovered_pages documents (중복 URL 제거) discovered_pages = [] seen_urls: set[str] = set() for url in urls: if url in seen_urls: continue seen_urls.add(url) discovered_pages.append({ "url": url, "depth": 0, "parent_url": None, "inspection_id": None, "status": "pending", "title": None, "overall_score": None, "grade": None, }) # 중복 제거된 URL 목록으로 갱신 urls = list(seen_urls) # Create initial document doc = { "batch_inspection_id": batch_inspection_id, "name": name, "status": "inspecting", "created_at": datetime.now(timezone.utc), "completed_at": None, "config": { "concurrency": concurrency, "accessibility_standard": accessibility_standard, }, "source_urls": urls, "discovered_pages": discovered_pages, "aggregate_scores": None, } await self.db.batch_inspections.insert_one(doc) logger.info( "Batch inspection started: id=%s, name=%s, total_urls=%d, concurrency=%d, standard=%s", batch_inspection_id, name, len(urls), concurrency, accessibility_standard, ) # Launch background task asyncio.create_task( self._inspect_all(batch_inspection_id, urls, concurrency, accessibility_standard) ) return batch_inspection_id async def get_batch_inspection(self, batch_inspection_id: str) -> Optional[dict]: """Get batch inspection result by ID (cache-first).""" # Try Redis cache first cache_key = f"batch-inspection:result:{batch_inspection_id}" cached = await self.redis.get(cache_key) if cached: return json.loads(cached) # Fetch from MongoDB doc = await self.db.batch_inspections.find_one( {"batch_inspection_id": batch_inspection_id}, {"_id": 0}, ) if doc: # Only cache completed results if doc.get("status") in ("completed", "error"): await self.redis.set( cache_key, json.dumps(doc, ensure_ascii=False, default=str), ex=BATCH_RESULT_CACHE_TTL, ) return doc return None async def get_batch_inspection_list( self, page: int = 1, limit: int = 20, name_filter: Optional[str] = None, ) -> dict: """Get paginated list of batch inspections.""" limit = min(limit, 100) skip = (page - 1) * limit # Build query query = {} if name_filter: query["name"] = {"$regex": name_filter, "$options": "i"} total = await self.db.batch_inspections.count_documents(query) cursor = self.db.batch_inspections.find( query, { "_id": 0, "batch_inspection_id": 1, "name": 1, "status": 1, "created_at": 1, "discovered_pages": 1, "aggregate_scores": 1, "source_urls": 1, }, ).sort("created_at", -1).skip(skip).limit(limit) items = [] async for doc in cursor: pages = doc.get("discovered_pages", []) total_urls = len(doc.get("source_urls", [])) pages_inspected = sum( 1 for p in pages if p.get("status") == "completed" ) agg = doc.get("aggregate_scores") items.append({ "batch_inspection_id": doc.get("batch_inspection_id"), "name": doc.get("name"), "status": doc.get("status"), "created_at": doc.get("created_at"), "total_urls": total_urls, "pages_inspected": pages_inspected, "overall_score": agg.get("overall_score") if agg else None, "grade": agg.get("grade") if agg else None, }) total_pages = max(1, -(-total // limit)) return { "items": items, "total": total, "page": page, "limit": limit, "total_pages": total_pages, } # ------------------------------------------------------------------ # Background task: Inspect All URLs # ------------------------------------------------------------------ async def _inspect_all( self, batch_inspection_id: str, urls: list[str], concurrency: int = 4, accessibility_standard: str = "wcag_2.1_aa", ) -> None: """ Background task that inspects all URLs in parallel. No crawling phase - URLs are inspected directly. """ try: logger.info( "Batch inspection started: %s, urls=%d", batch_inspection_id, len(urls), ) semaphore = asyncio.Semaphore(concurrency) tasks = [ self._inspect_page_with_semaphore( semaphore=semaphore, batch_inspection_id=batch_inspection_id, page_url=url, page_index=idx, total_pages=len(urls), accessibility_standard=accessibility_standard, ) for idx, url in enumerate(urls) ] await asyncio.gather(*tasks, return_exceptions=True) # ============================== # Finalize: Compute aggregates # ============================== aggregate_scores = await self._compute_and_store_aggregates(batch_inspection_id) # Mark as completed await self.db.batch_inspections.update_one( {"batch_inspection_id": batch_inspection_id}, { "$set": { "status": "completed", "completed_at": datetime.now(timezone.utc), } }, ) # Publish complete event await self._publish_batch_event(batch_inspection_id, { "event_type": "complete", "batch_inspection_id": batch_inspection_id, "status": "completed", "aggregate_scores": aggregate_scores, }) logger.info("Batch inspection completed: %s", batch_inspection_id) except Exception as e: logger.error( "Batch inspection %s failed: %s", batch_inspection_id, str(e), exc_info=True, ) await self.db.batch_inspections.update_one( {"batch_inspection_id": batch_inspection_id}, { "$set": { "status": "error", "completed_at": datetime.now(timezone.utc), } }, ) await self._publish_batch_event(batch_inspection_id, { "event_type": "error", "batch_inspection_id": batch_inspection_id, "status": "error", "message": f"배치 검사 중 오류가 발생했습니다: {str(e)[:200]}", }) async def _inspect_page_with_semaphore( self, semaphore: asyncio.Semaphore, batch_inspection_id: str, page_url: str, page_index: int, total_pages: int, accessibility_standard: str = "wcag_2.1_aa", ) -> None: """Inspect a single page with semaphore-controlled concurrency.""" async with semaphore: await self._inspect_single_page( batch_inspection_id=batch_inspection_id, page_url=page_url, page_index=page_index, total_pages=total_pages, accessibility_standard=accessibility_standard, ) async def _inspect_single_page( self, batch_inspection_id: str, page_url: str, page_index: int, total_pages: int, accessibility_standard: str = "wcag_2.1_aa", ) -> None: """Run inspection for a single page in the batch.""" inspection_id = str(uuid.uuid4()) # Publish page_start event await self._publish_batch_event(batch_inspection_id, { "event_type": "page_start", "batch_inspection_id": batch_inspection_id, "page_url": page_url, "page_index": page_index, }) # Mark page as inspecting in MongoDB await self.db.batch_inspections.update_one( { "batch_inspection_id": batch_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "inspecting", "discovered_pages.$.inspection_id": inspection_id, } }, ) try: # Progress callback for per-page SSE updates async def page_progress_callback(category: str, progress: int, current_step: str): await self._publish_batch_event(batch_inspection_id, { "event_type": "page_progress", "batch_inspection_id": batch_inspection_id, "page_url": page_url, "page_index": page_index, "category": category, "progress": progress, "current_step": current_step, }) # Run the inspection _, result = await self.inspection_service.run_inspection_inline( url=page_url, inspection_id=inspection_id, progress_callback=page_progress_callback, accessibility_standard=accessibility_standard, ) overall_score = result.get("overall_score", 0) grade = result.get("grade", "F") # Update page status in MongoDB await self.db.batch_inspections.update_one( { "batch_inspection_id": batch_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "completed", "discovered_pages.$.overall_score": overall_score, "discovered_pages.$.grade": grade, } }, ) # Publish page_complete event await self._publish_batch_event(batch_inspection_id, { "event_type": "page_complete", "batch_inspection_id": batch_inspection_id, "page_url": page_url, "inspection_id": inspection_id, "overall_score": overall_score, "grade": grade, }) # Compute and publish aggregate update aggregate_scores = await self._compute_and_store_aggregates(batch_inspection_id) await self._publish_batch_event(batch_inspection_id, { "event_type": "aggregate_update", "batch_inspection_id": batch_inspection_id, "pages_inspected": aggregate_scores.get("pages_inspected", 0), "pages_total": aggregate_scores.get("pages_total", total_pages), "overall_score": aggregate_scores.get("overall_score", 0), "grade": aggregate_scores.get("grade", "F"), }) logger.info( "Page inspection completed: batch=%s, page=%s, score=%d", batch_inspection_id, page_url, overall_score, ) except Exception as e: logger.error( "Page inspection failed: batch=%s, page=%s, error=%s", batch_inspection_id, page_url, str(e), ) # Mark page as error await self.db.batch_inspections.update_one( { "batch_inspection_id": batch_inspection_id, "discovered_pages.url": page_url, }, { "$set": { "discovered_pages.$.status": "error", } }, ) # Publish page error (non-fatal, continue with other pages) await self._publish_batch_event(batch_inspection_id, { "event_type": "page_complete", "batch_inspection_id": batch_inspection_id, "page_url": page_url, "inspection_id": None, "overall_score": 0, "grade": "F", "error": str(e)[:200], }) # ------------------------------------------------------------------ # Aggregate computation # ------------------------------------------------------------------ async def _compute_and_store_aggregates(self, batch_inspection_id: str) -> dict: """ Compute aggregate scores from all completed page inspections. Fetches each completed page's full inspection result from the inspections collection, averages category scores, and stores the aggregate in the batch_inspections document. Returns the aggregate_scores dict. """ doc = await self.db.batch_inspections.find_one( {"batch_inspection_id": batch_inspection_id}, ) if not doc: return {} pages = doc.get("discovered_pages", []) total_pages = len(pages) # Collect inspection IDs for completed pages completed_ids = [ p["inspection_id"] for p in pages if p.get("status") == "completed" and p.get("inspection_id") ] if not completed_ids: aggregate = { "overall_score": 0, "grade": "F", "html_css": 0, "accessibility": 0, "seo": 0, "performance_security": 0, "total_issues": 0, "pages_inspected": 0, "pages_total": total_pages, } await self._store_aggregates(batch_inspection_id, aggregate) return aggregate # Fetch all completed inspection results cursor = self.db.inspections.find( {"inspection_id": {"$in": completed_ids}}, { "_id": 0, "overall_score": 1, "categories.html_css.score": 1, "categories.accessibility.score": 1, "categories.seo.score": 1, "categories.performance_security.score": 1, "summary.total_issues": 1, }, ) scores_overall = [] scores_html_css = [] scores_accessibility = [] scores_seo = [] scores_perf = [] total_issues = 0 async for insp in cursor: scores_overall.append(insp.get("overall_score", 0)) cats = insp.get("categories", {}) scores_html_css.append(cats.get("html_css", {}).get("score", 0)) scores_accessibility.append(cats.get("accessibility", {}).get("score", 0)) scores_seo.append(cats.get("seo", {}).get("score", 0)) scores_perf.append(cats.get("performance_security", {}).get("score", 0)) total_issues += insp.get("summary", {}).get("total_issues", 0) pages_inspected = len(scores_overall) def safe_avg(values: list[int]) -> int: return round(sum(values) / len(values)) if values else 0 overall_score = safe_avg(scores_overall) grade = calculate_grade(overall_score) aggregate = { "overall_score": overall_score, "grade": grade, "html_css": safe_avg(scores_html_css), "accessibility": safe_avg(scores_accessibility), "seo": safe_avg(scores_seo), "performance_security": safe_avg(scores_perf), "total_issues": total_issues, "pages_inspected": pages_inspected, "pages_total": total_pages, } await self._store_aggregates(batch_inspection_id, aggregate) return aggregate async def _store_aggregates(self, batch_inspection_id: str, aggregate: dict) -> None: """Store aggregate scores in MongoDB.""" await self.db.batch_inspections.update_one( {"batch_inspection_id": batch_inspection_id}, {"$set": {"aggregate_scores": aggregate}}, ) # ------------------------------------------------------------------ # SSE event publishing # ------------------------------------------------------------------ async def _publish_batch_event(self, batch_inspection_id: str, event_data: dict) -> None: """Publish an SSE event for batch inspection via Redis Pub/Sub.""" channel = f"batch-inspection:{batch_inspection_id}:events" await self.redis.publish( channel, json.dumps(event_data, ensure_ascii=False, default=str), )