Files
labs/oauth/backend/app/routers/applications.py
jungwoo choi 6c21809a24 feat: OAuth 2.0 백엔드 시스템 구현 완료
Phase 1 & 2 완료:
- 프로젝트 기본 구조 설정
- Docker Compose 환경 구성 (MongoDB, Redis, Backend, Frontend)
- FastAPI 기반 OAuth 2.0 백엔드 구현

주요 기능:
- JWT 기반 인증 시스템
- 3단계 권한 체계 (System Admin/Group Admin/User)
- 사용자 관리 CRUD API
- 애플리케이션 관리 CRUD API
- OAuth 2.0 Authorization Code Flow
- Refresh Token 관리
- 인증 히스토리 추적

API 엔드포인트:
- /auth/* - 인증 관련 (register, login, logout, refresh)
- /users/* - 사용자 관리
- /applications/* - 애플리케이션 관리
- /oauth/* - OAuth 2.0 플로우

보안 기능:
- bcrypt 비밀번호 해싱
- JWT 토큰 인증
- CORS 설정
- Rate limiting 준비

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-05 14:56:02 +09:00

256 lines
8.2 KiB
Python

"""Applications management router"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import List
from bson import ObjectId
from datetime import datetime
from app.models.application import Application, ApplicationCreate, ApplicationUpdate, ApplicationPublic
from app.models.user import User, UserRole
from app.routers.auth import get_current_user
from app.utils.database import get_database
from app.utils.security import generate_client_id, generate_client_secret
router = APIRouter(prefix="/applications", tags=["Applications"])
def require_admin(current_user: User = Depends(get_current_user)) -> User:
"""Require admin role"""
if current_user.role not in [UserRole.SYSTEM_ADMIN, UserRole.GROUP_ADMIN]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.get("/", response_model=List[Application])
async def get_applications(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
current_user: User = Depends(get_current_user)
):
"""Get applications (own applications for users, all for admins)"""
db = get_database()
query = {}
if current_user.role not in [UserRole.SYSTEM_ADMIN, UserRole.GROUP_ADMIN]:
query = {"created_by": str(current_user.id)}
apps = await db.applications.find(query).skip(skip).limit(limit).to_list(limit)
return [Application(**app) for app in apps]
@router.post("/", response_model=Application)
async def create_application(
app_data: ApplicationCreate,
current_user: User = Depends(get_current_user)
):
"""Create a new application"""
db = get_database()
# Generate client credentials
client_id = generate_client_id()
client_secret = generate_client_secret()
# Check if client_id already exists (unlikely but possible)
existing_app = await db.applications.find_one({"client_id": client_id})
if existing_app:
# Regenerate if collision
client_id = generate_client_id()
# Prepare application document
app_doc = {
**app_data.model_dump(),
"client_id": client_id,
"client_secret": client_secret,
"created_by": str(current_user.id),
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
# Insert application
result = await db.applications.insert_one(app_doc)
app_doc["_id"] = result.inserted_id
return Application(**app_doc)
@router.get("/public/{client_id}", response_model=ApplicationPublic)
async def get_application_public(client_id: str):
"""Get public application information (for OAuth flow)"""
db = get_database()
app = await db.applications.find_one({"client_id": client_id, "is_active": True})
if not app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found"
)
return ApplicationPublic(**app)
@router.get("/{app_id}", response_model=Application)
async def get_application(app_id: str, current_user: User = Depends(get_current_user)):
"""Get application by ID"""
db = get_database()
try:
app = await db.applications.find_one({"_id": ObjectId(app_id)})
if not app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found"
)
# Check permissions
if str(app["created_by"]) != str(current_user.id) and current_user.role not in [UserRole.SYSTEM_ADMIN, UserRole.GROUP_ADMIN]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return Application(**app)
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid application ID"
)
@router.put("/{app_id}", response_model=Application)
async def update_application(
app_id: str,
app_update: ApplicationUpdate,
current_user: User = Depends(get_current_user)
):
"""Update application"""
db = get_database()
try:
# Get existing application
app = await db.applications.find_one({"_id": ObjectId(app_id)})
if not app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found"
)
# Check permissions
if str(app["created_by"]) != str(current_user.id) and current_user.role not in [UserRole.SYSTEM_ADMIN, UserRole.GROUP_ADMIN]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
# Prepare update data
update_data = app_update.model_dump(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
# Update application
result = await db.applications.update_one(
{"_id": ObjectId(app_id)},
{"$set": update_data}
)
# Return updated application
app = await db.applications.find_one({"_id": ObjectId(app_id)})
return Application(**app)
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/{app_id}/regenerate-secret", response_model=Application)
async def regenerate_client_secret(
app_id: str,
current_user: User = Depends(get_current_user)
):
"""Regenerate client secret"""
db = get_database()
try:
# Get existing application
app = await db.applications.find_one({"_id": ObjectId(app_id)})
if not app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found"
)
# Check permissions
if str(app["created_by"]) != str(current_user.id) and current_user.role not in [UserRole.SYSTEM_ADMIN, UserRole.GROUP_ADMIN]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
# Generate new secret
new_secret = generate_client_secret()
# Update application
await db.applications.update_one(
{"_id": ObjectId(app_id)},
{"$set": {
"client_secret": new_secret,
"updated_at": datetime.utcnow()
}}
)
# Return updated application
app = await db.applications.find_one({"_id": ObjectId(app_id)})
return Application(**app)
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.delete("/{app_id}")
async def delete_application(
app_id: str,
current_user: User = Depends(get_current_user)
):
"""Delete application"""
db = get_database()
try:
# Get existing application
app = await db.applications.find_one({"_id": ObjectId(app_id)})
if not app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found"
)
# Check permissions
if str(app["created_by"]) != str(current_user.id) and current_user.role != UserRole.SYSTEM_ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
# Delete application
result = await db.applications.delete_one({"_id": ObjectId(app_id)})
return {"message": "Application deleted successfully"}
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid application ID"
)