""" 이벤트 핸들러 모듈 각 이벤트 타입별 처리 로직 구현 """ import logging from typing import Dict, Any, Optional from datetime import datetime import json import asyncio from redis import asyncio as aioredis logger = logging.getLogger(__name__) class EventHandlers: def __init__(self, redis_client: Optional[aioredis.Redis] = None): self.redis = redis_client self.retry_counts: Dict[str, int] = {} async def handle_user_created(self, event: Dict[str, Any]): """사용자 생성 이벤트 처리""" try: user_id = event.get('data', {}).get('user_id') username = event.get('data', {}).get('username') email = event.get('data', {}).get('email') logger.info(f"Processing USER_CREATED: {username} ({user_id})") # Redis 캐시 무효화 if self.redis: await self.redis.delete(f"user:{user_id}") await self.redis.delete("users:list") # 추가 처리 로직 # - 환영 이메일 발송 준비 # - 초기 설정 생성 # - 분석 데이터 기록 await self._publish_notification({ "type": "user.welcome", "user_id": user_id, "email": email, "username": username, "timestamp": datetime.now().isoformat() }) logger.info(f"Successfully processed USER_CREATED for {username}") except Exception as e: logger.error(f"Error handling USER_CREATED: {e}") raise async def handle_user_updated(self, event: Dict[str, Any]): """사용자 업데이트 이벤트 처리""" try: user_id = event.get('data', {}).get('user_id') updated_fields = event.get('data', {}).get('updated_fields', []) logger.info(f"Processing USER_UPDATED: {user_id}, fields: {updated_fields}") # Redis 캐시 무효화 if self.redis: await self.redis.delete(f"user:{user_id}") await self.redis.delete("users:list") # 프로필 사진 변경 시 이미지 캐시도 무효화 if 'profile_picture' in updated_fields: await self.redis.delete(f"user:profile_picture:{user_id}") # 프로필 완성도 계산 if 'profile_picture' in updated_fields or 'bio' in updated_fields: await self._calculate_profile_completeness(user_id) logger.info(f"Successfully processed USER_UPDATED for {user_id}") except Exception as e: logger.error(f"Error handling USER_UPDATED: {e}") raise async def handle_user_deleted(self, event: Dict[str, Any]): """사용자 삭제 이벤트 처리""" try: user_id = event.get('data', {}).get('user_id') username = event.get('data', {}).get('username') logger.info(f"Processing USER_DELETED: {username} ({user_id})") # Redis에서 모든 관련 데이터 삭제 if self.redis: # 사용자 캐시 삭제 await self.redis.delete(f"user:{user_id}") await self.redis.delete("users:list") # 세션 삭제 session_keys = await self.redis.keys(f"session:*:{user_id}") if session_keys: await self.redis.delete(*session_keys) # 프로필 이미지 캐시 삭제 await self.redis.delete(f"user:profile_picture:{user_id}") # 관련 데이터 정리 이벤트 발행 await self._publish_cleanup_event({ "user_id": user_id, "username": username, "timestamp": datetime.now().isoformat() }) logger.info(f"Successfully processed USER_DELETED for {username}") except Exception as e: logger.error(f"Error handling USER_DELETED: {e}") raise async def handle_oauth_app_created(self, event: Dict[str, Any]): """OAuth 앱 생성 이벤트 처리""" try: app_id = event.get('data', {}).get('app_id') app_name = event.get('data', {}).get('name') owner_id = event.get('data', {}).get('owner_id') logger.info(f"Processing OAUTH_APP_CREATED: {app_name} ({app_id})") # 앱 생성 알림 await self._publish_notification({ "type": "oauth.app_created", "app_id": app_id, "app_name": app_name, "owner_id": owner_id, "timestamp": datetime.now().isoformat() }) logger.info(f"Successfully processed OAUTH_APP_CREATED for {app_name}") except Exception as e: logger.error(f"Error handling OAUTH_APP_CREATED: {e}") raise async def handle_oauth_token_issued(self, event: Dict[str, Any]): """OAuth 토큰 발급 이벤트 처리""" try: client_id = event.get('data', {}).get('client_id') user_id = event.get('data', {}).get('user_id') scopes = event.get('data', {}).get('scopes', []) logger.info(f"Processing OAUTH_TOKEN_ISSUED: client={client_id}, user={user_id}") # 보안 감사 로그 await self._log_security_event({ "type": "oauth.token_issued", "client_id": client_id, "user_id": user_id, "scopes": scopes, "timestamp": datetime.now().isoformat() }) # 사용 통계 업데이트 if self.redis: await self.redis.hincrby(f"oauth:stats:{client_id}", "tokens_issued", 1) await self.redis.sadd(f"oauth:users:{client_id}", user_id) logger.info(f"Successfully processed OAUTH_TOKEN_ISSUED") except Exception as e: logger.error(f"Error handling OAUTH_TOKEN_ISSUED: {e}") raise async def _publish_notification(self, notification: Dict[str, Any]): """알림 이벤트 발행""" # 향후 Notification 서비스로 이벤트 발행 logger.debug(f"Publishing notification: {notification}") if self.redis: await self.redis.lpush( "notifications:queue", json.dumps(notification) ) async def _publish_cleanup_event(self, cleanup_data: Dict[str, Any]): """정리 이벤트 발행""" # 향후 각 서비스로 정리 이벤트 발행 logger.debug(f"Publishing cleanup event: {cleanup_data}") if self.redis: await self.redis.lpush( "cleanup:queue", json.dumps(cleanup_data) ) async def _calculate_profile_completeness(self, user_id: str): """프로필 완성도 계산""" # 향후 프로필 완성도 계산 로직 logger.debug(f"Calculating profile completeness for user: {user_id}") if self.redis: # 임시로 Redis에 저장 await self.redis.hset( f"user:stats:{user_id}", "profile_updated_at", datetime.now().isoformat() ) async def _log_security_event(self, event_data: Dict[str, Any]): """보안 이벤트 로깅""" logger.info(f"Security event: {event_data}") if self.redis: await self.redis.lpush( "security:audit_log", json.dumps(event_data) ) # 최근 100개만 유지 await self.redis.ltrim("security:audit_log", 0, 99)