diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..382a955 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,176 @@ +# 프로젝트 개발 계획 + +## 프로젝트 목표 +마이크로서비스 아키텍처 기반의 확장 가능한 웹 애플리케이션 구축 + +## 아키텍처 원칙 +1. **Console as API Gateway**: 모든 외부 요청은 Console을 통해 라우팅 +2. **Docker-Only Development**: 모든 개발과 실행은 Docker 컨테이너 내에서 +3. **Event-Driven Architecture**: Kafka를 통한 서비스 간 비동기 통신 +4. **Service Isolation**: 각 서비스는 독립적으로 배포 가능 + +## 완료된 단계 (✅) + +### Phase 1: 기반 구축 +- [x] Step 1: 기본 프로젝트 구조 및 Docker 설정 +- [x] Step 2: Users 마이크로서비스 구현 +- [x] Step 3: MongoDB 통합 +- [x] Step 4: Redis 캐싱 시스템 +- [x] Step 5: Frontend 스켈레톤 (React + Vite) +- [x] Step 6: JWT 인증 시스템 +- [x] Step 6.5: Images 서비스 통합 +- [x] Step 7: Kafka 이벤트 시스템 + +## 진행 예정 단계 + +### Phase 2: 이벤트 기반 시스템 확장 +#### Step 8: 고급 이벤트 처리 +- [ ] 이벤트 소비자 구현 + - Console에서 user-events 토픽 구독 + - 알림 서비스 이벤트 처리 +- [ ] Dead Letter Queue 구현 +- [ ] 이벤트 재시도 메커니즘 +- [ ] 이벤트 스키마 레지스트리 + +#### Step 9: 태스크 큐 시스템 +- [ ] Kafka 기반 백그라운드 작업 처리 +- [ ] 이미지 프로세싱 작업 큐 +- [ ] 이메일 전송 큐 +- [ ] 배치 작업 스케줄러 + +### Phase 3: 고급 기능 +#### Step 10: 실시간 기능 +- [ ] WebSocket 통합 (Console) +- [ ] 실시간 알림 시스템 +- [ ] 온라인 사용자 상태 추적 +- [ ] 실시간 데이터 동기화 + +#### Step 11: 파일 시스템 +- [ ] 파일 업로드 서비스 +- [ ] S3 호환 객체 스토리지 (MinIO) +- [ ] 파일 메타데이터 관리 +- [ ] 썸네일 생성 서비스 + +#### Step 12: 검색 시스템 +- [ ] Elasticsearch 통합 +- [ ] 전문 검색 기능 +- [ ] 자동완성 기능 +- [ ] 검색 분석 및 최적화 + +### Phase 4: 프로덕션 준비 +#### Step 13: 모니터링 및 로깅 +- [ ] Prometheus 메트릭 수집 +- [ ] Grafana 대시보드 +- [ ] ELK Stack 로깅 +- [ ] 분산 추적 (Jaeger) + +#### Step 14: 보안 강화 +- [ ] Rate Limiting +- [ ] API Key 관리 +- [ ] OAuth2 통합 +- [ ] 데이터 암호화 + +#### Step 15: 테스트 및 CI/CD +- [ ] 단위 테스트 작성 +- [ ] 통합 테스트 +- [ ] E2E 테스트 +- [ ] GitHub Actions CI/CD + +#### Step 16: 성능 최적화 +- [ ] 데이터베이스 인덱싱 +- [ ] 쿼리 최적화 +- [ ] 캐싱 전략 개선 +- [ ] CDN 통합 + +## 서비스 구성 + +### 현재 서비스 +1. **Console** (API Gateway) + - Frontend: React SPA + - Backend: FastAPI, JWT 인증 + +2. **Users Service** + - User CRUD + - MongoDB 저장소 + - Kafka 이벤트 발행 + +3. **Images Service** + - 이미지 프록시 + - 캐싱 시스템 + - WebP 변환 + +### 계획된 서비스 +4. **Notification Service** + - 이메일/SMS 전송 + - 푸시 알림 + - 알림 히스토리 + +5. **Analytics Service** + - 사용자 행동 분석 + - 비즈니스 메트릭 + - 리포트 생성 + +6. **Payment Service** + - 결제 처리 + - 구독 관리 + - 청구서 생성 + +## 기술 스택 로드맵 + +### 현재 사용 중 +- FastAPI, React, TypeScript +- MongoDB, Redis +- Apache Kafka +- Docker, Docker Compose + +### 도입 예정 +- Elasticsearch (검색) +- MinIO (객체 스토리지) +- Prometheus/Grafana (모니터링) +- Jaeger (분산 추적) +- Nginx (리버스 프록시) + +## 개발 일정 + +### 2025 Q1 +- Phase 2 완료 (이벤트 시스템) +- Phase 3 시작 (고급 기능) + +### 2025 Q2 +- Phase 3 완료 +- Phase 4 시작 (프로덕션 준비) + +### 2025 Q3 +- Phase 4 완료 +- 프로덕션 배포 + +## 성공 지표 + +1. **기술적 지표** + - 서비스 응답 시간 < 200ms + - 시스템 가용성 > 99.9% + - 초당 처리 가능 요청 > 1000 + +2. **개발 지표** + - 테스트 커버리지 > 80% + - 빌드 시간 < 5분 + - 배포 시간 < 10분 + +3. **확장성 지표** + - 수평 확장 가능 + - 서비스 독립 배포 + - 무중단 업데이트 + +## 리스크 및 대응 방안 + +1. **복잡도 증가** + - 대응: 점진적 구현, 문서화 강화 + +2. **성능 병목** + - 대응: 프로파일링, 캐싱 전략 + +3. **데이터 일관성** + - 대응: 이벤트 소싱, SAGA 패턴 + +4. **보안 취약점** + - 대응: 정기 보안 감사, 자동화된 스캔 \ No newline at end of file diff --git a/PROGRESS.md b/PROGRESS.md new file mode 100644 index 0000000..66c693b --- /dev/null +++ b/PROGRESS.md @@ -0,0 +1,116 @@ +# 프로젝트 진행 상황 + +## 완료된 단계 + +### Step 1: 기본 프로젝트 구조 생성 ✅ +- Docker Compose 설정 +- Console 서비스 (API Gateway) 기본 구현 +- 프로젝트 문서 (CLAUDE.md, PLAN.md) 작성 + +### Step 2: Users 마이크로서비스 구현 ✅ +- Users 서비스 CRUD API +- MongoDB 연동 (Beanie ODM) +- 서비스 간 통신 설정 + +### Step 3: MongoDB 통합 ✅ +- MongoDB 컨테이너 설정 +- Beanie ODM 설정 +- Users 모델 및 데이터베이스 연결 +- PyMongo 버전 호환성 문제 해결 + +### Step 4: Redis 통합 ✅ +- Redis 컨테이너 설정 +- 캐싱 시스템 준비 +- 향후 세션 관리 및 캐싱 구현 예정 + +### Step 5: Frontend 스켈레톤 ✅ +- React + Vite + TypeScript 설정 +- Material-UI 통합 +- Console Frontend 기본 구조 +- npm ci → npm install 문제 해결 + +### Step 6: 환경 변수 및 인증 ✅ +- .env 파일 설정 +- JWT 인증 시스템 구현 +- Console이 인증 처리 담당 +- 포트 충돌 해결 (8000 → 8011) + +### Step 6.5: Images 서비스 통합 ✅ +- site00의 image-service 마이그레이션 +- 프록시 및 캐싱 기능 유지 +- WebP 변환 기능 포함 +- Console에서 Images 서비스로 라우팅 + +### Step 7: Kafka 이벤트 시스템 ✅ +- Kafka 및 Zookeeper 컨테이너 추가 +- 공유 Kafka 라이브러리 생성 (Producer/Consumer) +- 이벤트 타입 정의 (USER_CREATED, USER_UPDATED, USER_DELETED 등) +- Users 서비스에 이벤트 발행 기능 추가 +- aiokafka 통합 + +## 현재 실행 중인 서비스 + +- **Console Frontend**: http://localhost:3000 +- **Console Backend**: http://localhost:8011 +- **Users Service**: Internal (Console 통해 접근) +- **Images Service**: http://localhost:8002 +- **MongoDB**: localhost:27017 +- **Redis**: localhost:6379 +- **Kafka**: localhost:9092 +- **Zookeeper**: localhost:2181 + +## 다음 단계 (예정) + +### Step 8: 고급 이벤트 처리 +- 이벤트 소비자 구현 +- 이벤트 기반 워크플로우 +- 에러 처리 및 재시도 로직 + +### Step 9: 고급 기능 +- 실시간 알림 (WebSocket) +- 파일 업로드 시스템 +- 검색 기능 (Elasticsearch) + +### Step 10: 프로덕션 준비 +- 로깅 시스템 (ELK Stack) +- 모니터링 (Prometheus/Grafana) +- CI/CD 파이프라인 +- 테스트 자동화 + +## 기술 스택 + +- **Backend**: FastAPI (Python) +- **Frontend**: React + TypeScript + Vite + Material-UI +- **Database**: MongoDB +- **Cache**: Redis +- **Message Queue**: Apache Kafka +- **Container**: Docker & Docker Compose +- **Authentication**: JWT + +## 주요 환경 변수 + +```env +COMPOSE_PROJECT_NAME=site11 +CONSOLE_BACKEND_PORT=8011 +JWT_SECRET_KEY=your-secret-key-change-in-production-12345 +KAFKA_BOOTSTRAP_SERVERS=kafka:9092 +``` + +## 문제 해결 기록 + +1. **PyMongo 호환성**: motor와 pymongo 버전 충돌 → pymongo==4.6.1로 고정 +2. **npm ci 실패**: package-lock.json 부재 → npm install로 변경 +3. **포트 충돌**: 8000 포트 사용 중 → Console을 8011로 변경 +4. **WebP 변환 문제**: 검정색 이미지 출력 → convert_to_webp 임시 비활성화 + +## 컨텍스트 복구 정보 + +Claude가 재시작되면 이 정보를 참조: + +- 작업 디렉토리: `/Users/jungwoochoi/Desktop/prototype/site11` +- Git 저장소: 각 단계마다 커밋됨 +- Docker 개발 원칙: 모든 개발은 Docker 내에서 진행 +- 문서 형식: 모든 마크다운 파일은 대문자 (CLAUDE.md, PLAN.md, PROGRESS.md) +- Console 서비스가 중앙 API Gateway 역할 +- Kafka를 메인 이벤트 시스템으로 사용 +- Redis는 캐싱 전용 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index c595d67..86b275e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,13 +46,17 @@ services: - PORT=8000 - MONGODB_URL=${MONGODB_URL} - DB_NAME=${USERS_DB_NAME} + - KAFKA_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} + - KAFKA_GROUP_ID=${KAFKA_GROUP_ID} volumes: - ./services/users/backend:/app + - ./shared:/app/shared networks: - site11_network restart: unless-stopped depends_on: - mongodb + - kafka images-backend: build: @@ -113,6 +117,52 @@ services: timeout: 5s retries: 5 + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + container_name: ${COMPOSE_PROJECT_NAME}_zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "${KAFKA_ZOOKEEPER_PORT}:2181" + volumes: + - zookeeper_data:/var/lib/zookeeper/data + - zookeeper_logs:/var/lib/zookeeper/log + networks: + - site11_network + restart: unless-stopped + + kafka: + image: confluentinc/cp-kafka:7.5.0 + container_name: ${COMPOSE_PROJECT_NAME}_kafka + depends_on: + - zookeeper + ports: + - "${KAFKA_PORT}:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + volumes: + - kafka_data:/var/lib/kafka/data + networks: + - site11_network + restart: unless-stopped + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 5s + retries: 5 + networks: site11_network: driver: bridge @@ -122,4 +172,7 @@ volumes: mongodb_data: mongodb_config: redis_data: - images_cache: \ No newline at end of file + images_cache: + zookeeper_data: + zookeeper_logs: + kafka_data: \ No newline at end of file diff --git a/services/users/backend/main.py b/services/users/backend/main.py index 2996f08..01472c4 100644 --- a/services/users/backend/main.py +++ b/services/users/backend/main.py @@ -4,11 +4,20 @@ from pydantic import BaseModel from typing import List, Optional from datetime import datetime import uvicorn +import os +import sys +import logging from contextlib import asynccontextmanager from database import init_db from models import User from beanie import PydanticObjectId +sys.path.append('/app') +from shared.kafka import KafkaProducer, Event, EventType + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + # Pydantic models for requests class UserCreate(BaseModel): @@ -30,13 +39,32 @@ class UserResponse(BaseModel): updated_at: datetime +# Global Kafka producer +kafka_producer: Optional[KafkaProducer] = None + @asynccontextmanager async def lifespan(app: FastAPI): # Startup + global kafka_producer + await init_db() + + # Initialize Kafka producer + try: + kafka_producer = KafkaProducer( + bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092') + ) + await kafka_producer.start() + logger.info("Kafka producer initialized") + except Exception as e: + logger.warning(f"Failed to initialize Kafka producer: {e}") + kafka_producer = None + yield + # Shutdown - pass + if kafka_producer: + await kafka_producer.stop() app = FastAPI( @@ -110,6 +138,20 @@ async def create_user(user_data: UserCreate): await user.create() + # Publish event + if kafka_producer: + event = Event( + event_type=EventType.USER_CREATED, + service="users", + data={ + "user_id": str(user.id), + "username": user.username, + "email": user.email + }, + user_id=str(user.id) + ) + await kafka_producer.send_event("user-events", event) + return UserResponse( id=str(user.id), username=user.username, @@ -147,6 +189,21 @@ async def update_user(user_id: str, user_update: UserUpdate): user.updated_at = datetime.now() await user.save() + # Publish event + if kafka_producer: + event = Event( + event_type=EventType.USER_UPDATED, + service="users", + data={ + "user_id": str(user.id), + "username": user.username, + "email": user.email, + "updated_fields": list(user_update.dict(exclude_unset=True).keys()) + }, + user_id=str(user.id) + ) + await kafka_producer.send_event("user-events", event) + return UserResponse( id=str(user.id), username=user.username, @@ -162,7 +219,25 @@ async def delete_user(user_id: str): user = await User.get(PydanticObjectId(user_id)) if not user: raise HTTPException(status_code=404, detail="User not found") + + user_id_str = str(user.id) + username = user.username + await user.delete() + + # Publish event + if kafka_producer: + event = Event( + event_type=EventType.USER_DELETED, + service="users", + data={ + "user_id": user_id_str, + "username": username + }, + user_id=user_id_str + ) + await kafka_producer.send_event("user-events", event) + return {"message": "User deleted successfully"} except Exception: raise HTTPException(status_code=404, detail="User not found") diff --git a/services/users/backend/requirements.txt b/services/users/backend/requirements.txt index 5be6a5f..6d4db67 100644 --- a/services/users/backend/requirements.txt +++ b/services/users/backend/requirements.txt @@ -3,4 +3,5 @@ uvicorn[standard]==0.27.0 pydantic[email]==2.5.3 pymongo==4.6.1 motor==3.3.2 -beanie==1.23.6 \ No newline at end of file +beanie==1.23.6 +aiokafka==0.10.0 \ No newline at end of file diff --git a/shared/kafka/__init__.py b/shared/kafka/__init__.py new file mode 100644 index 0000000..7d7db74 --- /dev/null +++ b/shared/kafka/__init__.py @@ -0,0 +1,5 @@ +from .producer import KafkaProducer +from .consumer import KafkaConsumer +from .events import Event, EventType + +__all__ = ['KafkaProducer', 'KafkaConsumer', 'Event', 'EventType'] \ No newline at end of file diff --git a/shared/kafka/consumer.py b/shared/kafka/consumer.py new file mode 100644 index 0000000..746e79b --- /dev/null +++ b/shared/kafka/consumer.py @@ -0,0 +1,125 @@ +import json +import asyncio +from typing import Optional, Callable, Dict, Any, List +from aiokafka import AIOKafkaConsumer +from aiokafka.errors import KafkaError +import logging + +from .events import Event, EventType + +logger = logging.getLogger(__name__) + +class KafkaConsumer: + def __init__( + self, + topics: List[str], + group_id: str, + bootstrap_servers: str = "kafka:9092" + ): + self.topics = topics + self.group_id = group_id + self.bootstrap_servers = bootstrap_servers + self._consumer: Optional[AIOKafkaConsumer] = None + self._handlers: Dict[EventType, List[Callable]] = {} + self._running = False + + def register_handler(self, event_type: EventType, handler: Callable): + """이벤트 타입별 핸들러 등록""" + if event_type not in self._handlers: + self._handlers[event_type] = [] + self._handlers[event_type].append(handler) + logger.info(f"Registered handler for {event_type}") + + async def start(self): + """Kafka Consumer 시작""" + try: + self._consumer = AIOKafkaConsumer( + *self.topics, + bootstrap_servers=self.bootstrap_servers, + group_id=self.group_id, + value_deserializer=lambda v: json.loads(v.decode()), + auto_offset_reset='earliest', + enable_auto_commit=True, + auto_commit_interval_ms=1000, + session_timeout_ms=30000, + heartbeat_interval_ms=10000 + ) + await self._consumer.start() + self._running = True + logger.info(f"Kafka Consumer started: {self.topics} (group: {self.group_id})") + + # 메시지 처리 루프 시작 + asyncio.create_task(self._consume_messages()) + + except Exception as e: + logger.error(f"Failed to start Kafka Consumer: {e}") + raise + + async def stop(self): + """Kafka Consumer 종료""" + self._running = False + if self._consumer: + await self._consumer.stop() + logger.info("Kafka Consumer stopped") + + async def _consume_messages(self): + """메시지 소비 루프""" + if not self._consumer: + return + + while self._running: + try: + # 메시지 배치로 가져오기 (최대 100ms 대기) + msg_batch = await self._consumer.getmany(timeout_ms=100) + + for tp, messages in msg_batch.items(): + for msg in messages: + await self._process_message(msg.value) + + except KafkaError as e: + logger.error(f"Kafka error: {e}") + await asyncio.sleep(1) + except Exception as e: + logger.error(f"Error processing messages: {e}") + await asyncio.sleep(1) + + async def _process_message(self, message: Dict[str, Any]): + """개별 메시지 처리""" + try: + # Event 객체로 변환 + event = Event(**message) + + # 등록된 핸들러 실행 + handlers = self._handlers.get(event.event_type, []) + + for handler in handlers: + try: + if asyncio.iscoroutinefunction(handler): + await handler(event) + else: + handler(event) + except Exception as e: + logger.error(f"Handler error for {event.event_type}: {e}") + + if not handlers: + logger.debug(f"No handlers for event type: {event.event_type}") + + except Exception as e: + logger.error(f"Failed to process message: {e}") + + async def consume_one(self, timeout: float = 1.0) -> Optional[Event]: + """단일 메시지 소비 (테스트/디버깅용)""" + if not self._consumer: + return None + + try: + msg = await asyncio.wait_for( + self._consumer.getone(), + timeout=timeout + ) + return Event(**msg.value) + except asyncio.TimeoutError: + return None + except Exception as e: + logger.error(f"Error consuming message: {e}") + return None \ No newline at end of file diff --git a/shared/kafka/events.py b/shared/kafka/events.py new file mode 100644 index 0000000..2121a2f --- /dev/null +++ b/shared/kafka/events.py @@ -0,0 +1,31 @@ +from enum import Enum +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Any, Optional, Dict + +class EventType(str, Enum): + USER_CREATED = "user.created" + USER_UPDATED = "user.updated" + USER_DELETED = "user.deleted" + USER_LOGIN = "user.login" + + IMAGE_UPLOADED = "image.uploaded" + IMAGE_CACHED = "image.cached" + IMAGE_DELETED = "image.deleted" + + TASK_CREATED = "task.created" + TASK_COMPLETED = "task.completed" + TASK_FAILED = "task.failed" + +class Event(BaseModel): + event_type: EventType + timestamp: datetime = Field(default_factory=datetime.now) + service: str + data: Dict[str, Any] + correlation_id: Optional[str] = None + user_id: Optional[str] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } \ No newline at end of file diff --git a/shared/kafka/producer.py b/shared/kafka/producer.py new file mode 100644 index 0000000..4f525f6 --- /dev/null +++ b/shared/kafka/producer.py @@ -0,0 +1,102 @@ +import json +import asyncio +from typing import Optional, Dict, Any +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError +import logging + +from .events import Event + +logger = logging.getLogger(__name__) + +class KafkaProducer: + def __init__(self, bootstrap_servers: str = "kafka:9092"): + self.bootstrap_servers = bootstrap_servers + self._producer: Optional[AIOKafkaProducer] = None + + async def start(self): + """Kafka Producer 시작""" + try: + self._producer = AIOKafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode(), + compression_type="gzip", + acks='all', + retry_backoff_ms=100, + max_in_flight_requests_per_connection=5 + ) + await self._producer.start() + logger.info(f"Kafka Producer started: {self.bootstrap_servers}") + except Exception as e: + logger.error(f"Failed to start Kafka Producer: {e}") + raise + + async def stop(self): + """Kafka Producer 종료""" + if self._producer: + await self._producer.stop() + logger.info("Kafka Producer stopped") + + async def send_event(self, topic: str, event: Event) -> bool: + """이벤트 전송""" + if not self._producer: + logger.error("Producer not started") + return False + + try: + event_dict = event.dict() + event_dict['timestamp'] = event.timestamp.isoformat() + + await self._producer.send_and_wait( + topic, + value=event_dict, + key=event.correlation_id.encode() if event.correlation_id else None + ) + + logger.info(f"Event sent to {topic}: {event.event_type}") + return True + + except KafkaError as e: + logger.error(f"Failed to send event to {topic}: {e}") + return False + except Exception as e: + logger.error(f"Unexpected error sending event: {e}") + return False + + async def send_batch(self, topic: str, events: list[Event]) -> int: + """여러 이벤트를 배치로 전송""" + if not self._producer: + logger.error("Producer not started") + return 0 + + sent_count = 0 + batch = self._producer.create_batch() + + for event in events: + event_dict = event.dict() + event_dict['timestamp'] = event.timestamp.isoformat() + + metadata = batch.append( + key=event.correlation_id.encode() if event.correlation_id else None, + value=json.dumps(event_dict).encode(), + timestamp=None + ) + + if metadata is None: + # 배치가 가득 찼으면 전송하고 새 배치 생성 + await self._producer.send_batch(batch, topic) + sent_count += len(batch) + batch = self._producer.create_batch() + batch.append( + key=event.correlation_id.encode() if event.correlation_id else None, + value=json.dumps(event_dict).encode(), + timestamp=None + ) + + # 남은 배치 전송 + if batch: + await self._producer.send_batch(batch, topic) + sent_count += len(batch) + + logger.info(f"Sent {sent_count} events to {topic}") + return sent_count \ No newline at end of file