Files
site11/services/console/backend/shared/kafka/consumer.py
jungwoo choi f4b75b96a5 feat: Phase 1 - Complete authentication system with JWT
Backend Implementation (FastAPI + MongoDB):
- JWT authentication with access/refresh tokens
- User registration and login endpoints
- Password hashing with bcrypt (fixed 72-byte limit)
- Protected endpoints with JWT middleware
- Token refresh mechanism
- Role-Based Access Control (RBAC) structure
- Pydantic v2 models and async MongoDB with Motor
- API endpoints: /api/auth/register, /api/auth/login, /api/auth/me, /api/auth/refresh

Frontend Implementation (React + TypeScript + Material-UI):
- Login and Register pages with validation
- AuthContext for global authentication state
- API client with Axios interceptors for token refresh
- Protected routes with automatic redirect
- User profile display in navigation
- Logout functionality

Technical Achievements:
- Resolved bcrypt 72-byte limit (replaced passlib with native bcrypt)
- Fixed Pydantic v2 compatibility (PyObjectId, ConfigDict)
- Implemented automatic token refresh on 401 errors
- Created comprehensive test suite for all auth endpoints

Docker & Kubernetes:
- Backend image: yakenator/site11-console-backend:latest
- Frontend image: yakenator/site11-console-frontend:latest
- Deployed to site11-pipeline namespace
- Nginx reverse proxy configuration

Documentation:
- CONSOLE_ARCHITECTURE.md - Complete system architecture
- PHASE1_COMPLETION.md - Detailed completion report
- PROGRESS.md - Updated with Phase 1 status

All authentication endpoints tested and verified working.

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-28 16:23:07 +09:00

125 lines
4.3 KiB
Python

import json
import asyncio
from typing import Optional, Callable, Dict, Any, List
from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaError
import logging
from .events import Event, EventType
logger = logging.getLogger(__name__)
class KafkaConsumer:
def __init__(
self,
topics: List[str],
group_id: str,
bootstrap_servers: str = "kafka:9092"
):
self.topics = topics
self.group_id = group_id
self.bootstrap_servers = bootstrap_servers
self._consumer: Optional[AIOKafkaConsumer] = None
self._handlers: Dict[EventType, List[Callable]] = {}
self._running = False
def register_handler(self, event_type: EventType, handler: Callable):
"""이벤트 타입별 핸들러 등록"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
logger.info(f"Registered handler for {event_type}")
async def start(self):
"""Kafka Consumer 시작"""
try:
self._consumer = AIOKafkaConsumer(
*self.topics,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
await self._consumer.start()
self._running = True
logger.info(f"Kafka Consumer started: {self.topics} (group: {self.group_id})")
# 메시지 처리 루프 시작
asyncio.create_task(self._consume_messages())
except Exception as e:
logger.error(f"Failed to start Kafka Consumer: {e}")
raise
async def stop(self):
"""Kafka Consumer 종료"""
self._running = False
if self._consumer:
await self._consumer.stop()
logger.info("Kafka Consumer stopped")
async def _consume_messages(self):
"""메시지 소비 루프"""
if not self._consumer:
return
while self._running:
try:
# 메시지 배치로 가져오기 (최대 100ms 대기)
msg_batch = await self._consumer.getmany(timeout_ms=100)
for tp, messages in msg_batch.items():
for msg in messages:
await self._process_message(msg.value)
except KafkaError as e:
logger.error(f"Kafka error: {e}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error processing messages: {e}")
await asyncio.sleep(1)
async def _process_message(self, message: Dict[str, Any]):
"""개별 메시지 처리"""
try:
# Event 객체로 변환
event = Event(**message)
# 등록된 핸들러 실행
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
logger.error(f"Handler error for {event.event_type}: {e}")
if not handlers:
logger.debug(f"No handlers for event type: {event.event_type}")
except Exception as e:
logger.error(f"Failed to process message: {e}")
async def consume_one(self, timeout: float = 1.0) -> Optional[Event]:
"""단일 메시지 소비 (테스트/디버깅용)"""
if not self._consumer:
return None
try:
msg = await asyncio.wait_for(
self._consumer.getone(),
timeout=timeout
)
return Event(**msg.value)
except asyncio.TimeoutError:
return None
except Exception as e:
logger.error(f"Error consuming message: {e}")
return None