Files
site11/services/oauth/backend/utils.py
jungwoo choi 1467766f3d Step 8: OAuth 2.0 인증 시스템 및 프로필 기능 구현
- OAuth 2.0 서비스 구현
  * Authorization Code, Client Credentials, Refresh Token 플로우 지원
  * 애플리케이션 등록 및 관리 기능
  * 토큰 introspection 및 revocation
  * SSO 설정 지원 (Google, GitHub, SAML)
  * 실용적인 스코프 시스템 (user, app, org, api 관리)

- 사용자 프로필 기능 확장
  * 프로필 사진 및 썸네일 필드 추가
  * bio, location, website 등 추가 프로필 정보
  * 이메일 인증 및 계정 활성화 상태 관리
  * UserPublicResponse 모델 추가

- OAuth 스코프 관리
  * picture 스코프 추가 (프로필 사진 접근 제어)
  * 카테고리별 스코프 정리 (기본 인증, 사용자 데이터, 앱 관리, 조직, API)
  * 스코프별 승인 필요 여부 설정

- 인프라 개선
  * Users 서비스 포트 매핑 추가 (8001)
  * OAuth 서비스 Docker 구성 (포트 8003)
  * Kafka 이벤트 통합 (USER_CREATED, USER_UPDATED, USER_DELETED)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-10 17:37:16 +09:00

131 lines
4.3 KiB
Python

import secrets
import hashlib
import base64
from datetime import datetime, timedelta
from typing import Optional, List
from passlib.context import CryptContext
from jose import JWTError, jwt
import os
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class OAuthUtils:
@staticmethod
def generate_client_id() -> str:
"""클라이언트 ID 생성"""
return secrets.token_urlsafe(24)
@staticmethod
def generate_client_secret() -> str:
"""클라이언트 시크릿 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def hash_client_secret(secret: str) -> str:
"""클라이언트 시크릿 해싱"""
return pwd_context.hash(secret)
@staticmethod
def verify_client_secret(plain_secret: str, hashed_secret: str) -> bool:
"""클라이언트 시크릿 검증"""
return pwd_context.verify(plain_secret, hashed_secret)
@staticmethod
def generate_authorization_code() -> str:
"""인증 코드 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def generate_access_token() -> str:
"""액세스 토큰 생성"""
return secrets.token_urlsafe(32)
@staticmethod
def generate_refresh_token() -> str:
"""리프레시 토큰 생성"""
return secrets.token_urlsafe(48)
@staticmethod
def verify_pkce_challenge(verifier: str, challenge: str, method: str = "S256") -> bool:
"""PKCE challenge 검증"""
if method == "plain":
return verifier == challenge
elif method == "S256":
verifier_hash = hashlib.sha256(verifier.encode()).digest()
verifier_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip("=")
return verifier_challenge == challenge
return False
@staticmethod
def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""JWT 토큰 생성"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
@staticmethod
def decode_jwt_token(token: str) -> Optional[dict]:
"""JWT 토큰 디코딩"""
try:
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
payload = jwt.decode(token, secret_key, algorithms=[algorithm])
return payload
except JWTError:
return None
class TokenGenerator:
@staticmethod
def generate_token_response(
access_token: str,
token_type: str = "Bearer",
expires_in: int = 3600,
refresh_token: Optional[str] = None,
scope: Optional[str] = None,
id_token: Optional[str] = None
) -> dict:
"""OAuth 2.0 토큰 응답 생성"""
response = {
"access_token": access_token,
"token_type": token_type,
"expires_in": expires_in
}
if refresh_token:
response["refresh_token"] = refresh_token
if scope:
response["scope"] = scope
if id_token:
response["id_token"] = id_token
return response
class ScopeValidator:
@staticmethod
def validate_scopes(requested_scopes: List[str], allowed_scopes: List[str]) -> List[str]:
"""요청된 스코프가 허용된 스코프에 포함되는지 검증"""
return [scope for scope in requested_scopes if scope in allowed_scopes]
@staticmethod
def has_scope(token_scopes: List[str], required_scope: str) -> bool:
"""토큰이 특정 스코프를 가지고 있는지 확인"""
return required_scope in token_scopes
@staticmethod
def parse_scope_string(scope_string: str) -> List[str]:
"""스코프 문자열을 리스트로 파싱"""
if not scope_string:
return []
return scope_string.strip().split()