Phase 1 & 2 완료: - 프로젝트 기본 구조 설정 - Docker Compose 환경 구성 (MongoDB, Redis, Backend, Frontend) - FastAPI 기반 OAuth 2.0 백엔드 구현 주요 기능: - JWT 기반 인증 시스템 - 3단계 권한 체계 (System Admin/Group Admin/User) - 사용자 관리 CRUD API - 애플리케이션 관리 CRUD API - OAuth 2.0 Authorization Code Flow - Refresh Token 관리 - 인증 히스토리 추적 API 엔드포인트: - /auth/* - 인증 관련 (register, login, logout, refresh) - /users/* - 사용자 관리 - /applications/* - 애플리케이션 관리 - /oauth/* - OAuth 2.0 플로우 보안 기능: - bcrypt 비밀번호 해싱 - JWT 토큰 인증 - CORS 설정 - Rate limiting 준비 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
93 lines
2.7 KiB
Python
93 lines
2.7 KiB
Python
"""Security utilities for password hashing and JWT tokens"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Dict, Any
|
|
from passlib.context import CryptContext
|
|
from jose import JWTError, jwt
|
|
from app.config import settings
|
|
import secrets
|
|
import string
|
|
|
|
# Password hashing context
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify a plain password against a hashed password"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password"""
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token"""
|
|
to_encode = data.copy()
|
|
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(
|
|
minutes=settings.jwt_access_token_expire_minutes
|
|
)
|
|
|
|
to_encode.update({"exp": expire, "type": "access"})
|
|
encoded_jwt = jwt.encode(
|
|
to_encode,
|
|
settings.secret_key,
|
|
algorithm=settings.jwt_algorithm
|
|
)
|
|
return encoded_jwt
|
|
|
|
|
|
def create_refresh_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT refresh token"""
|
|
to_encode = data.copy()
|
|
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(
|
|
days=settings.jwt_refresh_token_expire_days
|
|
)
|
|
|
|
to_encode.update({"exp": expire, "type": "refresh"})
|
|
encoded_jwt = jwt.encode(
|
|
to_encode,
|
|
settings.secret_key,
|
|
algorithm=settings.jwt_algorithm
|
|
)
|
|
return encoded_jwt
|
|
|
|
|
|
def decode_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""Decode and verify a JWT token"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.secret_key,
|
|
algorithms=[settings.jwt_algorithm]
|
|
)
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def generate_client_secret() -> str:
|
|
"""Generate a secure client secret"""
|
|
alphabet = string.ascii_letters + string.digits
|
|
return ''.join(secrets.choice(alphabet) for _ in range(32))
|
|
|
|
|
|
def generate_client_id() -> str:
|
|
"""Generate a unique client ID"""
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
return ''.join(secrets.choice(alphabet) for _ in range(16))
|
|
|
|
|
|
def generate_authorization_code() -> str:
|
|
"""Generate a secure authorization code"""
|
|
alphabet = string.ascii_letters + string.digits
|
|
return ''.join(secrets.choice(alphabet) for _ in range(32)) |