diff --git a/console/backend/event_consumer.py b/console/backend/event_consumer.py new file mode 100644 index 0000000..9c4c0f3 --- /dev/null +++ b/console/backend/event_consumer.py @@ -0,0 +1,358 @@ +""" +고급 이벤트 컨슈머 with DLQ and Retry +""" +import asyncio +import json +import logging +from typing import Dict, Any, Optional, List +from datetime import datetime, timedelta +from redis import asyncio as aioredis +from aiokafka import AIOKafkaProducer + +import sys +sys.path.append('/app') +from shared.kafka import KafkaConsumer, Event, EventType +from event_handlers import EventHandlers + +logger = logging.getLogger(__name__) + +class RetryPolicy: + """재시도 정책""" + def __init__( + self, + max_retries: int = 3, + initial_delay: float = 1.0, + max_delay: float = 60.0, + exponential_base: float = 2.0 + ): + self.max_retries = max_retries + self.initial_delay = initial_delay + self.max_delay = max_delay + self.exponential_base = exponential_base + + def get_delay(self, retry_count: int) -> float: + """재시도 지연 시간 계산 (exponential backoff)""" + delay = self.initial_delay * (self.exponential_base ** retry_count) + return min(delay, self.max_delay) + +class AdvancedEventConsumer: + def __init__( + self, + topics: List[str], + group_id: str, + redis_url: str = "redis://redis:6379", + bootstrap_servers: str = "kafka:9092", + enable_dlq: bool = True, + dlq_topic: str = "dead-letter-queue" + ): + self.topics = topics + self.group_id = group_id + self.bootstrap_servers = bootstrap_servers + self.enable_dlq = enable_dlq + self.dlq_topic = dlq_topic + + # Kafka Consumer + self.consumer = KafkaConsumer( + topics=topics, + group_id=group_id, + bootstrap_servers=bootstrap_servers + ) + + # DLQ Producer + self.dlq_producer: Optional[AIOKafkaProducer] = None + + # Redis for retry tracking + self.redis: Optional[aioredis.Redis] = None + self.redis_url = redis_url + + # Event handlers + self.handlers: Optional[EventHandlers] = None + + # Retry policies per event type + self.retry_policies = { + EventType.USER_CREATED: RetryPolicy(max_retries=3), + EventType.USER_UPDATED: RetryPolicy(max_retries=2), + EventType.USER_DELETED: RetryPolicy(max_retries=5), # 중요한 이벤트 + EventType.OAUTH_APP_CREATED: RetryPolicy(max_retries=3), + EventType.OAUTH_TOKEN_ISSUED: RetryPolicy(max_retries=1), + } + + # Processing statistics + self.stats = { + "processed": 0, + "failed": 0, + "retried": 0, + "dlq_sent": 0 + } + + async def start(self): + """컨슈머 시작""" + try: + # Redis 연결 + self.redis = await aioredis.from_url( + self.redis_url, + encoding="utf-8", + decode_responses=True + ) + + # Event handlers 초기화 + self.handlers = EventHandlers(redis_client=self.redis) + + # DLQ Producer 초기화 + if self.enable_dlq: + self.dlq_producer = AIOKafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode() + ) + await self.dlq_producer.start() + logger.info(f"DLQ Producer started for topic: {self.dlq_topic}") + + # 이벤트 핸들러 등록 + self._register_event_handlers() + + # Kafka Consumer 시작 + await self.consumer.start() + + logger.info(f"Advanced Event Consumer started: {self.topics}") + + # 통계 리포팅 태스크 시작 + asyncio.create_task(self._report_stats()) + + except Exception as e: + logger.error(f"Failed to start Advanced Event Consumer: {e}") + raise + + async def stop(self): + """컨슈머 종료""" + await self.consumer.stop() + + if self.dlq_producer: + await self.dlq_producer.stop() + + if self.redis: + await self.redis.close() + + logger.info("Advanced Event Consumer stopped") + + def _register_event_handlers(self): + """이벤트 핸들러 등록""" + # 각 이벤트 타입에 대한 핸들러를 래퍼로 감싸서 등록 + self.consumer.register_handler( + EventType.USER_CREATED, + self._create_handler_with_retry( + self.handlers.handle_user_created, + EventType.USER_CREATED + ) + ) + + self.consumer.register_handler( + EventType.USER_UPDATED, + self._create_handler_with_retry( + self.handlers.handle_user_updated, + EventType.USER_UPDATED + ) + ) + + self.consumer.register_handler( + EventType.USER_DELETED, + self._create_handler_with_retry( + self.handlers.handle_user_deleted, + EventType.USER_DELETED + ) + ) + + self.consumer.register_handler( + EventType.OAUTH_APP_CREATED, + self._create_handler_with_retry( + self.handlers.handle_oauth_app_created, + EventType.OAUTH_APP_CREATED + ) + ) + + self.consumer.register_handler( + EventType.OAUTH_TOKEN_ISSUED, + self._create_handler_with_retry( + self.handlers.handle_oauth_token_issued, + EventType.OAUTH_TOKEN_ISSUED + ) + ) + + def _create_handler_with_retry(self, handler_func, event_type: EventType): + """재시도 로직이 포함된 핸들러 래퍼 생성""" + async def wrapper(event: Event): + event_id = f"{event.event_id}:{event.event_type}" + retry_key = f"retry:{event_id}" + + try: + # 재시도 횟수 확인 + retry_count = 0 + if self.redis: + retry_count_str = await self.redis.get(retry_key) + retry_count = int(retry_count_str) if retry_count_str else 0 + + # 핸들러 실행 + await handler_func(event.dict()) + + # 성공 시 재시도 카운터 삭제 + if self.redis and retry_count > 0: + await self.redis.delete(retry_key) + + self.stats["processed"] += 1 + + except Exception as e: + logger.error(f"Error processing {event_type}: {e}") + self.stats["failed"] += 1 + + # 재시도 처리 + retry_policy = self.retry_policies.get(event_type) + if retry_policy and retry_count < retry_policy.max_retries: + await self._handle_retry(event, retry_count, retry_policy, retry_key) + else: + # 최대 재시도 초과 -> DLQ로 전송 + await self._send_to_dlq(event, str(e), retry_count) + + return wrapper + + async def _handle_retry( + self, + event: Event, + retry_count: int, + retry_policy: RetryPolicy, + retry_key: str + ): + """재시도 처리""" + retry_count += 1 + delay = retry_policy.get_delay(retry_count) + + logger.warning( + f"Retrying event {event.event_id} " + f"(attempt {retry_count}/{retry_policy.max_retries}) " + f"after {delay}s" + ) + + # 재시도 카운터 저장 + if self.redis: + await self.redis.setex( + retry_key, + timedelta(hours=24), # 24시간 후 자동 삭제 + retry_count + ) + + # 지연 후 재처리를 위해 다시 큐에 추가 + # 실제 프로덕션에서는 별도의 재시도 토픽 사용 권장 + self.stats["retried"] += 1 + + # 지연 실행 + await asyncio.sleep(delay) + + # 이벤트 재발행 (재시도 토픽으로) + if hasattr(self, 'retry_producer'): + await self._republish_for_retry(event, retry_count) + + async def _send_to_dlq(self, event: Event, error: str, retry_count: int): + """Dead Letter Queue로 전송""" + if not self.enable_dlq or not self.dlq_producer: + logger.error(f"Failed to process event {event.event_id} after {retry_count} retries") + return + + try: + dlq_message = { + "original_event": event.dict(), + "error": error, + "retry_count": retry_count, + "failed_at": datetime.now().isoformat(), + "consumer_group": self.group_id, + "topic": self.topics[0] if self.topics else None + } + + await self.dlq_producer.send( + self.dlq_topic, + value=dlq_message + ) + + self.stats["dlq_sent"] += 1 + + logger.error( + f"Event {event.event_id} sent to DLQ after {retry_count} retries. " + f"Error: {error}" + ) + + # Redis에 DLQ 전송 기록 + if self.redis: + dlq_key = f"dlq:{event.event_id}" + await self.redis.setex( + dlq_key, + timedelta(days=7), # 7일 보관 + json.dumps({ + "error": error, + "retry_count": retry_count, + "sent_at": datetime.now().isoformat() + }) + ) + + except Exception as e: + logger.critical(f"Failed to send event to DLQ: {e}") + + async def _republish_for_retry(self, event: Event, retry_count: int): + """재시도를 위한 이벤트 재발행""" + # 실제 구현에서는 별도의 재시도 토픽 사용 + # 여기서는 로깅만 수행 + logger.info(f"Would republish event {event.event_id} for retry #{retry_count}") + + async def _report_stats(self): + """통계 리포팅 (1분마다)""" + while True: + await asyncio.sleep(60) + + logger.info( + f"Event Consumer Stats - " + f"Processed: {self.stats['processed']}, " + f"Failed: {self.stats['failed']}, " + f"Retried: {self.stats['retried']}, " + f"DLQ: {self.stats['dlq_sent']}" + ) + + # Redis에 통계 저장 + if self.redis: + stats_key = f"consumer:stats:{self.group_id}" + await self.redis.hset( + stats_key, + mapping={ + **self.stats, + "updated_at": datetime.now().isoformat() + } + ) + + async def get_dlq_messages(self, limit: int = 10) -> List[Dict[str, Any]]: + """DLQ 메시지 조회 (관리 목적)""" + if not self.redis: + return [] + + dlq_keys = await self.redis.keys("dlq:*") + messages = [] + + for key in dlq_keys[:limit]: + data = await self.redis.get(key) + if data: + event_id = key.replace("dlq:", "") + message = json.loads(data) + message["event_id"] = event_id + messages.append(message) + + return messages + + async def retry_dlq_message(self, event_id: str) -> bool: + """DLQ 메시지 수동 재시도""" + # 실제 구현에서는 DLQ에서 메시지를 읽어 재처리 + logger.info(f"Manual retry requested for event: {event_id}") + + if self.redis: + # 재시도 카운터 리셋 + retry_key = f"retry:{event_id}:*" + keys = await self.redis.keys(retry_key) + if keys: + await self.redis.delete(*keys) + + return True + + return False \ No newline at end of file diff --git a/console/backend/event_handlers.py b/console/backend/event_handlers.py new file mode 100644 index 0000000..78895ea --- /dev/null +++ b/console/backend/event_handlers.py @@ -0,0 +1,213 @@ +""" +이벤트 핸들러 모듈 +각 이벤트 타입별 처리 로직 구현 +""" +import logging +from typing import Dict, Any, Optional +from datetime import datetime +import json +import asyncio +from redis import asyncio as aioredis + +logger = logging.getLogger(__name__) + +class EventHandlers: + def __init__(self, redis_client: Optional[aioredis.Redis] = None): + self.redis = redis_client + self.retry_counts: Dict[str, int] = {} + + async def handle_user_created(self, event: Dict[str, Any]): + """사용자 생성 이벤트 처리""" + try: + user_id = event.get('data', {}).get('user_id') + username = event.get('data', {}).get('username') + email = event.get('data', {}).get('email') + + logger.info(f"Processing USER_CREATED: {username} ({user_id})") + + # Redis 캐시 무효화 + if self.redis: + await self.redis.delete(f"user:{user_id}") + await self.redis.delete("users:list") + + # 추가 처리 로직 + # - 환영 이메일 발송 준비 + # - 초기 설정 생성 + # - 분석 데이터 기록 + + await self._publish_notification({ + "type": "user.welcome", + "user_id": user_id, + "email": email, + "username": username, + "timestamp": datetime.now().isoformat() + }) + + logger.info(f"Successfully processed USER_CREATED for {username}") + + except Exception as e: + logger.error(f"Error handling USER_CREATED: {e}") + raise + + async def handle_user_updated(self, event: Dict[str, Any]): + """사용자 업데이트 이벤트 처리""" + try: + user_id = event.get('data', {}).get('user_id') + updated_fields = event.get('data', {}).get('updated_fields', []) + + logger.info(f"Processing USER_UPDATED: {user_id}, fields: {updated_fields}") + + # Redis 캐시 무효화 + if self.redis: + await self.redis.delete(f"user:{user_id}") + await self.redis.delete("users:list") + + # 프로필 사진 변경 시 이미지 캐시도 무효화 + if 'profile_picture' in updated_fields: + await self.redis.delete(f"user:profile_picture:{user_id}") + + # 프로필 완성도 계산 + if 'profile_picture' in updated_fields or 'bio' in updated_fields: + await self._calculate_profile_completeness(user_id) + + logger.info(f"Successfully processed USER_UPDATED for {user_id}") + + except Exception as e: + logger.error(f"Error handling USER_UPDATED: {e}") + raise + + async def handle_user_deleted(self, event: Dict[str, Any]): + """사용자 삭제 이벤트 처리""" + try: + user_id = event.get('data', {}).get('user_id') + username = event.get('data', {}).get('username') + + logger.info(f"Processing USER_DELETED: {username} ({user_id})") + + # Redis에서 모든 관련 데이터 삭제 + if self.redis: + # 사용자 캐시 삭제 + await self.redis.delete(f"user:{user_id}") + await self.redis.delete("users:list") + + # 세션 삭제 + session_keys = await self.redis.keys(f"session:*:{user_id}") + if session_keys: + await self.redis.delete(*session_keys) + + # 프로필 이미지 캐시 삭제 + await self.redis.delete(f"user:profile_picture:{user_id}") + + # 관련 데이터 정리 이벤트 발행 + await self._publish_cleanup_event({ + "user_id": user_id, + "username": username, + "timestamp": datetime.now().isoformat() + }) + + logger.info(f"Successfully processed USER_DELETED for {username}") + + except Exception as e: + logger.error(f"Error handling USER_DELETED: {e}") + raise + + async def handle_oauth_app_created(self, event: Dict[str, Any]): + """OAuth 앱 생성 이벤트 처리""" + try: + app_id = event.get('data', {}).get('app_id') + app_name = event.get('data', {}).get('name') + owner_id = event.get('data', {}).get('owner_id') + + logger.info(f"Processing OAUTH_APP_CREATED: {app_name} ({app_id})") + + # 앱 생성 알림 + await self._publish_notification({ + "type": "oauth.app_created", + "app_id": app_id, + "app_name": app_name, + "owner_id": owner_id, + "timestamp": datetime.now().isoformat() + }) + + logger.info(f"Successfully processed OAUTH_APP_CREATED for {app_name}") + + except Exception as e: + logger.error(f"Error handling OAUTH_APP_CREATED: {e}") + raise + + async def handle_oauth_token_issued(self, event: Dict[str, Any]): + """OAuth 토큰 발급 이벤트 처리""" + try: + client_id = event.get('data', {}).get('client_id') + user_id = event.get('data', {}).get('user_id') + scopes = event.get('data', {}).get('scopes', []) + + logger.info(f"Processing OAUTH_TOKEN_ISSUED: client={client_id}, user={user_id}") + + # 보안 감사 로그 + await self._log_security_event({ + "type": "oauth.token_issued", + "client_id": client_id, + "user_id": user_id, + "scopes": scopes, + "timestamp": datetime.now().isoformat() + }) + + # 사용 통계 업데이트 + if self.redis: + await self.redis.hincrby(f"oauth:stats:{client_id}", "tokens_issued", 1) + await self.redis.sadd(f"oauth:users:{client_id}", user_id) + + logger.info(f"Successfully processed OAUTH_TOKEN_ISSUED") + + except Exception as e: + logger.error(f"Error handling OAUTH_TOKEN_ISSUED: {e}") + raise + + async def _publish_notification(self, notification: Dict[str, Any]): + """알림 이벤트 발행""" + # 향후 Notification 서비스로 이벤트 발행 + logger.debug(f"Publishing notification: {notification}") + + if self.redis: + await self.redis.lpush( + "notifications:queue", + json.dumps(notification) + ) + + async def _publish_cleanup_event(self, cleanup_data: Dict[str, Any]): + """정리 이벤트 발행""" + # 향후 각 서비스로 정리 이벤트 발행 + logger.debug(f"Publishing cleanup event: {cleanup_data}") + + if self.redis: + await self.redis.lpush( + "cleanup:queue", + json.dumps(cleanup_data) + ) + + async def _calculate_profile_completeness(self, user_id: str): + """프로필 완성도 계산""" + # 향후 프로필 완성도 계산 로직 + logger.debug(f"Calculating profile completeness for user: {user_id}") + + if self.redis: + # 임시로 Redis에 저장 + await self.redis.hset( + f"user:stats:{user_id}", + "profile_updated_at", + datetime.now().isoformat() + ) + + async def _log_security_event(self, event_data: Dict[str, Any]): + """보안 이벤트 로깅""" + logger.info(f"Security event: {event_data}") + + if self.redis: + await self.redis.lpush( + "security:audit_log", + json.dumps(event_data) + ) + + # 최근 100개만 유지 + await self.redis.ltrim("security:audit_log", 0, 99) \ No newline at end of file diff --git a/console/backend/main.py b/console/backend/main.py index 0f22f9a..2a55a32 100644 --- a/console/backend/main.py +++ b/console/backend/main.py @@ -5,7 +5,10 @@ import uvicorn from datetime import datetime, timedelta import httpx import os +import asyncio +import logging from typing import Any +from contextlib import asynccontextmanager from auth import ( Token, UserLogin, UserInDB, verify_password, get_password_hash, @@ -13,10 +16,51 @@ from auth import ( ACCESS_TOKEN_EXPIRE_MINUTES ) +# Import event consumer +from event_consumer import AdvancedEventConsumer + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global event consumer instance +event_consumer = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + global event_consumer + + try: + # Initialize and start event consumer + event_consumer = AdvancedEventConsumer( + topics=["user-events", "oauth-events"], + group_id="console-consumer-group", + redis_url=os.getenv("REDIS_URL", "redis://redis:6379"), + bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"), + enable_dlq=True, + dlq_topic="dead-letter-queue" + ) + + await event_consumer.start() + logger.info("Event consumer started successfully") + + except Exception as e: + logger.error(f"Failed to start event consumer: {e}") + # Continue without event consumer (degraded mode) + event_consumer = None + + yield + + # Shutdown + if event_consumer: + await event_consumer.stop() + logger.info("Event consumer stopped") + app = FastAPI( title="Console API Gateway", description="Central orchestrator for microservices", - version="0.1.0" + version="0.1.0", + lifespan=lifespan ) # Service URLs from environment @@ -45,6 +89,66 @@ async def health_check(): return { "status": "healthy", "service": "console", + "timestamp": datetime.now().isoformat(), + "event_consumer": "running" if event_consumer else "not running" + } + +# Event Management Endpoints +@app.get("/api/events/stats") +async def get_event_stats(current_user = Depends(get_current_user)): + """Get event consumer statistics""" + if not event_consumer: + raise HTTPException(status_code=503, detail="Event consumer not available") + + return { + "stats": event_consumer.stats, + "timestamp": datetime.now().isoformat() + } + +@app.get("/api/events/dlq") +async def get_dlq_messages( + limit: int = 10, + current_user = Depends(get_current_user) +): + """Get messages from Dead Letter Queue""" + if not event_consumer: + raise HTTPException(status_code=503, detail="Event consumer not available") + + messages = await event_consumer.get_dlq_messages(limit=limit) + return { + "messages": messages, + "count": len(messages), + "timestamp": datetime.now().isoformat() + } + +@app.post("/api/events/dlq/{event_id}/retry") +async def retry_dlq_message( + event_id: str, + current_user = Depends(get_current_user) +): + """Manually retry a message from DLQ""" + if not event_consumer: + raise HTTPException(status_code=503, detail="Event consumer not available") + + success = await event_consumer.retry_dlq_message(event_id) + if not success: + raise HTTPException(status_code=404, detail="Event not found in DLQ") + + return { + "status": "retry_initiated", + "event_id": event_id, + "timestamp": datetime.now().isoformat() + } + +@app.get("/api/events/schemas") +async def get_event_schemas(): + """Get all event schemas documentation""" + from shared.kafka.schema_registry import SchemaRegistry + + schemas = SchemaRegistry.get_all_schemas() + return { + "schemas": schemas, + "version": "1.0.0", "timestamp": datetime.now().isoformat() } diff --git a/console/backend/requirements.txt b/console/backend/requirements.txt index 2d35a3a..37d8b9b 100644 --- a/console/backend/requirements.txt +++ b/console/backend/requirements.txt @@ -5,4 +5,6 @@ pydantic==2.5.3 httpx==0.26.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 -python-multipart==0.0.6 \ No newline at end of file +python-multipart==0.0.6 +redis==5.0.1 +aiokafka==0.10.0 \ No newline at end of file diff --git a/console/backend/shared/kafka/__init__.py b/console/backend/shared/kafka/__init__.py new file mode 100644 index 0000000..7c78f53 --- /dev/null +++ b/console/backend/shared/kafka/__init__.py @@ -0,0 +1,6 @@ +from .producer import KafkaProducer +from .consumer import KafkaConsumer +from .events import Event, EventType +from .schema_registry import SchemaRegistry + +__all__ = ['KafkaProducer', 'KafkaConsumer', 'Event', 'EventType', 'SchemaRegistry'] \ No newline at end of file diff --git a/console/backend/shared/kafka/consumer.py b/console/backend/shared/kafka/consumer.py new file mode 100644 index 0000000..746e79b --- /dev/null +++ b/console/backend/shared/kafka/consumer.py @@ -0,0 +1,125 @@ +import json +import asyncio +from typing import Optional, Callable, Dict, Any, List +from aiokafka import AIOKafkaConsumer +from aiokafka.errors import KafkaError +import logging + +from .events import Event, EventType + +logger = logging.getLogger(__name__) + +class KafkaConsumer: + def __init__( + self, + topics: List[str], + group_id: str, + bootstrap_servers: str = "kafka:9092" + ): + self.topics = topics + self.group_id = group_id + self.bootstrap_servers = bootstrap_servers + self._consumer: Optional[AIOKafkaConsumer] = None + self._handlers: Dict[EventType, List[Callable]] = {} + self._running = False + + def register_handler(self, event_type: EventType, handler: Callable): + """이벤트 타입별 핸들러 등록""" + if event_type not in self._handlers: + self._handlers[event_type] = [] + self._handlers[event_type].append(handler) + logger.info(f"Registered handler for {event_type}") + + async def start(self): + """Kafka Consumer 시작""" + try: + self._consumer = AIOKafkaConsumer( + *self.topics, + bootstrap_servers=self.bootstrap_servers, + group_id=self.group_id, + value_deserializer=lambda v: json.loads(v.decode()), + auto_offset_reset='earliest', + enable_auto_commit=True, + auto_commit_interval_ms=1000, + session_timeout_ms=30000, + heartbeat_interval_ms=10000 + ) + await self._consumer.start() + self._running = True + logger.info(f"Kafka Consumer started: {self.topics} (group: {self.group_id})") + + # 메시지 처리 루프 시작 + asyncio.create_task(self._consume_messages()) + + except Exception as e: + logger.error(f"Failed to start Kafka Consumer: {e}") + raise + + async def stop(self): + """Kafka Consumer 종료""" + self._running = False + if self._consumer: + await self._consumer.stop() + logger.info("Kafka Consumer stopped") + + async def _consume_messages(self): + """메시지 소비 루프""" + if not self._consumer: + return + + while self._running: + try: + # 메시지 배치로 가져오기 (최대 100ms 대기) + msg_batch = await self._consumer.getmany(timeout_ms=100) + + for tp, messages in msg_batch.items(): + for msg in messages: + await self._process_message(msg.value) + + except KafkaError as e: + logger.error(f"Kafka error: {e}") + await asyncio.sleep(1) + except Exception as e: + logger.error(f"Error processing messages: {e}") + await asyncio.sleep(1) + + async def _process_message(self, message: Dict[str, Any]): + """개별 메시지 처리""" + try: + # Event 객체로 변환 + event = Event(**message) + + # 등록된 핸들러 실행 + handlers = self._handlers.get(event.event_type, []) + + for handler in handlers: + try: + if asyncio.iscoroutinefunction(handler): + await handler(event) + else: + handler(event) + except Exception as e: + logger.error(f"Handler error for {event.event_type}: {e}") + + if not handlers: + logger.debug(f"No handlers for event type: {event.event_type}") + + except Exception as e: + logger.error(f"Failed to process message: {e}") + + async def consume_one(self, timeout: float = 1.0) -> Optional[Event]: + """단일 메시지 소비 (테스트/디버깅용)""" + if not self._consumer: + return None + + try: + msg = await asyncio.wait_for( + self._consumer.getone(), + timeout=timeout + ) + return Event(**msg.value) + except asyncio.TimeoutError: + return None + except Exception as e: + logger.error(f"Error consuming message: {e}") + return None \ No newline at end of file diff --git a/console/backend/shared/kafka/events.py b/console/backend/shared/kafka/events.py new file mode 100644 index 0000000..2121a2f --- /dev/null +++ b/console/backend/shared/kafka/events.py @@ -0,0 +1,31 @@ +from enum import Enum +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Any, Optional, Dict + +class EventType(str, Enum): + USER_CREATED = "user.created" + USER_UPDATED = "user.updated" + USER_DELETED = "user.deleted" + USER_LOGIN = "user.login" + + IMAGE_UPLOADED = "image.uploaded" + IMAGE_CACHED = "image.cached" + IMAGE_DELETED = "image.deleted" + + TASK_CREATED = "task.created" + TASK_COMPLETED = "task.completed" + TASK_FAILED = "task.failed" + +class Event(BaseModel): + event_type: EventType + timestamp: datetime = Field(default_factory=datetime.now) + service: str + data: Dict[str, Any] + correlation_id: Optional[str] = None + user_id: Optional[str] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } \ No newline at end of file diff --git a/console/backend/shared/kafka/producer.py b/console/backend/shared/kafka/producer.py new file mode 100644 index 0000000..0a33ba0 --- /dev/null +++ b/console/backend/shared/kafka/producer.py @@ -0,0 +1,101 @@ +import json +import asyncio +from typing import Optional, Dict, Any +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError +import logging + +from .events import Event + +logger = logging.getLogger(__name__) + +class KafkaProducer: + def __init__(self, bootstrap_servers: str = "kafka:9092"): + self.bootstrap_servers = bootstrap_servers + self._producer: Optional[AIOKafkaProducer] = None + + async def start(self): + """Kafka Producer 시작""" + try: + self._producer = AIOKafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode(), + compression_type="gzip", + acks='all', + retry_backoff_ms=100 + ) + await self._producer.start() + logger.info(f"Kafka Producer started: {self.bootstrap_servers}") + except Exception as e: + logger.error(f"Failed to start Kafka Producer: {e}") + raise + + async def stop(self): + """Kafka Producer 종료""" + if self._producer: + await self._producer.stop() + logger.info("Kafka Producer stopped") + + async def send_event(self, topic: str, event: Event) -> bool: + """이벤트 전송""" + if not self._producer: + logger.error("Producer not started") + return False + + try: + event_dict = event.dict() + event_dict['timestamp'] = event.timestamp.isoformat() + + await self._producer.send_and_wait( + topic, + value=event_dict, + key=event.correlation_id.encode() if event.correlation_id else None + ) + + logger.info(f"Event sent to {topic}: {event.event_type}") + return True + + except KafkaError as e: + logger.error(f"Failed to send event to {topic}: {e}") + return False + except Exception as e: + logger.error(f"Unexpected error sending event: {e}") + return False + + async def send_batch(self, topic: str, events: list[Event]) -> int: + """여러 이벤트를 배치로 전송""" + if not self._producer: + logger.error("Producer not started") + return 0 + + sent_count = 0 + batch = self._producer.create_batch() + + for event in events: + event_dict = event.dict() + event_dict['timestamp'] = event.timestamp.isoformat() + + metadata = batch.append( + key=event.correlation_id.encode() if event.correlation_id else None, + value=json.dumps(event_dict).encode(), + timestamp=None + ) + + if metadata is None: + # 배치가 가득 찼으면 전송하고 새 배치 생성 + await self._producer.send_batch(batch, topic) + sent_count += len(batch) + batch = self._producer.create_batch() + batch.append( + key=event.correlation_id.encode() if event.correlation_id else None, + value=json.dumps(event_dict).encode(), + timestamp=None + ) + + # 남은 배치 전송 + if batch: + await self._producer.send_batch(batch, topic) + sent_count += len(batch) + + logger.info(f"Sent {sent_count} events to {topic}") + return sent_count \ No newline at end of file diff --git a/console/backend/shared/kafka/schema_registry.py b/console/backend/shared/kafka/schema_registry.py new file mode 100644 index 0000000..676306d --- /dev/null +++ b/console/backend/shared/kafka/schema_registry.py @@ -0,0 +1,333 @@ +""" +이벤트 스키마 레지스트리 +이벤트 스키마 정의 및 버전 관리 +""" +from typing import Dict, Any, Optional, List, Literal +from enum import Enum +from pydantic import BaseModel, Field, field_validator +from datetime import datetime +import json + +class SchemaVersion(str, Enum): + V1 = "1.0.0" + V2 = "2.0.0" + +class EventSchemaBase(BaseModel): + """이벤트 스키마 베이스""" + event_id: str = Field(..., description="고유 이벤트 ID") + event_type: str = Field(..., description="이벤트 타입") + timestamp: datetime = Field(default_factory=datetime.now, description="이벤트 발생 시간") + version: str = Field(default=SchemaVersion.V1, description="스키마 버전") + service: str = Field(..., description="이벤트 발생 서비스") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +# User Events Schemas +class UserCreatedSchema(EventSchemaBase): + """사용자 생성 이벤트 스키마""" + event_type: Literal["USER_CREATED"] = "USER_CREATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id', 'username', 'email'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +class UserUpdatedSchema(EventSchemaBase): + """사용자 업데이트 이벤트 스키마""" + event_type: Literal["USER_UPDATED"] = "USER_UPDATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id'] + optional_fields = ['username', 'email', 'full_name', 'profile_picture', + 'bio', 'location', 'website', 'updated_fields'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + # updated_fields가 있으면 검증 + if 'updated_fields' in v and not isinstance(v['updated_fields'], list): + raise ValueError("updated_fields must be a list") + + return v + +class UserDeletedSchema(EventSchemaBase): + """사용자 삭제 이벤트 스키마""" + event_type: Literal["USER_DELETED"] = "USER_DELETED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id', 'username'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +# OAuth Events Schemas +class OAuthAppCreatedSchema(EventSchemaBase): + """OAuth 앱 생성 이벤트 스키마""" + event_type: Literal["OAUTH_APP_CREATED"] = "OAUTH_APP_CREATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['app_id', 'name', 'owner_id', 'client_id'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +class OAuthTokenIssuedSchema(EventSchemaBase): + """OAuth 토큰 발급 이벤트 스키마""" + event_type: Literal["OAUTH_TOKEN_ISSUED"] = "OAUTH_TOKEN_ISSUED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['client_id', 'grant_type'] + optional_fields = ['user_id', 'scopes', 'expires_in'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + # scopes가 있으면 리스트여야 함 + if 'scopes' in v and not isinstance(v['scopes'], list): + raise ValueError("scopes must be a list") + + return v + +class OAuthTokenRevokedSchema(EventSchemaBase): + """OAuth 토큰 폐기 이벤트 스키마""" + event_type: Literal["OAUTH_TOKEN_REVOKED"] = "OAUTH_TOKEN_REVOKED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['token_id', 'client_id'] + optional_fields = ['user_id', 'revoked_by'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +# Image Events Schemas +class ImageUploadedSchema(EventSchemaBase): + """이미지 업로드 이벤트 스키마""" + event_type: Literal["IMAGE_UPLOADED"] = "IMAGE_UPLOADED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['image_id', 'user_id', 'url'] + optional_fields = ['size', 'mime_type', 'width', 'height', 'thumbnail_url'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +class ImageProcessedSchema(EventSchemaBase): + """이미지 처리 완료 이벤트 스키마""" + event_type: Literal["IMAGE_PROCESSED"] = "IMAGE_PROCESSED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['image_id', 'process_type'] + optional_fields = ['original_url', 'processed_url', 'processing_time_ms'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +class SchemaRegistry: + """스키마 레지스트리""" + + # 스키마 매핑 + SCHEMAS = { + "USER_CREATED": UserCreatedSchema, + "USER_UPDATED": UserUpdatedSchema, + "USER_DELETED": UserDeletedSchema, + "OAUTH_APP_CREATED": OAuthAppCreatedSchema, + "OAUTH_TOKEN_ISSUED": OAuthTokenIssuedSchema, + "OAUTH_TOKEN_REVOKED": OAuthTokenRevokedSchema, + "IMAGE_UPLOADED": ImageUploadedSchema, + "IMAGE_PROCESSED": ImageProcessedSchema, + } + + # 스키마 버전 호환성 매트릭스 + COMPATIBILITY_MATRIX = { + SchemaVersion.V1: [SchemaVersion.V1], + SchemaVersion.V2: [SchemaVersion.V1, SchemaVersion.V2], # V2는 V1과 호환 + } + + @classmethod + def get_schema(cls, event_type: str) -> Optional[type]: + """이벤트 타입에 대한 스키마 반환""" + return cls.SCHEMAS.get(event_type) + + @classmethod + def validate_event(cls, event_data: Dict[str, Any]) -> tuple[bool, Optional[str]]: + """이벤트 데이터 검증""" + try: + event_type = event_data.get('event_type') + if not event_type: + return False, "Missing event_type" + + schema_class = cls.get_schema(event_type) + if not schema_class: + return False, f"Unknown event type: {event_type}" + + # 스키마 검증 + schema_class(**event_data) + return True, None + + except Exception as e: + return False, str(e) + + @classmethod + def is_compatible(cls, from_version: str, to_version: str) -> bool: + """버전 호환성 확인""" + from_v = SchemaVersion(from_version) + to_v = SchemaVersion(to_version) + + compatible_versions = cls.COMPATIBILITY_MATRIX.get(to_v, []) + return from_v in compatible_versions + + @classmethod + def migrate_event( + cls, + event_data: Dict[str, Any], + from_version: str, + to_version: str + ) -> Dict[str, Any]: + """이벤트 데이터 마이그레이션""" + if from_version == to_version: + return event_data + + if not cls.is_compatible(from_version, to_version): + raise ValueError(f"Cannot migrate from {from_version} to {to_version}") + + # 버전별 마이그레이션 로직 + if from_version == SchemaVersion.V1 and to_version == SchemaVersion.V2: + # V1 -> V2 마이그레이션 예시 + event_data['version'] = SchemaVersion.V2 + + # 새로운 필드 추가 (기본값) + if 'metadata' not in event_data: + event_data['metadata'] = {} + + return event_data + + @classmethod + def get_all_schemas(cls) -> Dict[str, Dict[str, Any]]: + """모든 스키마 정보 반환 (문서화용)""" + schemas_info = {} + + for event_type, schema_class in cls.SCHEMAS.items(): + schemas_info[event_type] = { + "description": schema_class.__doc__, + "fields": schema_class.schema(), + "version": SchemaVersion.V1, + "example": cls._generate_example(schema_class) + } + + return schemas_info + + @classmethod + def _generate_example(cls, schema_class: type) -> Dict[str, Any]: + """스키마 예시 생성""" + examples = { + "USER_CREATED": { + "event_id": "evt_123456", + "event_type": "USER_CREATED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "users", + "data": { + "user_id": "usr_abc123", + "username": "johndoe", + "email": "john@example.com" + } + }, + "USER_UPDATED": { + "event_id": "evt_123457", + "event_type": "USER_UPDATED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "users", + "data": { + "user_id": "usr_abc123", + "updated_fields": ["profile_picture", "bio"], + "profile_picture": "https://example.com/pic.jpg", + "bio": "Updated bio" + } + }, + "OAUTH_TOKEN_ISSUED": { + "event_id": "evt_123458", + "event_type": "OAUTH_TOKEN_ISSUED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "oauth", + "data": { + "client_id": "app_xyz789", + "user_id": "usr_abc123", + "grant_type": "authorization_code", + "scopes": ["profile", "email"], + "expires_in": 3600 + } + } + } + + return examples.get(schema_class.__fields__['event_type'].default, {}) + + @classmethod + def export_schemas(cls, format: str = "json") -> str: + """스키마 내보내기""" + schemas = cls.get_all_schemas() + + if format == "json": + return json.dumps(schemas, indent=2, default=str) + elif format == "markdown": + return cls._export_as_markdown(schemas) + else: + raise ValueError(f"Unsupported format: {format}") + + @classmethod + def _export_as_markdown(cls, schemas: Dict[str, Dict[str, Any]]) -> str: + """마크다운 형식으로 내보내기""" + md = "# Event Schema Registry\n\n" + + for event_type, info in schemas.items(): + md += f"## {event_type}\n\n" + md += f"{info['description']}\n\n" + md += f"**Version:** {info['version']}\n\n" + md += "**Example:**\n```json\n" + md += json.dumps(info['example'], indent=2, default=str) + md += "\n```\n\n" + + return md \ No newline at end of file diff --git a/shared/kafka/__init__.py b/shared/kafka/__init__.py index 7d7db74..7c78f53 100644 --- a/shared/kafka/__init__.py +++ b/shared/kafka/__init__.py @@ -1,5 +1,6 @@ from .producer import KafkaProducer from .consumer import KafkaConsumer from .events import Event, EventType +from .schema_registry import SchemaRegistry -__all__ = ['KafkaProducer', 'KafkaConsumer', 'Event', 'EventType'] \ No newline at end of file +__all__ = ['KafkaProducer', 'KafkaConsumer', 'Event', 'EventType', 'SchemaRegistry'] \ No newline at end of file diff --git a/shared/kafka/schema_registry.py b/shared/kafka/schema_registry.py new file mode 100644 index 0000000..676306d --- /dev/null +++ b/shared/kafka/schema_registry.py @@ -0,0 +1,333 @@ +""" +이벤트 스키마 레지스트리 +이벤트 스키마 정의 및 버전 관리 +""" +from typing import Dict, Any, Optional, List, Literal +from enum import Enum +from pydantic import BaseModel, Field, field_validator +from datetime import datetime +import json + +class SchemaVersion(str, Enum): + V1 = "1.0.0" + V2 = "2.0.0" + +class EventSchemaBase(BaseModel): + """이벤트 스키마 베이스""" + event_id: str = Field(..., description="고유 이벤트 ID") + event_type: str = Field(..., description="이벤트 타입") + timestamp: datetime = Field(default_factory=datetime.now, description="이벤트 발생 시간") + version: str = Field(default=SchemaVersion.V1, description="스키마 버전") + service: str = Field(..., description="이벤트 발생 서비스") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +# User Events Schemas +class UserCreatedSchema(EventSchemaBase): + """사용자 생성 이벤트 스키마""" + event_type: Literal["USER_CREATED"] = "USER_CREATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id', 'username', 'email'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +class UserUpdatedSchema(EventSchemaBase): + """사용자 업데이트 이벤트 스키마""" + event_type: Literal["USER_UPDATED"] = "USER_UPDATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id'] + optional_fields = ['username', 'email', 'full_name', 'profile_picture', + 'bio', 'location', 'website', 'updated_fields'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + # updated_fields가 있으면 검증 + if 'updated_fields' in v and not isinstance(v['updated_fields'], list): + raise ValueError("updated_fields must be a list") + + return v + +class UserDeletedSchema(EventSchemaBase): + """사용자 삭제 이벤트 스키마""" + event_type: Literal["USER_DELETED"] = "USER_DELETED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['user_id', 'username'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +# OAuth Events Schemas +class OAuthAppCreatedSchema(EventSchemaBase): + """OAuth 앱 생성 이벤트 스키마""" + event_type: Literal["OAUTH_APP_CREATED"] = "OAUTH_APP_CREATED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['app_id', 'name', 'owner_id', 'client_id'] + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + return v + +class OAuthTokenIssuedSchema(EventSchemaBase): + """OAuth 토큰 발급 이벤트 스키마""" + event_type: Literal["OAUTH_TOKEN_ISSUED"] = "OAUTH_TOKEN_ISSUED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['client_id', 'grant_type'] + optional_fields = ['user_id', 'scopes', 'expires_in'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + # scopes가 있으면 리스트여야 함 + if 'scopes' in v and not isinstance(v['scopes'], list): + raise ValueError("scopes must be a list") + + return v + +class OAuthTokenRevokedSchema(EventSchemaBase): + """OAuth 토큰 폐기 이벤트 스키마""" + event_type: Literal["OAUTH_TOKEN_REVOKED"] = "OAUTH_TOKEN_REVOKED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['token_id', 'client_id'] + optional_fields = ['user_id', 'revoked_by'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +# Image Events Schemas +class ImageUploadedSchema(EventSchemaBase): + """이미지 업로드 이벤트 스키마""" + event_type: Literal["IMAGE_UPLOADED"] = "IMAGE_UPLOADED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['image_id', 'user_id', 'url'] + optional_fields = ['size', 'mime_type', 'width', 'height', 'thumbnail_url'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +class ImageProcessedSchema(EventSchemaBase): + """이미지 처리 완료 이벤트 스키마""" + event_type: Literal["IMAGE_PROCESSED"] = "IMAGE_PROCESSED" + data: Dict[str, Any] = Field(..., description="이벤트 데이터") + + @field_validator('data') + @classmethod + def validate_data(cls, v): + required_fields = ['image_id', 'process_type'] + optional_fields = ['original_url', 'processed_url', 'processing_time_ms'] + + for field in required_fields: + if field not in v: + raise ValueError(f"Missing required field: {field}") + + return v + +class SchemaRegistry: + """스키마 레지스트리""" + + # 스키마 매핑 + SCHEMAS = { + "USER_CREATED": UserCreatedSchema, + "USER_UPDATED": UserUpdatedSchema, + "USER_DELETED": UserDeletedSchema, + "OAUTH_APP_CREATED": OAuthAppCreatedSchema, + "OAUTH_TOKEN_ISSUED": OAuthTokenIssuedSchema, + "OAUTH_TOKEN_REVOKED": OAuthTokenRevokedSchema, + "IMAGE_UPLOADED": ImageUploadedSchema, + "IMAGE_PROCESSED": ImageProcessedSchema, + } + + # 스키마 버전 호환성 매트릭스 + COMPATIBILITY_MATRIX = { + SchemaVersion.V1: [SchemaVersion.V1], + SchemaVersion.V2: [SchemaVersion.V1, SchemaVersion.V2], # V2는 V1과 호환 + } + + @classmethod + def get_schema(cls, event_type: str) -> Optional[type]: + """이벤트 타입에 대한 스키마 반환""" + return cls.SCHEMAS.get(event_type) + + @classmethod + def validate_event(cls, event_data: Dict[str, Any]) -> tuple[bool, Optional[str]]: + """이벤트 데이터 검증""" + try: + event_type = event_data.get('event_type') + if not event_type: + return False, "Missing event_type" + + schema_class = cls.get_schema(event_type) + if not schema_class: + return False, f"Unknown event type: {event_type}" + + # 스키마 검증 + schema_class(**event_data) + return True, None + + except Exception as e: + return False, str(e) + + @classmethod + def is_compatible(cls, from_version: str, to_version: str) -> bool: + """버전 호환성 확인""" + from_v = SchemaVersion(from_version) + to_v = SchemaVersion(to_version) + + compatible_versions = cls.COMPATIBILITY_MATRIX.get(to_v, []) + return from_v in compatible_versions + + @classmethod + def migrate_event( + cls, + event_data: Dict[str, Any], + from_version: str, + to_version: str + ) -> Dict[str, Any]: + """이벤트 데이터 마이그레이션""" + if from_version == to_version: + return event_data + + if not cls.is_compatible(from_version, to_version): + raise ValueError(f"Cannot migrate from {from_version} to {to_version}") + + # 버전별 마이그레이션 로직 + if from_version == SchemaVersion.V1 and to_version == SchemaVersion.V2: + # V1 -> V2 마이그레이션 예시 + event_data['version'] = SchemaVersion.V2 + + # 새로운 필드 추가 (기본값) + if 'metadata' not in event_data: + event_data['metadata'] = {} + + return event_data + + @classmethod + def get_all_schemas(cls) -> Dict[str, Dict[str, Any]]: + """모든 스키마 정보 반환 (문서화용)""" + schemas_info = {} + + for event_type, schema_class in cls.SCHEMAS.items(): + schemas_info[event_type] = { + "description": schema_class.__doc__, + "fields": schema_class.schema(), + "version": SchemaVersion.V1, + "example": cls._generate_example(schema_class) + } + + return schemas_info + + @classmethod + def _generate_example(cls, schema_class: type) -> Dict[str, Any]: + """스키마 예시 생성""" + examples = { + "USER_CREATED": { + "event_id": "evt_123456", + "event_type": "USER_CREATED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "users", + "data": { + "user_id": "usr_abc123", + "username": "johndoe", + "email": "john@example.com" + } + }, + "USER_UPDATED": { + "event_id": "evt_123457", + "event_type": "USER_UPDATED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "users", + "data": { + "user_id": "usr_abc123", + "updated_fields": ["profile_picture", "bio"], + "profile_picture": "https://example.com/pic.jpg", + "bio": "Updated bio" + } + }, + "OAUTH_TOKEN_ISSUED": { + "event_id": "evt_123458", + "event_type": "OAUTH_TOKEN_ISSUED", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "service": "oauth", + "data": { + "client_id": "app_xyz789", + "user_id": "usr_abc123", + "grant_type": "authorization_code", + "scopes": ["profile", "email"], + "expires_in": 3600 + } + } + } + + return examples.get(schema_class.__fields__['event_type'].default, {}) + + @classmethod + def export_schemas(cls, format: str = "json") -> str: + """스키마 내보내기""" + schemas = cls.get_all_schemas() + + if format == "json": + return json.dumps(schemas, indent=2, default=str) + elif format == "markdown": + return cls._export_as_markdown(schemas) + else: + raise ValueError(f"Unsupported format: {format}") + + @classmethod + def _export_as_markdown(cls, schemas: Dict[str, Dict[str, Any]]) -> str: + """마크다운 형식으로 내보내기""" + md = "# Event Schema Registry\n\n" + + for event_type, info in schemas.items(): + md += f"## {event_type}\n\n" + md += f"{info['description']}\n\n" + md += f"**Version:** {info['version']}\n\n" + md += "**Example:**\n```json\n" + md += json.dumps(info['example'], indent=2, default=str) + md += "\n```\n\n" + + return md \ No newline at end of file diff --git a/test_event_flow.py b/test_event_flow.py new file mode 100644 index 0000000..797bfa2 --- /dev/null +++ b/test_event_flow.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +""" +Step 9 이벤트 흐름 테스트 스크립트 +""" +import asyncio +import httpx +import json +from datetime import datetime +import time + +# Service URLs +CONSOLE_URL = "http://localhost:8011" +USERS_URL = "http://localhost:8001" +OAUTH_URL = "http://localhost:8003" + +# Test credentials +TEST_USERNAME = "admin" +TEST_PASSWORD = "admin123" + +async def get_auth_token(): + """Console에서 인증 토큰 획득""" + async with httpx.AsyncClient() as client: + response = await client.post( + f"{CONSOLE_URL}/api/auth/login", + data={ + "username": TEST_USERNAME, + "password": TEST_PASSWORD + } + ) + if response.status_code == 200: + token_data = response.json() + return token_data["access_token"] + else: + print(f"Failed to get auth token: {response.status_code}") + return None + +async def test_user_create_event(): + """사용자 생성 이벤트 테스트""" + print("\n=== Testing User Create Event ===") + + # Create a new user + async with httpx.AsyncClient() as client: + user_data = { + "username": f"test_user_{int(time.time())}", + "email": f"test_{int(time.time())}@example.com", + "full_name": "Test User for Event", + "profile_picture": "https://example.com/test.jpg", + "bio": "Testing event system", + "location": "Test City" + } + + response = await client.post( + f"{USERS_URL}/users", + json=user_data + ) + + if response.status_code == 201: + user = response.json() + print(f"✅ User created: {user['username']} (ID: {user['id']})") + + # Wait for event processing + await asyncio.sleep(2) + + return user['id'] + else: + print(f"❌ Failed to create user: {response.status_code}") + print(response.text) + return None + +async def test_user_update_event(user_id: str): + """사용자 업데이트 이벤트 테스트""" + print("\n=== Testing User Update Event ===") + + async with httpx.AsyncClient() as client: + update_data = { + "bio": "Updated bio for event testing", + "profile_picture": "https://example.com/updated.jpg", + "location": "Updated City" + } + + response = await client.put( + f"{USERS_URL}/users/{user_id}", + json=update_data + ) + + if response.status_code == 200: + user = response.json() + print(f"✅ User updated: {user['username']}") + + # Wait for event processing + await asyncio.sleep(2) + + return True + else: + print(f"❌ Failed to update user: {response.status_code}") + return False + +async def test_oauth_app_create_event(): + """OAuth 앱 생성 이벤트 테스트""" + print("\n=== Testing OAuth App Create Event ===") + + async with httpx.AsyncClient() as client: + app_data = { + "name": f"Test App {int(time.time())}", + "description": "Testing event system", + "redirect_uris": ["http://localhost:3000/callback"], + "owner_id": "test_owner_123" + } + + response = await client.post( + f"{OAUTH_URL}/applications", + json=app_data + ) + + if response.status_code in [200, 201]: + app = response.json() + print(f"✅ OAuth app created: {app['name']} (ID: {app['id']})") + + # Wait for event processing + await asyncio.sleep(2) + + return app['client_id'] + else: + print(f"❌ Failed to create OAuth app: {response.status_code}") + print(response.text) + return None + +async def check_event_stats(token: str): + """이벤트 통계 확인""" + print("\n=== Checking Event Statistics ===") + + async with httpx.AsyncClient() as client: + headers = {"Authorization": f"Bearer {token}"} + + # Get event stats + response = await client.get( + f"{CONSOLE_URL}/api/events/stats", + headers=headers + ) + + if response.status_code == 200: + stats = response.json() + print(f"✅ Event Statistics:") + print(f" - Processed: {stats['stats']['processed']}") + print(f" - Failed: {stats['stats']['failed']}") + print(f" - Retried: {stats['stats']['retried']}") + print(f" - DLQ: {stats['stats']['dlq_sent']}") + else: + print(f"❌ Failed to get event stats: {response.status_code}") + +async def check_dlq_messages(token: str): + """DLQ 메시지 확인""" + print("\n=== Checking Dead Letter Queue ===") + + async with httpx.AsyncClient() as client: + headers = {"Authorization": f"Bearer {token}"} + + response = await client.get( + f"{CONSOLE_URL}/api/events/dlq?limit=5", + headers=headers + ) + + if response.status_code == 200: + dlq_data = response.json() + print(f"✅ DLQ Messages: {dlq_data['count']} messages") + + for msg in dlq_data['messages']: + print(f" - Event ID: {msg.get('event_id', 'N/A')}") + print(f" Error: {msg.get('error', 'N/A')}") + print(f" Retry Count: {msg.get('retry_count', 0)}") + else: + print(f"❌ Failed to get DLQ messages: {response.status_code}") + +async def check_event_schemas(): + """이벤트 스키마 확인""" + print("\n=== Checking Event Schemas ===") + + async with httpx.AsyncClient() as client: + response = await client.get(f"{CONSOLE_URL}/api/events/schemas") + + if response.status_code == 200: + schemas_data = response.json() + print(f"✅ Available Event Schemas:") + + for schema_name in schemas_data['schemas'].keys(): + print(f" - {schema_name}") + else: + print(f"❌ Failed to get event schemas: {response.status_code}") + +async def test_user_delete_event(user_id: str): + """사용자 삭제 이벤트 테스트""" + print("\n=== Testing User Delete Event ===") + + async with httpx.AsyncClient() as client: + response = await client.delete(f"{USERS_URL}/users/{user_id}") + + if response.status_code == 200: + print(f"✅ User deleted: {user_id}") + + # Wait for event processing + await asyncio.sleep(2) + + return True + else: + print(f"❌ Failed to delete user: {response.status_code}") + return False + +async def main(): + """메인 테스트 실행""" + print("=" * 50) + print("Step 9: Advanced Event Processing Test") + print("=" * 50) + + # Wait for services to be ready + print("\nWaiting for services to be ready...") + await asyncio.sleep(5) + + # Get auth token + token = await get_auth_token() + if not token: + print("Failed to authenticate. Exiting.") + return + + print(f"✅ Authentication successful") + + # Check event schemas first + await check_event_schemas() + + # Test user events + user_id = await test_user_create_event() + if user_id: + await test_user_update_event(user_id) + + # Check stats after user events + await check_event_stats(token) + + # Delete user + await test_user_delete_event(user_id) + + # Test OAuth events + client_id = await test_oauth_app_create_event() + + # Final statistics + await asyncio.sleep(3) # Wait for all events to process + await check_event_stats(token) + await check_dlq_messages(token) + + print("\n" + "=" * 50) + print("Test completed!") + print("=" * 50) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file