3가지 검사 모드(한 페이지, 사이트 크롤링, 목록 업로드) 모두에서 접근성 표준을 선택할 수 있도록 추가. WCAG 2.0 A/AA, 2.1 AA, 2.2 AA와 KWCAG 2.1, 2.2를 지원하며, KWCAG 선택 시 axe-core 결과를 KWCAG 검사항목으로 자동 매핑. - KWCAG 2.2 (33항목) / 2.1 (24항목) ↔ WCAG 매핑 테이블 (kwcag_mapping.py) - AccessibilityChecker에 표준 파싱 및 KWCAG 변환 로직 추가 - 전체 API 파이프라인에 accessibility_standard 파라미터 전파 - 프론트엔드 3개 폼에 공용 표준 선택 드롭다운 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
690 lines
24 KiB
Python
690 lines
24 KiB
Python
"""
|
|
Site-wide inspection orchestration service.
|
|
|
|
Manages the full site inspection lifecycle:
|
|
1. BFS crawling to discover same-domain pages
|
|
2. Sequential/parallel inspection of each discovered page
|
|
3. Aggregate score computation
|
|
4. Progress tracking via Redis Pub/Sub (SSE events)
|
|
5. Result storage in MongoDB (site_inspections collection)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
from redis.asyncio import Redis
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.redis import get_redis
|
|
from app.models.schemas import calculate_grade
|
|
from app.services.link_crawler import LinkCrawler
|
|
from app.services.inspection_service import InspectionService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Redis key TTLs
|
|
SITE_RESULT_CACHE_TTL = 3600 # 1 hour
|
|
|
|
|
|
class SiteInspectionService:
|
|
"""Site-wide inspection orchestration service."""
|
|
|
|
def __init__(self, db: AsyncIOMotorDatabase, redis: Redis):
|
|
self.db = db
|
|
self.redis = redis
|
|
self.inspection_service = InspectionService(db=db, redis=redis)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
async def start_site_inspection(
|
|
self,
|
|
url: str,
|
|
max_pages: int = 20,
|
|
max_depth: int = 2,
|
|
concurrency: int = 4,
|
|
accessibility_standard: str = "wcag_2.1_aa",
|
|
) -> str:
|
|
"""
|
|
Start a site-wide inspection.
|
|
|
|
1. Validate URL
|
|
2. Generate site_inspection_id
|
|
3. Create initial MongoDB document with status "crawling"
|
|
4. Launch background crawl-and-inspect task
|
|
5. Return site_inspection_id
|
|
"""
|
|
settings = get_settings()
|
|
|
|
# Clamp to server-side limits (0 = unlimited, don't clamp)
|
|
if max_pages > 0:
|
|
max_pages = min(max_pages, settings.SITE_MAX_PAGES)
|
|
max_depth = min(max_depth, settings.SITE_MAX_DEPTH)
|
|
concurrency = min(concurrency, settings.SITE_CONCURRENCY)
|
|
|
|
site_inspection_id = str(uuid.uuid4())
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc.lower()
|
|
|
|
# Create initial document
|
|
doc = {
|
|
"site_inspection_id": site_inspection_id,
|
|
"root_url": url,
|
|
"domain": domain,
|
|
"status": "crawling",
|
|
"created_at": datetime.now(timezone.utc),
|
|
"completed_at": None,
|
|
"config": {
|
|
"max_pages": max_pages,
|
|
"max_depth": max_depth,
|
|
"concurrency": concurrency,
|
|
"accessibility_standard": accessibility_standard,
|
|
},
|
|
"discovered_pages": [],
|
|
"aggregate_scores": None,
|
|
}
|
|
await self.db.site_inspections.insert_one(doc)
|
|
|
|
logger.info(
|
|
"Site inspection started: id=%s, url=%s, max_pages=%d, max_depth=%d, concurrency=%d, standard=%s",
|
|
site_inspection_id, url, max_pages, max_depth, concurrency, accessibility_standard,
|
|
)
|
|
|
|
# Launch background task
|
|
asyncio.create_task(
|
|
self._crawl_and_inspect(
|
|
site_inspection_id, url, max_pages, max_depth, concurrency,
|
|
accessibility_standard=accessibility_standard,
|
|
)
|
|
)
|
|
|
|
return site_inspection_id
|
|
|
|
async def get_site_inspection(self, site_inspection_id: str) -> Optional[dict]:
|
|
"""Get site inspection result by ID (cache-first)."""
|
|
# Try Redis cache first
|
|
cache_key = f"site-inspection:result:{site_inspection_id}"
|
|
cached = await self.redis.get(cache_key)
|
|
if cached:
|
|
return json.loads(cached)
|
|
|
|
# Fetch from MongoDB
|
|
doc = await self.db.site_inspections.find_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
{"_id": 0},
|
|
)
|
|
if doc:
|
|
# Only cache completed results
|
|
if doc.get("status") in ("completed", "error"):
|
|
await self.redis.set(
|
|
cache_key,
|
|
json.dumps(doc, ensure_ascii=False, default=str),
|
|
ex=SITE_RESULT_CACHE_TTL,
|
|
)
|
|
return doc
|
|
return None
|
|
|
|
async def get_site_inspection_list(
|
|
self,
|
|
page: int = 1,
|
|
limit: int = 20,
|
|
) -> dict:
|
|
"""Get paginated list of site inspections."""
|
|
limit = min(limit, 100)
|
|
skip = (page - 1) * limit
|
|
|
|
total = await self.db.site_inspections.count_documents({})
|
|
|
|
cursor = self.db.site_inspections.find(
|
|
{},
|
|
{
|
|
"_id": 0,
|
|
"site_inspection_id": 1,
|
|
"root_url": 1,
|
|
"domain": 1,
|
|
"status": 1,
|
|
"created_at": 1,
|
|
"discovered_pages": 1,
|
|
"aggregate_scores": 1,
|
|
},
|
|
).sort("created_at", -1).skip(skip).limit(limit)
|
|
|
|
items = []
|
|
async for doc in cursor:
|
|
pages = doc.get("discovered_pages", [])
|
|
pages_total = len(pages)
|
|
pages_inspected = sum(
|
|
1 for p in pages if p.get("status") == "completed"
|
|
)
|
|
agg = doc.get("aggregate_scores")
|
|
|
|
items.append({
|
|
"site_inspection_id": doc.get("site_inspection_id"),
|
|
"root_url": doc.get("root_url"),
|
|
"domain": doc.get("domain"),
|
|
"status": doc.get("status"),
|
|
"created_at": doc.get("created_at"),
|
|
"pages_total": pages_total,
|
|
"pages_inspected": pages_inspected,
|
|
"overall_score": agg.get("overall_score") if agg else None,
|
|
"grade": agg.get("grade") if agg else None,
|
|
})
|
|
|
|
total_pages = max(1, -(-total // limit))
|
|
|
|
return {
|
|
"items": items,
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit,
|
|
"total_pages": total_pages,
|
|
}
|
|
|
|
async def inspect_single_page(
|
|
self,
|
|
site_inspection_id: str,
|
|
page_url: str,
|
|
) -> Optional[str]:
|
|
"""
|
|
Trigger inspection for a single page within a site inspection.
|
|
Returns the inspection_id if successful, None if site inspection not found.
|
|
"""
|
|
doc = await self.db.site_inspections.find_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
)
|
|
if not doc:
|
|
return None
|
|
|
|
# Find the page in discovered_pages
|
|
page_found = False
|
|
for page in doc.get("discovered_pages", []):
|
|
if page["url"] == page_url:
|
|
page_found = True
|
|
break
|
|
|
|
if not page_found:
|
|
return None
|
|
|
|
# Run inspection inline
|
|
inspection_id = str(uuid.uuid4())
|
|
try:
|
|
inspection_id, result = await self.inspection_service.run_inspection_inline(
|
|
url=page_url,
|
|
inspection_id=inspection_id,
|
|
)
|
|
|
|
# Update page status in site inspection
|
|
overall_score = result.get("overall_score", 0)
|
|
grade = result.get("grade", "F")
|
|
|
|
await self.db.site_inspections.update_one(
|
|
{
|
|
"site_inspection_id": site_inspection_id,
|
|
"discovered_pages.url": page_url,
|
|
},
|
|
{
|
|
"$set": {
|
|
"discovered_pages.$.inspection_id": inspection_id,
|
|
"discovered_pages.$.status": "completed",
|
|
"discovered_pages.$.overall_score": overall_score,
|
|
"discovered_pages.$.grade": grade,
|
|
}
|
|
},
|
|
)
|
|
|
|
# Recompute aggregates
|
|
await self._compute_and_store_aggregates(site_inspection_id)
|
|
|
|
# Invalidate cache
|
|
cache_key = f"site-inspection:result:{site_inspection_id}"
|
|
await self.redis.delete(cache_key)
|
|
|
|
return inspection_id
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to inspect page %s in site %s: %s",
|
|
page_url, site_inspection_id, str(e),
|
|
)
|
|
await self.db.site_inspections.update_one(
|
|
{
|
|
"site_inspection_id": site_inspection_id,
|
|
"discovered_pages.url": page_url,
|
|
},
|
|
{
|
|
"$set": {
|
|
"discovered_pages.$.status": "error",
|
|
}
|
|
},
|
|
)
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Background task: Crawl + Inspect
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _crawl_and_inspect(
|
|
self,
|
|
site_inspection_id: str,
|
|
url: str,
|
|
max_pages: int,
|
|
max_depth: int,
|
|
concurrency: int = 4,
|
|
accessibility_standard: str = "wcag_2.1_aa",
|
|
) -> None:
|
|
"""
|
|
Background task that runs in two phases:
|
|
Phase 1: BFS crawling to discover pages
|
|
Phase 2: Parallel inspection of discovered pages (with semaphore)
|
|
"""
|
|
try:
|
|
# ==============================
|
|
# Phase 1: Crawling
|
|
# ==============================
|
|
logger.info("Phase 1 (crawling) started: %s", site_inspection_id)
|
|
|
|
async def crawl_progress(pages_found: int, current_url: str):
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "crawl_progress",
|
|
"site_inspection_id": site_inspection_id,
|
|
"pages_found": pages_found,
|
|
"current_url": current_url,
|
|
})
|
|
|
|
crawler = LinkCrawler(
|
|
root_url=url,
|
|
max_pages=max_pages,
|
|
max_depth=max_depth,
|
|
)
|
|
discovered = await crawler.crawl(progress_callback=crawl_progress)
|
|
|
|
if not discovered:
|
|
raise ValueError("크롤링 결과가 없습니다. URL을 확인해주세요.")
|
|
|
|
# Build discovered_pages documents (중복 URL 제거)
|
|
discovered_pages = []
|
|
seen_urls: set[str] = set()
|
|
for page in discovered:
|
|
if page["url"] in seen_urls:
|
|
continue
|
|
seen_urls.add(page["url"])
|
|
discovered_pages.append({
|
|
"url": page["url"],
|
|
"depth": page["depth"],
|
|
"parent_url": page["parent_url"],
|
|
"inspection_id": None,
|
|
"status": "pending",
|
|
"title": page.get("title"),
|
|
"overall_score": None,
|
|
"grade": None,
|
|
})
|
|
|
|
# Store discovered pages in MongoDB
|
|
await self.db.site_inspections.update_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
{
|
|
"$set": {
|
|
"status": "inspecting",
|
|
"discovered_pages": discovered_pages,
|
|
}
|
|
},
|
|
)
|
|
|
|
# Publish crawl_complete event
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "crawl_complete",
|
|
"site_inspection_id": site_inspection_id,
|
|
"total_pages": len(discovered_pages),
|
|
"pages": [
|
|
{
|
|
"url": p["url"],
|
|
"depth": p["depth"],
|
|
"parent_url": p["parent_url"],
|
|
"title": p.get("title"),
|
|
}
|
|
for p in discovered_pages
|
|
],
|
|
})
|
|
|
|
logger.info(
|
|
"Phase 1 completed: %s, pages=%d",
|
|
site_inspection_id, len(discovered_pages),
|
|
)
|
|
|
|
# ==============================
|
|
# Phase 2: Page-by-page inspection
|
|
# ==============================
|
|
logger.info("Phase 2 (inspection) started: %s", site_inspection_id)
|
|
|
|
semaphore = asyncio.Semaphore(concurrency)
|
|
|
|
tasks = [
|
|
self._inspect_page_with_semaphore(
|
|
semaphore=semaphore,
|
|
site_inspection_id=site_inspection_id,
|
|
page_url=page["url"],
|
|
page_index=idx,
|
|
total_pages=len(discovered_pages),
|
|
accessibility_standard=accessibility_standard,
|
|
)
|
|
for idx, page in enumerate(discovered_pages)
|
|
]
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# ==============================
|
|
# Finalize: Compute aggregates
|
|
# ==============================
|
|
aggregate_scores = await self._compute_and_store_aggregates(site_inspection_id)
|
|
|
|
# Mark as completed
|
|
await self.db.site_inspections.update_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
{
|
|
"$set": {
|
|
"status": "completed",
|
|
"completed_at": datetime.now(timezone.utc),
|
|
}
|
|
},
|
|
)
|
|
|
|
# Publish complete event
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "complete",
|
|
"site_inspection_id": site_inspection_id,
|
|
"status": "completed",
|
|
"aggregate_scores": aggregate_scores,
|
|
})
|
|
|
|
logger.info("Site inspection completed: %s", site_inspection_id)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Site inspection %s failed: %s",
|
|
site_inspection_id, str(e), exc_info=True,
|
|
)
|
|
|
|
await self.db.site_inspections.update_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
{
|
|
"$set": {
|
|
"status": "error",
|
|
"completed_at": datetime.now(timezone.utc),
|
|
}
|
|
},
|
|
)
|
|
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "error",
|
|
"site_inspection_id": site_inspection_id,
|
|
"status": "error",
|
|
"message": f"사이트 검사 중 오류가 발생했습니다: {str(e)[:200]}",
|
|
})
|
|
|
|
async def _inspect_page_with_semaphore(
|
|
self,
|
|
semaphore: asyncio.Semaphore,
|
|
site_inspection_id: str,
|
|
page_url: str,
|
|
page_index: int,
|
|
total_pages: int,
|
|
accessibility_standard: str = "wcag_2.1_aa",
|
|
) -> None:
|
|
"""Inspect a single page with semaphore-controlled concurrency."""
|
|
async with semaphore:
|
|
await self._inspect_single_page(
|
|
site_inspection_id=site_inspection_id,
|
|
page_url=page_url,
|
|
page_index=page_index,
|
|
total_pages=total_pages,
|
|
accessibility_standard=accessibility_standard,
|
|
)
|
|
|
|
async def _inspect_single_page(
|
|
self,
|
|
site_inspection_id: str,
|
|
page_url: str,
|
|
page_index: int,
|
|
total_pages: int,
|
|
accessibility_standard: str = "wcag_2.1_aa",
|
|
) -> None:
|
|
"""Run inspection for a single discovered page."""
|
|
inspection_id = str(uuid.uuid4())
|
|
|
|
# Mark page as inspecting in MongoDB
|
|
await self.db.site_inspections.update_one(
|
|
{
|
|
"site_inspection_id": site_inspection_id,
|
|
"discovered_pages.url": page_url,
|
|
},
|
|
{
|
|
"$set": {
|
|
"discovered_pages.$.status": "inspecting",
|
|
"discovered_pages.$.inspection_id": inspection_id,
|
|
}
|
|
},
|
|
)
|
|
|
|
try:
|
|
# Progress callback for per-page SSE updates
|
|
async def page_progress_callback(category: str, progress: int, current_step: str):
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "page_progress",
|
|
"site_inspection_id": site_inspection_id,
|
|
"page_url": page_url,
|
|
"page_index": page_index,
|
|
"category": category,
|
|
"progress": progress,
|
|
"current_step": current_step,
|
|
})
|
|
|
|
# Run the inspection
|
|
_, result = await self.inspection_service.run_inspection_inline(
|
|
url=page_url,
|
|
inspection_id=inspection_id,
|
|
progress_callback=page_progress_callback,
|
|
accessibility_standard=accessibility_standard,
|
|
)
|
|
|
|
overall_score = result.get("overall_score", 0)
|
|
grade = result.get("grade", "F")
|
|
|
|
# Update page status in MongoDB
|
|
await self.db.site_inspections.update_one(
|
|
{
|
|
"site_inspection_id": site_inspection_id,
|
|
"discovered_pages.url": page_url,
|
|
},
|
|
{
|
|
"$set": {
|
|
"discovered_pages.$.status": "completed",
|
|
"discovered_pages.$.overall_score": overall_score,
|
|
"discovered_pages.$.grade": grade,
|
|
}
|
|
},
|
|
)
|
|
|
|
# Publish page_complete event
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "page_complete",
|
|
"site_inspection_id": site_inspection_id,
|
|
"page_url": page_url,
|
|
"page_index": page_index,
|
|
"inspection_id": inspection_id,
|
|
"overall_score": overall_score,
|
|
"grade": grade,
|
|
})
|
|
|
|
# Compute and publish aggregate update
|
|
aggregate_scores = await self._compute_and_store_aggregates(site_inspection_id)
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "aggregate_update",
|
|
"site_inspection_id": site_inspection_id,
|
|
"pages_inspected": aggregate_scores.get("pages_inspected", 0),
|
|
"pages_total": aggregate_scores.get("pages_total", total_pages),
|
|
"overall_score": aggregate_scores.get("overall_score", 0),
|
|
"grade": aggregate_scores.get("grade", "F"),
|
|
})
|
|
|
|
logger.info(
|
|
"Page inspection completed: site=%s, page=%s, score=%d",
|
|
site_inspection_id, page_url, overall_score,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Page inspection failed: site=%s, page=%s, error=%s",
|
|
site_inspection_id, page_url, str(e),
|
|
)
|
|
|
|
# Mark page as error
|
|
await self.db.site_inspections.update_one(
|
|
{
|
|
"site_inspection_id": site_inspection_id,
|
|
"discovered_pages.url": page_url,
|
|
},
|
|
{
|
|
"$set": {
|
|
"discovered_pages.$.status": "error",
|
|
}
|
|
},
|
|
)
|
|
|
|
# Publish page error (non-fatal, continue with other pages)
|
|
await self._publish_site_event(site_inspection_id, {
|
|
"event_type": "page_complete",
|
|
"site_inspection_id": site_inspection_id,
|
|
"page_url": page_url,
|
|
"page_index": page_index,
|
|
"inspection_id": None,
|
|
"overall_score": 0,
|
|
"grade": "F",
|
|
"error": str(e)[:200],
|
|
})
|
|
|
|
# ------------------------------------------------------------------
|
|
# Aggregate computation
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _compute_and_store_aggregates(self, site_inspection_id: str) -> dict:
|
|
"""
|
|
Compute aggregate scores from all completed page inspections.
|
|
|
|
Fetches each completed page's full inspection result from the
|
|
inspections collection, averages category scores, and stores
|
|
the aggregate in the site_inspections document.
|
|
|
|
Returns the aggregate_scores dict.
|
|
"""
|
|
doc = await self.db.site_inspections.find_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
)
|
|
if not doc:
|
|
return {}
|
|
|
|
pages = doc.get("discovered_pages", [])
|
|
total_pages = len(pages)
|
|
|
|
# Collect inspection IDs for completed pages
|
|
completed_ids = [
|
|
p["inspection_id"]
|
|
for p in pages
|
|
if p.get("status") == "completed" and p.get("inspection_id")
|
|
]
|
|
|
|
if not completed_ids:
|
|
aggregate = {
|
|
"overall_score": 0,
|
|
"grade": "F",
|
|
"html_css": 0,
|
|
"accessibility": 0,
|
|
"seo": 0,
|
|
"performance_security": 0,
|
|
"total_issues": 0,
|
|
"pages_inspected": 0,
|
|
"pages_total": total_pages,
|
|
}
|
|
await self._store_aggregates(site_inspection_id, aggregate)
|
|
return aggregate
|
|
|
|
# Fetch all completed inspection results
|
|
cursor = self.db.inspections.find(
|
|
{"inspection_id": {"$in": completed_ids}},
|
|
{
|
|
"_id": 0,
|
|
"overall_score": 1,
|
|
"categories.html_css.score": 1,
|
|
"categories.accessibility.score": 1,
|
|
"categories.seo.score": 1,
|
|
"categories.performance_security.score": 1,
|
|
"summary.total_issues": 1,
|
|
},
|
|
)
|
|
|
|
scores_overall = []
|
|
scores_html_css = []
|
|
scores_accessibility = []
|
|
scores_seo = []
|
|
scores_perf = []
|
|
total_issues = 0
|
|
|
|
async for insp in cursor:
|
|
scores_overall.append(insp.get("overall_score", 0))
|
|
|
|
cats = insp.get("categories", {})
|
|
scores_html_css.append(cats.get("html_css", {}).get("score", 0))
|
|
scores_accessibility.append(cats.get("accessibility", {}).get("score", 0))
|
|
scores_seo.append(cats.get("seo", {}).get("score", 0))
|
|
scores_perf.append(cats.get("performance_security", {}).get("score", 0))
|
|
|
|
total_issues += insp.get("summary", {}).get("total_issues", 0)
|
|
|
|
pages_inspected = len(scores_overall)
|
|
|
|
def safe_avg(values: list[int]) -> int:
|
|
return round(sum(values) / len(values)) if values else 0
|
|
|
|
overall_score = safe_avg(scores_overall)
|
|
grade = calculate_grade(overall_score)
|
|
|
|
aggregate = {
|
|
"overall_score": overall_score,
|
|
"grade": grade,
|
|
"html_css": safe_avg(scores_html_css),
|
|
"accessibility": safe_avg(scores_accessibility),
|
|
"seo": safe_avg(scores_seo),
|
|
"performance_security": safe_avg(scores_perf),
|
|
"total_issues": total_issues,
|
|
"pages_inspected": pages_inspected,
|
|
"pages_total": total_pages,
|
|
}
|
|
|
|
await self._store_aggregates(site_inspection_id, aggregate)
|
|
return aggregate
|
|
|
|
async def _store_aggregates(self, site_inspection_id: str, aggregate: dict) -> None:
|
|
"""Store aggregate scores in MongoDB."""
|
|
await self.db.site_inspections.update_one(
|
|
{"site_inspection_id": site_inspection_id},
|
|
{"$set": {"aggregate_scores": aggregate}},
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# SSE event publishing
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _publish_site_event(self, site_inspection_id: str, event_data: dict) -> None:
|
|
"""Publish an SSE event for site inspection via Redis Pub/Sub."""
|
|
channel = f"site-inspection:{site_inspection_id}:events"
|
|
await self.redis.publish(
|
|
channel,
|
|
json.dumps(event_data, ensure_ascii=False, default=str),
|
|
)
|