import json import asyncio from typing import Optional, Callable, Dict, Any, List from aiokafka import AIOKafkaConsumer from aiokafka.errors import KafkaError import logging from .events import Event, EventType logger = logging.getLogger(__name__) class KafkaConsumer: def __init__( self, topics: List[str], group_id: str, bootstrap_servers: str = "kafka:9092" ): self.topics = topics self.group_id = group_id self.bootstrap_servers = bootstrap_servers self._consumer: Optional[AIOKafkaConsumer] = None self._handlers: Dict[EventType, List[Callable]] = {} self._running = False def register_handler(self, event_type: EventType, handler: Callable): """이벤트 타입별 핸들러 등록""" if event_type not in self._handlers: self._handlers[event_type] = [] self._handlers[event_type].append(handler) logger.info(f"Registered handler for {event_type}") async def start(self): """Kafka Consumer 시작""" try: self._consumer = AIOKafkaConsumer( *self.topics, bootstrap_servers=self.bootstrap_servers, group_id=self.group_id, value_deserializer=lambda v: json.loads(v.decode()), auto_offset_reset='earliest', enable_auto_commit=True, auto_commit_interval_ms=1000, session_timeout_ms=30000, heartbeat_interval_ms=10000 ) await self._consumer.start() self._running = True logger.info(f"Kafka Consumer started: {self.topics} (group: {self.group_id})") # 메시지 처리 루프 시작 asyncio.create_task(self._consume_messages()) except Exception as e: logger.error(f"Failed to start Kafka Consumer: {e}") raise async def stop(self): """Kafka Consumer 종료""" self._running = False if self._consumer: await self._consumer.stop() logger.info("Kafka Consumer stopped") async def _consume_messages(self): """메시지 소비 루프""" if not self._consumer: return while self._running: try: # 메시지 배치로 가져오기 (최대 100ms 대기) msg_batch = await self._consumer.getmany(timeout_ms=100) for tp, messages in msg_batch.items(): for msg in messages: await self._process_message(msg.value) except KafkaError as e: logger.error(f"Kafka error: {e}") await asyncio.sleep(1) except Exception as e: logger.error(f"Error processing messages: {e}") await asyncio.sleep(1) async def _process_message(self, message: Dict[str, Any]): """개별 메시지 처리""" try: # Event 객체로 변환 event = Event(**message) # 등록된 핸들러 실행 handlers = self._handlers.get(event.event_type, []) for handler in handlers: try: if asyncio.iscoroutinefunction(handler): await handler(event) else: handler(event) except Exception as e: logger.error(f"Handler error for {event.event_type}: {e}") if not handlers: logger.debug(f"No handlers for event type: {event.event_type}") except Exception as e: logger.error(f"Failed to process message: {e}") async def consume_one(self, timeout: float = 1.0) -> Optional[Event]: """단일 메시지 소비 (테스트/디버깅용)""" if not self._consumer: return None try: msg = await asyncio.wait_for( self._consumer.getone(), timeout=timeout ) return Event(**msg.value) except asyncio.TimeoutError: return None except Exception as e: logger.error(f"Error consuming message: {e}") return None