Step 7: Kafka 이벤트 시스템 구현
- Kafka 및 Zookeeper 컨테이너 추가 - 공유 Kafka 라이브러리 생성 (Producer/Consumer) - 이벤트 타입 정의 및 이벤트 모델 구현 - Users 서비스에 이벤트 발행 기능 추가 (USER_CREATED, USER_UPDATED, USER_DELETED) - PROGRESS.md 및 PLAN.md 문서 생성 - aiokafka 통합 완료
This commit is contained in:
176
PLAN.md
Normal file
176
PLAN.md
Normal file
@ -0,0 +1,176 @@
|
||||
# 프로젝트 개발 계획
|
||||
|
||||
## 프로젝트 목표
|
||||
마이크로서비스 아키텍처 기반의 확장 가능한 웹 애플리케이션 구축
|
||||
|
||||
## 아키텍처 원칙
|
||||
1. **Console as API Gateway**: 모든 외부 요청은 Console을 통해 라우팅
|
||||
2. **Docker-Only Development**: 모든 개발과 실행은 Docker 컨테이너 내에서
|
||||
3. **Event-Driven Architecture**: Kafka를 통한 서비스 간 비동기 통신
|
||||
4. **Service Isolation**: 각 서비스는 독립적으로 배포 가능
|
||||
|
||||
## 완료된 단계 (✅)
|
||||
|
||||
### Phase 1: 기반 구축
|
||||
- [x] Step 1: 기본 프로젝트 구조 및 Docker 설정
|
||||
- [x] Step 2: Users 마이크로서비스 구현
|
||||
- [x] Step 3: MongoDB 통합
|
||||
- [x] Step 4: Redis 캐싱 시스템
|
||||
- [x] Step 5: Frontend 스켈레톤 (React + Vite)
|
||||
- [x] Step 6: JWT 인증 시스템
|
||||
- [x] Step 6.5: Images 서비스 통합
|
||||
- [x] Step 7: Kafka 이벤트 시스템
|
||||
|
||||
## 진행 예정 단계
|
||||
|
||||
### Phase 2: 이벤트 기반 시스템 확장
|
||||
#### Step 8: 고급 이벤트 처리
|
||||
- [ ] 이벤트 소비자 구현
|
||||
- Console에서 user-events 토픽 구독
|
||||
- 알림 서비스 이벤트 처리
|
||||
- [ ] Dead Letter Queue 구현
|
||||
- [ ] 이벤트 재시도 메커니즘
|
||||
- [ ] 이벤트 스키마 레지스트리
|
||||
|
||||
#### Step 9: 태스크 큐 시스템
|
||||
- [ ] Kafka 기반 백그라운드 작업 처리
|
||||
- [ ] 이미지 프로세싱 작업 큐
|
||||
- [ ] 이메일 전송 큐
|
||||
- [ ] 배치 작업 스케줄러
|
||||
|
||||
### Phase 3: 고급 기능
|
||||
#### Step 10: 실시간 기능
|
||||
- [ ] WebSocket 통합 (Console)
|
||||
- [ ] 실시간 알림 시스템
|
||||
- [ ] 온라인 사용자 상태 추적
|
||||
- [ ] 실시간 데이터 동기화
|
||||
|
||||
#### Step 11: 파일 시스템
|
||||
- [ ] 파일 업로드 서비스
|
||||
- [ ] S3 호환 객체 스토리지 (MinIO)
|
||||
- [ ] 파일 메타데이터 관리
|
||||
- [ ] 썸네일 생성 서비스
|
||||
|
||||
#### Step 12: 검색 시스템
|
||||
- [ ] Elasticsearch 통합
|
||||
- [ ] 전문 검색 기능
|
||||
- [ ] 자동완성 기능
|
||||
- [ ] 검색 분석 및 최적화
|
||||
|
||||
### Phase 4: 프로덕션 준비
|
||||
#### Step 13: 모니터링 및 로깅
|
||||
- [ ] Prometheus 메트릭 수집
|
||||
- [ ] Grafana 대시보드
|
||||
- [ ] ELK Stack 로깅
|
||||
- [ ] 분산 추적 (Jaeger)
|
||||
|
||||
#### Step 14: 보안 강화
|
||||
- [ ] Rate Limiting
|
||||
- [ ] API Key 관리
|
||||
- [ ] OAuth2 통합
|
||||
- [ ] 데이터 암호화
|
||||
|
||||
#### Step 15: 테스트 및 CI/CD
|
||||
- [ ] 단위 테스트 작성
|
||||
- [ ] 통합 테스트
|
||||
- [ ] E2E 테스트
|
||||
- [ ] GitHub Actions CI/CD
|
||||
|
||||
#### Step 16: 성능 최적화
|
||||
- [ ] 데이터베이스 인덱싱
|
||||
- [ ] 쿼리 최적화
|
||||
- [ ] 캐싱 전략 개선
|
||||
- [ ] CDN 통합
|
||||
|
||||
## 서비스 구성
|
||||
|
||||
### 현재 서비스
|
||||
1. **Console** (API Gateway)
|
||||
- Frontend: React SPA
|
||||
- Backend: FastAPI, JWT 인증
|
||||
|
||||
2. **Users Service**
|
||||
- User CRUD
|
||||
- MongoDB 저장소
|
||||
- Kafka 이벤트 발행
|
||||
|
||||
3. **Images Service**
|
||||
- 이미지 프록시
|
||||
- 캐싱 시스템
|
||||
- WebP 변환
|
||||
|
||||
### 계획된 서비스
|
||||
4. **Notification Service**
|
||||
- 이메일/SMS 전송
|
||||
- 푸시 알림
|
||||
- 알림 히스토리
|
||||
|
||||
5. **Analytics Service**
|
||||
- 사용자 행동 분석
|
||||
- 비즈니스 메트릭
|
||||
- 리포트 생성
|
||||
|
||||
6. **Payment Service**
|
||||
- 결제 처리
|
||||
- 구독 관리
|
||||
- 청구서 생성
|
||||
|
||||
## 기술 스택 로드맵
|
||||
|
||||
### 현재 사용 중
|
||||
- FastAPI, React, TypeScript
|
||||
- MongoDB, Redis
|
||||
- Apache Kafka
|
||||
- Docker, Docker Compose
|
||||
|
||||
### 도입 예정
|
||||
- Elasticsearch (검색)
|
||||
- MinIO (객체 스토리지)
|
||||
- Prometheus/Grafana (모니터링)
|
||||
- Jaeger (분산 추적)
|
||||
- Nginx (리버스 프록시)
|
||||
|
||||
## 개발 일정
|
||||
|
||||
### 2025 Q1
|
||||
- Phase 2 완료 (이벤트 시스템)
|
||||
- Phase 3 시작 (고급 기능)
|
||||
|
||||
### 2025 Q2
|
||||
- Phase 3 완료
|
||||
- Phase 4 시작 (프로덕션 준비)
|
||||
|
||||
### 2025 Q3
|
||||
- Phase 4 완료
|
||||
- 프로덕션 배포
|
||||
|
||||
## 성공 지표
|
||||
|
||||
1. **기술적 지표**
|
||||
- 서비스 응답 시간 < 200ms
|
||||
- 시스템 가용성 > 99.9%
|
||||
- 초당 처리 가능 요청 > 1000
|
||||
|
||||
2. **개발 지표**
|
||||
- 테스트 커버리지 > 80%
|
||||
- 빌드 시간 < 5분
|
||||
- 배포 시간 < 10분
|
||||
|
||||
3. **확장성 지표**
|
||||
- 수평 확장 가능
|
||||
- 서비스 독립 배포
|
||||
- 무중단 업데이트
|
||||
|
||||
## 리스크 및 대응 방안
|
||||
|
||||
1. **복잡도 증가**
|
||||
- 대응: 점진적 구현, 문서화 강화
|
||||
|
||||
2. **성능 병목**
|
||||
- 대응: 프로파일링, 캐싱 전략
|
||||
|
||||
3. **데이터 일관성**
|
||||
- 대응: 이벤트 소싱, SAGA 패턴
|
||||
|
||||
4. **보안 취약점**
|
||||
- 대응: 정기 보안 감사, 자동화된 스캔
|
||||
116
PROGRESS.md
Normal file
116
PROGRESS.md
Normal file
@ -0,0 +1,116 @@
|
||||
# 프로젝트 진행 상황
|
||||
|
||||
## 완료된 단계
|
||||
|
||||
### Step 1: 기본 프로젝트 구조 생성 ✅
|
||||
- Docker Compose 설정
|
||||
- Console 서비스 (API Gateway) 기본 구현
|
||||
- 프로젝트 문서 (CLAUDE.md, PLAN.md) 작성
|
||||
|
||||
### Step 2: Users 마이크로서비스 구현 ✅
|
||||
- Users 서비스 CRUD API
|
||||
- MongoDB 연동 (Beanie ODM)
|
||||
- 서비스 간 통신 설정
|
||||
|
||||
### Step 3: MongoDB 통합 ✅
|
||||
- MongoDB 컨테이너 설정
|
||||
- Beanie ODM 설정
|
||||
- Users 모델 및 데이터베이스 연결
|
||||
- PyMongo 버전 호환성 문제 해결
|
||||
|
||||
### Step 4: Redis 통합 ✅
|
||||
- Redis 컨테이너 설정
|
||||
- 캐싱 시스템 준비
|
||||
- 향후 세션 관리 및 캐싱 구현 예정
|
||||
|
||||
### Step 5: Frontend 스켈레톤 ✅
|
||||
- React + Vite + TypeScript 설정
|
||||
- Material-UI 통합
|
||||
- Console Frontend 기본 구조
|
||||
- npm ci → npm install 문제 해결
|
||||
|
||||
### Step 6: 환경 변수 및 인증 ✅
|
||||
- .env 파일 설정
|
||||
- JWT 인증 시스템 구현
|
||||
- Console이 인증 처리 담당
|
||||
- 포트 충돌 해결 (8000 → 8011)
|
||||
|
||||
### Step 6.5: Images 서비스 통합 ✅
|
||||
- site00의 image-service 마이그레이션
|
||||
- 프록시 및 캐싱 기능 유지
|
||||
- WebP 변환 기능 포함
|
||||
- Console에서 Images 서비스로 라우팅
|
||||
|
||||
### Step 7: Kafka 이벤트 시스템 ✅
|
||||
- Kafka 및 Zookeeper 컨테이너 추가
|
||||
- 공유 Kafka 라이브러리 생성 (Producer/Consumer)
|
||||
- 이벤트 타입 정의 (USER_CREATED, USER_UPDATED, USER_DELETED 등)
|
||||
- Users 서비스에 이벤트 발행 기능 추가
|
||||
- aiokafka 통합
|
||||
|
||||
## 현재 실행 중인 서비스
|
||||
|
||||
- **Console Frontend**: http://localhost:3000
|
||||
- **Console Backend**: http://localhost:8011
|
||||
- **Users Service**: Internal (Console 통해 접근)
|
||||
- **Images Service**: http://localhost:8002
|
||||
- **MongoDB**: localhost:27017
|
||||
- **Redis**: localhost:6379
|
||||
- **Kafka**: localhost:9092
|
||||
- **Zookeeper**: localhost:2181
|
||||
|
||||
## 다음 단계 (예정)
|
||||
|
||||
### Step 8: 고급 이벤트 처리
|
||||
- 이벤트 소비자 구현
|
||||
- 이벤트 기반 워크플로우
|
||||
- 에러 처리 및 재시도 로직
|
||||
|
||||
### Step 9: 고급 기능
|
||||
- 실시간 알림 (WebSocket)
|
||||
- 파일 업로드 시스템
|
||||
- 검색 기능 (Elasticsearch)
|
||||
|
||||
### Step 10: 프로덕션 준비
|
||||
- 로깅 시스템 (ELK Stack)
|
||||
- 모니터링 (Prometheus/Grafana)
|
||||
- CI/CD 파이프라인
|
||||
- 테스트 자동화
|
||||
|
||||
## 기술 스택
|
||||
|
||||
- **Backend**: FastAPI (Python)
|
||||
- **Frontend**: React + TypeScript + Vite + Material-UI
|
||||
- **Database**: MongoDB
|
||||
- **Cache**: Redis
|
||||
- **Message Queue**: Apache Kafka
|
||||
- **Container**: Docker & Docker Compose
|
||||
- **Authentication**: JWT
|
||||
|
||||
## 주요 환경 변수
|
||||
|
||||
```env
|
||||
COMPOSE_PROJECT_NAME=site11
|
||||
CONSOLE_BACKEND_PORT=8011
|
||||
JWT_SECRET_KEY=your-secret-key-change-in-production-12345
|
||||
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
|
||||
```
|
||||
|
||||
## 문제 해결 기록
|
||||
|
||||
1. **PyMongo 호환성**: motor와 pymongo 버전 충돌 → pymongo==4.6.1로 고정
|
||||
2. **npm ci 실패**: package-lock.json 부재 → npm install로 변경
|
||||
3. **포트 충돌**: 8000 포트 사용 중 → Console을 8011로 변경
|
||||
4. **WebP 변환 문제**: 검정색 이미지 출력 → convert_to_webp 임시 비활성화
|
||||
|
||||
## 컨텍스트 복구 정보
|
||||
|
||||
Claude가 재시작되면 이 정보를 참조:
|
||||
|
||||
- 작업 디렉토리: `/Users/jungwoochoi/Desktop/prototype/site11`
|
||||
- Git 저장소: 각 단계마다 커밋됨
|
||||
- Docker 개발 원칙: 모든 개발은 Docker 내에서 진행
|
||||
- 문서 형식: 모든 마크다운 파일은 대문자 (CLAUDE.md, PLAN.md, PROGRESS.md)
|
||||
- Console 서비스가 중앙 API Gateway 역할
|
||||
- Kafka를 메인 이벤트 시스템으로 사용
|
||||
- Redis는 캐싱 전용
|
||||
@ -46,13 +46,17 @@ services:
|
||||
- PORT=8000
|
||||
- MONGODB_URL=${MONGODB_URL}
|
||||
- DB_NAME=${USERS_DB_NAME}
|
||||
- KAFKA_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS}
|
||||
- KAFKA_GROUP_ID=${KAFKA_GROUP_ID}
|
||||
volumes:
|
||||
- ./services/users/backend:/app
|
||||
- ./shared:/app/shared
|
||||
networks:
|
||||
- site11_network
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- mongodb
|
||||
- kafka
|
||||
|
||||
images-backend:
|
||||
build:
|
||||
@ -113,6 +117,52 @@ services:
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:7.5.0
|
||||
container_name: ${COMPOSE_PROJECT_NAME}_zookeeper
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_TICK_TIME: 2000
|
||||
ports:
|
||||
- "${KAFKA_ZOOKEEPER_PORT}:2181"
|
||||
volumes:
|
||||
- zookeeper_data:/var/lib/zookeeper/data
|
||||
- zookeeper_logs:/var/lib/zookeeper/log
|
||||
networks:
|
||||
- site11_network
|
||||
restart: unless-stopped
|
||||
|
||||
kafka:
|
||||
image: confluentinc/cp-kafka:7.5.0
|
||||
container_name: ${COMPOSE_PROJECT_NAME}_kafka
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- "${KAFKA_PORT}:9092"
|
||||
- "9101:9101"
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||
KAFKA_JMX_PORT: 9101
|
||||
KAFKA_JMX_HOSTNAME: localhost
|
||||
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
|
||||
volumes:
|
||||
- kafka_data:/var/lib/kafka/data
|
||||
networks:
|
||||
- site11_network
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
networks:
|
||||
site11_network:
|
||||
driver: bridge
|
||||
@ -122,4 +172,7 @@ volumes:
|
||||
mongodb_data:
|
||||
mongodb_config:
|
||||
redis_data:
|
||||
images_cache:
|
||||
images_cache:
|
||||
zookeeper_data:
|
||||
zookeeper_logs:
|
||||
kafka_data:
|
||||
@ -4,11 +4,20 @@ from pydantic import BaseModel
|
||||
from typing import List, Optional
|
||||
from datetime import datetime
|
||||
import uvicorn
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from database import init_db
|
||||
from models import User
|
||||
from beanie import PydanticObjectId
|
||||
|
||||
sys.path.append('/app')
|
||||
from shared.kafka import KafkaProducer, Event, EventType
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Pydantic models for requests
|
||||
class UserCreate(BaseModel):
|
||||
@ -30,13 +39,32 @@ class UserResponse(BaseModel):
|
||||
updated_at: datetime
|
||||
|
||||
|
||||
# Global Kafka producer
|
||||
kafka_producer: Optional[KafkaProducer] = None
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# Startup
|
||||
global kafka_producer
|
||||
|
||||
await init_db()
|
||||
|
||||
# Initialize Kafka producer
|
||||
try:
|
||||
kafka_producer = KafkaProducer(
|
||||
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092')
|
||||
)
|
||||
await kafka_producer.start()
|
||||
logger.info("Kafka producer initialized")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize Kafka producer: {e}")
|
||||
kafka_producer = None
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
pass
|
||||
if kafka_producer:
|
||||
await kafka_producer.stop()
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
@ -110,6 +138,20 @@ async def create_user(user_data: UserCreate):
|
||||
|
||||
await user.create()
|
||||
|
||||
# Publish event
|
||||
if kafka_producer:
|
||||
event = Event(
|
||||
event_type=EventType.USER_CREATED,
|
||||
service="users",
|
||||
data={
|
||||
"user_id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email
|
||||
},
|
||||
user_id=str(user.id)
|
||||
)
|
||||
await kafka_producer.send_event("user-events", event)
|
||||
|
||||
return UserResponse(
|
||||
id=str(user.id),
|
||||
username=user.username,
|
||||
@ -147,6 +189,21 @@ async def update_user(user_id: str, user_update: UserUpdate):
|
||||
user.updated_at = datetime.now()
|
||||
await user.save()
|
||||
|
||||
# Publish event
|
||||
if kafka_producer:
|
||||
event = Event(
|
||||
event_type=EventType.USER_UPDATED,
|
||||
service="users",
|
||||
data={
|
||||
"user_id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"updated_fields": list(user_update.dict(exclude_unset=True).keys())
|
||||
},
|
||||
user_id=str(user.id)
|
||||
)
|
||||
await kafka_producer.send_event("user-events", event)
|
||||
|
||||
return UserResponse(
|
||||
id=str(user.id),
|
||||
username=user.username,
|
||||
@ -162,7 +219,25 @@ async def delete_user(user_id: str):
|
||||
user = await User.get(PydanticObjectId(user_id))
|
||||
if not user:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
user_id_str = str(user.id)
|
||||
username = user.username
|
||||
|
||||
await user.delete()
|
||||
|
||||
# Publish event
|
||||
if kafka_producer:
|
||||
event = Event(
|
||||
event_type=EventType.USER_DELETED,
|
||||
service="users",
|
||||
data={
|
||||
"user_id": user_id_str,
|
||||
"username": username
|
||||
},
|
||||
user_id=user_id_str
|
||||
)
|
||||
await kafka_producer.send_event("user-events", event)
|
||||
|
||||
return {"message": "User deleted successfully"}
|
||||
except Exception:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
@ -3,4 +3,5 @@ uvicorn[standard]==0.27.0
|
||||
pydantic[email]==2.5.3
|
||||
pymongo==4.6.1
|
||||
motor==3.3.2
|
||||
beanie==1.23.6
|
||||
beanie==1.23.6
|
||||
aiokafka==0.10.0
|
||||
5
shared/kafka/__init__.py
Normal file
5
shared/kafka/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
from .producer import KafkaProducer
|
||||
from .consumer import KafkaConsumer
|
||||
from .events import Event, EventType
|
||||
|
||||
__all__ = ['KafkaProducer', 'KafkaConsumer', 'Event', 'EventType']
|
||||
125
shared/kafka/consumer.py
Normal file
125
shared/kafka/consumer.py
Normal file
@ -0,0 +1,125 @@
|
||||
import json
|
||||
import asyncio
|
||||
from typing import Optional, Callable, Dict, Any, List
|
||||
from aiokafka import AIOKafkaConsumer
|
||||
from aiokafka.errors import KafkaError
|
||||
import logging
|
||||
|
||||
from .events import Event, EventType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class KafkaConsumer:
|
||||
def __init__(
|
||||
self,
|
||||
topics: List[str],
|
||||
group_id: str,
|
||||
bootstrap_servers: str = "kafka:9092"
|
||||
):
|
||||
self.topics = topics
|
||||
self.group_id = group_id
|
||||
self.bootstrap_servers = bootstrap_servers
|
||||
self._consumer: Optional[AIOKafkaConsumer] = None
|
||||
self._handlers: Dict[EventType, List[Callable]] = {}
|
||||
self._running = False
|
||||
|
||||
def register_handler(self, event_type: EventType, handler: Callable):
|
||||
"""이벤트 타입별 핸들러 등록"""
|
||||
if event_type not in self._handlers:
|
||||
self._handlers[event_type] = []
|
||||
self._handlers[event_type].append(handler)
|
||||
logger.info(f"Registered handler for {event_type}")
|
||||
|
||||
async def start(self):
|
||||
"""Kafka Consumer 시작"""
|
||||
try:
|
||||
self._consumer = AIOKafkaConsumer(
|
||||
*self.topics,
|
||||
bootstrap_servers=self.bootstrap_servers,
|
||||
group_id=self.group_id,
|
||||
value_deserializer=lambda v: json.loads(v.decode()),
|
||||
auto_offset_reset='earliest',
|
||||
enable_auto_commit=True,
|
||||
auto_commit_interval_ms=1000,
|
||||
session_timeout_ms=30000,
|
||||
heartbeat_interval_ms=10000
|
||||
)
|
||||
await self._consumer.start()
|
||||
self._running = True
|
||||
logger.info(f"Kafka Consumer started: {self.topics} (group: {self.group_id})")
|
||||
|
||||
# 메시지 처리 루프 시작
|
||||
asyncio.create_task(self._consume_messages())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start Kafka Consumer: {e}")
|
||||
raise
|
||||
|
||||
async def stop(self):
|
||||
"""Kafka Consumer 종료"""
|
||||
self._running = False
|
||||
if self._consumer:
|
||||
await self._consumer.stop()
|
||||
logger.info("Kafka Consumer stopped")
|
||||
|
||||
async def _consume_messages(self):
|
||||
"""메시지 소비 루프"""
|
||||
if not self._consumer:
|
||||
return
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# 메시지 배치로 가져오기 (최대 100ms 대기)
|
||||
msg_batch = await self._consumer.getmany(timeout_ms=100)
|
||||
|
||||
for tp, messages in msg_batch.items():
|
||||
for msg in messages:
|
||||
await self._process_message(msg.value)
|
||||
|
||||
except KafkaError as e:
|
||||
logger.error(f"Kafka error: {e}")
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing messages: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _process_message(self, message: Dict[str, Any]):
|
||||
"""개별 메시지 처리"""
|
||||
try:
|
||||
# Event 객체로 변환
|
||||
event = Event(**message)
|
||||
|
||||
# 등록된 핸들러 실행
|
||||
handlers = self._handlers.get(event.event_type, [])
|
||||
|
||||
for handler in handlers:
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(handler):
|
||||
await handler(event)
|
||||
else:
|
||||
handler(event)
|
||||
except Exception as e:
|
||||
logger.error(f"Handler error for {event.event_type}: {e}")
|
||||
|
||||
if not handlers:
|
||||
logger.debug(f"No handlers for event type: {event.event_type}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process message: {e}")
|
||||
|
||||
async def consume_one(self, timeout: float = 1.0) -> Optional[Event]:
|
||||
"""단일 메시지 소비 (테스트/디버깅용)"""
|
||||
if not self._consumer:
|
||||
return None
|
||||
|
||||
try:
|
||||
msg = await asyncio.wait_for(
|
||||
self._consumer.getone(),
|
||||
timeout=timeout
|
||||
)
|
||||
return Event(**msg.value)
|
||||
except asyncio.TimeoutError:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error consuming message: {e}")
|
||||
return None
|
||||
31
shared/kafka/events.py
Normal file
31
shared/kafka/events.py
Normal file
@ -0,0 +1,31 @@
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Dict
|
||||
|
||||
class EventType(str, Enum):
|
||||
USER_CREATED = "user.created"
|
||||
USER_UPDATED = "user.updated"
|
||||
USER_DELETED = "user.deleted"
|
||||
USER_LOGIN = "user.login"
|
||||
|
||||
IMAGE_UPLOADED = "image.uploaded"
|
||||
IMAGE_CACHED = "image.cached"
|
||||
IMAGE_DELETED = "image.deleted"
|
||||
|
||||
TASK_CREATED = "task.created"
|
||||
TASK_COMPLETED = "task.completed"
|
||||
TASK_FAILED = "task.failed"
|
||||
|
||||
class Event(BaseModel):
|
||||
event_type: EventType
|
||||
timestamp: datetime = Field(default_factory=datetime.now)
|
||||
service: str
|
||||
data: Dict[str, Any]
|
||||
correlation_id: Optional[str] = None
|
||||
user_id: Optional[str] = None
|
||||
|
||||
class Config:
|
||||
json_encoders = {
|
||||
datetime: lambda v: v.isoformat()
|
||||
}
|
||||
102
shared/kafka/producer.py
Normal file
102
shared/kafka/producer.py
Normal file
@ -0,0 +1,102 @@
|
||||
import json
|
||||
import asyncio
|
||||
from typing import Optional, Dict, Any
|
||||
from aiokafka import AIOKafkaProducer
|
||||
from aiokafka.errors import KafkaError
|
||||
import logging
|
||||
|
||||
from .events import Event
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class KafkaProducer:
|
||||
def __init__(self, bootstrap_servers: str = "kafka:9092"):
|
||||
self.bootstrap_servers = bootstrap_servers
|
||||
self._producer: Optional[AIOKafkaProducer] = None
|
||||
|
||||
async def start(self):
|
||||
"""Kafka Producer 시작"""
|
||||
try:
|
||||
self._producer = AIOKafkaProducer(
|
||||
bootstrap_servers=self.bootstrap_servers,
|
||||
value_serializer=lambda v: json.dumps(v).encode(),
|
||||
compression_type="gzip",
|
||||
acks='all',
|
||||
retry_backoff_ms=100,
|
||||
max_in_flight_requests_per_connection=5
|
||||
)
|
||||
await self._producer.start()
|
||||
logger.info(f"Kafka Producer started: {self.bootstrap_servers}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start Kafka Producer: {e}")
|
||||
raise
|
||||
|
||||
async def stop(self):
|
||||
"""Kafka Producer 종료"""
|
||||
if self._producer:
|
||||
await self._producer.stop()
|
||||
logger.info("Kafka Producer stopped")
|
||||
|
||||
async def send_event(self, topic: str, event: Event) -> bool:
|
||||
"""이벤트 전송"""
|
||||
if not self._producer:
|
||||
logger.error("Producer not started")
|
||||
return False
|
||||
|
||||
try:
|
||||
event_dict = event.dict()
|
||||
event_dict['timestamp'] = event.timestamp.isoformat()
|
||||
|
||||
await self._producer.send_and_wait(
|
||||
topic,
|
||||
value=event_dict,
|
||||
key=event.correlation_id.encode() if event.correlation_id else None
|
||||
)
|
||||
|
||||
logger.info(f"Event sent to {topic}: {event.event_type}")
|
||||
return True
|
||||
|
||||
except KafkaError as e:
|
||||
logger.error(f"Failed to send event to {topic}: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error sending event: {e}")
|
||||
return False
|
||||
|
||||
async def send_batch(self, topic: str, events: list[Event]) -> int:
|
||||
"""여러 이벤트를 배치로 전송"""
|
||||
if not self._producer:
|
||||
logger.error("Producer not started")
|
||||
return 0
|
||||
|
||||
sent_count = 0
|
||||
batch = self._producer.create_batch()
|
||||
|
||||
for event in events:
|
||||
event_dict = event.dict()
|
||||
event_dict['timestamp'] = event.timestamp.isoformat()
|
||||
|
||||
metadata = batch.append(
|
||||
key=event.correlation_id.encode() if event.correlation_id else None,
|
||||
value=json.dumps(event_dict).encode(),
|
||||
timestamp=None
|
||||
)
|
||||
|
||||
if metadata is None:
|
||||
# 배치가 가득 찼으면 전송하고 새 배치 생성
|
||||
await self._producer.send_batch(batch, topic)
|
||||
sent_count += len(batch)
|
||||
batch = self._producer.create_batch()
|
||||
batch.append(
|
||||
key=event.correlation_id.encode() if event.correlation_id else None,
|
||||
value=json.dumps(event_dict).encode(),
|
||||
timestamp=None
|
||||
)
|
||||
|
||||
# 남은 배치 전송
|
||||
if batch:
|
||||
await self._producer.send_batch(batch, topic)
|
||||
sent_count += len(batch)
|
||||
|
||||
logger.info(f"Sent {sent_count} events to {topic}")
|
||||
return sent_count
|
||||
Reference in New Issue
Block a user