Files
labs/oauth/backend/app/main.py
jungwoo choi 6c21809a24 feat: OAuth 2.0 백엔드 시스템 구현 완료
Phase 1 & 2 완료:
- 프로젝트 기본 구조 설정
- Docker Compose 환경 구성 (MongoDB, Redis, Backend, Frontend)
- FastAPI 기반 OAuth 2.0 백엔드 구현

주요 기능:
- JWT 기반 인증 시스템
- 3단계 권한 체계 (System Admin/Group Admin/User)
- 사용자 관리 CRUD API
- 애플리케이션 관리 CRUD API
- OAuth 2.0 Authorization Code Flow
- Refresh Token 관리
- 인증 히스토리 추적

API 엔드포인트:
- /auth/* - 인증 관련 (register, login, logout, refresh)
- /users/* - 사용자 관리
- /applications/* - 애플리케이션 관리
- /oauth/* - OAuth 2.0 플로우

보안 기능:
- bcrypt 비밀번호 해싱
- JWT 토큰 인증
- CORS 설정
- Rate limiting 준비

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-05 14:56:02 +09:00

198 lines
5.6 KiB
Python

"""Main FastAPI application"""
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import logging
from app.config import settings
from app.utils.database import connect_database, disconnect_database
from app.routers import auth_router, users_router, applications_router
from app.services.auth_service import AuthService
# Configure logging
logging.basicConfig(
level=getattr(logging, settings.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle"""
# Startup
logger.info("Starting OAuth 2.0 Authentication System")
await connect_database()
# Create admin user if not exists
if settings.environment == "dev":
await AuthService.create_admin_user()
yield
# Shutdown
logger.info("Shutting down OAuth 2.0 Authentication System")
await disconnect_database()
# Create FastAPI app
app = FastAPI(
title="OAuth 2.0 Authentication System",
description="Enterprise-grade OAuth 2.0 based central authentication system",
version="1.0.0",
docs_url=f"{settings.api_prefix}/docs",
redoc_url=f"{settings.api_prefix}/redoc",
openapi_url=f"{settings.api_prefix}/openapi.json",
lifespan=lifespan
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"environment": settings.environment,
"version": "1.0.0"
}
# Root endpoint
@app.get("/")
async def root():
"""Root endpoint"""
return {
"message": "OAuth 2.0 Authentication System API",
"docs": f"{settings.api_prefix}/docs",
"health": "/health"
}
# Include routers
app.include_router(auth_router, prefix=settings.api_prefix)
app.include_router(users_router, prefix=settings.api_prefix)
app.include_router(applications_router, prefix=settings.api_prefix)
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler"""
logger.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
# OAuth 2.0 authorization endpoint
@app.get("/oauth/authorize")
async def oauth_authorize(
response_type: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str = None
):
"""OAuth 2.0 authorization endpoint"""
# This would typically render a login page
# For now, return the parameters for the frontend to handle
return {
"response_type": response_type,
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope,
"state": state
}
# OAuth 2.0 token endpoint
@app.post("/oauth/token")
async def oauth_token(
grant_type: str,
code: str = None,
client_id: str = None,
client_secret: str = None,
redirect_uri: str = None,
refresh_token: str = None
):
"""OAuth 2.0 token endpoint"""
from app.services.token_service import TokenService
if grant_type == "authorization_code":
if not all([code, client_id, client_secret, redirect_uri]):
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing required parameters"}
)
token = await TokenService.exchange_authorization_code(
code=code,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri
)
if not token:
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
return token
elif grant_type == "refresh_token":
if not refresh_token:
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing refresh token"}
)
user_id = await TokenService.verify_refresh_token(refresh_token)
if not user_id:
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid refresh token"}
)
# Get user and create new tokens
from app.services.auth_service import AuthService
user = await AuthService.get_user_by_id(user_id)
if not user:
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "User not found"}
)
# Revoke old refresh token
await TokenService.revoke_refresh_token(refresh_token)
# Create new tokens
tokens = await TokenService.create_tokens(
str(user.id),
{
"username": user.username,
"role": user.role,
"email": user.email
}
)
return tokens
else:
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type", "error_description": f"Grant type '{grant_type}' is not supported"}
)