Files
web-inspector/backend/app/engines/performance_security.py
jungwoo choi 44ad36e2ab refactor: 4개 검사 엔진을 YAML 기반 표준 규칙으로 리팩토링
- YAML 규칙 파일 4개 신규 생성 (html_css, accessibility, seo, performance_security)
  W3C, WCAG 2.0/2.1/2.2, OWASP, Google Search Essentials 공식 표준 기반
- rules/__init__.py: YAML 로더 + 캐싱 + 리로드 모듈
- html_css.py: 30개 폐기 요소, 100+개 폐기 속성을 YAML에서 동적 로드
- accessibility.py: WCAG 버전 선택 지원 (wcag_version 파라미터)
- seo.py: title/description 길이, OG 필수 태그 등 임계값 YAML 로드
- performance_security.py: COOP/COEP/CORP 검사 추가, 정보 노출 헤더 검사 추가,
  TTFB/페이지 크기 임계값 YAML 로드
- PyYAML 의존성 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 15:49:57 +09:00

572 lines
24 KiB
Python

"""
Performance/Security Checker Engine (F-005).
Checks security headers, HTTPS, SSL certificate, response time, page size, etc.
"""
import re
import ssl
import socket
import logging
import time
from datetime import datetime, timezone
from urllib.parse import urlparse
from typing import Any, Optional
import httpx
from bs4 import BeautifulSoup
from app.engines.base import BaseChecker
from app.models.schemas import CategoryResult, Issue, calculate_grade
from app.rules import get_rules
logger = logging.getLogger(__name__)
class PerformanceSecurityChecker(BaseChecker):
"""Performance and security checker engine."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._rules_data = get_rules("performance_security")
def _get_security_headers(self) -> list[dict[str, Any]]:
"""Load required security headers from YAML."""
return self._rules_data.get("security", {}).get("headers", [])
def _get_headers_to_remove(self) -> list[dict[str, Any]]:
"""Load information disclosure headers from YAML."""
return self._rules_data.get("security", {}).get("headers_to_remove", [])
def _get_ttfb_thresholds(self) -> dict[str, int]:
"""Load TTFB thresholds from YAML."""
for metric in self._rules_data.get("performance", {}).get("additional_metrics", []):
if metric.get("id") == "perf-ttfb":
return metric.get("thresholds", {})
return {"good": 800, "needs_improvement": 1800}
def _get_page_size_thresholds(self) -> dict[str, int]:
"""Load total page size thresholds from YAML."""
for check in self._rules_data.get("performance", {}).get("resource_checks", []):
if check.get("id") == "perf-total-page-size":
return check.get("thresholds", {})
return {"good": 1500, "needs_improvement": 3000, "poor": 5000}
@property
def category_name(self) -> str:
return "performance_security"
async def check(self, url: str, html_content: str, headers: dict) -> CategoryResult:
issues: list[Issue] = []
metrics: dict = {}
await self.update_progress(10, "HTTPS 검사 중...")
issues += self._check_https(url, metrics)
await self.update_progress(20, "SSL 인증서 검사 중...")
issues += await self._check_ssl(url, metrics)
await self.update_progress(35, "보안 헤더 검사 중...")
issues += self._check_hsts(headers)
issues += self._check_csp(headers)
issues += self._check_x_content_type(headers)
issues += self._check_x_frame_options(headers)
issues += self._check_x_xss_protection(headers)
issues += self._check_referrer_policy(headers)
issues += self._check_permissions_policy(headers)
issues += self._check_coop(headers)
issues += self._check_coep(headers)
issues += self._check_corp(headers)
await self.update_progress(50, "정보 노출 헤더 검사 중...")
issues += self._check_info_disclosure(headers)
await self.update_progress(60, "응답 시간 측정 중...")
issues += await self._check_ttfb(url, metrics)
await self.update_progress(70, "페이지 크기 분석 중...")
issues += self._check_page_size(html_content, metrics)
await self.update_progress(80, "리다이렉트 검사 중...")
issues += await self._check_redirects(url, metrics)
await self.update_progress(85, "압축 검사 중...")
issues += self._check_compression(headers, metrics)
await self.update_progress(90, "혼합 콘텐츠 검사 중...")
issues += self._check_mixed_content(url, html_content)
score, sub_scores = self._calculate_composite_score(issues, metrics)
await self.update_progress(100, "완료")
return self._build_result(
category="performance_security",
score=score,
issues=issues,
sub_scores=sub_scores,
metrics=metrics,
)
def _check_https(self, url: str, metrics: dict) -> list[Issue]:
"""P-01: Check HTTPS usage."""
parsed = urlparse(url)
is_https = parsed.scheme == "https"
metrics["https"] = is_https
if not is_https:
return [self._create_issue(
code="P-01",
severity="critical",
message="HTTPS를 사용하지 않고 있습니다",
suggestion="사이트 보안을 위해 HTTPS를 적용하세요",
)]
return []
async def _check_ssl(self, url: str, metrics: dict) -> list[Issue]:
"""P-02: Check SSL certificate validity and expiry."""
parsed = urlparse(url)
if parsed.scheme != "https":
metrics["ssl_valid"] = False
metrics["ssl_expiry_days"] = None
return [self._create_issue(
code="P-02",
severity="critical",
message="HTTPS를 사용하지 않아 SSL 인증서를 확인할 수 없습니다",
suggestion="SSL 인증서를 설치하고 HTTPS를 적용하세요",
)]
hostname = parsed.hostname
port = parsed.port or 443
try:
ctx = ssl.create_default_context()
conn = ctx.wrap_socket(
socket.socket(socket.AF_INET),
server_hostname=hostname,
)
conn.settimeout(5)
conn.connect((hostname, port))
cert = conn.getpeercert()
conn.close()
# Check expiry
not_after = cert.get("notAfter")
if not_after:
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
days_remaining = (expiry_date - datetime.now()).days
metrics["ssl_valid"] = True
metrics["ssl_expiry_days"] = days_remaining
if days_remaining < 0:
return [self._create_issue(
code="P-02",
severity="critical",
message="SSL 인증서가 만료되었습니다",
suggestion="SSL 인증서를 즉시 갱신하세요",
)]
elif days_remaining < 30:
return [self._create_issue(
code="P-02",
severity="major",
message=f"SSL 인증서가 {days_remaining}일 후 만료됩니다",
suggestion="인증서 만료 전에 갱신하세요",
)]
else:
metrics["ssl_valid"] = True
metrics["ssl_expiry_days"] = None
except ssl.SSLError as e:
metrics["ssl_valid"] = False
metrics["ssl_expiry_days"] = None
return [self._create_issue(
code="P-02",
severity="critical",
message=f"SSL 인증서가 유효하지 않습니다: {str(e)[:100]}",
suggestion="유효한 SSL 인증서를 설치하세요",
)]
except Exception as e:
logger.warning("SSL check failed for %s: %s", url, str(e))
metrics["ssl_valid"] = None
metrics["ssl_expiry_days"] = None
return [self._create_issue(
code="P-02",
severity="minor",
message="SSL 인증서를 확인할 수 없습니다",
suggestion="서버의 SSL 설정을 점검하세요",
)]
return []
def _get_security_header_rule(self, rule_id: str) -> dict[str, Any]:
"""Find a specific security header rule from YAML."""
for h in self._get_security_headers():
if h.get("id") == rule_id:
return h
return {}
def _check_hsts(self, headers: dict) -> list[Issue]:
"""P-03: Check Strict-Transport-Security header."""
rule = self._get_security_header_rule("sec-strict-transport-security")
recommended = rule.get("details", {}).get("recommended_value", "max-age=31536000; includeSubDomains")
hsts = self._get_header(headers, "Strict-Transport-Security")
if not hsts:
return [self._create_issue(
code="P-03",
severity="major",
message="Strict-Transport-Security(HSTS) 헤더가 설정되지 않았습니다",
suggestion=f"HSTS 헤더를 추가하세요: Strict-Transport-Security: {recommended}",
)]
return []
def _check_csp(self, headers: dict) -> list[Issue]:
"""P-04: Check Content-Security-Policy header."""
csp = self._get_header(headers, "Content-Security-Policy")
if not csp:
return [self._create_issue(
code="P-04",
severity="major",
message="Content-Security-Policy(CSP) 헤더가 설정되지 않았습니다",
suggestion="CSP 헤더를 추가하여 XSS 공격을 방지하세요",
)]
return []
def _check_x_content_type(self, headers: dict) -> list[Issue]:
"""P-05: Check X-Content-Type-Options header."""
xcto = self._get_header(headers, "X-Content-Type-Options")
if not xcto or "nosniff" not in xcto.lower():
return [self._create_issue(
code="P-05",
severity="minor",
message="X-Content-Type-Options 헤더가 설정되지 않았습니다",
suggestion="X-Content-Type-Options: nosniff 헤더를 추가하세요",
)]
return []
def _check_x_frame_options(self, headers: dict) -> list[Issue]:
"""P-06: Check X-Frame-Options header."""
xfo = self._get_header(headers, "X-Frame-Options")
if not xfo:
return [self._create_issue(
code="P-06",
severity="minor",
message="X-Frame-Options 헤더가 설정되지 않았습니다",
suggestion="클릭재킹 방지를 위해 X-Frame-Options: DENY 또는 SAMEORIGIN을 설정하세요",
)]
return []
def _check_x_xss_protection(self, headers: dict) -> list[Issue]:
"""P-07: Check X-XSS-Protection header (deprecated notice)."""
xxp = self._get_header(headers, "X-XSS-Protection")
if xxp:
return [self._create_issue(
code="P-07",
severity="info",
message="X-XSS-Protection 헤더가 설정되어 있습니다 (현재 deprecated)",
suggestion="X-XSS-Protection 대신 Content-Security-Policy를 사용하세요",
)]
return []
def _check_referrer_policy(self, headers: dict) -> list[Issue]:
"""P-08: Check Referrer-Policy header."""
rule = self._get_security_header_rule("sec-referrer-policy")
recommended = rule.get("details", {}).get("recommended_value", "strict-origin-when-cross-origin")
rp = self._get_header(headers, "Referrer-Policy")
if not rp:
return [self._create_issue(
code="P-08",
severity="minor",
message="Referrer-Policy 헤더가 설정되지 않았습니다",
suggestion=f"Referrer-Policy: {recommended}을 설정하세요",
)]
return []
def _check_permissions_policy(self, headers: dict) -> list[Issue]:
"""P-09: Check Permissions-Policy header."""
pp = self._get_header(headers, "Permissions-Policy")
if not pp:
return [self._create_issue(
code="P-09",
severity="minor",
message="Permissions-Policy 헤더가 설정되지 않았습니다",
suggestion="Permissions-Policy 헤더를 추가하여 브라우저 기능 접근을 제한하세요",
)]
return []
def _check_coop(self, headers: dict) -> list[Issue]:
"""P-15: Check Cross-Origin-Opener-Policy header."""
rule = self._get_security_header_rule("sec-cross-origin-opener-policy")
recommended = rule.get("details", {}).get("recommended_value", "same-origin")
coop = self._get_header(headers, "Cross-Origin-Opener-Policy")
if not coop:
return [self._create_issue(
code="P-15",
severity="minor",
message="Cross-Origin-Opener-Policy(COOP) 헤더가 설정되지 않았습니다",
suggestion=f"COOP 헤더를 추가하세요: Cross-Origin-Opener-Policy: {recommended}",
)]
return []
def _check_coep(self, headers: dict) -> list[Issue]:
"""P-16: Check Cross-Origin-Embedder-Policy header."""
rule = self._get_security_header_rule("sec-cross-origin-embedder-policy")
recommended = rule.get("details", {}).get("recommended_value", "require-corp")
coep = self._get_header(headers, "Cross-Origin-Embedder-Policy")
if not coep:
return [self._create_issue(
code="P-16",
severity="minor",
message="Cross-Origin-Embedder-Policy(COEP) 헤더가 설정되지 않았습니다",
suggestion=f"COEP 헤더를 추가하세요: Cross-Origin-Embedder-Policy: {recommended}",
)]
return []
def _check_corp(self, headers: dict) -> list[Issue]:
"""P-17: Check Cross-Origin-Resource-Policy header."""
rule = self._get_security_header_rule("sec-cross-origin-resource-policy")
recommended = rule.get("details", {}).get("recommended_value", "same-site")
corp = self._get_header(headers, "Cross-Origin-Resource-Policy")
if not corp:
return [self._create_issue(
code="P-17",
severity="minor",
message="Cross-Origin-Resource-Policy(CORP) 헤더가 설정되지 않았습니다",
suggestion=f"CORP 헤더를 추가하세요: Cross-Origin-Resource-Policy: {recommended}",
)]
return []
def _check_info_disclosure(self, headers: dict) -> list[Issue]:
"""P-18: Check for information disclosure headers (Server, X-Powered-By)."""
issues = []
for rule in self._get_headers_to_remove():
header_name = rule.get("details", {}).get("header", "")
value = self._get_header(headers, header_name)
if value:
issues.append(self._create_issue(
code="P-18",
severity="info",
message=f"{header_name} 헤더가 서버 정보를 노출하고 있습니다: {value[:80]}",
suggestion=f"{header_name} 헤더를 제거하여 서버 기술 스택 노출을 방지하세요",
))
return issues
async def _check_ttfb(self, url: str, metrics: dict) -> list[Issue]:
"""P-10: Check Time To First Byte (TTFB) using YAML thresholds."""
thresholds = self._get_ttfb_thresholds()
good_ms = thresholds.get("good", 800)
needs_improvement_ms = thresholds.get("needs_improvement", 1800)
try:
start = time.monotonic()
async with httpx.AsyncClient(
timeout=httpx.Timeout(10.0),
follow_redirects=True,
verify=False,
) as client:
resp = await client.get(url, headers={
"User-Agent": "WebInspector/1.0 (Inspection Bot)",
})
ttfb_ms = round((time.monotonic() - start) * 1000)
metrics["ttfb_ms"] = ttfb_ms
if ttfb_ms > needs_improvement_ms:
return [self._create_issue(
code="P-10",
severity="major",
message=f"응답 시간(TTFB)이 느립니다: {ttfb_ms}ms (권장 < {good_ms}ms)",
suggestion="서버 응답 속도를 개선하세요 (캐싱, CDN, 서버 최적화)",
)]
elif ttfb_ms > good_ms:
return [self._create_issue(
code="P-10",
severity="minor",
message=f"응답 시간(TTFB)이 다소 느립니다: {ttfb_ms}ms (권장 < {good_ms}ms)",
suggestion="서버 응답 속도 개선을 고려하세요",
)]
except Exception as e:
logger.warning("TTFB check failed for %s: %s", url, str(e))
metrics["ttfb_ms"] = None
return [self._create_issue(
code="P-10",
severity="major",
message="응답 시간(TTFB)을 측정할 수 없습니다",
suggestion="서버 접근성을 확인하세요",
)]
return []
def _check_page_size(self, html_content: str, metrics: dict) -> list[Issue]:
"""P-11: Check HTML page size using YAML thresholds."""
thresholds = self._get_page_size_thresholds()
poor_kb = thresholds.get("poor", 5000)
poor_bytes = poor_kb * 1024
size_bytes = len(html_content.encode("utf-8"))
metrics["page_size_bytes"] = size_bytes
if size_bytes > poor_bytes:
return [self._create_issue(
code="P-11",
severity="minor",
message=f"페이지 크기가 큽니다: {round(size_bytes / 1024 / 1024, 1)}MB (권장 < {poor_kb // 1024}MB)",
suggestion="페이지 크기를 줄이세요 (불필요한 코드 제거, 이미지 최적화, 코드 분할)",
)]
return []
async def _check_redirects(self, url: str, metrics: dict) -> list[Issue]:
"""P-12: Check redirect chain length."""
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(10.0),
follow_redirects=True,
verify=False,
) as client:
resp = await client.get(url, headers={
"User-Agent": "WebInspector/1.0 (Inspection Bot)",
})
redirect_count = len(resp.history)
metrics["redirect_count"] = redirect_count
if redirect_count >= 3:
return [self._create_issue(
code="P-12",
severity="minor",
message=f"리다이렉트가 {redirect_count}회 발생합니다 (권장 < 3회)",
suggestion="리다이렉트 체인을 줄여 로딩 속도를 개선하세요",
)]
except Exception as e:
logger.warning("Redirect check failed for %s: %s", url, str(e))
metrics["redirect_count"] = None
return []
def _check_compression(self, headers: dict, metrics: dict) -> list[Issue]:
"""P-13: Check response compression (Gzip/Brotli)."""
encoding = self._get_header(headers, "Content-Encoding")
if encoding:
metrics["compression"] = encoding.lower()
return []
metrics["compression"] = None
return [self._create_issue(
code="P-13",
severity="minor",
message="응답 압축(Gzip/Brotli)이 적용되지 않았습니다",
suggestion="서버에서 Gzip 또는 Brotli 압축을 활성화하세요",
)]
def _check_mixed_content(self, url: str, html_content: str) -> list[Issue]:
"""P-14: Check for mixed content (HTTP resources on HTTPS page)."""
parsed = urlparse(url)
if parsed.scheme != "https":
return []
soup = BeautifulSoup(html_content, "html5lib")
mixed_elements = []
# Check src attributes
for tag in soup.find_all(["img", "script", "link", "iframe", "audio", "video", "source"]):
src = tag.get("src") or tag.get("href")
if src and src.startswith("http://"):
mixed_elements.append(tag)
if mixed_elements:
return [self._create_issue(
code="P-14",
severity="major",
message=f"혼합 콘텐츠 발견: HTTPS 페이지에서 HTTP 리소스 {len(mixed_elements)}개 로드",
element=self._truncate_element(str(mixed_elements[0])) if mixed_elements else None,
suggestion="모든 리소스를 HTTPS로 변경하세요",
)]
return []
def _calculate_composite_score(self, issues: list[Issue], metrics: dict) -> tuple[int, dict]:
"""
Calculate composite score:
Security (70%): HTTPS/SSL (30%) + Security Headers (40%)
Performance (30%): Response time (40%) + Page size (30%) + Compression (30%)
"""
# Security score
security_score = 100
# HTTPS/SSL component (30% of security)
https_ssl_score = 100
for issue in issues:
if issue.code in ("P-01", "P-02"):
if issue.severity.value == "critical":
https_ssl_score -= 50
elif issue.severity.value == "major":
https_ssl_score -= 25
https_ssl_score = max(0, https_ssl_score)
# Security headers component (40% of security)
header_codes = {"P-03", "P-04", "P-05", "P-06", "P-07", "P-08", "P-09", "P-15", "P-16", "P-17"}
header_issues = [i for i in issues if i.code in header_codes]
total_header_checks = len(header_codes)
passed_headers = total_header_checks - len(header_issues)
header_score = round(passed_headers / total_header_checks * 100) if total_header_checks else 100
security_score = round(https_ssl_score * 0.43 + header_score * 0.57)
# Performance score
perf_score = 100
# TTFB component (40% of performance)
ttfb_thresholds = self._get_ttfb_thresholds()
ttfb_good = ttfb_thresholds.get("good", 800)
ttfb_ni = ttfb_thresholds.get("needs_improvement", 1800)
ttfb = metrics.get("ttfb_ms")
if ttfb is not None:
if ttfb <= ttfb_good // 2:
ttfb_score = 100
elif ttfb <= ttfb_good:
ttfb_score = 80
elif ttfb <= ttfb_ni:
ttfb_score = 60
else:
ttfb_score = 30
else:
ttfb_score = 50
# Page size component (30% of performance)
size_thresholds = self._get_page_size_thresholds()
good_kb = size_thresholds.get("good", 1500)
ni_kb = size_thresholds.get("needs_improvement", 3000)
poor_kb = size_thresholds.get("poor", 5000)
page_size = metrics.get("page_size_bytes", 0)
if page_size <= good_kb * 1024:
size_score = 100
elif page_size <= ni_kb * 1024:
size_score = 80
elif page_size <= poor_kb * 1024:
size_score = 60
else:
size_score = 30
# Compression component (30% of performance)
compression = metrics.get("compression")
compression_score = 100 if compression else 50
perf_score = round(ttfb_score * 0.4 + size_score * 0.3 + compression_score * 0.3)
# Composite
overall = round(security_score * 0.7 + perf_score * 0.3)
overall = max(0, min(100, overall))
sub_scores = {
"security": security_score,
"performance": perf_score,
}
return overall, sub_scores
@staticmethod
def _get_header(headers: dict, name: str) -> Optional[str]:
"""Case-insensitive header lookup."""
for key, value in headers.items():
if key.lower() == name.lower():
return value
return None
@staticmethod
def _truncate_element(element_str: str, max_len: int = 200) -> str:
if len(element_str) > max_len:
return element_str[:max_len] + "..."
return element_str