""" Batch inspections router. Handles batch inspection lifecycle: - Start batch inspection (multipart file upload + inspect all URLs) - SSE stream for real-time progress - Get batch inspection result - List batch inspections (history) IMPORTANT: Static paths (/batch-inspections) must be registered BEFORE dynamic paths (/batch-inspections/{id}) to avoid routing conflicts. """ import json import logging from urllib.parse import urlparse from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile from sse_starlette.sse import EventSourceResponse from app.core.config import get_settings from app.core.database import get_db from app.core.redis import get_redis from app.models.batch_schemas import StartBatchInspectionResponse from app.services.batch_inspection_service import BatchInspectionService logger = logging.getLogger(__name__) router = APIRouter() # Maximum upload file size: 1MB MAX_FILE_SIZE = 1 * 1024 * 1024 # 1MB def _get_service() -> BatchInspectionService: """Get BatchInspectionService instance.""" db = get_db() redis = get_redis() return BatchInspectionService(db=db, redis=redis) def _validate_url(url: str) -> bool: """Validate that a URL has http or https scheme.""" try: parsed = urlparse(url) return parsed.scheme in ("http", "https") and bool(parsed.netloc) except Exception: return False def _parse_url_file(content: str) -> list[str]: """ Parse URL file content into a list of valid URLs. - One URL per line - Skip empty lines - Skip lines starting with # (comments) - Strip whitespace """ urls = [] for line in content.splitlines(): line = line.strip() if not line or line.startswith("#"): continue urls.append(line) return urls # ============================================================ # POST /api/batch-inspections -- Start batch inspection # ============================================================ @router.post("/batch-inspections", status_code=202) async def start_batch_inspection( file: UploadFile = File(...), name: str = Form(...), concurrency: int = Form(default=4), ): """ Start a new batch inspection from an uploaded URL file. Returns 202 Accepted with batch_inspection_id immediately. Inspection runs asynchronously in the background. File format: .txt, one URL per line, max 1MB, max 200 URLs. Lines starting with # are treated as comments and ignored. """ settings = get_settings() # Validate file extension if file.filename and not file.filename.lower().endswith(".txt"): raise HTTPException( status_code=422, detail="텍스트 파일(.txt)만 업로드 가능합니다", ) # Validate file size content_bytes = await file.read() if len(content_bytes) > MAX_FILE_SIZE: raise HTTPException( status_code=422, detail="파일 크기가 1MB를 초과합니다", ) # Parse file content try: content = content_bytes.decode("utf-8") except UnicodeDecodeError: raise HTTPException( status_code=422, detail="파일 인코딩이 올바르지 않습니다 (UTF-8만 지원)", ) urls = _parse_url_file(content) if not urls: raise HTTPException( status_code=422, detail="파일에 유효한 URL이 없습니다", ) # Validate URL count if len(urls) > settings.BATCH_MAX_URLS: raise HTTPException( status_code=422, detail=f"URL 수가 최대 허용치({settings.BATCH_MAX_URLS}개)를 초과합니다 (입력: {len(urls)}개)", ) # Validate each URL invalid_urls = [url for url in urls if not _validate_url(url)] if invalid_urls: # Show first 5 invalid URLs sample = invalid_urls[:5] detail = f"유효하지 않은 URL이 포함되어 있습니다: {', '.join(sample)}" if len(invalid_urls) > 5: detail += f" 외 {len(invalid_urls) - 5}건" raise HTTPException( status_code=422, detail=detail, ) # Validate concurrency concurrency = max(1, min(concurrency, settings.BATCH_CONCURRENCY)) # Validate name name = name.strip() if not name: raise HTTPException( status_code=422, detail="배치 검사 이름을 입력해주세요", ) service = _get_service() try: batch_inspection_id = await service.start_batch_inspection( name=name, urls=urls, concurrency=concurrency, ) except Exception as e: logger.error("Failed to start batch inspection: %s", str(e)) raise HTTPException( status_code=400, detail="배치 검사를 시작할 수 없습니다", ) return StartBatchInspectionResponse( batch_inspection_id=batch_inspection_id, status="inspecting", name=name, total_urls=len(urls), stream_url=f"/api/batch-inspections/{batch_inspection_id}/stream", ) # ============================================================ # GET /api/batch-inspections -- List batch inspections (history) # IMPORTANT: This MUST be before /{batch_inspection_id} routes # ============================================================ @router.get("/batch-inspections") async def list_batch_inspections( page: int = Query(default=1, ge=1), limit: int = Query(default=20, ge=1, le=100), name: str = Query(default=None, description="이름 검색 필터"), ): """Get paginated batch inspection history.""" service = _get_service() result = await service.get_batch_inspection_list( page=page, limit=limit, name_filter=name, ) return result # ============================================================ # GET /api/batch-inspections/{batch_inspection_id}/stream -- SSE # ============================================================ @router.get("/batch-inspections/{batch_inspection_id}/stream") async def batch_inspection_stream(batch_inspection_id: str): """ Stream batch inspection progress via Server-Sent Events. Events: - connected: { batch_inspection_id, message } - page_start: { batch_inspection_id, page_url, page_index } - page_complete: { batch_inspection_id, page_url, inspection_id, overall_score, grade } - aggregate_update: { batch_inspection_id, pages_inspected, pages_total, overall_score, grade } - complete: { batch_inspection_id, status: "completed", aggregate_scores: {...} } - error: { batch_inspection_id, message } """ async def event_generator(): redis = get_redis() pubsub = redis.pubsub() channel = f"batch-inspection:{batch_inspection_id}:events" await pubsub.subscribe(channel) try: # Send initial connected event yield { "event": "connected", "data": json.dumps({ "batch_inspection_id": batch_inspection_id, "message": "SSE 연결 완료", }, ensure_ascii=False), } # Listen for Pub/Sub messages async for message in pubsub.listen(): if message["type"] == "message": event_data = json.loads(message["data"]) event_type = event_data.pop("event_type", "progress") yield { "event": event_type, "data": json.dumps(event_data, ensure_ascii=False), } # End stream on complete or error if event_type in ("complete", "error"): break except Exception as e: logger.error( "SSE stream error for batch %s: %s", batch_inspection_id, str(e), ) yield { "event": "error", "data": json.dumps({ "batch_inspection_id": batch_inspection_id, "status": "error", "message": "스트리밍 중 오류가 발생했습니다", }, ensure_ascii=False), } finally: await pubsub.unsubscribe(channel) await pubsub.aclose() return EventSourceResponse( event_generator(), media_type="text/event-stream", ) # ============================================================ # GET /api/batch-inspections/{batch_inspection_id} -- Get result # ============================================================ @router.get("/batch-inspections/{batch_inspection_id}") async def get_batch_inspection(batch_inspection_id: str): """Get batch inspection result by ID.""" service = _get_service() result = await service.get_batch_inspection(batch_inspection_id) if result is None: raise HTTPException( status_code=404, detail="배치 검사 결과를 찾을 수 없습니다", ) # Remove MongoDB _id field if present result.pop("_id", None) return result