""" 이벤트 스키마 레지스트리 이벤트 스키마 정의 및 버전 관리 """ from typing import Dict, Any, Optional, List, Literal from enum import Enum from pydantic import BaseModel, Field, field_validator from datetime import datetime import json class SchemaVersion(str, Enum): V1 = "1.0.0" V2 = "2.0.0" class EventSchemaBase(BaseModel): """이벤트 스키마 베이스""" event_id: str = Field(..., description="고유 이벤트 ID") event_type: str = Field(..., description="이벤트 타입") timestamp: datetime = Field(default_factory=datetime.now, description="이벤트 발생 시간") version: str = Field(default=SchemaVersion.V1, description="스키마 버전") service: str = Field(..., description="이벤트 발생 서비스") class Config: json_encoders = { datetime: lambda v: v.isoformat() } # User Events Schemas class UserCreatedSchema(EventSchemaBase): """사용자 생성 이벤트 스키마""" event_type: Literal["USER_CREATED"] = "USER_CREATED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['user_id', 'username', 'email'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v class UserUpdatedSchema(EventSchemaBase): """사용자 업데이트 이벤트 스키마""" event_type: Literal["USER_UPDATED"] = "USER_UPDATED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['user_id'] optional_fields = ['username', 'email', 'full_name', 'profile_picture', 'bio', 'location', 'website', 'updated_fields'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") # updated_fields가 있으면 검증 if 'updated_fields' in v and not isinstance(v['updated_fields'], list): raise ValueError("updated_fields must be a list") return v class UserDeletedSchema(EventSchemaBase): """사용자 삭제 이벤트 스키마""" event_type: Literal["USER_DELETED"] = "USER_DELETED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['user_id', 'username'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v # OAuth Events Schemas class OAuthAppCreatedSchema(EventSchemaBase): """OAuth 앱 생성 이벤트 스키마""" event_type: Literal["OAUTH_APP_CREATED"] = "OAUTH_APP_CREATED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['app_id', 'name', 'owner_id', 'client_id'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v class OAuthTokenIssuedSchema(EventSchemaBase): """OAuth 토큰 발급 이벤트 스키마""" event_type: Literal["OAUTH_TOKEN_ISSUED"] = "OAUTH_TOKEN_ISSUED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['client_id', 'grant_type'] optional_fields = ['user_id', 'scopes', 'expires_in'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") # scopes가 있으면 리스트여야 함 if 'scopes' in v and not isinstance(v['scopes'], list): raise ValueError("scopes must be a list") return v class OAuthTokenRevokedSchema(EventSchemaBase): """OAuth 토큰 폐기 이벤트 스키마""" event_type: Literal["OAUTH_TOKEN_REVOKED"] = "OAUTH_TOKEN_REVOKED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['token_id', 'client_id'] optional_fields = ['user_id', 'revoked_by'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v # Image Events Schemas class ImageUploadedSchema(EventSchemaBase): """이미지 업로드 이벤트 스키마""" event_type: Literal["IMAGE_UPLOADED"] = "IMAGE_UPLOADED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['image_id', 'user_id', 'url'] optional_fields = ['size', 'mime_type', 'width', 'height', 'thumbnail_url'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v class ImageProcessedSchema(EventSchemaBase): """이미지 처리 완료 이벤트 스키마""" event_type: Literal["IMAGE_PROCESSED"] = "IMAGE_PROCESSED" data: Dict[str, Any] = Field(..., description="이벤트 데이터") @field_validator('data') @classmethod def validate_data(cls, v): required_fields = ['image_id', 'process_type'] optional_fields = ['original_url', 'processed_url', 'processing_time_ms'] for field in required_fields: if field not in v: raise ValueError(f"Missing required field: {field}") return v class SchemaRegistry: """스키마 레지스트리""" # 스키마 매핑 SCHEMAS = { "USER_CREATED": UserCreatedSchema, "USER_UPDATED": UserUpdatedSchema, "USER_DELETED": UserDeletedSchema, "OAUTH_APP_CREATED": OAuthAppCreatedSchema, "OAUTH_TOKEN_ISSUED": OAuthTokenIssuedSchema, "OAUTH_TOKEN_REVOKED": OAuthTokenRevokedSchema, "IMAGE_UPLOADED": ImageUploadedSchema, "IMAGE_PROCESSED": ImageProcessedSchema, } # 스키마 버전 호환성 매트릭스 COMPATIBILITY_MATRIX = { SchemaVersion.V1: [SchemaVersion.V1], SchemaVersion.V2: [SchemaVersion.V1, SchemaVersion.V2], # V2는 V1과 호환 } @classmethod def get_schema(cls, event_type: str) -> Optional[type]: """이벤트 타입에 대한 스키마 반환""" return cls.SCHEMAS.get(event_type) @classmethod def validate_event(cls, event_data: Dict[str, Any]) -> tuple[bool, Optional[str]]: """이벤트 데이터 검증""" try: event_type = event_data.get('event_type') if not event_type: return False, "Missing event_type" schema_class = cls.get_schema(event_type) if not schema_class: return False, f"Unknown event type: {event_type}" # 스키마 검증 schema_class(**event_data) return True, None except Exception as e: return False, str(e) @classmethod def is_compatible(cls, from_version: str, to_version: str) -> bool: """버전 호환성 확인""" from_v = SchemaVersion(from_version) to_v = SchemaVersion(to_version) compatible_versions = cls.COMPATIBILITY_MATRIX.get(to_v, []) return from_v in compatible_versions @classmethod def migrate_event( cls, event_data: Dict[str, Any], from_version: str, to_version: str ) -> Dict[str, Any]: """이벤트 데이터 마이그레이션""" if from_version == to_version: return event_data if not cls.is_compatible(from_version, to_version): raise ValueError(f"Cannot migrate from {from_version} to {to_version}") # 버전별 마이그레이션 로직 if from_version == SchemaVersion.V1 and to_version == SchemaVersion.V2: # V1 -> V2 마이그레이션 예시 event_data['version'] = SchemaVersion.V2 # 새로운 필드 추가 (기본값) if 'metadata' not in event_data: event_data['metadata'] = {} return event_data @classmethod def get_all_schemas(cls) -> Dict[str, Dict[str, Any]]: """모든 스키마 정보 반환 (문서화용)""" schemas_info = {} for event_type, schema_class in cls.SCHEMAS.items(): schemas_info[event_type] = { "description": schema_class.__doc__, "fields": schema_class.schema(), "version": SchemaVersion.V1, "example": cls._generate_example(schema_class) } return schemas_info @classmethod def _generate_example(cls, schema_class: type) -> Dict[str, Any]: """스키마 예시 생성""" examples = { "USER_CREATED": { "event_id": "evt_123456", "event_type": "USER_CREATED", "timestamp": datetime.now().isoformat(), "version": "1.0.0", "service": "users", "data": { "user_id": "usr_abc123", "username": "johndoe", "email": "john@example.com" } }, "USER_UPDATED": { "event_id": "evt_123457", "event_type": "USER_UPDATED", "timestamp": datetime.now().isoformat(), "version": "1.0.0", "service": "users", "data": { "user_id": "usr_abc123", "updated_fields": ["profile_picture", "bio"], "profile_picture": "https://example.com/pic.jpg", "bio": "Updated bio" } }, "OAUTH_TOKEN_ISSUED": { "event_id": "evt_123458", "event_type": "OAUTH_TOKEN_ISSUED", "timestamp": datetime.now().isoformat(), "version": "1.0.0", "service": "oauth", "data": { "client_id": "app_xyz789", "user_id": "usr_abc123", "grant_type": "authorization_code", "scopes": ["profile", "email"], "expires_in": 3600 } } } return examples.get(schema_class.__fields__['event_type'].default, {}) @classmethod def export_schemas(cls, format: str = "json") -> str: """스키마 내보내기""" schemas = cls.get_all_schemas() if format == "json": return json.dumps(schemas, indent=2, default=str) elif format == "markdown": return cls._export_as_markdown(schemas) else: raise ValueError(f"Unsupported format: {format}") @classmethod def _export_as_markdown(cls, schemas: Dict[str, Dict[str, Any]]) -> str: """마크다운 형식으로 내보내기""" md = "# Event Schema Registry\n\n" for event_type, info in schemas.items(): md += f"## {event_type}\n\n" md += f"{info['description']}\n\n" md += f"**Version:** {info['version']}\n\n" md += "**Example:**\n```json\n" md += json.dumps(info['example'], indent=2, default=str) md += "\n```\n\n" return md