import json import asyncio from typing import Optional, Dict, Any from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError import logging from .events import Event logger = logging.getLogger(__name__) class KafkaProducer: def __init__(self, bootstrap_servers: str = "kafka:9092"): self.bootstrap_servers = bootstrap_servers self._producer: Optional[AIOKafkaProducer] = None async def start(self): """Kafka Producer 시작""" try: self._producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode(), compression_type="gzip", acks='all', retry_backoff_ms=100 ) await self._producer.start() logger.info(f"Kafka Producer started: {self.bootstrap_servers}") except Exception as e: logger.error(f"Failed to start Kafka Producer: {e}") raise async def stop(self): """Kafka Producer 종료""" if self._producer: await self._producer.stop() logger.info("Kafka Producer stopped") async def send_event(self, topic: str, event: Event) -> bool: """이벤트 전송""" if not self._producer: logger.error("Producer not started") return False try: event_dict = event.dict() event_dict['timestamp'] = event.timestamp.isoformat() await self._producer.send_and_wait( topic, value=event_dict, key=event.correlation_id.encode() if event.correlation_id else None ) logger.info(f"Event sent to {topic}: {event.event_type}") return True except KafkaError as e: logger.error(f"Failed to send event to {topic}: {e}") return False except Exception as e: logger.error(f"Unexpected error sending event: {e}") return False async def send_batch(self, topic: str, events: list[Event]) -> int: """여러 이벤트를 배치로 전송""" if not self._producer: logger.error("Producer not started") return 0 sent_count = 0 batch = self._producer.create_batch() for event in events: event_dict = event.dict() event_dict['timestamp'] = event.timestamp.isoformat() metadata = batch.append( key=event.correlation_id.encode() if event.correlation_id else None, value=json.dumps(event_dict).encode(), timestamp=None ) if metadata is None: # 배치가 가득 찼으면 전송하고 새 배치 생성 await self._producer.send_batch(batch, topic) sent_count += len(batch) batch = self._producer.create_batch() batch.append( key=event.correlation_id.encode() if event.correlation_id else None, value=json.dumps(event_dict).encode(), timestamp=None ) # 남은 배치 전송 if batch: await self._producer.send_batch(batch, topic) sent_count += len(batch) logger.info(f"Sent {sent_count} events to {topic}") return sent_count