Files
labs/oauth/backend/app/services/auth_service.py
jungwoo choi 6c21809a24 feat: OAuth 2.0 백엔드 시스템 구현 완료
Phase 1 & 2 완료:
- 프로젝트 기본 구조 설정
- Docker Compose 환경 구성 (MongoDB, Redis, Backend, Frontend)
- FastAPI 기반 OAuth 2.0 백엔드 구현

주요 기능:
- JWT 기반 인증 시스템
- 3단계 권한 체계 (System Admin/Group Admin/User)
- 사용자 관리 CRUD API
- 애플리케이션 관리 CRUD API
- OAuth 2.0 Authorization Code Flow
- Refresh Token 관리
- 인증 히스토리 추적

API 엔드포인트:
- /auth/* - 인증 관련 (register, login, logout, refresh)
- /users/* - 사용자 관리
- /applications/* - 애플리케이션 관리
- /oauth/* - OAuth 2.0 플로우

보안 기능:
- bcrypt 비밀번호 해싱
- JWT 토큰 인증
- CORS 설정
- Rate limiting 준비

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-05 14:56:02 +09:00

135 lines
4.1 KiB
Python

"""Authentication service"""
from typing import Optional
from datetime import datetime, timedelta
from bson import ObjectId
from app.utils.database import get_database
from app.utils.security import verify_password, hash_password
from app.models.user import UserCreate, User, UserInDB, UserRole
from app.models.auth_history import AuthHistoryCreate, AuthAction
import logging
logger = logging.getLogger(__name__)
class AuthService:
"""Service for handling authentication logic"""
@staticmethod
async def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
"""Authenticate a user with username/email and password"""
db = get_database()
# Try to find user by username or email
user = await db.users.find_one({
"$or": [
{"username": username},
{"email": username}
]
})
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
return UserInDB(**user)
@staticmethod
async def create_user(user_data: UserCreate) -> User:
"""Create a new user"""
db = get_database()
# Check if user already exists
existing_user = await db.users.find_one({
"$or": [
{"email": user_data.email},
{"username": user_data.username}
]
})
if existing_user:
raise ValueError("User with this email or username already exists")
# Hash password
hashed_password = hash_password(user_data.password)
# Prepare user document
user_doc = {
**user_data.model_dump(exclude={"password"}),
"hashed_password": hashed_password,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
# Insert user
result = await db.users.insert_one(user_doc)
user_doc["_id"] = result.inserted_id
return User(**user_doc)
@staticmethod
async def get_user_by_id(user_id: str) -> Optional[User]:
"""Get user by ID"""
db = get_database()
try:
user = await db.users.find_one({"_id": ObjectId(user_id)})
if user:
return User(**user)
except Exception as e:
logger.error(f"Error getting user by ID: {e}")
return None
@staticmethod
async def log_auth_action(
user_id: str,
action: AuthAction,
ip_address: str,
user_agent: Optional[str] = None,
application_id: Optional[str] = None,
result: str = "success",
details: Optional[dict] = None
):
"""Log authentication action"""
db = get_database()
history_doc = {
"user_id": user_id,
"application_id": application_id,
"action": action.value,
"ip_address": ip_address,
"user_agent": user_agent,
"result": result,
"details": details,
"created_at": datetime.utcnow()
}
await db.auth_history.insert_one(history_doc)
@staticmethod
async def create_admin_user():
"""Create default admin user if not exists"""
from app.config import settings
db = get_database()
# Check if admin exists
admin = await db.users.find_one({"email": settings.admin_email})
if not admin:
admin_data = UserCreate(
email=settings.admin_email,
username="admin",
password=settings.admin_password,
full_name="System Administrator",
role=UserRole.SYSTEM_ADMIN
)
try:
await AuthService.create_user(admin_data)
logger.info("Admin user created successfully")
except ValueError:
logger.info("Admin user already exists")