from fastapi import FastAPI, HTTPException, Depends, Form, Query, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, JSONResponse from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Optional, List, Dict import uvicorn import os import sys import logging from database import init_db from models import ( OAuthApplication, AuthorizationCode, AccessToken, OAuthScope, UserConsent, GrantType, ResponseType ) from utils import OAuthUtils, TokenGenerator, ScopeValidator from pydantic import BaseModel, Field from beanie import PydanticObjectId sys.path.append('/app') from shared.kafka import KafkaProducer, Event, EventType logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Pydantic models class ApplicationCreate(BaseModel): name: str description: Optional[str] = None redirect_uris: List[str] website_url: Optional[str] = None logo_url: Optional[str] = None privacy_policy_url: Optional[str] = None terms_url: Optional[str] = None sso_enabled: Optional[bool] = False sso_provider: Optional[str] = None sso_config: Optional[Dict] = None allowed_domains: Optional[List[str]] = None class ApplicationUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None redirect_uris: Optional[List[str]] = None website_url: Optional[str] = None logo_url: Optional[str] = None privacy_policy_url: Optional[str] = None terms_url: Optional[str] = None is_active: Optional[bool] = None sso_enabled: Optional[bool] = None sso_provider: Optional[str] = None sso_config: Optional[Dict] = None allowed_domains: Optional[List[str]] = None class ApplicationResponse(BaseModel): id: str client_id: str name: str description: Optional[str] redirect_uris: List[str] allowed_scopes: List[str] grant_types: List[str] is_active: bool is_trusted: bool sso_enabled: bool sso_provider: Optional[str] allowed_domains: List[str] website_url: Optional[str] logo_url: Optional[str] created_at: datetime class TokenRequest(BaseModel): grant_type: str code: Optional[str] = None redirect_uri: Optional[str] = None client_id: Optional[str] = None client_secret: Optional[str] = None refresh_token: Optional[str] = None scope: Optional[str] = None code_verifier: Optional[str] = None # Global Kafka producer kafka_producer: Optional[KafkaProducer] = None @asynccontextmanager async def lifespan(app: FastAPI): # Startup global kafka_producer await init_db() # Initialize Kafka producer try: kafka_producer = KafkaProducer( bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092') ) await kafka_producer.start() logger.info("Kafka producer initialized") except Exception as e: logger.warning(f"Failed to initialize Kafka producer: {e}") kafka_producer = None yield # Shutdown if kafka_producer: await kafka_producer.stop() app = FastAPI( title="OAuth 2.0 Service", description="OAuth 2.0 인증 서버 및 애플리케이션 관리", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Health check @app.get("/health") async def health_check(): return { "status": "healthy", "service": "oauth", "timestamp": datetime.now().isoformat() } # OAuth Application Management @app.post("/applications", response_model=ApplicationResponse, status_code=201) async def create_application( app_data: ApplicationCreate, current_user_id: str = "test_user" # TODO: Get from JWT token ): """새로운 OAuth 애플리케이션 등록""" client_id = OAuthUtils.generate_client_id() client_secret = OAuthUtils.generate_client_secret() hashed_secret = OAuthUtils.hash_client_secret(client_secret) # 기본 스코프 가져오기 default_scopes = await OAuthScope.find(OAuthScope.is_default == True).to_list() allowed_scopes = [scope.name for scope in default_scopes] application = OAuthApplication( client_id=client_id, client_secret=hashed_secret, name=app_data.name, description=app_data.description, owner_id=current_user_id, redirect_uris=app_data.redirect_uris, allowed_scopes=allowed_scopes, grant_types=[GrantType.AUTHORIZATION_CODE, GrantType.REFRESH_TOKEN], sso_enabled=app_data.sso_enabled or False, sso_provider=app_data.sso_provider, sso_config=app_data.sso_config or {}, allowed_domains=app_data.allowed_domains or [], website_url=app_data.website_url, logo_url=app_data.logo_url, privacy_policy_url=app_data.privacy_policy_url, terms_url=app_data.terms_url ) await application.create() # 이벤트 발행 if kafka_producer: event = Event( event_type=EventType.TASK_CREATED, service="oauth", data={ "app_id": str(application.id), "client_id": client_id, "name": application.name, "owner_id": current_user_id } ) await kafka_producer.send_event("oauth-events", event) # 클라이언트 시크릿은 생성 시에만 반환 return { **ApplicationResponse( id=str(application.id), client_id=application.client_id, name=application.name, description=application.description, redirect_uris=application.redirect_uris, allowed_scopes=application.allowed_scopes, grant_types=[gt.value for gt in application.grant_types], is_active=application.is_active, is_trusted=application.is_trusted, sso_enabled=application.sso_enabled, sso_provider=application.sso_provider, allowed_domains=application.allowed_domains, website_url=application.website_url, logo_url=application.logo_url, created_at=application.created_at ).dict(), "client_secret": client_secret # 최초 생성 시에만 반환 } @app.get("/applications", response_model=List[ApplicationResponse]) async def list_applications( owner_id: Optional[str] = None, is_active: Optional[bool] = None ): """OAuth 애플리케이션 목록 조회""" query = {} if owner_id: query["owner_id"] = owner_id if is_active is not None: query["is_active"] = is_active applications = await OAuthApplication.find(query).to_list() return [ ApplicationResponse( id=str(app.id), client_id=app.client_id, name=app.name, description=app.description, redirect_uris=app.redirect_uris, allowed_scopes=app.allowed_scopes, grant_types=[gt.value for gt in app.grant_types], is_active=app.is_active, is_trusted=app.is_trusted, sso_enabled=app.sso_enabled, sso_provider=app.sso_provider, allowed_domains=app.allowed_domains, website_url=app.website_url, logo_url=app.logo_url, created_at=app.created_at ) for app in applications ] @app.get("/applications/{client_id}", response_model=ApplicationResponse) async def get_application(client_id: str): """OAuth 애플리케이션 상세 조회""" application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id) if not application: raise HTTPException(status_code=404, detail="Application not found") return ApplicationResponse( id=str(application.id), client_id=application.client_id, name=application.name, description=application.description, redirect_uris=application.redirect_uris, allowed_scopes=application.allowed_scopes, grant_types=[gt.value for gt in application.grant_types], is_active=application.is_active, is_trusted=application.is_trusted, sso_enabled=application.sso_enabled, sso_provider=application.sso_provider, allowed_domains=application.allowed_domains, website_url=application.website_url, logo_url=application.logo_url, created_at=application.created_at ) # OAuth 2.0 Authorization Endpoint @app.get("/authorize") async def authorize( response_type: str = Query(..., description="응답 타입 (code, token)"), client_id: str = Query(..., description="클라이언트 ID"), redirect_uri: str = Query(..., description="리다이렉트 URI"), scope: str = Query("", description="요청 스코프"), state: Optional[str] = Query(None, description="상태 값"), code_challenge: Optional[str] = Query(None, description="PKCE challenge"), code_challenge_method: Optional[str] = Query("S256", description="PKCE method"), current_user_id: str = "test_user" # TODO: Get from session/JWT ): """OAuth 2.0 인증 엔드포인트""" # 애플리케이션 확인 application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id) if not application or not application.is_active: raise HTTPException(status_code=400, detail="Invalid client") # 리다이렉트 URI 확인 if redirect_uri not in application.redirect_uris: raise HTTPException(status_code=400, detail="Invalid redirect URI") # 스코프 검증 requested_scopes = ScopeValidator.parse_scope_string(scope) valid_scopes = ScopeValidator.validate_scopes(requested_scopes, application.allowed_scopes) # 사용자 동의 확인 (신뢰할 수 있는 앱이거나 이미 동의한 경우 건너뛰기) if not application.is_trusted: consent = await UserConsent.find_one( UserConsent.user_id == current_user_id, UserConsent.client_id == client_id ) if not consent or set(valid_scopes) - set(consent.granted_scopes): # TODO: 동의 화면으로 리다이렉트 pass if response_type == "code": # Authorization Code Flow code = OAuthUtils.generate_authorization_code() auth_code = AuthorizationCode( code=code, client_id=client_id, user_id=current_user_id, redirect_uri=redirect_uri, scopes=valid_scopes, code_challenge=code_challenge, code_challenge_method=code_challenge_method, expires_at=datetime.now() + timedelta(minutes=10) ) await auth_code.create() # 리다이렉트 URL 생성 redirect_url = f"{redirect_uri}?code={code}" if state: redirect_url += f"&state={state}" return RedirectResponse(url=redirect_url) elif response_type == "token": # Implicit Flow (권장하지 않음) raise HTTPException(status_code=400, detail="Implicit flow not supported") else: raise HTTPException(status_code=400, detail="Unsupported response type") # OAuth 2.0 Token Endpoint @app.post("/token") async def token( grant_type: str = Form(...), code: Optional[str] = Form(None), redirect_uri: Optional[str] = Form(None), client_id: Optional[str] = Form(None), client_secret: Optional[str] = Form(None), refresh_token: Optional[str] = Form(None), scope: Optional[str] = Form(None), code_verifier: Optional[str] = Form(None) ): """OAuth 2.0 토큰 엔드포인트""" # 클라이언트 인증 if not client_id or not client_secret: raise HTTPException( status_code=401, detail="Client authentication required", headers={"WWW-Authenticate": "Basic"} ) application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id) if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret): raise HTTPException(status_code=401, detail="Invalid client credentials") if grant_type == "authorization_code": # Authorization Code Grant if not code or not redirect_uri: raise HTTPException(status_code=400, detail="Missing required parameters") auth_code = await AuthorizationCode.find_one( AuthorizationCode.code == code, AuthorizationCode.client_id == client_id ) if not auth_code: raise HTTPException(status_code=400, detail="Invalid authorization code") if auth_code.used: raise HTTPException(status_code=400, detail="Authorization code already used") if auth_code.expires_at < datetime.now(): raise HTTPException(status_code=400, detail="Authorization code expired") if auth_code.redirect_uri != redirect_uri: raise HTTPException(status_code=400, detail="Redirect URI mismatch") # PKCE 검증 if auth_code.code_challenge: if not code_verifier: raise HTTPException(status_code=400, detail="Code verifier required") if not OAuthUtils.verify_pkce_challenge( code_verifier, auth_code.code_challenge, auth_code.code_challenge_method ): raise HTTPException(status_code=400, detail="Invalid code verifier") # 코드를 사용됨으로 표시 auth_code.used = True auth_code.used_at = datetime.now() await auth_code.save() # 토큰 생성 access_token = OAuthUtils.generate_access_token() refresh_token = OAuthUtils.generate_refresh_token() token_doc = AccessToken( token=access_token, refresh_token=refresh_token, client_id=client_id, user_id=auth_code.user_id, scopes=auth_code.scopes, expires_at=datetime.now() + timedelta(hours=1), refresh_expires_at=datetime.now() + timedelta(days=30) ) await token_doc.create() return TokenGenerator.generate_token_response( access_token=access_token, expires_in=3600, refresh_token=refresh_token, scope=" ".join(auth_code.scopes) ) elif grant_type == "refresh_token": # Refresh Token Grant if not refresh_token: raise HTTPException(status_code=400, detail="Refresh token required") token_doc = await AccessToken.find_one( AccessToken.refresh_token == refresh_token, AccessToken.client_id == client_id ) if not token_doc: raise HTTPException(status_code=400, detail="Invalid refresh token") if token_doc.revoked: raise HTTPException(status_code=400, detail="Token has been revoked") if token_doc.refresh_expires_at and token_doc.refresh_expires_at < datetime.now(): raise HTTPException(status_code=400, detail="Refresh token expired") # 기존 토큰 폐기 token_doc.revoked = True token_doc.revoked_at = datetime.now() await token_doc.save() # 새 토큰 생성 new_access_token = OAuthUtils.generate_access_token() new_refresh_token = OAuthUtils.generate_refresh_token() new_token_doc = AccessToken( token=new_access_token, refresh_token=new_refresh_token, client_id=client_id, user_id=token_doc.user_id, scopes=token_doc.scopes, expires_at=datetime.now() + timedelta(hours=1), refresh_expires_at=datetime.now() + timedelta(days=30) ) await new_token_doc.create() return TokenGenerator.generate_token_response( access_token=new_access_token, expires_in=3600, refresh_token=new_refresh_token, scope=" ".join(token_doc.scopes) ) elif grant_type == "client_credentials": # Client Credentials Grant requested_scopes = ScopeValidator.parse_scope_string(scope) if scope else [] valid_scopes = ScopeValidator.validate_scopes(requested_scopes, application.allowed_scopes) access_token = OAuthUtils.generate_access_token() token_doc = AccessToken( token=access_token, client_id=client_id, scopes=valid_scopes, expires_at=datetime.now() + timedelta(hours=1) ) await token_doc.create() return TokenGenerator.generate_token_response( access_token=access_token, expires_in=3600, scope=" ".join(valid_scopes) ) else: raise HTTPException(status_code=400, detail="Unsupported grant type") # Token Introspection Endpoint @app.post("/introspect") async def introspect( token: str = Form(...), token_type_hint: Optional[str] = Form(None), client_id: str = Form(...), client_secret: str = Form(...) ): """토큰 검증 엔드포인트""" # 클라이언트 인증 application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id) if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret): raise HTTPException(status_code=401, detail="Invalid client credentials") # 토큰 조회 token_doc = await AccessToken.find_one(AccessToken.token == token) if not token_doc or token_doc.revoked or token_doc.expires_at < datetime.now(): return {"active": False} # 토큰 사용 시간 업데이트 token_doc.last_used_at = datetime.now() await token_doc.save() return { "active": True, "scope": " ".join(token_doc.scopes), "client_id": token_doc.client_id, "username": token_doc.user_id, "exp": int(token_doc.expires_at.timestamp()) } # Token Revocation Endpoint @app.post("/revoke") async def revoke( token: str = Form(...), token_type_hint: Optional[str] = Form(None), client_id: str = Form(...), client_secret: str = Form(...) ): """토큰 폐기 엔드포인트""" # 클라이언트 인증 application = await OAuthApplication.find_one(OAuthApplication.client_id == client_id) if not application or not OAuthUtils.verify_client_secret(client_secret, application.client_secret): raise HTTPException(status_code=401, detail="Invalid client credentials") # 토큰 조회 및 폐기 token_doc = await AccessToken.find_one( AccessToken.token == token, AccessToken.client_id == client_id ) if token_doc and not token_doc.revoked: token_doc.revoked = True token_doc.revoked_at = datetime.now() await token_doc.save() # 이벤트 발행 if kafka_producer: event = Event( event_type=EventType.TASK_COMPLETED, service="oauth", data={ "action": "token_revoked", "token_id": str(token_doc.id), "client_id": client_id } ) await kafka_producer.send_event("oauth-events", event) return {"status": "success"} # Scopes Management @app.get("/scopes") async def list_scopes(): """사용 가능한 스코프 목록 조회""" scopes = await OAuthScope.find_all().to_list() return [ { "name": scope.name, "display_name": scope.display_name, "description": scope.description, "is_default": scope.is_default, "requires_approval": scope.requires_approval } for scope in scopes ] if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )