Initial commit - cleaned repository

This commit is contained in:
jungwoo choi
2025-09-28 20:41:57 +09:00
commit e3c28f796a
188 changed files with 28102 additions and 0 deletions

View File

@ -0,0 +1,131 @@
import secrets
import hashlib
import base64
from datetime import datetime, timedelta
from typing import Optional, List
from passlib.context import CryptContext
from jose import JWTError, jwt
import os
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class OAuthUtils:
@staticmethod
def generate_client_id() -> str:
"""클라이언트 ID 생성"""
return secrets.token_urlsafe(24)
@staticmethod
def generate_client_secret() -> str:
"""클라이언트 시크릿 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def hash_client_secret(secret: str) -> str:
"""클라이언트 시크릿 해싱"""
return pwd_context.hash(secret)
@staticmethod
def verify_client_secret(plain_secret: str, hashed_secret: str) -> bool:
"""클라이언트 시크릿 검증"""
return pwd_context.verify(plain_secret, hashed_secret)
@staticmethod
def generate_authorization_code() -> str:
"""인증 코드 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def generate_access_token() -> str:
"""액세스 토큰 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def generate_refresh_token() -> str:
"""리프레시 토큰 생성"""
return secrets.token_urlsafe(48)
@staticmethod
def verify_pkce_challenge(verifier: str, challenge: str, method: str = "S256") -> bool:
"""PKCE challenge 검증"""
if method == "plain":
return verifier == challenge
elif method == "S256":
verifier_hash = hashlib.sha256(verifier.encode()).digest()
verifier_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip("=")
return verifier_challenge == challenge
return False
@staticmethod
def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""JWT 토큰 생성"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
@staticmethod
def decode_jwt_token(token: str) -> Optional[dict]:
"""JWT 토큰 디코딩"""
try:
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
payload = jwt.decode(token, secret_key, algorithms=[algorithm])
return payload
except JWTError:
return None
class TokenGenerator:
@staticmethod
def generate_token_response(
access_token: str,
token_type: str = "Bearer",
expires_in: int = 3600,
refresh_token: Optional[str] = None,
scope: Optional[str] = None,
id_token: Optional[str] = None
) -> dict:
"""OAuth 2.0 토큰 응답 생성"""
response = {
"access_token": access_token,
"token_type": token_type,
"expires_in": expires_in
}
if refresh_token:
response["refresh_token"] = refresh_token
if scope:
response["scope"] = scope
if id_token:
response["id_token"] = id_token
return response
class ScopeValidator:
@staticmethod
def validate_scopes(requested_scopes: List[str], allowed_scopes: List[str]) -> List[str]:
"""요청된 스코프가 허용된 스코프에 포함되는지 검증"""
return [scope for scope in requested_scopes if scope in allowed_scopes]
@staticmethod
def has_scope(token_scopes: List[str], required_scope: str) -> bool:
"""토큰이 특정 스코프를 가지고 있는지 확인"""
return required_scope in token_scopes
@staticmethod
def parse_scope_string(scope_string: str) -> List[str]:
"""스코프 문자열을 리스트로 파싱"""
if not scope_string:
return []
return scope_string.strip().split()