Files
web-inspector/backend/app/services/link_crawler.py
jungwoo choi 81b9104aea feat: 사이트 전체 검사 기능 추가
도메인 하위 링크를 BFS로 자동 크롤링하여 페이지별 검사 수행.
- BFS 링크 크롤러 (같은 도메인 필터링, max_pages/max_depth 설정)
- 사이트 검사 오케스트레이션 (크롤링→순차 검사→집계)
- SSE 실시간 진행 상태 (크롤링/검사/완료)
- 페이지 트리 + 집계 결과 UI
- UrlInputForm에 "사이트 전체 검사" 버튼 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 16:46:49 +09:00

292 lines
9.1 KiB
Python

"""
BFS link crawler for same-domain page discovery.
Crawls a root URL using BFS (Breadth-First Search), extracting same-domain
links up to configurable max_pages and max_depth limits. Used by the
site-wide inspection feature to discover pages before inspection.
"""
import logging
from collections import deque
from typing import Callable, Awaitable, Optional
from urllib.parse import urljoin, urlparse, urlunparse
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Schemes to skip when extracting links
_SKIP_SCHEMES = {"javascript", "mailto", "tel", "data", "blob", "ftp"}
# File extensions that are not HTML pages
_SKIP_EXTENSIONS = {
".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".webp", ".ico",
".css", ".js", ".json", ".xml", ".zip", ".tar", ".gz", ".mp4",
".mp3", ".wav", ".avi", ".mov", ".woff", ".woff2", ".ttf", ".eot",
}
# Type alias for progress callback: (pages_found, current_url) -> None
ProgressCallback = Callable[[int, str], Awaitable[None]]
def normalize_url(url: str) -> str:
"""
Normalize a URL for deduplication:
- Remove fragment (#...)
- Remove trailing slash (except for root path)
- Lowercase scheme and netloc
"""
parsed = urlparse(url)
# Remove fragment
normalized = parsed._replace(fragment="")
# Lowercase scheme and netloc
normalized = normalized._replace(
scheme=normalized.scheme.lower(),
netloc=normalized.netloc.lower(),
)
# Remove trailing slash (but keep "/" for root path)
path = normalized.path
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
normalized = normalized._replace(path=path)
return urlunparse(normalized)
def is_same_domain(url: str, root_domain: str) -> bool:
"""Check if a URL belongs to the same domain as the root."""
parsed = urlparse(url)
url_domain = parsed.netloc.lower()
# Handle www prefix: treat example.com and www.example.com as same domain
root_clean = root_domain.lower().removeprefix("www.")
url_clean = url_domain.removeprefix("www.")
return root_clean == url_clean
def should_skip_url(href: str) -> bool:
"""Check if a URL should be skipped based on scheme or extension."""
if not href or href.strip() == "":
return True
# Skip anchors-only links
if href.startswith("#"):
return True
# Skip non-HTTP schemes
parsed = urlparse(href)
if parsed.scheme and parsed.scheme.lower() in _SKIP_SCHEMES:
return True
# Skip non-HTML file extensions
path = parsed.path.lower()
for ext in _SKIP_EXTENSIONS:
if path.endswith(ext):
return True
return False
class LinkCrawler:
"""
BFS link crawler that discovers same-domain pages.
Usage:
crawler = LinkCrawler(
root_url="https://example.com",
max_pages=20,
max_depth=2,
)
pages = await crawler.crawl(progress_callback=callback)
"""
def __init__(
self,
root_url: str,
max_pages: int = 20,
max_depth: int = 2,
):
self.root_url = normalize_url(root_url)
self.max_pages = max_pages
self.max_depth = max_depth
parsed = urlparse(self.root_url)
self.root_domain = parsed.netloc.lower()
self.root_scheme = parsed.scheme
async def crawl(
self,
progress_callback: Optional[ProgressCallback] = None,
) -> list[dict]:
"""
BFS crawl starting from root_url.
Returns list of dicts:
[
{
"url": "https://example.com/",
"depth": 0,
"parent_url": None,
"title": "Example Page",
"status": "discovered",
},
...
]
"""
visited: set[str] = set()
results: list[dict] = []
# BFS queue: (url, depth, parent_url)
queue: deque[tuple[str, int, Optional[str]]] = deque()
queue.append((self.root_url, 0, None))
visited.add(self.root_url)
async with httpx.AsyncClient(
follow_redirects=True,
timeout=httpx.Timeout(10.0),
verify=False,
headers={
"User-Agent": "WebInspector/1.0 (Site Crawler)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
},
) as client:
while queue and len(results) < self.max_pages:
url, depth, parent_url = queue.popleft()
# Fetch the page
title = None
status_code = None
links: list[str] = []
try:
response = await client.get(url)
status_code = response.status_code
# Only parse HTML content
content_type = response.headers.get("content-type", "")
if "text/html" not in content_type and "application/xhtml" not in content_type:
logger.debug("Skipping non-HTML content: %s (%s)", url, content_type)
# Still record it but don't extract links
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
html = response.text
title, links = self._extract_links_and_title(url, html)
except httpx.TimeoutException:
logger.warning("Timeout crawling %s", url)
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
except httpx.RequestError as e:
logger.warning("Request error crawling %s: %s", url, str(e))
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
# Record this page
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": title,
"status": "discovered",
})
# Notify progress
if progress_callback:
await progress_callback(len(results), url)
# Only enqueue child links if we haven't reached max_depth
if depth < self.max_depth:
for link in links:
normalized = normalize_url(link)
if normalized in visited:
continue
if not is_same_domain(normalized, self.root_domain):
continue
if len(visited) >= self.max_pages:
break
visited.add(normalized)
queue.append((normalized, depth + 1, url))
logger.info(
"Crawl completed: root=%s, pages_found=%d, max_pages=%d, max_depth=%d",
self.root_url, len(results), self.max_pages, self.max_depth,
)
return results
def _extract_links_and_title(
self, base_url: str, html: str
) -> tuple[Optional[str], list[str]]:
"""
Extract page title and same-domain links from HTML.
Returns:
(title, list_of_absolute_urls)
"""
soup = BeautifulSoup(html, "html.parser")
# Extract title
title = None
title_tag = soup.find("title")
if title_tag and title_tag.string:
title = title_tag.string.strip()
# Truncate very long titles
if len(title) > 200:
title = title[:200] + "..."
# Extract links
links: list[str] = []
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"].strip()
if should_skip_url(href):
continue
# Resolve relative URLs
absolute_url = urljoin(base_url, href)
# Verify it's HTTP(S)
parsed = urlparse(absolute_url)
if parsed.scheme not in ("http", "https"):
continue
links.append(absolute_url)
return title, links