import hashlib import aiofiles import os from pathlib import Path from datetime import datetime, timedelta from typing import Optional import httpx from PIL import Image try: from pillow_heif import register_heif_opener, register_avif_opener register_heif_opener() # HEIF/HEIC 지원 register_avif_opener() # AVIF 지원 print("HEIF/AVIF support enabled successfully") except ImportError: print("Warning: pillow_heif not installed, HEIF/AVIF support disabled") import io import asyncio import ssl from .config import settings class ImageCache: def __init__(self): self.cache_dir = settings.cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) def _get_cache_path(self, url: str, size: Optional[str] = None) -> Path: """URL을 기반으로 캐시 파일 경로 생성""" # URL을 해시하여 파일명 생성 url_hash = hashlib.md5(url.encode()).hexdigest() # 3단계 디렉토리 구조 생성 # 예: 10f8a8f96aa1377e86fdbc6bf3c631cf -> 10/f8/a8/ level1 = url_hash[:2] # 첫 2자리 level2 = url_hash[2:4] # 다음 2자리 level3 = url_hash[4:6] # 다음 2자리 # 크기별로 다른 파일명 사용 if size: filename = f"{url_hash}_{size}" else: filename = url_hash # 확장자 추출 (WebP로 저장되는 경우 .webp 사용) if settings.convert_to_webp and size: filename = f"{filename}.webp" else: ext = self._get_extension_from_url(url) if ext: filename = f"{filename}.{ext}" # 3단계 디렉토리 경로 생성 path = self.cache_dir / level1 / level2 / level3 / filename path.parent.mkdir(parents=True, exist_ok=True) return path def _get_extension_from_url(self, url: str) -> Optional[str]: """URL에서 파일 확장자 추출""" path = url.split('?')[0] # 쿼리 파라미터 제거 parts = path.split('.') if len(parts) > 1: ext = parts[-1].lower() if ext in settings.allowed_formats: return ext return None def _is_svg(self, data: bytes) -> bool: """SVG 파일인지 확인""" # SVG 파일의 시작 부분 확인 if len(data) < 100: return False # 처음 1000바이트만 확인 (성능 최적화) header = data[:1000].lower() # SVG 시그니처 확인 svg_signatures = [ b' tuple[bytes, str]: """GIF 처리 - JPEG로 변환하여 안정적으로 처리""" try: from PIL import Image # GIF 열기 img = Image.open(io.BytesIO(gif_data)) # 모든 GIF를 RGB로 변환 (팔레트 모드 문제 해결) # 팔레트 모드(P)를 RGB로 직접 변환 if img.mode != 'RGB': img = img.convert('RGB') # 리사이징 img = img.resize(target_size, Image.Resampling.LANCZOS) # JPEG로 저장 (안정적) output = io.BytesIO() img.save(output, format='JPEG', quality=85, optimize=True) return output.getvalue(), 'image/jpeg' except Exception as e: print(f"GIF 처리 중 오류: {e}") import traceback traceback.print_exc() # 오류 발생 시 원본 반환 return gif_data, 'image/gif' async def get(self, url: str, size: Optional[str] = None) -> Optional[bytes]: """캐시에서 이미지 가져오기""" cache_path = self._get_cache_path(url, size) if cache_path.exists(): # 캐시 만료 확인 stat = cache_path.stat() age = datetime.now() - datetime.fromtimestamp(stat.st_mtime) if age < timedelta(days=settings.cache_ttl_days): async with aiofiles.open(cache_path, 'rb') as f: return await f.read() else: # 만료된 캐시 삭제 cache_path.unlink() return None async def set(self, url: str, data: bytes, size: Optional[str] = None): """캐시에 이미지 저장""" cache_path = self._get_cache_path(url, size) async with aiofiles.open(cache_path, 'wb') as f: await f.write(data) async def download_image(self, url: str) -> bytes: """외부 URL에서 이미지 다운로드""" from urllib.parse import urlparse # URL에서 도메인 추출 parsed_url = urlparse(url) domain = parsed_url.netloc base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" # 기본 헤더 설정 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Sec-Fetch-Dest': 'image', 'Sec-Fetch-Mode': 'no-cors', 'Sec-Fetch-Site': 'cross-site', 'Referer': base_url # 항상 기본 Referer 설정 } # 특정 사이트별 Referer 오버라이드 if 'yna.co.kr' in url: headers['Referer'] = 'https://www.yna.co.kr/' client = httpx.AsyncClient( verify=False, # SSL 검증 비활성화 timeout=30.0, follow_redirects=True ) elif 'investing.com' in url: headers['Referer'] = 'https://www.investing.com/' client = httpx.AsyncClient() elif 'naver.com' in url: headers['Referer'] = 'https://news.naver.com/' client = httpx.AsyncClient() elif 'daum.net' in url: headers['Referer'] = 'https://news.daum.net/' client = httpx.AsyncClient() elif 'chosun.com' in url: headers['Referer'] = 'https://www.chosun.com/' client = httpx.AsyncClient() elif 'vietnam.vn' in url or 'vstatic.vietnam.vn' in url: headers['Referer'] = 'https://vietnam.vn/' client = httpx.AsyncClient() elif 'ddaily.co.kr' in url: # ddaily는 /photos/ 경로를 사용해야 함 headers['Referer'] = 'https://www.ddaily.co.kr/' # URL이 잘못된 경로를 사용하는 경우 수정 if '/2025/' in url and '/photos/' not in url: url = url.replace('/2025/', '/photos/2025/') print(f"Fixed ddaily URL: {url}") client = httpx.AsyncClient() else: # 기본적으로 도메인 기반 Referer 사용 client = httpx.AsyncClient() async with client: try: response = await client.get( url, headers=headers, timeout=settings.request_timeout, follow_redirects=True ) response.raise_for_status() except Exception as e: # 모든 에러에 대해 Playwright 사용 시도 error_msg = str(e) if isinstance(e, httpx.HTTPStatusError): error_type = f"HTTP {e.response.status_code}" elif isinstance(e, httpx.ConnectError): error_type = "Connection Error" elif isinstance(e, ssl.SSLError): error_type = "SSL Error" elif "resolve" in error_msg.lower() or "dns" in error_msg.lower(): error_type = "DNS Resolution Error" else: error_type = "Network Error" print(f"{error_type} for {url}, trying with Playwright...") # Playwright로 이미지 가져오기 시도 try: from playwright.async_api import async_playwright from PIL import Image import io async with async_playwright() as p: # 브라우저 실행 browser = await p.chromium.launch( headless=True, args=['--no-sandbox', '--disable-setuid-sandbox'] ) # Referer 설정을 위한 도메인 추출 from urllib.parse import urlparse parsed = urlparse(url) referer_url = f"{parsed.scheme}://{parsed.netloc}/" context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', extra_http_headers={ 'Referer': referer_url } ) page = await context.new_page() try: # Response를 가로채기 위한 설정 image_data = None async def handle_response(response): nonlocal image_data # 이미지 URL에 대한 응답 가로채기 if url in response.url or response.url == url: try: image_data = await response.body() print(f"✅ Image intercepted: {len(image_data)} bytes") except: pass # Response 이벤트 리스너 등록 page.on('response', handle_response) # 이미지 URL로 이동 (에러 무시) try: await page.goto(url, wait_until='networkidle', timeout=30000) except Exception as goto_error: print(f"⚠️ Direct navigation failed: {goto_error}") # 직접 이동 실패 시 HTML에 img 태그 삽입 await page.set_content(f''' ''') await page.wait_for_timeout(3000) # 이미지 로딩 대기 # 이미지 데이터가 없으면 JavaScript로 직접 fetch if not image_data: # JavaScript로 이미지 fetch image_data_base64 = await page.evaluate(''' async (url) => { try { const response = await fetch(url); const blob = await response.blob(); return new Promise((resolve) => { const reader = new FileReader(); reader.onloadend = () => resolve(reader.result.split(',')[1]); reader.readAsDataURL(blob); }); } catch (e) { return null; } } ''', url) if image_data_base64: import base64 image_data = base64.b64decode(image_data_base64) print(f"✅ Image fetched via JavaScript: {len(image_data)} bytes") # 여전히 데이터가 없으면 스크린샷 사용 if not image_data: # 이미지 요소 찾기 img_element = await page.query_selector('img') if img_element: # 이미지가 로드되었는지 확인 is_loaded = await img_element.evaluate('(img) => img.complete && img.naturalHeight > 0') if is_loaded: image_data = await img_element.screenshot() print(f"✅ Screenshot from loaded image: {len(image_data)} bytes") else: # 이미지 로드 대기 try: await img_element.evaluate('(img) => new Promise(r => img.onload = r)') image_data = await img_element.screenshot() print(f"✅ Screenshot after waiting: {len(image_data)} bytes") except: # 전체 페이지 스크린샷 image_data = await page.screenshot(full_page=True) print(f"⚠️ Full page screenshot: {len(image_data)} bytes") else: image_data = await page.screenshot(full_page=True) print(f"⚠️ No image element, full screenshot: {len(image_data)} bytes") print(f"✅ Successfully fetched image with Playwright: {url}") return image_data finally: await page.close() await context.close() await browser.close() except Exception as pw_error: print(f"Playwright failed: {pw_error}, returning placeholder") # Playwright도 실패하면 세련된 placeholder 반환 from PIL import Image, ImageDraw, ImageFont import io import random # 그라디언트 배경색 선택 (부드러운 색상) gradients = [ ('#667eea', '#764ba2'), # 보라 그라디언트 ('#f093fb', '#f5576c'), # 핑크 그라디언트 ('#4facfe', '#00f2fe'), # 하늘색 그라디언트 ('#43e97b', '#38f9d7'), # 민트 그라디언트 ('#fa709a', '#fee140'), # 선셋 그라디언트 ('#30cfd0', '#330867'), # 딥 오션 ('#a8edea', '#fed6e3'), # 파스텔 ('#ffecd2', '#fcb69f'), # 피치 ] # 랜덤 그라디언트 선택 color1, color2 = random.choice(gradients) # 이미지 생성 (16:9 비율) width, height = 800, 450 img = Image.new('RGB', (width, height)) draw = ImageDraw.Draw(img) # 그라디언트 배경 생성 def hex_to_rgb(hex_color): hex_color = hex_color.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) rgb1 = hex_to_rgb(color1) rgb2 = hex_to_rgb(color2) # 세로 그라디언트 for y in range(height): ratio = y / height r = int(rgb1[0] * (1 - ratio) + rgb2[0] * ratio) g = int(rgb1[1] * (1 - ratio) + rgb2[1] * ratio) b = int(rgb1[2] * (1 - ratio) + rgb2[2] * ratio) draw.rectangle([(0, y), (width, y + 1)], fill=(r, g, b)) # 반투명 오버레이 추가 (깊이감) overlay = Image.new('RGBA', (width, height), (0, 0, 0, 0)) overlay_draw = ImageDraw.Draw(overlay) # 중앙 원형 그라디언트 효과 center_x, center_y = width // 2, height // 2 max_radius = min(width, height) // 3 for radius in range(max_radius, 0, -2): opacity = int(255 * (1 - radius / max_radius) * 0.3) overlay_draw.ellipse( [(center_x - radius, center_y - radius), (center_x + radius, center_y + radius)], fill=(255, 255, 255, opacity) ) # 이미지 아이콘 그리기 (산 모양) icon_color = (255, 255, 255, 200) icon_size = 80 icon_x = center_x icon_y = center_y - 20 # 산 아이콘 (사진 이미지를 나타냄) mountain_points = [ (icon_x - icon_size, icon_y + icon_size//2), (icon_x - icon_size//2, icon_y - icon_size//4), (icon_x - icon_size//4, icon_y), (icon_x + icon_size//4, icon_y - icon_size//2), (icon_x + icon_size, icon_y + icon_size//2), ] overlay_draw.polygon(mountain_points, fill=icon_color) # 태양/달 원 sun_radius = icon_size // 4 overlay_draw.ellipse( [(icon_x - icon_size//2, icon_y - icon_size//2 - sun_radius), (icon_x - icon_size//2 + sun_radius*2, icon_y - icon_size//2 + sun_radius)], fill=icon_color ) # 프레임 테두리 frame_margin = 40 overlay_draw.rectangle( [(frame_margin, frame_margin), (width - frame_margin, height - frame_margin)], outline=(255, 255, 255, 150), width=3 ) # 코너 장식 corner_size = 20 corner_width = 4 corners = [ (frame_margin, frame_margin), (width - frame_margin - corner_size, frame_margin), (frame_margin, height - frame_margin - corner_size), (width - frame_margin - corner_size, height - frame_margin - corner_size) ] for x, y in corners: # 가로선 overlay_draw.rectangle( [(x, y), (x + corner_size, y + corner_width)], fill=(255, 255, 255, 200) ) # 세로선 overlay_draw.rectangle( [(x, y), (x + corner_width, y + corner_size)], fill=(255, 255, 255, 200) ) # "Image Loading..." 텍스트 (작게) try: # 시스템 폰트 시도 font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 16) except: font = ImageFont.load_default() text = "Image Loading..." bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] text_x = (width - text_width) // 2 text_y = center_y + icon_size # 텍스트 그림자 for offset in [(2, 2), (-1, -1)]: overlay_draw.text( (text_x + offset[0], text_y + offset[1]), text, font=font, fill=(0, 0, 0, 100) ) # 텍스트 본체 overlay_draw.text( (text_x, text_y), text, font=font, fill=(255, 255, 255, 220) ) # 오버레이 합성 img = Image.alpha_composite(img.convert('RGBA'), overlay).convert('RGB') # 약간의 노이즈 추가 (텍스처) pixels = img.load() for _ in range(1000): x = random.randint(0, width - 1) y = random.randint(0, height - 1) r, g, b = pixels[x, y] brightness = random.randint(-20, 20) pixels[x, y] = ( max(0, min(255, r + brightness)), max(0, min(255, g + brightness)), max(0, min(255, b + brightness)) ) # JPEG로 변환 (높은 품질) output = io.BytesIO() img.save(output, format='JPEG', quality=85, optimize=True) return output.getvalue() raise # 이미지 크기 확인 content_length = int(response.headers.get('content-length', 0)) max_size = settings.max_image_size_mb * 1024 * 1024 if content_length > max_size: raise ValueError(f"Image too large: {content_length} bytes") # 응답 데이터 확인 content = response.content print(f"Downloaded {len(content)} bytes from {url[:50]}...") # gzip 압축 확인 및 해제 import gzip if len(content) > 2 and content[:2] == b'\x1f\x8b': print("📦 Gzip compressed data detected, decompressing...") try: content = gzip.decompress(content) print(f"✅ Decompressed to {len(content)} bytes") except Exception as e: print(f"❌ Failed to decompress gzip: {e}") # 처음 몇 바이트로 이미지 형식 확인 if len(content) > 10: header = content[:12] if header[:2] == b'\xff\xd8': print("✅ JPEG image detected") elif header[:8] == b'\x89PNG\r\n\x1a\n': print("✅ PNG image detected") elif header[:6] in (b'GIF87a', b'GIF89a'): print("✅ GIF image detected") elif header[:4] == b'RIFF' and header[8:12] == b'WEBP': print("✅ WebP image detected") elif b' tuple[bytes, str]: """이미지 리사이징 및 최적화""" if size not in settings.thumbnail_sizes: raise ValueError(f"Invalid size: {size}") target_size = settings.thumbnail_sizes[size] # SVG 체크 - SVG는 리사이징하지 않고 그대로 반환 if self._is_svg(image_data): return image_data, 'image/svg+xml' # PIL로 이미지 열기 try: img = Image.open(io.BytesIO(image_data)) except Exception as e: # WebP 헤더 체크 (RIFF....WEBP) header = image_data[:12] if len(image_data) >= 12 else image_data if header[:4] == b'RIFF' and header[8:12] == b'WEBP': print("🎨 WebP 이미지 감지됨, 변환 시도") # WebP 형식이지만 PIL이 열지 못하는 경우 # Pillow-SIMD 또는 추가 라이브러리가 필요할 수 있음 try: # 재시도 from PIL import WebPImagePlugin img = Image.open(io.BytesIO(image_data)) except: print("❌ WebP 이미지를 열 수 없음, 원본 반환") return image_data, 'image/webp' else: raise e # GIF 애니메이션 체크 및 처리 if getattr(img, "format", None) == "GIF": return self._process_gif(image_data, target_size) # WebP 형식 체크 original_format = getattr(img, "format", None) is_webp = original_format == "WEBP" # 원본 모드와 투명도 정보 저장 original_mode = img.mode original_has_transparency = img.mode in ('RGBA', 'LA') original_has_palette = img.mode == 'P' # 팔레트 모드(P) 처리 - 간단하게 PIL의 기본 변환 사용 if img.mode == 'P': # 팔레트 모드는 RGB로 직접 변환 # PIL의 convert 메서드가 팔레트를 올바르게 처리함 img = img.convert('RGB') # 투명도가 있는 이미지 처리 if img.mode == 'RGBA': # RGBA는 흰색 배경과 합성 background = Image.new('RGB', img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1]) img = background elif img.mode == 'LA': # LA(그레이스케일+알파)는 RGBA를 거쳐 RGB로 img = img.convert('RGBA') background = Image.new('RGB', img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1]) img = background elif img.mode == 'L': # 그레이스케일은 RGB로 변환 img = img.convert('RGB') elif img.mode not in ('RGB',): # 기타 모드는 모두 RGB로 변환 img = img.convert('RGB') # EXIF 방향 정보 처리 (RGB 변환 후에 수행) try: from PIL import ImageOps img = ImageOps.exif_transpose(img) except: pass # 메타데이터 제거는 스킵 (팔레트 모드 이미지에서 문제 발생) # RGB로 변환되었으므로 이미 메타데이터는 대부분 제거됨 # 비율 유지하며 리사이징 (크롭 없이) img_ratio = img.width / img.height target_width = target_size[0] target_height = target_size[1] # 원본 비율을 유지하면서 목표 크기에 맞추기 # 너비 또는 높이 중 하나를 기준으로 비율 계산 if img.width > target_width or img.height > target_height: # 너비 기준 리사이징 width_ratio = target_width / img.width # 높이 기준 리사이징 height_ratio = target_height / img.height # 둘 중 작은 비율 사용 (목표 크기를 넘지 않도록) ratio = min(width_ratio, height_ratio) new_width = int(img.width * ratio) new_height = int(img.height * ratio) # 큰 이미지를 작게 만들 때는 2단계 리샘플링으로 품질 향상 if img.width > new_width * 2 or img.height > new_height * 2: # 1단계: 목표 크기의 2배로 먼저 축소 intermediate_width = new_width * 2 intermediate_height = new_height * 2 img = img.resize((intermediate_width, intermediate_height), Image.Resampling.LANCZOS) # 최종 목표 크기로 리샘플링 img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) # 샤프닝 적용 (작은 이미지에만) if target_size[0] <= 400: from PIL import ImageEnhance enhancer = ImageEnhance.Sharpness(img) img = enhancer.enhance(1.2) # 바이트로 변환 output = io.BytesIO() # 적응형 품질 계산 (이미지 크기에 따라 조정) def get_adaptive_quality(base_quality: int, target_width: int) -> int: """이미지 크기에 따른 적응형 품질 계산""" # 품질을 더 높게 설정하여 검정색 문제 해결 if target_width <= 150: # 썸네일 return min(base_quality + 10, 95) elif target_width <= 360: # 카드 return min(base_quality + 5, 90) elif target_width <= 800: # 상세 return base_quality # 85 else: # 히어로 return base_quality # 85 # WebP 변환 및 최적화 - 최고 압축률 설정 # WebP 입력은 JPEG로 변환 (WebP 리사이징 문제 회피) if is_webp: output_format = 'JPEG' content_type = 'image/jpeg' else: output_format = 'WEBP' if settings.convert_to_webp else 'JPEG' content_type = 'image/webp' if output_format == 'WEBP' else 'image/jpeg' if output_format == 'WEBP': # WebP 최적화: method=6(최고품질), lossless=False, exact=False adaptive_quality = get_adaptive_quality(settings.webp_quality, target_size[0]) save_kwargs = { 'format': 'WEBP', 'quality': adaptive_quality, 'method': 6, # 최고 압축 알고리즘 (0-6) 'lossless': settings.webp_lossless, 'exact': False, # 약간의 품질 손실 허용하여 더 작은 크기 } img.save(output, **save_kwargs) elif original_has_transparency and not settings.convert_to_webp: # PNG 최적화 (투명도가 있는 이미지) save_kwargs = { 'format': 'PNG', 'optimize': settings.optimize_png, 'compress_level': settings.png_compress_level, } # 팔레트 모드로 변환 가능한지 확인 (256색 이하) if settings.optimize_png: try: # 색상 수가 256개 이하이면 팔레트 모드로 변환 quantized = img.quantize(colors=256, method=Image.Quantize.MEDIANCUT) if len(quantized.getcolors()) <= 256: img = quantized save_kwargs['format'] = 'PNG' except: pass content_type = 'image/png' img.save(output, **save_kwargs) else: # JPEG 최적화 설정 (기본값) adaptive_quality = get_adaptive_quality(settings.jpeg_quality, target_size[0]) save_kwargs = { 'format': 'JPEG', 'quality': adaptive_quality, 'optimize': True, 'progressive': settings.progressive_jpeg, } img.save(output, **save_kwargs) return output.getvalue(), content_type async def get_cache_size(self) -> float: """현재 캐시 크기 (GB)""" total_size = 0 for dirpath, dirnames, filenames in os.walk(self.cache_dir): for filename in filenames: filepath = os.path.join(dirpath, filename) total_size += os.path.getsize(filepath) return total_size / (1024 ** 3) # GB로 변환 async def cleanup_old_cache(self): """오래된 캐시 파일 정리""" cutoff_time = datetime.now() - timedelta(days=settings.cache_ttl_days) for dirpath, dirnames, filenames in os.walk(self.cache_dir): for filename in filenames: filepath = Path(dirpath) / filename if filepath.stat().st_mtime < cutoff_time.timestamp(): filepath.unlink() async def trigger_background_generation(self, url: str): """백그라운드에서 모든 크기의 이미지 생성 트리거""" from .background_tasks import background_manager # 백그라운드 작업 큐에 추가 asyncio.create_task(background_manager.add_task(url)) async def get_directory_stats(self) -> dict: """디렉토리 구조 통계 정보""" total_files = 0 total_dirs = 0 files_per_dir = {} for root, dirs, files in os.walk(self.cache_dir): total_dirs += len(dirs) total_files += len(files) # 각 디렉토리의 파일 수 계산 rel_path = os.path.relpath(root, self.cache_dir) depth = len(Path(rel_path).parts) if rel_path != '.' else 0 if files and depth == 3: # 3단계 디렉토리에서만 파일 수 계산 files_per_dir[rel_path] = len(files) # 통계 계산 avg_files_per_dir = sum(files_per_dir.values()) / len(files_per_dir) if files_per_dir else 0 max_files_in_dir = max(files_per_dir.values()) if files_per_dir else 0 return { "total_files": total_files, "total_directories": total_dirs, "average_files_per_directory": round(avg_files_per_dir, 2), "max_files_in_single_directory": max_files_in_dir, "directory_depth": 3 } cache = ImageCache()