import secrets import hashlib import base64 from datetime import datetime, timedelta from typing import Optional, List from passlib.context import CryptContext from jose import JWTError, jwt import os pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class OAuthUtils: @staticmethod def generate_client_id() -> str: """클라이언트 ID 생성""" return secrets.token_urlsafe(24) @staticmethod def generate_client_secret() -> str: """클라이언트 시크릿 생성""" return secrets.token_urlsafe(32) @staticmethod def hash_client_secret(secret: str) -> str: """클라이언트 시크릿 해싱""" return pwd_context.hash(secret) @staticmethod def verify_client_secret(plain_secret: str, hashed_secret: str) -> bool: """클라이언트 시크릿 검증""" return pwd_context.verify(plain_secret, hashed_secret) @staticmethod def generate_authorization_code() -> str: """인증 코드 생성""" return secrets.token_urlsafe(32) @staticmethod def generate_access_token() -> str: """액세스 토큰 생성""" return secrets.token_urlsafe(32) @staticmethod def generate_refresh_token() -> str: """리프레시 토큰 생성""" return secrets.token_urlsafe(48) @staticmethod def verify_pkce_challenge(verifier: str, challenge: str, method: str = "S256") -> bool: """PKCE challenge 검증""" if method == "plain": return verifier == challenge elif method == "S256": verifier_hash = hashlib.sha256(verifier.encode()).digest() verifier_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip("=") return verifier_challenge == challenge return False @staticmethod def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """JWT 토큰 생성""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key") algorithm = os.getenv("JWT_ALGORITHM", "HS256") encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) return encoded_jwt @staticmethod def decode_jwt_token(token: str) -> Optional[dict]: """JWT 토큰 디코딩""" try: secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key") algorithm = os.getenv("JWT_ALGORITHM", "HS256") payload = jwt.decode(token, secret_key, algorithms=[algorithm]) return payload except JWTError: return None class TokenGenerator: @staticmethod def generate_token_response( access_token: str, token_type: str = "Bearer", expires_in: int = 3600, refresh_token: Optional[str] = None, scope: Optional[str] = None, id_token: Optional[str] = None ) -> dict: """OAuth 2.0 토큰 응답 생성""" response = { "access_token": access_token, "token_type": token_type, "expires_in": expires_in } if refresh_token: response["refresh_token"] = refresh_token if scope: response["scope"] = scope if id_token: response["id_token"] = id_token return response class ScopeValidator: @staticmethod def validate_scopes(requested_scopes: List[str], allowed_scopes: List[str]) -> List[str]: """요청된 스코프가 허용된 스코프에 포함되는지 검증""" return [scope for scope in requested_scopes if scope in allowed_scopes] @staticmethod def has_scope(token_scopes: List[str], required_scope: str) -> bool: """토큰이 특정 스코프를 가지고 있는지 확인""" return required_scope in token_scopes @staticmethod def parse_scope_string(scope_string: str) -> List[str]: """스코프 문자열을 리스트로 파싱""" if not scope_string: return [] return scope_string.strip().split()