""" 고급 이벤트 컨슈머 with DLQ and Retry """ import asyncio import json import logging from typing import Dict, Any, Optional, List from datetime import datetime, timedelta from redis import asyncio as aioredis from aiokafka import AIOKafkaProducer import sys sys.path.append('/app') from shared.kafka import KafkaConsumer, Event, EventType from event_handlers import EventHandlers logger = logging.getLogger(__name__) class RetryPolicy: """재시도 정책""" def __init__( self, max_retries: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0 ): self.max_retries = max_retries self.initial_delay = initial_delay self.max_delay = max_delay self.exponential_base = exponential_base def get_delay(self, retry_count: int) -> float: """재시도 지연 시간 계산 (exponential backoff)""" delay = self.initial_delay * (self.exponential_base ** retry_count) return min(delay, self.max_delay) class AdvancedEventConsumer: def __init__( self, topics: List[str], group_id: str, redis_url: str = "redis://redis:6379", bootstrap_servers: str = "kafka:9092", enable_dlq: bool = True, dlq_topic: str = "dead-letter-queue" ): self.topics = topics self.group_id = group_id self.bootstrap_servers = bootstrap_servers self.enable_dlq = enable_dlq self.dlq_topic = dlq_topic # Kafka Consumer self.consumer = KafkaConsumer( topics=topics, group_id=group_id, bootstrap_servers=bootstrap_servers ) # DLQ Producer self.dlq_producer: Optional[AIOKafkaProducer] = None # Redis for retry tracking self.redis: Optional[aioredis.Redis] = None self.redis_url = redis_url # Event handlers self.handlers: Optional[EventHandlers] = None # Retry policies per event type self.retry_policies = { EventType.USER_CREATED: RetryPolicy(max_retries=3), EventType.USER_UPDATED: RetryPolicy(max_retries=2), EventType.USER_DELETED: RetryPolicy(max_retries=5), # 중요한 이벤트 EventType.OAUTH_APP_CREATED: RetryPolicy(max_retries=3), EventType.OAUTH_TOKEN_ISSUED: RetryPolicy(max_retries=1), } # Processing statistics self.stats = { "processed": 0, "failed": 0, "retried": 0, "dlq_sent": 0 } async def start(self): """컨슈머 시작""" try: # Redis 연결 self.redis = await aioredis.from_url( self.redis_url, encoding="utf-8", decode_responses=True ) # Event handlers 초기화 self.handlers = EventHandlers(redis_client=self.redis) # DLQ Producer 초기화 if self.enable_dlq: self.dlq_producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode() ) await self.dlq_producer.start() logger.info(f"DLQ Producer started for topic: {self.dlq_topic}") # 이벤트 핸들러 등록 self._register_event_handlers() # Kafka Consumer 시작 await self.consumer.start() logger.info(f"Advanced Event Consumer started: {self.topics}") # 통계 리포팅 태스크 시작 asyncio.create_task(self._report_stats()) except Exception as e: logger.error(f"Failed to start Advanced Event Consumer: {e}") raise async def stop(self): """컨슈머 종료""" await self.consumer.stop() if self.dlq_producer: await self.dlq_producer.stop() if self.redis: await self.redis.close() logger.info("Advanced Event Consumer stopped") def _register_event_handlers(self): """이벤트 핸들러 등록""" # 각 이벤트 타입에 대한 핸들러를 래퍼로 감싸서 등록 self.consumer.register_handler( EventType.USER_CREATED, self._create_handler_with_retry( self.handlers.handle_user_created, EventType.USER_CREATED ) ) self.consumer.register_handler( EventType.USER_UPDATED, self._create_handler_with_retry( self.handlers.handle_user_updated, EventType.USER_UPDATED ) ) self.consumer.register_handler( EventType.USER_DELETED, self._create_handler_with_retry( self.handlers.handle_user_deleted, EventType.USER_DELETED ) ) self.consumer.register_handler( EventType.OAUTH_APP_CREATED, self._create_handler_with_retry( self.handlers.handle_oauth_app_created, EventType.OAUTH_APP_CREATED ) ) self.consumer.register_handler( EventType.OAUTH_TOKEN_ISSUED, self._create_handler_with_retry( self.handlers.handle_oauth_token_issued, EventType.OAUTH_TOKEN_ISSUED ) ) def _create_handler_with_retry(self, handler_func, event_type: EventType): """재시도 로직이 포함된 핸들러 래퍼 생성""" async def wrapper(event: Event): event_id = f"{event.event_id}:{event.event_type}" retry_key = f"retry:{event_id}" try: # 재시도 횟수 확인 retry_count = 0 if self.redis: retry_count_str = await self.redis.get(retry_key) retry_count = int(retry_count_str) if retry_count_str else 0 # 핸들러 실행 await handler_func(event.dict()) # 성공 시 재시도 카운터 삭제 if self.redis and retry_count > 0: await self.redis.delete(retry_key) self.stats["processed"] += 1 except Exception as e: logger.error(f"Error processing {event_type}: {e}") self.stats["failed"] += 1 # 재시도 처리 retry_policy = self.retry_policies.get(event_type) if retry_policy and retry_count < retry_policy.max_retries: await self._handle_retry(event, retry_count, retry_policy, retry_key) else: # 최대 재시도 초과 -> DLQ로 전송 await self._send_to_dlq(event, str(e), retry_count) return wrapper async def _handle_retry( self, event: Event, retry_count: int, retry_policy: RetryPolicy, retry_key: str ): """재시도 처리""" retry_count += 1 delay = retry_policy.get_delay(retry_count) logger.warning( f"Retrying event {event.event_id} " f"(attempt {retry_count}/{retry_policy.max_retries}) " f"after {delay}s" ) # 재시도 카운터 저장 if self.redis: await self.redis.setex( retry_key, timedelta(hours=24), # 24시간 후 자동 삭제 retry_count ) # 지연 후 재처리를 위해 다시 큐에 추가 # 실제 프로덕션에서는 별도의 재시도 토픽 사용 권장 self.stats["retried"] += 1 # 지연 실행 await asyncio.sleep(delay) # 이벤트 재발행 (재시도 토픽으로) if hasattr(self, 'retry_producer'): await self._republish_for_retry(event, retry_count) async def _send_to_dlq(self, event: Event, error: str, retry_count: int): """Dead Letter Queue로 전송""" if not self.enable_dlq or not self.dlq_producer: logger.error(f"Failed to process event {event.event_id} after {retry_count} retries") return try: dlq_message = { "original_event": event.dict(), "error": error, "retry_count": retry_count, "failed_at": datetime.now().isoformat(), "consumer_group": self.group_id, "topic": self.topics[0] if self.topics else None } await self.dlq_producer.send( self.dlq_topic, value=dlq_message ) self.stats["dlq_sent"] += 1 logger.error( f"Event {event.event_id} sent to DLQ after {retry_count} retries. " f"Error: {error}" ) # Redis에 DLQ 전송 기록 if self.redis: dlq_key = f"dlq:{event.event_id}" await self.redis.setex( dlq_key, timedelta(days=7), # 7일 보관 json.dumps({ "error": error, "retry_count": retry_count, "sent_at": datetime.now().isoformat() }) ) except Exception as e: logger.critical(f"Failed to send event to DLQ: {e}") async def _republish_for_retry(self, event: Event, retry_count: int): """재시도를 위한 이벤트 재발행""" # 실제 구현에서는 별도의 재시도 토픽 사용 # 여기서는 로깅만 수행 logger.info(f"Would republish event {event.event_id} for retry #{retry_count}") async def _report_stats(self): """통계 리포팅 (1분마다)""" while True: await asyncio.sleep(60) logger.info( f"Event Consumer Stats - " f"Processed: {self.stats['processed']}, " f"Failed: {self.stats['failed']}, " f"Retried: {self.stats['retried']}, " f"DLQ: {self.stats['dlq_sent']}" ) # Redis에 통계 저장 if self.redis: stats_key = f"consumer:stats:{self.group_id}" await self.redis.hset( stats_key, mapping={ **self.stats, "updated_at": datetime.now().isoformat() } ) async def get_dlq_messages(self, limit: int = 10) -> List[Dict[str, Any]]: """DLQ 메시지 조회 (관리 목적)""" if not self.redis: return [] dlq_keys = await self.redis.keys("dlq:*") messages = [] for key in dlq_keys[:limit]: data = await self.redis.get(key) if data: event_id = key.replace("dlq:", "") message = json.loads(data) message["event_id"] = event_id messages.append(message) return messages async def retry_dlq_message(self, event_id: str) -> bool: """DLQ 메시지 수동 재시도""" # 실제 구현에서는 DLQ에서 메시지를 읽어 재처리 logger.info(f"Manual retry requested for event: {event_id}") if self.redis: # 재시도 카운터 리셋 retry_key = f"retry:{event_id}:*" keys = await self.redis.keys(retry_key) if keys: await self.redis.delete(*keys) return True return False