"""Main FastAPI application""" from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from contextlib import asynccontextmanager import logging from app.config import settings from app.utils.database import connect_database, disconnect_database from app.routers import auth_router, users_router, applications_router from app.services.auth_service import AuthService # Configure logging logging.basicConfig( level=getattr(logging, settings.log_level), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifecycle""" # Startup logger.info("Starting OAuth 2.0 Authentication System") await connect_database() # Create admin user if not exists if settings.environment == "dev": await AuthService.create_admin_user() yield # Shutdown logger.info("Shutting down OAuth 2.0 Authentication System") await disconnect_database() # Create FastAPI app app = FastAPI( title="OAuth 2.0 Authentication System", description="Enterprise-grade OAuth 2.0 based central authentication system", version="1.0.0", docs_url=f"{settings.api_prefix}/docs", redoc_url=f"{settings.api_prefix}/redoc", openapi_url=f"{settings.api_prefix}/openapi.json", lifespan=lifespan ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=settings.cors_allow_credentials, allow_methods=["*"], allow_headers=["*"], ) # Health check endpoint @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "environment": settings.environment, "version": "1.0.0" } # Root endpoint @app.get("/") async def root(): """Root endpoint""" return { "message": "OAuth 2.0 Authentication System API", "docs": f"{settings.api_prefix}/docs", "health": "/health" } # Include routers app.include_router(auth_router, prefix=settings.api_prefix) app.include_router(users_router, prefix=settings.api_prefix) app.include_router(applications_router, prefix=settings.api_prefix) # Global exception handler @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """Global exception handler""" logger.error(f"Unhandled exception: {exc}") return JSONResponse( status_code=500, content={"detail": "Internal server error"} ) # OAuth 2.0 authorization endpoint @app.get("/oauth/authorize") async def oauth_authorize( response_type: str, client_id: str, redirect_uri: str, scope: str, state: str = None ): """OAuth 2.0 authorization endpoint""" # This would typically render a login page # For now, return the parameters for the frontend to handle return { "response_type": response_type, "client_id": client_id, "redirect_uri": redirect_uri, "scope": scope, "state": state } # OAuth 2.0 token endpoint @app.post("/oauth/token") async def oauth_token( grant_type: str, code: str = None, client_id: str = None, client_secret: str = None, redirect_uri: str = None, refresh_token: str = None ): """OAuth 2.0 token endpoint""" from app.services.token_service import TokenService if grant_type == "authorization_code": if not all([code, client_id, client_secret, redirect_uri]): return JSONResponse( status_code=400, content={"error": "invalid_request", "error_description": "Missing required parameters"} ) token = await TokenService.exchange_authorization_code( code=code, client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri ) if not token: return JSONResponse( status_code=400, content={"error": "invalid_grant", "error_description": "Invalid authorization code"} ) return token elif grant_type == "refresh_token": if not refresh_token: return JSONResponse( status_code=400, content={"error": "invalid_request", "error_description": "Missing refresh token"} ) user_id = await TokenService.verify_refresh_token(refresh_token) if not user_id: return JSONResponse( status_code=400, content={"error": "invalid_grant", "error_description": "Invalid refresh token"} ) # Get user and create new tokens from app.services.auth_service import AuthService user = await AuthService.get_user_by_id(user_id) if not user: return JSONResponse( status_code=400, content={"error": "invalid_grant", "error_description": "User not found"} ) # Revoke old refresh token await TokenService.revoke_refresh_token(refresh_token) # Create new tokens tokens = await TokenService.create_tokens( str(user.id), { "username": user.username, "role": user.role, "email": user.email } ) return tokens else: return JSONResponse( status_code=400, content={"error": "unsupported_grant_type", "error_description": f"Grant type '{grant_type}' is not supported"} )