Files
2025-09-28 20:41:57 +09:00

591 lines
20 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Form, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse, JSONResponse
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import uvicorn
import os
import sys
import logging
from database import init_db
from models import (
OAuthApplication, AuthorizationCode, AccessToken,
OAuthScope, UserConsent, GrantType, ResponseType
)
from utils import OAuthUtils, TokenGenerator, ScopeValidator
from pydantic import BaseModel, Field
from beanie import PydanticObjectId
sys.path.append('/app')
from shared.kafka import KafkaProducer, Event, EventType
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Pydantic models
class ApplicationCreate(BaseModel):
name: str
description: Optional[str] = None
redirect_uris: List[str]
website_url: Optional[str] = None
logo_url: Optional[str] = None
privacy_policy_url: Optional[str] = None
terms_url: Optional[str] = None
sso_enabled: Optional[bool] = False
sso_provider: Optional[str] = None
sso_config: Optional[Dict] = None
allowed_domains: Optional[List[str]] = None
class ApplicationUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
redirect_uris: Optional[List[str]] = None
website_url: Optional[str] = None
logo_url: Optional[str] = None
privacy_policy_url: Optional[str] = None
terms_url: Optional[str] = None
is_active: Optional[bool] = None
sso_enabled: Optional[bool] = None
sso_provider: Optional[str] = None
sso_config: Optional[Dict] = None
allowed_domains: Optional[List[str]] = None
class ApplicationResponse(BaseModel):
id: str
client_id: str
name: str
description: Optional[str]
redirect_uris: List[str]
allowed_scopes: List[str]
grant_types: List[str]
is_active: bool
is_trusted: bool
sso_enabled: bool
sso_provider: Optional[str]
allowed_domains: List[str]
website_url: Optional[str]
logo_url: Optional[str]
created_at: datetime
class TokenRequest(BaseModel):
grant_type: str
code: Optional[str] = None
redirect_uri: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
refresh_token: Optional[str] = None
scope: Optional[str] = None
code_verifier: Optional[str] = None
# Global Kafka producer
kafka_producer: Optional[KafkaProducer] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global kafka_producer
await init_db()
# Initialize Kafka producer
try:
kafka_producer = KafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092')
)
await kafka_producer.start()
logger.info("Kafka producer initialized")
except Exception as e:
logger.warning(f"Failed to initialize Kafka producer: {e}")
kafka_producer = None
yield
# Shutdown
if kafka_producer:
await kafka_producer.stop()
app = FastAPI(
title="OAuth 2.0 Service",
description="OAuth 2.0 인증 서버 및 애플리케이션 관리",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "oauth",
"timestamp": datetime.now().isoformat()
}
# OAuth Application Management
@app.post("/applications", response_model=ApplicationResponse, status_code=201)
async def create_application(
app_data: ApplicationCreate,
current_user_id: str = "test_user" # TODO: Get from JWT token
):
"""새로운 OAuth 애플리케이션 등록"""
client_id = OAuthUtils.generate_client_id()
client_secret = OAuthUtils.generate_client_secret()
hashed_secret = OAuthUtils.hash_client_secret(client_secret)
# 기본 스코프 가져오기
default_scopes = await OAuthScope.find(OAuthScope.is_default == True).to_list()
allowed_scopes = [scope.name for scope in default_scopes]
application = OAuthApplication(
client_id=client_id,
client_secret=hashed_secret,
name=app_data.name,
description=app_data.description,
owner_id=current_user_id,
redirect_uris=app_data.redirect_uris,
allowed_scopes=allowed_scopes,
grant_types=[GrantType.AUTHORIZATION_CODE, GrantType.REFRESH_TOKEN],
sso_enabled=app_data.sso_enabled or False,
sso_provider=app_data.sso_provider,
sso_config=app_data.sso_config or {},
allowed_domains=app_data.allowed_domains or [],
website_url=app_data.website_url,
logo_url=app_data.logo_url,
privacy_policy_url=app_data.privacy_policy_url,
terms_url=app_data.terms_url
)
await application.create()
# 이벤트 발행
if kafka_producer:
event = Event(
event_type=EventType.TASK_CREATED,
service="oauth",
data={
"app_id": str(application.id),
"client_id": client_id,
"name": application.name,
"owner_id": current_user_id
}
)
await kafka_producer.send_event("oauth-events", event)
# 클라이언트 시크릿은 생성 시에만 반환
return {
**ApplicationResponse(
id=str(application.id),
client_id=application.client_id,
name=application.name,
description=application.description,
redirect_uris=application.redirect_uris,
allowed_scopes=application.allowed_scopes,
grant_types=[gt.value for gt in application.grant_types],
is_active=application.is_active,
is_trusted=application.is_trusted,
sso_enabled=application.sso_enabled,
sso_provider=application.sso_provider,
allowed_domains=application.allowed_domains,
website_url=application.website_url,
logo_url=application.logo_url,
created_at=application.created_at
).dict(),
"client_secret": client_secret # 최초 생성 시에만 반환
}
@app.get("/applications", response_model=List[ApplicationResponse])
async def list_applications(
owner_id: Optional[str] = None,
is_active: Optional[bool] = None
):
"""OAuth 애플리케이션 목록 조회"""
query = {}
if owner_id:
query["owner_id"] = owner_id
if is_active is not None:
query["is_active"] = is_active
applications = await OAuthApplication.find(query).to_list()
return [
ApplicationResponse(
id=str(app.id),
client_id=app.client_id,
name=app.name,
description=app.description,
redirect_uris=app.redirect_uris,
allowed_scopes=app.allowed_scopes,
grant_types=[gt.value for gt in app.grant_types],
is_active=app.is_active,
is_trusted=app.is_trusted,
sso_enabled=app.sso_enabled,
sso_provider=app.sso_provider,
allowed_domains=app.allowed_domains,
website_url=app.website_url,
logo_url=app.logo_url,
created_at=app.created_at
)
for app in applications
]
@app.get("/applications/{client_id}", response_model=ApplicationResponse)
async def get_application(client_id: str):
"""OAuth 애플리케이션 상세 조회"""
application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id)
if not application:
raise HTTPException(status_code=404, detail="Application not found")
return ApplicationResponse(
id=str(application.id),
client_id=application.client_id,
name=application.name,
description=application.description,
redirect_uris=application.redirect_uris,
allowed_scopes=application.allowed_scopes,
grant_types=[gt.value for gt in application.grant_types],
is_active=application.is_active,
is_trusted=application.is_trusted,
sso_enabled=application.sso_enabled,
sso_provider=application.sso_provider,
allowed_domains=application.allowed_domains,
website_url=application.website_url,
logo_url=application.logo_url,
created_at=application.created_at
)
# OAuth 2.0 Authorization Endpoint
@app.get("/authorize")
async def authorize(
response_type: str = Query(..., description="응답 타입 (code, token)"),
client_id: str = Query(..., description="클라이언트 ID"),
redirect_uri: str = Query(..., description="리다이렉트 URI"),
scope: str = Query("", description="요청 스코프"),
state: Optional[str] = Query(None, description="상태 값"),
code_challenge: Optional[str] = Query(None, description="PKCE challenge"),
code_challenge_method: Optional[str] = Query("S256", description="PKCE method"),
current_user_id: str = "test_user" # TODO: Get from session/JWT
):
"""OAuth 2.0 인증 엔드포인트"""
# 애플리케이션 확인
application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id)
if not application or not application.is_active:
raise HTTPException(status_code=400, detail="Invalid client")
# 리다이렉트 URI 확인
if redirect_uri not in application.redirect_uris:
raise HTTPException(status_code=400, detail="Invalid redirect URI")
# 스코프 검증
requested_scopes = ScopeValidator.parse_scope_string(scope)
valid_scopes = ScopeValidator.validate_scopes(requested_scopes, application.allowed_scopes)
# 사용자 동의 확인 (신뢰할 수 있는 앱이거나 이미 동의한 경우 건너뛰기)
if not application.is_trusted:
consent = await UserConsent.find_one(
UserConsent.user_id == current_user_id,
UserConsent.client_id == client_id
)
if not consent or set(valid_scopes) - set(consent.granted_scopes):
# TODO: 동의 화면으로 리다이렉트
pass
if response_type == "code":
# Authorization Code Flow
code = OAuthUtils.generate_authorization_code()
auth_code = AuthorizationCode(
code=code,
client_id=client_id,
user_id=current_user_id,
redirect_uri=redirect_uri,
scopes=valid_scopes,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
expires_at=datetime.now() + timedelta(minutes=10)
)
await auth_code.create()
# 리다이렉트 URL 생성
redirect_url = f"{redirect_uri}?code={code}"
if state:
redirect_url += f"&state={state}"
return RedirectResponse(url=redirect_url)
elif response_type == "token":
# Implicit Flow (권장하지 않음)
raise HTTPException(status_code=400, detail="Implicit flow not supported")
else:
raise HTTPException(status_code=400, detail="Unsupported response type")
# OAuth 2.0 Token Endpoint
@app.post("/token")
async def token(
grant_type: str = Form(...),
code: Optional[str] = Form(None),
redirect_uri: Optional[str] = Form(None),
client_id: Optional[str] = Form(None),
client_secret: Optional[str] = Form(None),
refresh_token: Optional[str] = Form(None),
scope: Optional[str] = Form(None),
code_verifier: Optional[str] = Form(None)
):
"""OAuth 2.0 토큰 엔드포인트"""
# 클라이언트 인증
if not client_id or not client_secret:
raise HTTPException(
status_code=401,
detail="Client authentication required",
headers={"WWW-Authenticate": "Basic"}
)
application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id)
if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret):
raise HTTPException(status_code=401, detail="Invalid client credentials")
if grant_type == "authorization_code":
# Authorization Code Grant
if not code or not redirect_uri:
raise HTTPException(status_code=400, detail="Missing required parameters")
auth_code = await AuthorizationCode.find_one(
AuthorizationCode.code == code,
AuthorizationCode.client_id == client_id
)
if not auth_code:
raise HTTPException(status_code=400, detail="Invalid authorization code")
if auth_code.used:
raise HTTPException(status_code=400, detail="Authorization code already used")
if auth_code.expires_at < datetime.now():
raise HTTPException(status_code=400, detail="Authorization code expired")
if auth_code.redirect_uri != redirect_uri:
raise HTTPException(status_code=400, detail="Redirect URI mismatch")
# PKCE 검증
if auth_code.code_challenge:
if not code_verifier:
raise HTTPException(status_code=400, detail="Code verifier required")
if not OAuthUtils.verify_pkce_challenge(
code_verifier,
auth_code.code_challenge,
auth_code.code_challenge_method
):
raise HTTPException(status_code=400, detail="Invalid code verifier")
# 코드를 사용됨으로 표시
auth_code.used = True
auth_code.used_at = datetime.now()
await auth_code.save()
# 토큰 생성
access_token = OAuthUtils.generate_access_token()
refresh_token = OAuthUtils.generate_refresh_token()
token_doc = AccessToken(
token=access_token,
refresh_token=refresh_token,
client_id=client_id,
user_id=auth_code.user_id,
scopes=auth_code.scopes,
expires_at=datetime.now() + timedelta(hours=1),
refresh_expires_at=datetime.now() + timedelta(days=30)
)
await token_doc.create()
return TokenGenerator.generate_token_response(
access_token=access_token,
expires_in=3600,
refresh_token=refresh_token,
scope=" ".join(auth_code.scopes)
)
elif grant_type == "refresh_token":
# Refresh Token Grant
if not refresh_token:
raise HTTPException(status_code=400, detail="Refresh token required")
token_doc = await AccessToken.find_one(
AccessToken.refresh_token == refresh_token,
AccessToken.client_id == client_id
)
if not token_doc:
raise HTTPException(status_code=400, detail="Invalid refresh token")
if token_doc.revoked:
raise HTTPException(status_code=400, detail="Token has been revoked")
if token_doc.refresh_expires_at and token_doc.refresh_expires_at < datetime.now():
raise HTTPException(status_code=400, detail="Refresh token expired")
# 기존 토큰 폐기
token_doc.revoked = True
token_doc.revoked_at = datetime.now()
await token_doc.save()
# 새 토큰 생성
new_access_token = OAuthUtils.generate_access_token()
new_refresh_token = OAuthUtils.generate_refresh_token()
new_token_doc = AccessToken(
token=new_access_token,
refresh_token=new_refresh_token,
client_id=client_id,
user_id=token_doc.user_id,
scopes=token_doc.scopes,
expires_at=datetime.now() + timedelta(hours=1),
refresh_expires_at=datetime.now() + timedelta(days=30)
)
await new_token_doc.create()
return TokenGenerator.generate_token_response(
access_token=new_access_token,
expires_in=3600,
refresh_token=new_refresh_token,
scope=" ".join(token_doc.scopes)
)
elif grant_type == "client_credentials":
# Client Credentials Grant
requested_scopes = ScopeValidator.parse_scope_string(scope) if scope else []
valid_scopes = ScopeValidator.validate_scopes(requested_scopes, application.allowed_scopes)
access_token = OAuthUtils.generate_access_token()
token_doc = AccessToken(
token=access_token,
client_id=client_id,
scopes=valid_scopes,
expires_at=datetime.now() + timedelta(hours=1)
)
await token_doc.create()
return TokenGenerator.generate_token_response(
access_token=access_token,
expires_in=3600,
scope=" ".join(valid_scopes)
)
else:
raise HTTPException(status_code=400, detail="Unsupported grant type")
# Token Introspection Endpoint
@app.post("/introspect")
async def introspect(
token: str = Form(...),
token_type_hint: Optional[str] = Form(None),
client_id: str = Form(...),
client_secret: str = Form(...)
):
"""토큰 검증 엔드포인트"""
# 클라이언트 인증
application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id)
if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret):
raise HTTPException(status_code=401, detail="Invalid client credentials")
# 토큰 조회
token_doc = await AccessToken.find_one(AccessToken.token == token)
if not token_doc or token_doc.revoked or token_doc.expires_at < datetime.now():
return {"active": False}
# 토큰 사용 시간 업데이트
token_doc.last_used_at = datetime.now()
await token_doc.save()
return {
"active": True,
"scope": " ".join(token_doc.scopes),
"client_id": token_doc.client_id,
"username": token_doc.user_id,
"exp": int(token_doc.expires_at.timestamp())
}
# Token Revocation Endpoint
@app.post("/revoke")
async def revoke(
token: str = Form(...),
token_type_hint: Optional[str] = Form(None),
client_id: str = Form(...),
client_secret: str = Form(...)
):
"""토큰 폐기 엔드포인트"""
# 클라이언트 인증
application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id)
if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret):
raise HTTPException(status_code=401, detail="Invalid client credentials")
# 토큰 조회 및 폐기
token_doc = await AccessToken.find_one(
AccessToken.token == token,
AccessToken.client_id == client_id
)
if token_doc and not token_doc.revoked:
token_doc.revoked = True
token_doc.revoked_at = datetime.now()
await token_doc.save()
# 이벤트 발행
if kafka_producer:
event = Event(
event_type=EventType.TASK_COMPLETED,
service="oauth",
data={
"action": "token_revoked",
"token_id": str(token_doc.id),
"client_id": client_id
}
)
await kafka_producer.send_event("oauth-events", event)
return {"status": "success"}
# Scopes Management
@app.get("/scopes")
async def list_scopes():
"""사용 가능한 스코프 목록 조회"""
scopes = await OAuthScope.find_all().to_list()
return [
{
"name": scope.name,
"display_name": scope.display_name,
"description": scope.description,
"is_default": scope.is_default,
"requires_approval": scope.requires_approval
}
for scope in scopes
]
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)