- normalize_url: www. prefix 제거, UTM 등 트래킹 파라미터 제거 - site inspection: 크롤링 후 검사 전 중복 URL 필터링 - batch inspection: 업로드 URL 목록 중복 제거 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
312 lines
10 KiB
Python
312 lines
10 KiB
Python
"""
|
|
BFS link crawler for same-domain page discovery.
|
|
|
|
Crawls a root URL using BFS (Breadth-First Search), extracting same-domain
|
|
links up to configurable max_pages and max_depth limits. Used by the
|
|
site-wide inspection feature to discover pages before inspection.
|
|
"""
|
|
|
|
import logging
|
|
from collections import deque
|
|
from typing import Callable, Awaitable, Optional
|
|
from urllib.parse import urljoin, urlparse, urlunparse
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Schemes to skip when extracting links
|
|
_SKIP_SCHEMES = {"javascript", "mailto", "tel", "data", "blob", "ftp"}
|
|
|
|
# File extensions that are not HTML pages
|
|
_SKIP_EXTENSIONS = {
|
|
".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".webp", ".ico",
|
|
".css", ".js", ".json", ".xml", ".zip", ".tar", ".gz", ".mp4",
|
|
".mp3", ".wav", ".avi", ".mov", ".woff", ".woff2", ".ttf", ".eot",
|
|
}
|
|
|
|
# Type alias for progress callback: (pages_found, current_url) -> None
|
|
ProgressCallback = Callable[[int, str], Awaitable[None]]
|
|
|
|
# Safety limit for "unlimited" mode to prevent runaway crawls
|
|
_UNLIMITED_SAFETY_CAP = 500
|
|
|
|
|
|
def normalize_url(url: str) -> str:
|
|
"""
|
|
Normalize a URL for deduplication:
|
|
- Remove fragment (#...)
|
|
- Remove trailing slash (except for root path)
|
|
- Lowercase scheme and netloc
|
|
- Strip www. prefix for consistent deduplication
|
|
- Remove common tracking query parameters
|
|
"""
|
|
parsed = urlparse(url)
|
|
|
|
# Remove fragment
|
|
normalized = parsed._replace(fragment="")
|
|
|
|
# Lowercase scheme and netloc, strip www.
|
|
netloc = normalized.netloc.lower()
|
|
if netloc.startswith("www."):
|
|
netloc = netloc[4:]
|
|
normalized = normalized._replace(
|
|
scheme=normalized.scheme.lower(),
|
|
netloc=netloc,
|
|
)
|
|
|
|
# Remove trailing slash (but keep "/" for root path)
|
|
path = normalized.path
|
|
if path != "/" and path.endswith("/"):
|
|
path = path.rstrip("/")
|
|
normalized = normalized._replace(path=path)
|
|
|
|
# Remove common tracking query parameters
|
|
if normalized.query:
|
|
from urllib.parse import parse_qs, urlencode
|
|
_TRACKING_PARAMS = {
|
|
"utm_source", "utm_medium", "utm_campaign", "utm_term",
|
|
"utm_content", "ref", "fbclid", "gclid", "mc_cid", "mc_eid",
|
|
}
|
|
params = parse_qs(normalized.query, keep_blank_values=True)
|
|
filtered = {k: v for k, v in params.items() if k.lower() not in _TRACKING_PARAMS}
|
|
normalized = normalized._replace(query=urlencode(filtered, doseq=True))
|
|
|
|
return urlunparse(normalized)
|
|
|
|
|
|
def is_same_domain(url: str, root_domain: str) -> bool:
|
|
"""Check if a URL belongs to the same domain as the root."""
|
|
parsed = urlparse(url)
|
|
url_domain = parsed.netloc.lower()
|
|
|
|
# Handle www prefix: treat example.com and www.example.com as same domain
|
|
root_clean = root_domain.lower().removeprefix("www.")
|
|
url_clean = url_domain.removeprefix("www.")
|
|
|
|
return root_clean == url_clean
|
|
|
|
|
|
def should_skip_url(href: str) -> bool:
|
|
"""Check if a URL should be skipped based on scheme or extension."""
|
|
if not href or href.strip() == "":
|
|
return True
|
|
|
|
# Skip anchors-only links
|
|
if href.startswith("#"):
|
|
return True
|
|
|
|
# Skip non-HTTP schemes
|
|
parsed = urlparse(href)
|
|
if parsed.scheme and parsed.scheme.lower() in _SKIP_SCHEMES:
|
|
return True
|
|
|
|
# Skip non-HTML file extensions
|
|
path = parsed.path.lower()
|
|
for ext in _SKIP_EXTENSIONS:
|
|
if path.endswith(ext):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class LinkCrawler:
|
|
"""
|
|
BFS link crawler that discovers same-domain pages.
|
|
|
|
Usage:
|
|
crawler = LinkCrawler(
|
|
root_url="https://example.com",
|
|
max_pages=20,
|
|
max_depth=2,
|
|
)
|
|
pages = await crawler.crawl(progress_callback=callback)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
root_url: str,
|
|
max_pages: int = 20,
|
|
max_depth: int = 2,
|
|
):
|
|
self.root_url = normalize_url(root_url)
|
|
# 0 means unlimited → use safety cap
|
|
self.max_pages = max_pages if max_pages > 0 else _UNLIMITED_SAFETY_CAP
|
|
self.max_depth = max_depth
|
|
|
|
parsed = urlparse(self.root_url)
|
|
self.root_domain = parsed.netloc.lower()
|
|
self.root_scheme = parsed.scheme
|
|
|
|
async def crawl(
|
|
self,
|
|
progress_callback: Optional[ProgressCallback] = None,
|
|
) -> list[dict]:
|
|
"""
|
|
BFS crawl starting from root_url.
|
|
|
|
Returns list of dicts:
|
|
[
|
|
{
|
|
"url": "https://example.com/",
|
|
"depth": 0,
|
|
"parent_url": None,
|
|
"title": "Example Page",
|
|
"status": "discovered",
|
|
},
|
|
...
|
|
]
|
|
"""
|
|
visited: set[str] = set()
|
|
results: list[dict] = []
|
|
|
|
# BFS queue: (url, depth, parent_url)
|
|
queue: deque[tuple[str, int, Optional[str]]] = deque()
|
|
queue.append((self.root_url, 0, None))
|
|
visited.add(self.root_url)
|
|
|
|
async with httpx.AsyncClient(
|
|
follow_redirects=True,
|
|
timeout=httpx.Timeout(10.0),
|
|
verify=False,
|
|
headers={
|
|
"User-Agent": "WebInspector/1.0 (Site Crawler)",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
},
|
|
) as client:
|
|
while queue and len(results) < self.max_pages:
|
|
url, depth, parent_url = queue.popleft()
|
|
|
|
# Fetch the page
|
|
title = None
|
|
status_code = None
|
|
links: list[str] = []
|
|
|
|
try:
|
|
response = await client.get(url)
|
|
status_code = response.status_code
|
|
|
|
# Only parse HTML content
|
|
content_type = response.headers.get("content-type", "")
|
|
if "text/html" not in content_type and "application/xhtml" not in content_type:
|
|
logger.debug("Skipping non-HTML content: %s (%s)", url, content_type)
|
|
# Still record it but don't extract links
|
|
results.append({
|
|
"url": url,
|
|
"depth": depth,
|
|
"parent_url": parent_url,
|
|
"title": None,
|
|
"status": "discovered",
|
|
})
|
|
if progress_callback:
|
|
await progress_callback(len(results), url)
|
|
continue
|
|
|
|
html = response.text
|
|
title, links = self._extract_links_and_title(url, html)
|
|
|
|
except httpx.TimeoutException:
|
|
logger.warning("Timeout crawling %s", url)
|
|
results.append({
|
|
"url": url,
|
|
"depth": depth,
|
|
"parent_url": parent_url,
|
|
"title": None,
|
|
"status": "discovered",
|
|
})
|
|
if progress_callback:
|
|
await progress_callback(len(results), url)
|
|
continue
|
|
|
|
except httpx.RequestError as e:
|
|
logger.warning("Request error crawling %s: %s", url, str(e))
|
|
results.append({
|
|
"url": url,
|
|
"depth": depth,
|
|
"parent_url": parent_url,
|
|
"title": None,
|
|
"status": "discovered",
|
|
})
|
|
if progress_callback:
|
|
await progress_callback(len(results), url)
|
|
continue
|
|
|
|
# Record this page
|
|
results.append({
|
|
"url": url,
|
|
"depth": depth,
|
|
"parent_url": parent_url,
|
|
"title": title,
|
|
"status": "discovered",
|
|
})
|
|
|
|
# Notify progress
|
|
if progress_callback:
|
|
await progress_callback(len(results), url)
|
|
|
|
# Only enqueue child links if we haven't reached max_depth
|
|
if depth < self.max_depth:
|
|
for link in links:
|
|
normalized = normalize_url(link)
|
|
|
|
if normalized in visited:
|
|
continue
|
|
|
|
if not is_same_domain(normalized, self.root_domain):
|
|
continue
|
|
|
|
if len(visited) >= self.max_pages:
|
|
break
|
|
|
|
visited.add(normalized)
|
|
queue.append((normalized, depth + 1, url))
|
|
|
|
logger.info(
|
|
"Crawl completed: root=%s, pages_found=%d, max_pages=%d, max_depth=%d",
|
|
self.root_url, len(results), self.max_pages, self.max_depth,
|
|
)
|
|
|
|
return results
|
|
|
|
def _extract_links_and_title(
|
|
self, base_url: str, html: str
|
|
) -> tuple[Optional[str], list[str]]:
|
|
"""
|
|
Extract page title and same-domain links from HTML.
|
|
|
|
Returns:
|
|
(title, list_of_absolute_urls)
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Extract title
|
|
title = None
|
|
title_tag = soup.find("title")
|
|
if title_tag and title_tag.string:
|
|
title = title_tag.string.strip()
|
|
# Truncate very long titles
|
|
if len(title) > 200:
|
|
title = title[:200] + "..."
|
|
|
|
# Extract links
|
|
links: list[str] = []
|
|
for a_tag in soup.find_all("a", href=True):
|
|
href = a_tag["href"].strip()
|
|
|
|
if should_skip_url(href):
|
|
continue
|
|
|
|
# Resolve relative URLs
|
|
absolute_url = urljoin(base_url, href)
|
|
|
|
# Verify it's HTTP(S)
|
|
parsed = urlparse(absolute_url)
|
|
if parsed.scheme not in ("http", "https"):
|
|
continue
|
|
|
|
links.append(absolute_url)
|
|
|
|
return title, links
|