feat: 3-mode inspection with tabbed UI + batch upload

- Add batch inspection backend (multipart upload, SSE streaming, MongoDB)
- Add tabbed UI (single page / site crawling / batch upload) on home and history pages
- Add batch inspection progress, result pages with 2-panel layout
- Rename "사이트 전체" to "사이트 크롤링" across codebase
- Add python-multipart dependency for file upload
- Consolidate nginx SSE location for all inspection types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jungwoo choi
2026-02-13 19:15:27 +09:00
parent 9bb844c5e1
commit 8326c84be9
32 changed files with 3700 additions and 61 deletions

View File

@ -0,0 +1,537 @@
"""
Batch inspection orchestration service.
Manages batch inspection lifecycle without crawling:
1. Accept a list of URLs (from uploaded file)
2. Parallel inspection of each URL (semaphore-controlled)
3. Aggregate score computation
4. Progress tracking via Redis Pub/Sub (SSE events)
5. Result storage in MongoDB (batch_inspections collection)
"""
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorDatabase
from redis.asyncio import Redis
from app.core.config import get_settings
from app.models.schemas import calculate_grade
from app.services.inspection_service import InspectionService
logger = logging.getLogger(__name__)
# Redis key TTLs
BATCH_RESULT_CACHE_TTL = 3600 # 1 hour
class BatchInspectionService:
"""Batch inspection orchestration service."""
def __init__(self, db: AsyncIOMotorDatabase, redis: Redis):
self.db = db
self.redis = redis
self.inspection_service = InspectionService(db=db, redis=redis)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def start_batch_inspection(
self,
name: str,
urls: list[str],
concurrency: int = 4,
) -> str:
"""
Start a batch inspection.
1. Generate batch_inspection_id
2. Create initial MongoDB document with status "inspecting"
3. Build discovered_pages array (depth=0, parent_url=None)
4. Launch background inspect-all task
5. Return batch_inspection_id
"""
settings = get_settings()
# Clamp concurrency to server-side limit
concurrency = min(concurrency, settings.BATCH_CONCURRENCY)
batch_inspection_id = str(uuid.uuid4())
# Build discovered_pages documents
discovered_pages = []
for url in urls:
discovered_pages.append({
"url": url,
"depth": 0,
"parent_url": None,
"inspection_id": None,
"status": "pending",
"title": None,
"overall_score": None,
"grade": None,
})
# Create initial document
doc = {
"batch_inspection_id": batch_inspection_id,
"name": name,
"status": "inspecting",
"created_at": datetime.now(timezone.utc),
"completed_at": None,
"config": {
"concurrency": concurrency,
},
"source_urls": urls,
"discovered_pages": discovered_pages,
"aggregate_scores": None,
}
await self.db.batch_inspections.insert_one(doc)
logger.info(
"Batch inspection started: id=%s, name=%s, total_urls=%d, concurrency=%d",
batch_inspection_id, name, len(urls), concurrency,
)
# Launch background task
asyncio.create_task(
self._inspect_all(batch_inspection_id, urls, concurrency)
)
return batch_inspection_id
async def get_batch_inspection(self, batch_inspection_id: str) -> Optional[dict]:
"""Get batch inspection result by ID (cache-first)."""
# Try Redis cache first
cache_key = f"batch-inspection:result:{batch_inspection_id}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from MongoDB
doc = await self.db.batch_inspections.find_one(
{"batch_inspection_id": batch_inspection_id},
{"_id": 0},
)
if doc:
# Only cache completed results
if doc.get("status") in ("completed", "error"):
await self.redis.set(
cache_key,
json.dumps(doc, ensure_ascii=False, default=str),
ex=BATCH_RESULT_CACHE_TTL,
)
return doc
return None
async def get_batch_inspection_list(
self,
page: int = 1,
limit: int = 20,
name_filter: Optional[str] = None,
) -> dict:
"""Get paginated list of batch inspections."""
limit = min(limit, 100)
skip = (page - 1) * limit
# Build query
query = {}
if name_filter:
query["name"] = {"$regex": name_filter, "$options": "i"}
total = await self.db.batch_inspections.count_documents(query)
cursor = self.db.batch_inspections.find(
query,
{
"_id": 0,
"batch_inspection_id": 1,
"name": 1,
"status": 1,
"created_at": 1,
"discovered_pages": 1,
"aggregate_scores": 1,
"source_urls": 1,
},
).sort("created_at", -1).skip(skip).limit(limit)
items = []
async for doc in cursor:
pages = doc.get("discovered_pages", [])
total_urls = len(doc.get("source_urls", []))
pages_inspected = sum(
1 for p in pages if p.get("status") == "completed"
)
agg = doc.get("aggregate_scores")
items.append({
"batch_inspection_id": doc.get("batch_inspection_id"),
"name": doc.get("name"),
"status": doc.get("status"),
"created_at": doc.get("created_at"),
"total_urls": total_urls,
"pages_inspected": pages_inspected,
"overall_score": agg.get("overall_score") if agg else None,
"grade": agg.get("grade") if agg else None,
})
total_pages = max(1, -(-total // limit))
return {
"items": items,
"total": total,
"page": page,
"limit": limit,
"total_pages": total_pages,
}
# ------------------------------------------------------------------
# Background task: Inspect All URLs
# ------------------------------------------------------------------
async def _inspect_all(
self,
batch_inspection_id: str,
urls: list[str],
concurrency: int = 4,
) -> None:
"""
Background task that inspects all URLs in parallel.
No crawling phase - URLs are inspected directly.
"""
try:
logger.info(
"Batch inspection started: %s, urls=%d",
batch_inspection_id, len(urls),
)
semaphore = asyncio.Semaphore(concurrency)
tasks = [
self._inspect_page_with_semaphore(
semaphore=semaphore,
batch_inspection_id=batch_inspection_id,
page_url=url,
page_index=idx,
total_pages=len(urls),
)
for idx, url in enumerate(urls)
]
await asyncio.gather(*tasks, return_exceptions=True)
# ==============================
# Finalize: Compute aggregates
# ==============================
aggregate_scores = await self._compute_and_store_aggregates(batch_inspection_id)
# Mark as completed
await self.db.batch_inspections.update_one(
{"batch_inspection_id": batch_inspection_id},
{
"$set": {
"status": "completed",
"completed_at": datetime.now(timezone.utc),
}
},
)
# Publish complete event
await self._publish_batch_event(batch_inspection_id, {
"event_type": "complete",
"batch_inspection_id": batch_inspection_id,
"status": "completed",
"aggregate_scores": aggregate_scores,
})
logger.info("Batch inspection completed: %s", batch_inspection_id)
except Exception as e:
logger.error(
"Batch inspection %s failed: %s",
batch_inspection_id, str(e), exc_info=True,
)
await self.db.batch_inspections.update_one(
{"batch_inspection_id": batch_inspection_id},
{
"$set": {
"status": "error",
"completed_at": datetime.now(timezone.utc),
}
},
)
await self._publish_batch_event(batch_inspection_id, {
"event_type": "error",
"batch_inspection_id": batch_inspection_id,
"status": "error",
"message": f"배치 검사 중 오류가 발생했습니다: {str(e)[:200]}",
})
async def _inspect_page_with_semaphore(
self,
semaphore: asyncio.Semaphore,
batch_inspection_id: str,
page_url: str,
page_index: int,
total_pages: int,
) -> None:
"""Inspect a single page with semaphore-controlled concurrency."""
async with semaphore:
await self._inspect_single_page(
batch_inspection_id=batch_inspection_id,
page_url=page_url,
page_index=page_index,
total_pages=total_pages,
)
async def _inspect_single_page(
self,
batch_inspection_id: str,
page_url: str,
page_index: int,
total_pages: int,
) -> None:
"""Run inspection for a single page in the batch."""
inspection_id = str(uuid.uuid4())
# Publish page_start event
await self._publish_batch_event(batch_inspection_id, {
"event_type": "page_start",
"batch_inspection_id": batch_inspection_id,
"page_url": page_url,
"page_index": page_index,
})
# Mark page as inspecting in MongoDB
await self.db.batch_inspections.update_one(
{
"batch_inspection_id": batch_inspection_id,
"discovered_pages.url": page_url,
},
{
"$set": {
"discovered_pages.$.status": "inspecting",
"discovered_pages.$.inspection_id": inspection_id,
}
},
)
try:
# Progress callback for per-page SSE updates
async def page_progress_callback(category: str, progress: int, current_step: str):
await self._publish_batch_event(batch_inspection_id, {
"event_type": "page_progress",
"batch_inspection_id": batch_inspection_id,
"page_url": page_url,
"page_index": page_index,
"category": category,
"progress": progress,
"current_step": current_step,
})
# Run the inspection
_, result = await self.inspection_service.run_inspection_inline(
url=page_url,
inspection_id=inspection_id,
progress_callback=page_progress_callback,
)
overall_score = result.get("overall_score", 0)
grade = result.get("grade", "F")
# Update page status in MongoDB
await self.db.batch_inspections.update_one(
{
"batch_inspection_id": batch_inspection_id,
"discovered_pages.url": page_url,
},
{
"$set": {
"discovered_pages.$.status": "completed",
"discovered_pages.$.overall_score": overall_score,
"discovered_pages.$.grade": grade,
}
},
)
# Publish page_complete event
await self._publish_batch_event(batch_inspection_id, {
"event_type": "page_complete",
"batch_inspection_id": batch_inspection_id,
"page_url": page_url,
"inspection_id": inspection_id,
"overall_score": overall_score,
"grade": grade,
})
# Compute and publish aggregate update
aggregate_scores = await self._compute_and_store_aggregates(batch_inspection_id)
await self._publish_batch_event(batch_inspection_id, {
"event_type": "aggregate_update",
"batch_inspection_id": batch_inspection_id,
"pages_inspected": aggregate_scores.get("pages_inspected", 0),
"pages_total": aggregate_scores.get("pages_total", total_pages),
"overall_score": aggregate_scores.get("overall_score", 0),
"grade": aggregate_scores.get("grade", "F"),
})
logger.info(
"Page inspection completed: batch=%s, page=%s, score=%d",
batch_inspection_id, page_url, overall_score,
)
except Exception as e:
logger.error(
"Page inspection failed: batch=%s, page=%s, error=%s",
batch_inspection_id, page_url, str(e),
)
# Mark page as error
await self.db.batch_inspections.update_one(
{
"batch_inspection_id": batch_inspection_id,
"discovered_pages.url": page_url,
},
{
"$set": {
"discovered_pages.$.status": "error",
}
},
)
# Publish page error (non-fatal, continue with other pages)
await self._publish_batch_event(batch_inspection_id, {
"event_type": "page_complete",
"batch_inspection_id": batch_inspection_id,
"page_url": page_url,
"inspection_id": None,
"overall_score": 0,
"grade": "F",
"error": str(e)[:200],
})
# ------------------------------------------------------------------
# Aggregate computation
# ------------------------------------------------------------------
async def _compute_and_store_aggregates(self, batch_inspection_id: str) -> dict:
"""
Compute aggregate scores from all completed page inspections.
Fetches each completed page's full inspection result from the
inspections collection, averages category scores, and stores
the aggregate in the batch_inspections document.
Returns the aggregate_scores dict.
"""
doc = await self.db.batch_inspections.find_one(
{"batch_inspection_id": batch_inspection_id},
)
if not doc:
return {}
pages = doc.get("discovered_pages", [])
total_pages = len(pages)
# Collect inspection IDs for completed pages
completed_ids = [
p["inspection_id"]
for p in pages
if p.get("status") == "completed" and p.get("inspection_id")
]
if not completed_ids:
aggregate = {
"overall_score": 0,
"grade": "F",
"html_css": 0,
"accessibility": 0,
"seo": 0,
"performance_security": 0,
"total_issues": 0,
"pages_inspected": 0,
"pages_total": total_pages,
}
await self._store_aggregates(batch_inspection_id, aggregate)
return aggregate
# Fetch all completed inspection results
cursor = self.db.inspections.find(
{"inspection_id": {"$in": completed_ids}},
{
"_id": 0,
"overall_score": 1,
"categories.html_css.score": 1,
"categories.accessibility.score": 1,
"categories.seo.score": 1,
"categories.performance_security.score": 1,
"summary.total_issues": 1,
},
)
scores_overall = []
scores_html_css = []
scores_accessibility = []
scores_seo = []
scores_perf = []
total_issues = 0
async for insp in cursor:
scores_overall.append(insp.get("overall_score", 0))
cats = insp.get("categories", {})
scores_html_css.append(cats.get("html_css", {}).get("score", 0))
scores_accessibility.append(cats.get("accessibility", {}).get("score", 0))
scores_seo.append(cats.get("seo", {}).get("score", 0))
scores_perf.append(cats.get("performance_security", {}).get("score", 0))
total_issues += insp.get("summary", {}).get("total_issues", 0)
pages_inspected = len(scores_overall)
def safe_avg(values: list[int]) -> int:
return round(sum(values) / len(values)) if values else 0
overall_score = safe_avg(scores_overall)
grade = calculate_grade(overall_score)
aggregate = {
"overall_score": overall_score,
"grade": grade,
"html_css": safe_avg(scores_html_css),
"accessibility": safe_avg(scores_accessibility),
"seo": safe_avg(scores_seo),
"performance_security": safe_avg(scores_perf),
"total_issues": total_issues,
"pages_inspected": pages_inspected,
"pages_total": total_pages,
}
await self._store_aggregates(batch_inspection_id, aggregate)
return aggregate
async def _store_aggregates(self, batch_inspection_id: str, aggregate: dict) -> None:
"""Store aggregate scores in MongoDB."""
await self.db.batch_inspections.update_one(
{"batch_inspection_id": batch_inspection_id},
{"$set": {"aggregate_scores": aggregate}},
)
# ------------------------------------------------------------------
# SSE event publishing
# ------------------------------------------------------------------
async def _publish_batch_event(self, batch_inspection_id: str, event_data: dict) -> None:
"""Publish an SSE event for batch inspection via Redis Pub/Sub."""
channel = f"batch-inspection:{batch_inspection_id}:events"
await self.redis.publish(
channel,
json.dumps(event_data, ensure_ascii=False, default=str),
)