Step 6: Images Service Integration

- Integrated image-service from site00 as second microservice
- Maintained proxy and caching functionality
- Added Images service to docker-compose
- Configured Console API Gateway routing to Images
- Updated environment variables in .env
- Successfully tested image proxy endpoints

Services now running:
- Console (API Gateway)
- Users Service
- Images Service (proxy & cache)
- MongoDB & Redis

Next: Kafka event system implementation

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
jungwoo choi
2025-09-10 16:49:48 +09:00
parent 315eeea2ae
commit 4451170466
11 changed files with 1469 additions and 2 deletions

View File

@ -21,6 +21,7 @@ app = FastAPI(
# Service URLs from environment
USERS_SERVICE_URL = os.getenv("USERS_SERVICE_URL", "http://users-backend:8000")
IMAGES_SERVICE_URL = os.getenv("IMAGES_SERVICE_URL", "http://images-backend:8000")
# CORS middleware
app.add_middleware(
@ -111,9 +112,16 @@ async def system_status():
except:
services_status["users"] = "offline"
# Check Images service
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{IMAGES_SERVICE_URL}/health", timeout=2.0)
services_status["images"] = "online" if response.status_code == 200 else "error"
except:
services_status["images"] = "offline"
# Other services (not yet implemented)
services_status["oauth"] = "pending"
services_status["images"] = "pending"
services_status["applications"] = "pending"
services_status["data"] = "pending"
services_status["statistics"] = "pending"
@ -133,6 +141,43 @@ async def protected_route(current_user = Depends(get_current_user)):
"user": current_user.username
}
# API Gateway - Route to Images service
@app.api_route("/api/images/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_images(path: str, request: Request):
"""Proxy requests to Images service (public for image proxy)"""
try:
async with httpx.AsyncClient() as client:
# Build the target URL
url = f"{IMAGES_SERVICE_URL}/api/v1/{path}"
# Get request body if exists
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
# Forward the request
response = await client.request(
method=request.method,
url=url,
headers={
key: value for key, value in request.headers.items()
if key.lower() not in ["host", "content-length"]
},
content=body,
params=request.query_params
)
# Return the response
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Images service unavailable")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API Gateway - Route to Users service
@app.api_route("/api/users/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_users(path: str, request: Request, current_user = Depends(get_current_user)):

View File

@ -54,6 +54,30 @@ services:
depends_on:
- mongodb
images-backend:
build:
context: ./services/images/backend
dockerfile: Dockerfile
container_name: ${COMPOSE_PROJECT_NAME}_images_backend
ports:
- "${IMAGES_SERVICE_PORT}:8000"
environment:
- ENV=${ENV}
- PORT=8000
- REDIS_URL=${REDIS_URL}
- MONGODB_URL=${MONGODB_URL}
- CACHE_DIR=/app/cache
- CONVERT_TO_WEBP=true
volumes:
- ./services/images/backend:/app
- images_cache:/app/cache
networks:
- site11_network
restart: unless-stopped
depends_on:
- redis
- mongodb
mongodb:
image: mongo:7.0
container_name: ${COMPOSE_PROJECT_NAME}_mongodb
@ -98,3 +122,4 @@ volumes:
mongodb_data:
mongodb_config:
redis_data:
images_cache:

170
docs/TEST_AUTH.md Normal file
View File

@ -0,0 +1,170 @@
# 인증 시스템 테스트 가이드
## 테스트 계정
- **관리자**: admin / admin123
- **일반 사용자**: user / user123
## 1. Terminal에서 테스트
### 로그인 테스트
```bash
# 관리자로 로그인
curl -X POST http://localhost:8011/api/auth/login \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=admin123"
# 응답 예시:
# {"access_token":"eyJhbGci...","token_type":"bearer"}
```
### 토큰 저장 및 사용
```bash
# 토큰을 변수에 저장
export TOKEN="eyJhbGci..." # 위에서 받은 토큰
# 인증된 요청 - 사용자 정보 조회
curl -X GET http://localhost:8011/api/auth/me \
-H "Authorization: Bearer $TOKEN"
# 인증된 요청 - 보호된 엔드포인트
curl -X GET http://localhost:8011/api/protected \
-H "Authorization: Bearer $TOKEN"
# 인증된 요청 - Users 서비스 접근
curl -X GET http://localhost:8011/api/users/ \
-H "Authorization: Bearer $TOKEN"
```
### 로그아웃
```bash
curl -X POST http://localhost:8011/api/auth/logout \
-H "Authorization: Bearer $TOKEN"
```
## 2. Postman/Insomnia에서 테스트
### Postman 설정
1. **로그인 요청**
- Method: POST
- URL: `http://localhost:8011/api/auth/login`
- Body: x-www-form-urlencoded
- username: admin
- password: admin123
2. **토큰 사용**
- Authorization 탭에서 Type: Bearer Token 선택
- Token 필드에 받은 토큰 붙여넣기
## 3. Python 스크립트로 테스트
```python
import requests
# 로그인
login_response = requests.post(
"http://localhost:8011/api/auth/login",
data={"username": "admin", "password": "admin123"}
)
token = login_response.json()["access_token"]
# 인증된 요청
headers = {"Authorization": f"Bearer {token}"}
me_response = requests.get(
"http://localhost:8011/api/auth/me",
headers=headers
)
print(me_response.json())
# Users 서비스 접근
users_response = requests.get(
"http://localhost:8011/api/users/",
headers=headers
)
print(users_response.json())
```
## 4. JavaScript (브라우저 콘솔)에서 테스트
```javascript
// 로그인
const loginResponse = await fetch('http://localhost:8011/api/auth/login', {
method: 'POST',
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
body: 'username=admin&password=admin123'
});
const { access_token } = await loginResponse.json();
console.log('Token:', access_token);
// 인증된 요청
const meResponse = await fetch('http://localhost:8011/api/auth/me', {
headers: {'Authorization': `Bearer ${access_token}`}
});
const userData = await meResponse.json();
console.log('User:', userData);
```
## 5. Frontend에서 테스트 (React)
브라우저에서 http://localhost:3000 접속 후 개발자 도구 콘솔에서:
```javascript
// 로그인 함수
async function testLogin() {
const response = await fetch('/api/auth/login', {
method: 'POST',
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
body: 'username=admin&password=admin123'
});
const data = await response.json();
localStorage.setItem('token', data.access_token);
console.log('Logged in!', data);
return data.access_token;
}
// 인증 테스트
async function testAuth() {
const token = localStorage.getItem('token');
const response = await fetch('/api/auth/me', {
headers: {'Authorization': `Bearer ${token}`}
});
const data = await response.json();
console.log('User info:', data);
}
// 실행
await testLogin();
await testAuth();
```
## 오류 테스트
### 잘못된 비밀번호
```bash
curl -X POST http://localhost:8011/api/auth/login \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=wrong"
# 응답: 401 Unauthorized
```
### 토큰 없이 보호된 엔드포인트 접근
```bash
curl -X GET http://localhost:8011/api/auth/me
# 응답: 401 Unauthorized
```
### 잘못된 토큰
```bash
curl -X GET http://localhost:8011/api/auth/me \
-H "Authorization: Bearer invalid_token"
# 응답: 401 Unauthorized
```
## 토큰 정보
- **유효 기간**: 30분 (환경 변수 ACCESS_TOKEN_EXPIRE_MINUTES로 설정 가능)
- **알고리즘**: HS256
- **페이로드**: username 정보 포함
## 다음 단계
Frontend에 로그인 페이지를 추가하면 UI에서 직접 테스트 가능합니다.

View File

@ -0,0 +1,26 @@
FROM python:3.11-slim
WORKDIR /app
# 시스템 패키지 설치
RUN apt-get update && apt-get install -y \
gcc \
libheif-dev \
libde265-dev \
libjpeg-dev \
libpng-dev \
&& rm -rf /var/lib/apt/lists/*
# Python 패키지 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 애플리케이션 코드 복사
COPY . .
# 캐시 디렉토리 생성
RUN mkdir -p /app/cache
EXPOSE 8000
CMD ["python", "main.py"]

View File

View File

@ -0,0 +1,192 @@
from fastapi import APIRouter, Query, HTTPException, Body
from fastapi.responses import Response
from typing import Optional, Dict
import mimetypes
from pathlib import Path
import hashlib
from ..core.cache import cache
from ..core.config import settings
router = APIRouter()
@router.get("/image")
async def get_image(
url: str = Query(..., description="원본 이미지 URL"),
size: Optional[str] = Query(None, description="이미지 크기 (thumb, card, list, detail, hero)")
):
"""
이미지 프록시 엔드포인트
- 외부 URL의 이미지를 가져와서 캐싱
- 선택적으로 리사이징 및 최적화
- WebP 포맷으로 자동 변환 (설정에 따라)
"""
try:
# 캐시 확인
cached_data = await cache.get(url, size)
if cached_data:
# 캐시된 이미지 반환
# SVG 체크
if url.lower().endswith('.svg') or cache._is_svg(cached_data):
content_type = 'image/svg+xml'
# GIF 체크 (GIF는 WebP로 변환하지 않음)
elif url.lower().endswith('.gif'):
content_type = 'image/gif'
# WebP 변환이 활성화된 경우 항상 WebP로 제공 (GIF 제외)
elif settings.convert_to_webp and size:
content_type = 'image/webp'
else:
content_type = mimetypes.guess_type(url)[0] or 'image/jpeg'
return Response(
content=cached_data,
media_type=content_type,
headers={
"Cache-Control": f"public, max-age={86400 * 7}", # 7일 브라우저 캐시
"X-Cache": "HIT",
"X-Image-Format": content_type.split('/')[-1].upper()
}
)
# 캐시 미스 - 이미지 다운로드
image_data = await cache.download_image(url)
# URL에서 MIME 타입 추측
guessed_type = mimetypes.guess_type(url)[0]
# SVG 확장자 체크 (mimetypes가 SVG를 제대로 인식하지 못할 수 있음)
if url.lower().endswith('.svg') or cache._is_svg(image_data):
content_type = 'image/svg+xml'
# GIF 체크
elif url.lower().endswith('.gif') or (guessed_type and 'gif' in guessed_type.lower()):
content_type = 'image/gif'
else:
content_type = guessed_type or 'image/jpeg'
# 리사이징 및 최적화 (SVG와 GIF는 특별 처리)
if size and content_type != 'image/svg+xml':
# GIF는 특별 처리
if content_type == 'image/gif':
image_data, content_type = cache._process_gif(image_data, settings.thumbnail_sizes[size])
else:
image_data, content_type = cache.resize_and_optimize_image(image_data, size)
# 캐시에 저장
await cache.set(url, image_data, size)
# 백그라운드에서 다른 크기들도 생성하도록 트리거
await cache.trigger_background_generation(url)
# 이미지 반환
return Response(
content=image_data,
media_type=content_type,
headers={
"Cache-Control": f"public, max-age={86400 * 7}",
"X-Cache": "MISS",
"X-Image-Format": content_type.split('/')[-1].upper()
}
)
except HTTPException:
raise
except Exception as e:
import traceback
print(f"Error processing image from {url}: {str(e)}")
traceback.print_exc()
# 403 에러를 명확히 처리
if "403" in str(e):
raise HTTPException(
status_code=403,
detail=f"이미지 접근 거부됨: {url}"
)
raise HTTPException(
status_code=500,
detail=f"이미지 처리 실패: {str(e)}"
)
@router.get("/stats")
async def get_stats():
"""캐시 통계 정보"""
cache_size = await cache.get_cache_size()
# 디렉토리 구조 통계 추가
dir_stats = await cache.get_directory_stats()
return {
"cache_size_gb": round(cache_size, 2),
"max_cache_size_gb": settings.max_cache_size_gb,
"cache_usage_percent": round((cache_size / settings.max_cache_size_gb) * 100, 2),
"directory_stats": dir_stats
}
@router.post("/cleanup")
async def cleanup_cache():
"""오래된 캐시 정리"""
await cache.cleanup_old_cache()
return {"message": "캐시 정리 완료"}
@router.post("/cache/delete")
async def delete_cache(request: Dict = Body(...)):
"""특정 URL의 캐시 삭제"""
url = request.get("url")
if not url:
raise HTTPException(status_code=400, detail="URL이 필요합니다")
try:
# URL의 모든 크기 버전 삭제
sizes = ["thumb", "card", "list", "detail", "hero", None] # None은 원본
deleted_count = 0
for size in sizes:
# 캐시 경로 계산
url_hash = hashlib.md5(url.encode()).hexdigest()
# 3단계 디렉토리 구조
level1 = url_hash[:2]
level2 = url_hash[2:4]
level3 = url_hash[4:6]
# 크기별 파일명
if size:
patterns = [
f"{url_hash}_{size}.webp",
f"{url_hash}_{size}.jpg",
f"{url_hash}_{size}.jpeg",
f"{url_hash}_{size}.png",
f"{url_hash}_{size}.gif"
]
else:
patterns = [
f"{url_hash}",
f"{url_hash}.jpg",
f"{url_hash}.jpeg",
f"{url_hash}.png",
f"{url_hash}.gif",
f"{url_hash}.webp"
]
# 각 패턴에 대해 파일 삭제 시도
for filename in patterns:
cache_path = settings.cache_dir / level1 / level2 / level3 / filename
if cache_path.exists():
cache_path.unlink()
deleted_count += 1
print(f"✅ 캐시 파일 삭제: {cache_path}")
return {
"status": "success",
"message": f"{deleted_count}개의 캐시 파일이 삭제되었습니다",
"url": url
}
except Exception as e:
print(f"❌ 캐시 삭제 오류: {e}")
raise HTTPException(
status_code=500,
detail=f"캐시 삭제 실패: {str(e)}"
)

View File

@ -0,0 +1,91 @@
import asyncio
import logging
from typing import Set, Optional
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BackgroundTaskManager:
"""백그라운드 작업 관리자"""
def __init__(self):
self.processing_urls: Set[str] = set() # 현재 처리 중인 URL 목록
self.task_queue: asyncio.Queue = None
self.worker_task: Optional[asyncio.Task] = None
async def start(self):
"""백그라운드 워커 시작"""
self.task_queue = asyncio.Queue(maxsize=100)
self.worker_task = asyncio.create_task(self._worker())
logger.info("백그라운드 작업 관리자 시작됨")
async def stop(self):
"""백그라운드 워커 정지"""
if self.worker_task:
self.worker_task.cancel()
try:
await self.worker_task
except asyncio.CancelledError:
pass
logger.info("백그라운드 작업 관리자 정지됨")
async def add_task(self, url: str):
"""작업 큐에 URL 추가"""
if url not in self.processing_urls and self.task_queue:
try:
self.processing_urls.add(url)
await self.task_queue.put(url)
logger.info(f"백그라운드 작업 추가: {url}")
except asyncio.QueueFull:
self.processing_urls.discard(url)
logger.warning(f"작업 큐가 가득 참: {url}")
async def _worker(self):
"""백그라운드 워커 - 큐에서 작업을 가져와 처리"""
from .cache import cache
while True:
try:
# 큐에서 URL 가져오기
url = await self.task_queue.get()
try:
# 원본 이미지가 캐시에 있는지 확인
original_data = await cache.get(url, None)
if not original_data:
# 원본 이미지 다운로드
original_data = await cache.download_image(url)
await cache.set(url, original_data, None)
# 모든 크기의 이미지 생성
sizes = ['thumb', 'card', 'list', 'detail', 'hero']
for size in sizes:
# 이미 존재하는지 확인
existing = await cache.get(url, size)
if not existing:
try:
# 리사이징 및 최적화 - cache.resize_and_optimize_image가 WebP를 처리함
resized_data, _ = cache.resize_and_optimize_image(original_data, size)
await cache.set(url, resized_data, size)
logger.info(f"백그라운드 생성 완료: {url} ({size})")
except Exception as e:
logger.error(f"백그라운드 리사이징 실패: {url} ({size}) - {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
except Exception as e:
logger.error(f"백그라운드 작업 실패: {url} - {str(e)}")
finally:
# 처리 완료된 URL 제거
self.processing_urls.discard(url)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"백그라운드 워커 오류: {str(e)}")
await asyncio.sleep(1) # 오류 발생 시 잠시 대기
# 전역 백그라운드 작업 관리자
background_manager = BackgroundTaskManager()

View File

@ -0,0 +1,796 @@
import hashlib
import aiofiles
import os
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional
import httpx
from PIL import Image
try:
from pillow_heif import register_heif_opener, register_avif_opener
register_heif_opener() # HEIF/HEIC 지원
register_avif_opener() # AVIF 지원
print("HEIF/AVIF support enabled successfully")
except ImportError:
print("Warning: pillow_heif not installed, HEIF/AVIF support disabled")
import io
import asyncio
import ssl
from .config import settings
class ImageCache:
def __init__(self):
self.cache_dir = settings.cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_path(self, url: str, size: Optional[str] = None) -> Path:
"""URL을 기반으로 캐시 파일 경로 생성"""
# URL을 해시하여 파일명 생성
url_hash = hashlib.md5(url.encode()).hexdigest()
# 3단계 디렉토리 구조 생성
# 예: 10f8a8f96aa1377e86fdbc6bf3c631cf -> 10/f8/a8/
level1 = url_hash[:2] # 첫 2자리
level2 = url_hash[2:4] # 다음 2자리
level3 = url_hash[4:6] # 다음 2자리
# 크기별로 다른 파일명 사용
if size:
filename = f"{url_hash}_{size}"
else:
filename = url_hash
# 확장자 추출 (WebP로 저장되는 경우 .webp 사용)
if settings.convert_to_webp and size:
filename = f"{filename}.webp"
else:
ext = self._get_extension_from_url(url)
if ext:
filename = f"{filename}.{ext}"
# 3단계 디렉토리 경로 생성
path = self.cache_dir / level1 / level2 / level3 / filename
path.parent.mkdir(parents=True, exist_ok=True)
return path
def _get_extension_from_url(self, url: str) -> Optional[str]:
"""URL에서 파일 확장자 추출"""
path = url.split('?')[0] # 쿼리 파라미터 제거
parts = path.split('.')
if len(parts) > 1:
ext = parts[-1].lower()
if ext in settings.allowed_formats:
return ext
return None
def _is_svg(self, data: bytes) -> bool:
"""SVG 파일인지 확인"""
# SVG 파일의 시작 부분 확인
if len(data) < 100:
return False
# 처음 1000바이트만 확인 (성능 최적화)
header = data[:1000].lower()
# SVG 시그니처 확인
svg_signatures = [
b'<svg',
b'<?xml',
b'<!doctype svg'
]
for sig in svg_signatures:
if sig in header:
return True
return False
def _process_gif(self, gif_data: bytes, target_size: tuple) -> tuple[bytes, str]:
"""GIF 처리 - JPEG로 변환하여 안정적으로 처리"""
try:
from PIL import Image
# GIF 열기
img = Image.open(io.BytesIO(gif_data))
# 모든 GIF를 RGB로 변환 (팔레트 모드 문제 해결)
# 팔레트 모드(P)를 RGB로 직접 변환
if img.mode != 'RGB':
img = img.convert('RGB')
# 리사이징
img = img.resize(target_size, Image.Resampling.LANCZOS)
# JPEG로 저장 (안정적)
output = io.BytesIO()
img.save(output, format='JPEG', quality=85, optimize=True)
return output.getvalue(), 'image/jpeg'
except Exception as e:
print(f"GIF 처리 중 오류: {e}")
import traceback
traceback.print_exc()
# 오류 발생 시 원본 반환
return gif_data, 'image/gif'
async def get(self, url: str, size: Optional[str] = None) -> Optional[bytes]:
"""캐시에서 이미지 가져오기"""
cache_path = self._get_cache_path(url, size)
if cache_path.exists():
# 캐시 만료 확인
stat = cache_path.stat()
age = datetime.now() - datetime.fromtimestamp(stat.st_mtime)
if age < timedelta(days=settings.cache_ttl_days):
async with aiofiles.open(cache_path, 'rb') as f:
return await f.read()
else:
# 만료된 캐시 삭제
cache_path.unlink()
return None
async def set(self, url: str, data: bytes, size: Optional[str] = None):
"""캐시에 이미지 저장"""
cache_path = self._get_cache_path(url, size)
async with aiofiles.open(cache_path, 'wb') as f:
await f.write(data)
async def download_image(self, url: str) -> bytes:
"""외부 URL에서 이미지 다운로드"""
from urllib.parse import urlparse
# URL에서 도메인 추출
parsed_url = urlparse(url)
domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
# 기본 헤더 설정
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'image',
'Sec-Fetch-Mode': 'no-cors',
'Sec-Fetch-Site': 'cross-site',
'Referer': base_url # 항상 기본 Referer 설정
}
# 특정 사이트별 Referer 오버라이드
if 'yna.co.kr' in url:
headers['Referer'] = 'https://www.yna.co.kr/'
client = httpx.AsyncClient(
verify=False, # SSL 검증 비활성화
timeout=30.0,
follow_redirects=True
)
elif 'investing.com' in url:
headers['Referer'] = 'https://www.investing.com/'
client = httpx.AsyncClient()
elif 'naver.com' in url:
headers['Referer'] = 'https://news.naver.com/'
client = httpx.AsyncClient()
elif 'daum.net' in url:
headers['Referer'] = 'https://news.daum.net/'
client = httpx.AsyncClient()
elif 'chosun.com' in url:
headers['Referer'] = 'https://www.chosun.com/'
client = httpx.AsyncClient()
elif 'vietnam.vn' in url or 'vstatic.vietnam.vn' in url:
headers['Referer'] = 'https://vietnam.vn/'
client = httpx.AsyncClient()
elif 'ddaily.co.kr' in url:
# ddaily는 /photos/ 경로를 사용해야 함
headers['Referer'] = 'https://www.ddaily.co.kr/'
# URL이 잘못된 경로를 사용하는 경우 수정
if '/2025/' in url and '/photos/' not in url:
url = url.replace('/2025/', '/photos/2025/')
print(f"Fixed ddaily URL: {url}")
client = httpx.AsyncClient()
else:
# 기본적으로 도메인 기반 Referer 사용
client = httpx.AsyncClient()
async with client:
try:
response = await client.get(
url,
headers=headers,
timeout=settings.request_timeout,
follow_redirects=True
)
response.raise_for_status()
except Exception as e:
# 모든 에러에 대해 Playwright 사용 시도
error_msg = str(e)
if isinstance(e, httpx.HTTPStatusError):
error_type = f"HTTP {e.response.status_code}"
elif isinstance(e, httpx.ConnectError):
error_type = "Connection Error"
elif isinstance(e, ssl.SSLError):
error_type = "SSL Error"
elif "resolve" in error_msg.lower() or "dns" in error_msg.lower():
error_type = "DNS Resolution Error"
else:
error_type = "Network Error"
print(f"{error_type} for {url}, trying with Playwright...")
# Playwright로 이미지 가져오기 시도
try:
from playwright.async_api import async_playwright
from PIL import Image
import io
async with async_playwright() as p:
# 브라우저 실행
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox']
)
# Referer 설정을 위한 도메인 추출
from urllib.parse import urlparse
parsed = urlparse(url)
referer_url = f"{parsed.scheme}://{parsed.netloc}/"
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
extra_http_headers={
'Referer': referer_url
}
)
page = await context.new_page()
try:
# Response를 가로채기 위한 설정
image_data = None
async def handle_response(response):
nonlocal image_data
# 이미지 URL에 대한 응답 가로채기
if url in response.url or response.url == url:
try:
image_data = await response.body()
print(f"✅ Image intercepted: {len(image_data)} bytes")
except:
pass
# Response 이벤트 리스너 등록
page.on('response', handle_response)
# 이미지 URL로 이동 (에러 무시)
try:
await page.goto(url, wait_until='networkidle', timeout=30000)
except Exception as goto_error:
print(f"⚠️ Direct navigation failed: {goto_error}")
# 직접 이동 실패 시 HTML에 img 태그 삽입
await page.set_content(f'''
<html>
<body style="margin:0;padding:0;">
<img src="{url}" style="max-width:100%;height:auto;"
crossorigin="anonymous" />
</body>
</html>
''')
await page.wait_for_timeout(3000) # 이미지 로딩 대기
# 이미지 데이터가 없으면 JavaScript로 직접 fetch
if not image_data:
# JavaScript로 이미지 fetch
image_data_base64 = await page.evaluate('''
async (url) => {
try {
const response = await fetch(url);
const blob = await response.blob();
return new Promise((resolve) => {
const reader = new FileReader();
reader.onloadend = () => resolve(reader.result.split(',')[1]);
reader.readAsDataURL(blob);
});
} catch (e) {
return null;
}
}
''', url)
if image_data_base64:
import base64
image_data = base64.b64decode(image_data_base64)
print(f"✅ Image fetched via JavaScript: {len(image_data)} bytes")
# 여전히 데이터가 없으면 스크린샷 사용
if not image_data:
# 이미지 요소 찾기
img_element = await page.query_selector('img')
if img_element:
# 이미지가 로드되었는지 확인
is_loaded = await img_element.evaluate('(img) => img.complete && img.naturalHeight > 0')
if is_loaded:
image_data = await img_element.screenshot()
print(f"✅ Screenshot from loaded image: {len(image_data)} bytes")
else:
# 이미지 로드 대기
try:
await img_element.evaluate('(img) => new Promise(r => img.onload = r)')
image_data = await img_element.screenshot()
print(f"✅ Screenshot after waiting: {len(image_data)} bytes")
except:
# 전체 페이지 스크린샷
image_data = await page.screenshot(full_page=True)
print(f"⚠️ Full page screenshot: {len(image_data)} bytes")
else:
image_data = await page.screenshot(full_page=True)
print(f"⚠️ No image element, full screenshot: {len(image_data)} bytes")
print(f"✅ Successfully fetched image with Playwright: {url}")
return image_data
finally:
await page.close()
await context.close()
await browser.close()
except Exception as pw_error:
print(f"Playwright failed: {pw_error}, returning placeholder")
# Playwright도 실패하면 세련된 placeholder 반환
from PIL import Image, ImageDraw, ImageFont
import io
import random
# 그라디언트 배경색 선택 (부드러운 색상)
gradients = [
('#667eea', '#764ba2'), # 보라 그라디언트
('#f093fb', '#f5576c'), # 핑크 그라디언트
('#4facfe', '#00f2fe'), # 하늘색 그라디언트
('#43e97b', '#38f9d7'), # 민트 그라디언트
('#fa709a', '#fee140'), # 선셋 그라디언트
('#30cfd0', '#330867'), # 딥 오션
('#a8edea', '#fed6e3'), # 파스텔
('#ffecd2', '#fcb69f'), # 피치
]
# 랜덤 그라디언트 선택
color1, color2 = random.choice(gradients)
# 이미지 생성 (16:9 비율)
width, height = 800, 450
img = Image.new('RGB', (width, height))
draw = ImageDraw.Draw(img)
# 그라디언트 배경 생성
def hex_to_rgb(hex_color):
hex_color = hex_color.lstrip('#')
return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
rgb1 = hex_to_rgb(color1)
rgb2 = hex_to_rgb(color2)
# 세로 그라디언트
for y in range(height):
ratio = y / height
r = int(rgb1[0] * (1 - ratio) + rgb2[0] * ratio)
g = int(rgb1[1] * (1 - ratio) + rgb2[1] * ratio)
b = int(rgb1[2] * (1 - ratio) + rgb2[2] * ratio)
draw.rectangle([(0, y), (width, y + 1)], fill=(r, g, b))
# 반투명 오버레이 추가 (깊이감)
overlay = Image.new('RGBA', (width, height), (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay)
# 중앙 원형 그라디언트 효과
center_x, center_y = width // 2, height // 2
max_radius = min(width, height) // 3
for radius in range(max_radius, 0, -2):
opacity = int(255 * (1 - radius / max_radius) * 0.3)
overlay_draw.ellipse(
[(center_x - radius, center_y - radius),
(center_x + radius, center_y + radius)],
fill=(255, 255, 255, opacity)
)
# 이미지 아이콘 그리기 (산 모양)
icon_color = (255, 255, 255, 200)
icon_size = 80
icon_x = center_x
icon_y = center_y - 20
# 산 아이콘 (사진 이미지를 나타냄)
mountain_points = [
(icon_x - icon_size, icon_y + icon_size//2),
(icon_x - icon_size//2, icon_y - icon_size//4),
(icon_x - icon_size//4, icon_y),
(icon_x + icon_size//4, icon_y - icon_size//2),
(icon_x + icon_size, icon_y + icon_size//2),
]
overlay_draw.polygon(mountain_points, fill=icon_color)
# 태양/달 원
sun_radius = icon_size // 4
overlay_draw.ellipse(
[(icon_x - icon_size//2, icon_y - icon_size//2 - sun_radius),
(icon_x - icon_size//2 + sun_radius*2, icon_y - icon_size//2 + sun_radius)],
fill=icon_color
)
# 프레임 테두리
frame_margin = 40
overlay_draw.rectangle(
[(frame_margin, frame_margin),
(width - frame_margin, height - frame_margin)],
outline=(255, 255, 255, 150),
width=3
)
# 코너 장식
corner_size = 20
corner_width = 4
corners = [
(frame_margin, frame_margin),
(width - frame_margin - corner_size, frame_margin),
(frame_margin, height - frame_margin - corner_size),
(width - frame_margin - corner_size, height - frame_margin - corner_size)
]
for x, y in corners:
# 가로선
overlay_draw.rectangle(
[(x, y), (x + corner_size, y + corner_width)],
fill=(255, 255, 255, 200)
)
# 세로선
overlay_draw.rectangle(
[(x, y), (x + corner_width, y + corner_size)],
fill=(255, 255, 255, 200)
)
# "Image Loading..." 텍스트 (작게)
try:
# 시스템 폰트 시도
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 16)
except:
font = ImageFont.load_default()
text = "Image Loading..."
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
text_x = (width - text_width) // 2
text_y = center_y + icon_size
# 텍스트 그림자
for offset in [(2, 2), (-1, -1)]:
overlay_draw.text(
(text_x + offset[0], text_y + offset[1]),
text,
font=font,
fill=(0, 0, 0, 100)
)
# 텍스트 본체
overlay_draw.text(
(text_x, text_y),
text,
font=font,
fill=(255, 255, 255, 220)
)
# 오버레이 합성
img = Image.alpha_composite(img.convert('RGBA'), overlay).convert('RGB')
# 약간의 노이즈 추가 (텍스처)
pixels = img.load()
for _ in range(1000):
x = random.randint(0, width - 1)
y = random.randint(0, height - 1)
r, g, b = pixels[x, y]
brightness = random.randint(-20, 20)
pixels[x, y] = (
max(0, min(255, r + brightness)),
max(0, min(255, g + brightness)),
max(0, min(255, b + brightness))
)
# JPEG로 변환 (높은 품질)
output = io.BytesIO()
img.save(output, format='JPEG', quality=85, optimize=True)
return output.getvalue()
raise
# 이미지 크기 확인
content_length = int(response.headers.get('content-length', 0))
max_size = settings.max_image_size_mb * 1024 * 1024
if content_length > max_size:
raise ValueError(f"Image too large: {content_length} bytes")
# 응답 데이터 확인
content = response.content
print(f"Downloaded {len(content)} bytes from {url[:50]}...")
# gzip 압축 확인 및 해제
import gzip
if len(content) > 2 and content[:2] == b'\x1f\x8b':
print("📦 Gzip compressed data detected, decompressing...")
try:
content = gzip.decompress(content)
print(f"✅ Decompressed to {len(content)} bytes")
except Exception as e:
print(f"❌ Failed to decompress gzip: {e}")
# 처음 몇 바이트로 이미지 형식 확인
if len(content) > 10:
header = content[:12]
if header[:2] == b'\xff\xd8':
print("✅ JPEG image detected")
elif header[:8] == b'\x89PNG\r\n\x1a\n':
print("✅ PNG image detected")
elif header[:6] in (b'GIF87a', b'GIF89a'):
print("✅ GIF image detected")
elif header[:4] == b'RIFF' and header[8:12] == b'WEBP':
print("✅ WebP image detected")
elif b'<svg' in header or b'<?xml' in header:
print("✅ SVG image detected")
elif header[4:12] == b'ftypavif':
print("✅ AVIF image detected")
else:
print(f"⚠️ Unknown image format. Header: {header.hex()}")
return content
def resize_and_optimize_image(self, image_data: bytes, size: str) -> tuple[bytes, str]:
"""이미지 리사이징 및 최적화"""
if size not in settings.thumbnail_sizes:
raise ValueError(f"Invalid size: {size}")
target_size = settings.thumbnail_sizes[size]
# SVG 체크 - SVG는 리사이징하지 않고 그대로 반환
if self._is_svg(image_data):
return image_data, 'image/svg+xml'
# PIL로 이미지 열기
try:
img = Image.open(io.BytesIO(image_data))
except Exception as e:
# WebP 헤더 체크 (RIFF....WEBP)
header = image_data[:12] if len(image_data) >= 12 else image_data
if header[:4] == b'RIFF' and header[8:12] == b'WEBP':
print("🎨 WebP 이미지 감지됨, 변환 시도")
# WebP 형식이지만 PIL이 열지 못하는 경우
# Pillow-SIMD 또는 추가 라이브러리가 필요할 수 있음
try:
# 재시도
from PIL import WebPImagePlugin
img = Image.open(io.BytesIO(image_data))
except:
print("❌ WebP 이미지를 열 수 없음, 원본 반환")
return image_data, 'image/webp'
else:
raise e
# GIF 애니메이션 체크 및 처리
if getattr(img, "format", None) == "GIF":
return self._process_gif(image_data, target_size)
# WebP 형식 체크
original_format = getattr(img, "format", None)
is_webp = original_format == "WEBP"
# 원본 모드와 투명도 정보 저장
original_mode = img.mode
original_has_transparency = img.mode in ('RGBA', 'LA')
original_has_palette = img.mode == 'P'
# 팔레트 모드(P) 처리 - 간단하게 PIL의 기본 변환 사용
if img.mode == 'P':
# 팔레트 모드는 RGB로 직접 변환
# PIL의 convert 메서드가 팔레트를 올바르게 처리함
img = img.convert('RGB')
# 투명도가 있는 이미지 처리
if img.mode == 'RGBA':
# RGBA는 흰색 배경과 합성
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
elif img.mode == 'LA':
# LA(그레이스케일+알파)는 RGBA를 거쳐 RGB로
img = img.convert('RGBA')
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
elif img.mode == 'L':
# 그레이스케일은 RGB로 변환
img = img.convert('RGB')
elif img.mode not in ('RGB',):
# 기타 모드는 모두 RGB로 변환
img = img.convert('RGB')
# EXIF 방향 정보 처리 (RGB 변환 후에 수행)
try:
from PIL import ImageOps
img = ImageOps.exif_transpose(img)
except:
pass
# 메타데이터 제거는 스킵 (팔레트 모드 이미지에서 문제 발생)
# RGB로 변환되었으므로 이미 메타데이터는 대부분 제거됨
# 비율 유지하며 리사이징 (크롭 없이)
img_ratio = img.width / img.height
target_width = target_size[0]
target_height = target_size[1]
# 원본 비율을 유지하면서 목표 크기에 맞추기
# 너비 또는 높이 중 하나를 기준으로 비율 계산
if img.width > target_width or img.height > target_height:
# 너비 기준 리사이징
width_ratio = target_width / img.width
# 높이 기준 리사이징
height_ratio = target_height / img.height
# 둘 중 작은 비율 사용 (목표 크기를 넘지 않도록)
ratio = min(width_ratio, height_ratio)
new_width = int(img.width * ratio)
new_height = int(img.height * ratio)
# 큰 이미지를 작게 만들 때는 2단계 리샘플링으로 품질 향상
if img.width > new_width * 2 or img.height > new_height * 2:
# 1단계: 목표 크기의 2배로 먼저 축소
intermediate_width = new_width * 2
intermediate_height = new_height * 2
img = img.resize((intermediate_width, intermediate_height), Image.Resampling.LANCZOS)
# 최종 목표 크기로 리샘플링
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 샤프닝 적용 (작은 이미지에만)
if target_size[0] <= 400:
from PIL import ImageEnhance
enhancer = ImageEnhance.Sharpness(img)
img = enhancer.enhance(1.2)
# 바이트로 변환
output = io.BytesIO()
# 적응형 품질 계산 (이미지 크기에 따라 조정)
def get_adaptive_quality(base_quality: int, target_width: int) -> int:
"""이미지 크기에 따른 적응형 품질 계산"""
# 품질을 더 높게 설정하여 검정색 문제 해결
if target_width <= 150: # 썸네일
return min(base_quality + 10, 95)
elif target_width <= 360: # 카드
return min(base_quality + 5, 90)
elif target_width <= 800: # 상세
return base_quality # 85
else: # 히어로
return base_quality # 85
# WebP 변환 및 최적화 - 최고 압축률 설정
# WebP 입력은 JPEG로 변환 (WebP 리사이징 문제 회피)
if is_webp:
output_format = 'JPEG'
content_type = 'image/jpeg'
else:
output_format = 'WEBP' if settings.convert_to_webp else 'JPEG'
content_type = 'image/webp' if output_format == 'WEBP' else 'image/jpeg'
if output_format == 'WEBP':
# WebP 최적화: method=6(최고품질), lossless=False, exact=False
adaptive_quality = get_adaptive_quality(settings.webp_quality, target_size[0])
save_kwargs = {
'format': 'WEBP',
'quality': adaptive_quality,
'method': 6, # 최고 압축 알고리즘 (0-6)
'lossless': settings.webp_lossless,
'exact': False, # 약간의 품질 손실 허용하여 더 작은 크기
}
img.save(output, **save_kwargs)
elif original_has_transparency and not settings.convert_to_webp:
# PNG 최적화 (투명도가 있는 이미지)
save_kwargs = {
'format': 'PNG',
'optimize': settings.optimize_png,
'compress_level': settings.png_compress_level,
}
# 팔레트 모드로 변환 가능한지 확인 (256색 이하)
if settings.optimize_png:
try:
# 색상 수가 256개 이하이면 팔레트 모드로 변환
quantized = img.quantize(colors=256, method=Image.Quantize.MEDIANCUT)
if len(quantized.getcolors()) <= 256:
img = quantized
save_kwargs['format'] = 'PNG'
except:
pass
content_type = 'image/png'
img.save(output, **save_kwargs)
else:
# JPEG 최적화 설정 (기본값)
adaptive_quality = get_adaptive_quality(settings.jpeg_quality, target_size[0])
save_kwargs = {
'format': 'JPEG',
'quality': adaptive_quality,
'optimize': True,
'progressive': settings.progressive_jpeg,
}
img.save(output, **save_kwargs)
return output.getvalue(), content_type
async def get_cache_size(self) -> float:
"""현재 캐시 크기 (GB)"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(self.cache_dir):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
total_size += os.path.getsize(filepath)
return total_size / (1024 ** 3) # GB로 변환
async def cleanup_old_cache(self):
"""오래된 캐시 파일 정리"""
cutoff_time = datetime.now() - timedelta(days=settings.cache_ttl_days)
for dirpath, dirnames, filenames in os.walk(self.cache_dir):
for filename in filenames:
filepath = Path(dirpath) / filename
if filepath.stat().st_mtime < cutoff_time.timestamp():
filepath.unlink()
async def trigger_background_generation(self, url: str):
"""백그라운드에서 모든 크기의 이미지 생성 트리거"""
from .background_tasks import background_manager
# 백그라운드 작업 큐에 추가
asyncio.create_task(background_manager.add_task(url))
async def get_directory_stats(self) -> dict:
"""디렉토리 구조 통계 정보"""
total_files = 0
total_dirs = 0
files_per_dir = {}
for root, dirs, files in os.walk(self.cache_dir):
total_dirs += len(dirs)
total_files += len(files)
# 각 디렉토리의 파일 수 계산
rel_path = os.path.relpath(root, self.cache_dir)
depth = len(Path(rel_path).parts) if rel_path != '.' else 0
if files and depth == 3: # 3단계 디렉토리에서만 파일 수 계산
files_per_dir[rel_path] = len(files)
# 통계 계산
avg_files_per_dir = sum(files_per_dir.values()) / len(files_per_dir) if files_per_dir else 0
max_files_in_dir = max(files_per_dir.values()) if files_per_dir else 0
return {
"total_files": total_files,
"total_directories": total_dirs,
"average_files_per_directory": round(avg_files_per_dir, 2),
"max_files_in_single_directory": max_files_in_dir,
"directory_depth": 3
}
cache = ImageCache()

View File

@ -0,0 +1,46 @@
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
# 기본 설정
app_name: str = "Image Proxy Service"
debug: bool = True
# 캐시 설정
cache_dir: Path = Path("/app/cache")
max_cache_size_gb: int = 10
cache_ttl_days: int = 30
# 이미지 설정
max_image_size_mb: int = 20
allowed_formats: list = ["jpg", "jpeg", "png", "gif", "webp", "svg"]
# 리사이징 설정 - 뉴스 카드 용도별 최적화
thumbnail_sizes: dict = {
"thumb": (150, 100), # 작은 썸네일 (3:2 비율)
"card": (360, 240), # 뉴스 카드용 (3:2 비율)
"list": (300, 200), # 리스트용 (3:2 비율)
"detail": (800, 533), # 상세 페이지용 (원본 비율 유지)
"hero": (1200, 800) # 히어로 이미지용 (원본 비율 유지)
}
# 이미지 최적화 설정 - 품질 보장하면서 최저 용량
jpeg_quality: int = 85 # JPEG 품질 (품질 향상)
webp_quality: int = 85 # WebP 품질 (품질 향상으로 검정색 문제 해결)
webp_lossless: bool = False # 무손실 압축 비활성화 (용량 최적화)
png_compress_level: int = 9 # PNG 최대 압축 (0-9, 9가 최고 압축)
convert_to_webp: bool = False # WebP 변환 임시 비활성화 (검정색 이미지 문제)
# 고급 최적화 설정
progressive_jpeg: bool = True # 점진적 JPEG (로딩 성능 향상)
strip_metadata: bool = True # EXIF 등 메타데이터 제거 (용량 절약)
optimize_png: bool = True # PNG 팔레트 최적화
# 외부 요청 설정
request_timeout: int = 30
user_agent: str = "ImageProxyService/1.0"
class Config:
env_file = ".env"
settings = Settings()

View File

@ -0,0 +1,65 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import uvicorn
from datetime import datetime
from app.api.endpoints import router
from app.core.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
# 시작 시
print("Images service starting...")
yield
# 종료 시
print("Images service stopping...")
app = FastAPI(
title="Images Service",
description="이미지 업로드, 프록시 및 캐싱 서비스",
version="2.0.0",
lifespan=lifespan
)
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 라우터 등록
app.include_router(router, prefix="/api/v1")
@app.get("/")
async def root():
return {
"service": "Images Service",
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"endpoints": {
"proxy": "/api/v1/image?url=<image_url>&size=<optional_size>",
"upload": "/api/v1/upload",
"stats": "/api/v1/stats",
"cleanup": "/api/v1/cleanup"
}
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "images",
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)

View File

@ -0,0 +1,11 @@
fastapi==0.109.0
uvicorn[standard]==0.27.0
httpx==0.26.0
pillow==10.2.0
pillow-heif==0.20.0
aiofiles==23.2.1
python-multipart==0.0.6
pydantic==2.5.3
pydantic-settings==2.1.0
motor==3.3.2
redis==5.0.1