""" Performance/Security Checker Engine (F-005). Checks security headers, HTTPS, SSL certificate, response time, page size, etc. """ import re import ssl import socket import logging import time from datetime import datetime, timezone from urllib.parse import urlparse from typing import Optional import httpx from bs4 import BeautifulSoup from app.engines.base import BaseChecker from app.models.schemas import CategoryResult, Issue, calculate_grade logger = logging.getLogger(__name__) class PerformanceSecurityChecker(BaseChecker): """Performance and security checker engine.""" @property def category_name(self) -> str: return "performance_security" async def check(self, url: str, html_content: str, headers: dict) -> CategoryResult: issues: list[Issue] = [] metrics: dict = {} await self.update_progress(10, "HTTPS 검사 중...") issues += self._check_https(url, metrics) await self.update_progress(20, "SSL 인증서 검사 중...") issues += await self._check_ssl(url, metrics) await self.update_progress(35, "보안 헤더 검사 중...") issues += self._check_hsts(headers) issues += self._check_csp(headers) issues += self._check_x_content_type(headers) issues += self._check_x_frame_options(headers) issues += self._check_x_xss_protection(headers) issues += self._check_referrer_policy(headers) issues += self._check_permissions_policy(headers) await self.update_progress(60, "응답 시간 측정 중...") issues += await self._check_ttfb(url, metrics) await self.update_progress(70, "페이지 크기 분석 중...") issues += self._check_page_size(html_content, metrics) await self.update_progress(80, "리다이렉트 검사 중...") issues += await self._check_redirects(url, metrics) await self.update_progress(85, "압축 검사 중...") issues += self._check_compression(headers, metrics) await self.update_progress(90, "혼합 콘텐츠 검사 중...") issues += self._check_mixed_content(url, html_content) score, sub_scores = self._calculate_composite_score(issues, metrics) await self.update_progress(100, "완료") return self._build_result( category="performance_security", score=score, issues=issues, sub_scores=sub_scores, metrics=metrics, ) def _check_https(self, url: str, metrics: dict) -> list[Issue]: """P-01: Check HTTPS usage.""" parsed = urlparse(url) is_https = parsed.scheme == "https" metrics["https"] = is_https if not is_https: return [self._create_issue( code="P-01", severity="critical", message="HTTPS를 사용하지 않고 있습니다", suggestion="사이트 보안을 위해 HTTPS를 적용하세요", )] return [] async def _check_ssl(self, url: str, metrics: dict) -> list[Issue]: """P-02: Check SSL certificate validity and expiry.""" parsed = urlparse(url) if parsed.scheme != "https": metrics["ssl_valid"] = False metrics["ssl_expiry_days"] = None return [self._create_issue( code="P-02", severity="critical", message="HTTPS를 사용하지 않아 SSL 인증서를 확인할 수 없습니다", suggestion="SSL 인증서를 설치하고 HTTPS를 적용하세요", )] hostname = parsed.hostname port = parsed.port or 443 try: ctx = ssl.create_default_context() conn = ctx.wrap_socket( socket.socket(socket.AF_INET), server_hostname=hostname, ) conn.settimeout(5) conn.connect((hostname, port)) cert = conn.getpeercert() conn.close() # Check expiry not_after = cert.get("notAfter") if not_after: expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z") days_remaining = (expiry_date - datetime.now()).days metrics["ssl_valid"] = True metrics["ssl_expiry_days"] = days_remaining if days_remaining < 0: return [self._create_issue( code="P-02", severity="critical", message="SSL 인증서가 만료되었습니다", suggestion="SSL 인증서를 즉시 갱신하세요", )] elif days_remaining < 30: return [self._create_issue( code="P-02", severity="major", message=f"SSL 인증서가 {days_remaining}일 후 만료됩니다", suggestion="인증서 만료 전에 갱신하세요", )] else: metrics["ssl_valid"] = True metrics["ssl_expiry_days"] = None except ssl.SSLError as e: metrics["ssl_valid"] = False metrics["ssl_expiry_days"] = None return [self._create_issue( code="P-02", severity="critical", message=f"SSL 인증서가 유효하지 않습니다: {str(e)[:100]}", suggestion="유효한 SSL 인증서를 설치하세요", )] except Exception as e: logger.warning("SSL check failed for %s: %s", url, str(e)) metrics["ssl_valid"] = None metrics["ssl_expiry_days"] = None return [self._create_issue( code="P-02", severity="minor", message="SSL 인증서를 확인할 수 없습니다", suggestion="서버의 SSL 설정을 점검하세요", )] return [] def _check_hsts(self, headers: dict) -> list[Issue]: """P-03: Check Strict-Transport-Security header.""" hsts = self._get_header(headers, "Strict-Transport-Security") if not hsts: return [self._create_issue( code="P-03", severity="major", message="Strict-Transport-Security(HSTS) 헤더가 설정되지 않았습니다", suggestion="HSTS 헤더를 추가하세요: Strict-Transport-Security: max-age=31536000; includeSubDomains", )] return [] def _check_csp(self, headers: dict) -> list[Issue]: """P-04: Check Content-Security-Policy header.""" csp = self._get_header(headers, "Content-Security-Policy") if not csp: return [self._create_issue( code="P-04", severity="major", message="Content-Security-Policy(CSP) 헤더가 설정되지 않았습니다", suggestion="CSP 헤더를 추가하여 XSS 공격을 방지하세요", )] return [] def _check_x_content_type(self, headers: dict) -> list[Issue]: """P-05: Check X-Content-Type-Options header.""" xcto = self._get_header(headers, "X-Content-Type-Options") if not xcto or "nosniff" not in xcto.lower(): return [self._create_issue( code="P-05", severity="minor", message="X-Content-Type-Options 헤더가 설정되지 않았습니다", suggestion="X-Content-Type-Options: nosniff 헤더를 추가하세요", )] return [] def _check_x_frame_options(self, headers: dict) -> list[Issue]: """P-06: Check X-Frame-Options header.""" xfo = self._get_header(headers, "X-Frame-Options") if not xfo: return [self._create_issue( code="P-06", severity="minor", message="X-Frame-Options 헤더가 설정되지 않았습니다", suggestion="클릭재킹 방지를 위해 X-Frame-Options: DENY 또는 SAMEORIGIN을 설정하세요", )] return [] def _check_x_xss_protection(self, headers: dict) -> list[Issue]: """P-07: Check X-XSS-Protection header (deprecated notice).""" xxp = self._get_header(headers, "X-XSS-Protection") if xxp: return [self._create_issue( code="P-07", severity="info", message="X-XSS-Protection 헤더가 설정되어 있습니다 (현재 deprecated)", suggestion="X-XSS-Protection 대신 Content-Security-Policy를 사용하세요", )] return [] def _check_referrer_policy(self, headers: dict) -> list[Issue]: """P-08: Check Referrer-Policy header.""" rp = self._get_header(headers, "Referrer-Policy") if not rp: return [self._create_issue( code="P-08", severity="minor", message="Referrer-Policy 헤더가 설정되지 않았습니다", suggestion="Referrer-Policy: strict-origin-when-cross-origin을 설정하세요", )] return [] def _check_permissions_policy(self, headers: dict) -> list[Issue]: """P-09: Check Permissions-Policy header.""" pp = self._get_header(headers, "Permissions-Policy") if not pp: return [self._create_issue( code="P-09", severity="minor", message="Permissions-Policy 헤더가 설정되지 않았습니다", suggestion="Permissions-Policy 헤더를 추가하여 브라우저 기능 접근을 제한하세요", )] return [] async def _check_ttfb(self, url: str, metrics: dict) -> list[Issue]: """P-10: Check Time To First Byte (TTFB).""" try: start = time.monotonic() async with httpx.AsyncClient( timeout=httpx.Timeout(10.0), follow_redirects=True, verify=False, ) as client: resp = await client.get(url, headers={ "User-Agent": "WebInspector/1.0 (Inspection Bot)", }) ttfb_ms = round((time.monotonic() - start) * 1000) metrics["ttfb_ms"] = ttfb_ms if ttfb_ms > 2000: return [self._create_issue( code="P-10", severity="major", message=f"응답 시간(TTFB)이 느립니다: {ttfb_ms}ms (권장 < 1000ms)", suggestion="서버 응답 속도를 개선하세요 (캐싱, CDN, 서버 최적화)", )] elif ttfb_ms > 1000: return [self._create_issue( code="P-10", severity="minor", message=f"응답 시간(TTFB)이 다소 느립니다: {ttfb_ms}ms (권장 < 1000ms)", suggestion="서버 응답 속도 개선을 고려하세요", )] except Exception as e: logger.warning("TTFB check failed for %s: %s", url, str(e)) metrics["ttfb_ms"] = None return [self._create_issue( code="P-10", severity="major", message="응답 시간(TTFB)을 측정할 수 없습니다", suggestion="서버 접근성을 확인하세요", )] return [] def _check_page_size(self, html_content: str, metrics: dict) -> list[Issue]: """P-11: Check HTML page size.""" size_bytes = len(html_content.encode("utf-8")) metrics["page_size_bytes"] = size_bytes if size_bytes > 3 * 1024 * 1024: # 3MB return [self._create_issue( code="P-11", severity="minor", message=f"페이지 크기가 큽니다: {round(size_bytes / 1024 / 1024, 1)}MB (권장 < 3MB)", suggestion="페이지 크기를 줄이세요 (불필요한 코드 제거, 이미지 최적화, 코드 분할)", )] return [] async def _check_redirects(self, url: str, metrics: dict) -> list[Issue]: """P-12: Check redirect chain length.""" try: async with httpx.AsyncClient( timeout=httpx.Timeout(10.0), follow_redirects=True, verify=False, ) as client: resp = await client.get(url, headers={ "User-Agent": "WebInspector/1.0 (Inspection Bot)", }) redirect_count = len(resp.history) metrics["redirect_count"] = redirect_count if redirect_count >= 3: return [self._create_issue( code="P-12", severity="minor", message=f"리다이렉트가 {redirect_count}회 발생합니다 (권장 < 3회)", suggestion="리다이렉트 체인을 줄여 로딩 속도를 개선하세요", )] except Exception as e: logger.warning("Redirect check failed for %s: %s", url, str(e)) metrics["redirect_count"] = None return [] def _check_compression(self, headers: dict, metrics: dict) -> list[Issue]: """P-13: Check response compression (Gzip/Brotli).""" encoding = self._get_header(headers, "Content-Encoding") if encoding: metrics["compression"] = encoding.lower() return [] metrics["compression"] = None return [self._create_issue( code="P-13", severity="minor", message="응답 압축(Gzip/Brotli)이 적용되지 않았습니다", suggestion="서버에서 Gzip 또는 Brotli 압축을 활성화하세요", )] def _check_mixed_content(self, url: str, html_content: str) -> list[Issue]: """P-14: Check for mixed content (HTTP resources on HTTPS page).""" parsed = urlparse(url) if parsed.scheme != "https": return [] soup = BeautifulSoup(html_content, "html5lib") mixed_elements = [] # Check src attributes for tag in soup.find_all(["img", "script", "link", "iframe", "audio", "video", "source"]): src = tag.get("src") or tag.get("href") if src and src.startswith("http://"): mixed_elements.append(tag) if mixed_elements: return [self._create_issue( code="P-14", severity="major", message=f"혼합 콘텐츠 발견: HTTPS 페이지에서 HTTP 리소스 {len(mixed_elements)}개 로드", element=self._truncate_element(str(mixed_elements[0])) if mixed_elements else None, suggestion="모든 리소스를 HTTPS로 변경하세요", )] return [] def _calculate_composite_score(self, issues: list[Issue], metrics: dict) -> tuple[int, dict]: """ Calculate composite score: Security (70%): HTTPS/SSL (30%) + Security Headers (40%) Performance (30%): Response time (40%) + Page size (30%) + Compression (30%) """ # Security score security_score = 100 # HTTPS/SSL component (30% of security) https_ssl_score = 100 for issue in issues: if issue.code in ("P-01", "P-02"): if issue.severity.value == "critical": https_ssl_score -= 50 elif issue.severity.value == "major": https_ssl_score -= 25 https_ssl_score = max(0, https_ssl_score) # Security headers component (40% of security) header_issues = [i for i in issues if i.code in ("P-03", "P-04", "P-05", "P-06", "P-07", "P-08", "P-09")] total_header_checks = 7 passed_headers = total_header_checks - len(header_issues) header_score = round(passed_headers / total_header_checks * 100) if total_header_checks else 100 security_score = round(https_ssl_score * 0.43 + header_score * 0.57) # Performance score perf_score = 100 # TTFB component (40% of performance) ttfb = metrics.get("ttfb_ms") if ttfb is not None: if ttfb <= 500: ttfb_score = 100 elif ttfb <= 1000: ttfb_score = 80 elif ttfb <= 2000: ttfb_score = 60 else: ttfb_score = 30 else: ttfb_score = 50 # Page size component (30% of performance) page_size = metrics.get("page_size_bytes", 0) if page_size <= 1024 * 1024: # 1MB size_score = 100 elif page_size <= 2 * 1024 * 1024: # 2MB size_score = 80 elif page_size <= 3 * 1024 * 1024: # 3MB size_score = 60 else: size_score = 30 # Compression component (30% of performance) compression = metrics.get("compression") compression_score = 100 if compression else 50 perf_score = round(ttfb_score * 0.4 + size_score * 0.3 + compression_score * 0.3) # Composite overall = round(security_score * 0.7 + perf_score * 0.3) overall = max(0, min(100, overall)) sub_scores = { "security": security_score, "performance": perf_score, } return overall, sub_scores @staticmethod def _get_header(headers: dict, name: str) -> Optional[str]: """Case-insensitive header lookup.""" for key, value in headers.items(): if key.lower() == name.lower(): return value return None @staticmethod def _truncate_element(element_str: str, max_len: int = 200) -> str: if len(element_str) > max_len: return element_str[:max_len] + "..." return element_str