131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
import secrets
|
|
import hashlib
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List
|
|
from passlib.context import CryptContext
|
|
from jose import JWTError, jwt
|
|
import os
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
class OAuthUtils:
|
|
@staticmethod
|
|
def generate_client_id() -> str:
|
|
"""클라이언트 ID 생성"""
|
|
return secrets.token_urlsafe(24)
|
|
|
|
@staticmethod
|
|
def generate_client_secret() -> str:
|
|
"""클라이언트 시크릿 생성"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
@staticmethod
|
|
def hash_client_secret(secret: str) -> str:
|
|
"""클라이언트 시크릿 해싱"""
|
|
return pwd_context.hash(secret)
|
|
|
|
@staticmethod
|
|
def verify_client_secret(plain_secret: str, hashed_secret: str) -> bool:
|
|
"""클라이언트 시크릿 검증"""
|
|
return pwd_context.verify(plain_secret, hashed_secret)
|
|
|
|
@staticmethod
|
|
def generate_authorization_code() -> str:
|
|
"""인증 코드 생성"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
@staticmethod
|
|
def generate_access_token() -> str:
|
|
"""액세스 토큰 생성"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
@staticmethod
|
|
def generate_refresh_token() -> str:
|
|
"""리프레시 토큰 생성"""
|
|
return secrets.token_urlsafe(48)
|
|
|
|
@staticmethod
|
|
def verify_pkce_challenge(verifier: str, challenge: str, method: str = "S256") -> bool:
|
|
"""PKCE challenge 검증"""
|
|
if method == "plain":
|
|
return verifier == challenge
|
|
elif method == "S256":
|
|
verifier_hash = hashlib.sha256(verifier.encode()).digest()
|
|
verifier_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip("=")
|
|
return verifier_challenge == challenge
|
|
return False
|
|
|
|
@staticmethod
|
|
def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""JWT 토큰 생성"""
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=15)
|
|
to_encode.update({"exp": expire})
|
|
|
|
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
|
|
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
|
|
|
|
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
|
|
return encoded_jwt
|
|
|
|
@staticmethod
|
|
def decode_jwt_token(token: str) -> Optional[dict]:
|
|
"""JWT 토큰 디코딩"""
|
|
try:
|
|
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
|
|
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
|
|
|
|
payload = jwt.decode(token, secret_key, algorithms=[algorithm])
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
class TokenGenerator:
|
|
@staticmethod
|
|
def generate_token_response(
|
|
access_token: str,
|
|
token_type: str = "Bearer",
|
|
expires_in: int = 3600,
|
|
refresh_token: Optional[str] = None,
|
|
scope: Optional[str] = None,
|
|
id_token: Optional[str] = None
|
|
) -> dict:
|
|
"""OAuth 2.0 토큰 응답 생성"""
|
|
response = {
|
|
"access_token": access_token,
|
|
"token_type": token_type,
|
|
"expires_in": expires_in
|
|
}
|
|
|
|
if refresh_token:
|
|
response["refresh_token"] = refresh_token
|
|
|
|
if scope:
|
|
response["scope"] = scope
|
|
|
|
if id_token:
|
|
response["id_token"] = id_token
|
|
|
|
return response
|
|
|
|
class ScopeValidator:
|
|
@staticmethod
|
|
def validate_scopes(requested_scopes: List[str], allowed_scopes: List[str]) -> List[str]:
|
|
"""요청된 스코프가 허용된 스코프에 포함되는지 검증"""
|
|
return [scope for scope in requested_scopes if scope in allowed_scopes]
|
|
|
|
@staticmethod
|
|
def has_scope(token_scopes: List[str], required_scope: str) -> bool:
|
|
"""토큰이 특정 스코프를 가지고 있는지 확인"""
|
|
return required_scope in token_scopes
|
|
|
|
@staticmethod
|
|
def parse_scope_string(scope_string: str) -> List[str]:
|
|
"""스코프 문자열을 리스트로 파싱"""
|
|
if not scope_string:
|
|
return []
|
|
return scope_string.strip().split() |