Files
web-inspector/backend/app/services/link_crawler.py
jungwoo choi 645ec56bd1 fix: 크롤링 중복 URL 제거 + URL 정규화 강화
- normalize_url: www. prefix 제거, UTM 등 트래킹 파라미터 제거
- site inspection: 크롤링 후 검사 전 중복 URL 필터링
- batch inspection: 업로드 URL 목록 중복 제거

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 07:26:00 +09:00

312 lines
10 KiB
Python

"""
BFS link crawler for same-domain page discovery.
Crawls a root URL using BFS (Breadth-First Search), extracting same-domain
links up to configurable max_pages and max_depth limits. Used by the
site-wide inspection feature to discover pages before inspection.
"""
import logging
from collections import deque
from typing import Callable, Awaitable, Optional
from urllib.parse import urljoin, urlparse, urlunparse
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Schemes to skip when extracting links
_SKIP_SCHEMES = {"javascript", "mailto", "tel", "data", "blob", "ftp"}
# File extensions that are not HTML pages
_SKIP_EXTENSIONS = {
".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".webp", ".ico",
".css", ".js", ".json", ".xml", ".zip", ".tar", ".gz", ".mp4",
".mp3", ".wav", ".avi", ".mov", ".woff", ".woff2", ".ttf", ".eot",
}
# Type alias for progress callback: (pages_found, current_url) -> None
ProgressCallback = Callable[[int, str], Awaitable[None]]
# Safety limit for "unlimited" mode to prevent runaway crawls
_UNLIMITED_SAFETY_CAP = 500
def normalize_url(url: str) -> str:
"""
Normalize a URL for deduplication:
- Remove fragment (#...)
- Remove trailing slash (except for root path)
- Lowercase scheme and netloc
- Strip www. prefix for consistent deduplication
- Remove common tracking query parameters
"""
parsed = urlparse(url)
# Remove fragment
normalized = parsed._replace(fragment="")
# Lowercase scheme and netloc, strip www.
netloc = normalized.netloc.lower()
if netloc.startswith("www."):
netloc = netloc[4:]
normalized = normalized._replace(
scheme=normalized.scheme.lower(),
netloc=netloc,
)
# Remove trailing slash (but keep "/" for root path)
path = normalized.path
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
normalized = normalized._replace(path=path)
# Remove common tracking query parameters
if normalized.query:
from urllib.parse import parse_qs, urlencode
_TRACKING_PARAMS = {
"utm_source", "utm_medium", "utm_campaign", "utm_term",
"utm_content", "ref", "fbclid", "gclid", "mc_cid", "mc_eid",
}
params = parse_qs(normalized.query, keep_blank_values=True)
filtered = {k: v for k, v in params.items() if k.lower() not in _TRACKING_PARAMS}
normalized = normalized._replace(query=urlencode(filtered, doseq=True))
return urlunparse(normalized)
def is_same_domain(url: str, root_domain: str) -> bool:
"""Check if a URL belongs to the same domain as the root."""
parsed = urlparse(url)
url_domain = parsed.netloc.lower()
# Handle www prefix: treat example.com and www.example.com as same domain
root_clean = root_domain.lower().removeprefix("www.")
url_clean = url_domain.removeprefix("www.")
return root_clean == url_clean
def should_skip_url(href: str) -> bool:
"""Check if a URL should be skipped based on scheme or extension."""
if not href or href.strip() == "":
return True
# Skip anchors-only links
if href.startswith("#"):
return True
# Skip non-HTTP schemes
parsed = urlparse(href)
if parsed.scheme and parsed.scheme.lower() in _SKIP_SCHEMES:
return True
# Skip non-HTML file extensions
path = parsed.path.lower()
for ext in _SKIP_EXTENSIONS:
if path.endswith(ext):
return True
return False
class LinkCrawler:
"""
BFS link crawler that discovers same-domain pages.
Usage:
crawler = LinkCrawler(
root_url="https://example.com",
max_pages=20,
max_depth=2,
)
pages = await crawler.crawl(progress_callback=callback)
"""
def __init__(
self,
root_url: str,
max_pages: int = 20,
max_depth: int = 2,
):
self.root_url = normalize_url(root_url)
# 0 means unlimited → use safety cap
self.max_pages = max_pages if max_pages > 0 else _UNLIMITED_SAFETY_CAP
self.max_depth = max_depth
parsed = urlparse(self.root_url)
self.root_domain = parsed.netloc.lower()
self.root_scheme = parsed.scheme
async def crawl(
self,
progress_callback: Optional[ProgressCallback] = None,
) -> list[dict]:
"""
BFS crawl starting from root_url.
Returns list of dicts:
[
{
"url": "https://example.com/",
"depth": 0,
"parent_url": None,
"title": "Example Page",
"status": "discovered",
},
...
]
"""
visited: set[str] = set()
results: list[dict] = []
# BFS queue: (url, depth, parent_url)
queue: deque[tuple[str, int, Optional[str]]] = deque()
queue.append((self.root_url, 0, None))
visited.add(self.root_url)
async with httpx.AsyncClient(
follow_redirects=True,
timeout=httpx.Timeout(10.0),
verify=False,
headers={
"User-Agent": "WebInspector/1.0 (Site Crawler)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
},
) as client:
while queue and len(results) < self.max_pages:
url, depth, parent_url = queue.popleft()
# Fetch the page
title = None
status_code = None
links: list[str] = []
try:
response = await client.get(url)
status_code = response.status_code
# Only parse HTML content
content_type = response.headers.get("content-type", "")
if "text/html" not in content_type and "application/xhtml" not in content_type:
logger.debug("Skipping non-HTML content: %s (%s)", url, content_type)
# Still record it but don't extract links
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
html = response.text
title, links = self._extract_links_and_title(url, html)
except httpx.TimeoutException:
logger.warning("Timeout crawling %s", url)
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
except httpx.RequestError as e:
logger.warning("Request error crawling %s: %s", url, str(e))
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": None,
"status": "discovered",
})
if progress_callback:
await progress_callback(len(results), url)
continue
# Record this page
results.append({
"url": url,
"depth": depth,
"parent_url": parent_url,
"title": title,
"status": "discovered",
})
# Notify progress
if progress_callback:
await progress_callback(len(results), url)
# Only enqueue child links if we haven't reached max_depth
if depth < self.max_depth:
for link in links:
normalized = normalize_url(link)
if normalized in visited:
continue
if not is_same_domain(normalized, self.root_domain):
continue
if len(visited) >= self.max_pages:
break
visited.add(normalized)
queue.append((normalized, depth + 1, url))
logger.info(
"Crawl completed: root=%s, pages_found=%d, max_pages=%d, max_depth=%d",
self.root_url, len(results), self.max_pages, self.max_depth,
)
return results
def _extract_links_and_title(
self, base_url: str, html: str
) -> tuple[Optional[str], list[str]]:
"""
Extract page title and same-domain links from HTML.
Returns:
(title, list_of_absolute_urls)
"""
soup = BeautifulSoup(html, "html.parser")
# Extract title
title = None
title_tag = soup.find("title")
if title_tag and title_tag.string:
title = title_tag.string.strip()
# Truncate very long titles
if len(title) > 200:
title = title[:200] + "..."
# Extract links
links: list[str] = []
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"].strip()
if should_skip_url(href):
continue
# Resolve relative URLs
absolute_url = urljoin(base_url, href)
# Verify it's HTTP(S)
parsed = urlparse(absolute_url)
if parsed.scheme not in ("http", "https"):
continue
links.append(absolute_url)
return title, links