from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional from datetime import datetime import uvicorn from contextlib import asynccontextmanager from database import init_db from models import User from beanie import PydanticObjectId # Pydantic models for requests class UserCreate(BaseModel): username: str email: str full_name: Optional[str] = None class UserUpdate(BaseModel): username: Optional[str] = None email: Optional[str] = None full_name: Optional[str] = None class UserResponse(BaseModel): id: str username: str email: str full_name: Optional[str] = None created_at: datetime updated_at: datetime @asynccontextmanager async def lifespan(app: FastAPI): # Startup await init_db() yield # Shutdown pass app = FastAPI( title="Users Service", description="User management microservice with MongoDB", version="0.2.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Health check @app.get("/health") async def health_check(): return { "status": "healthy", "service": "users", "timestamp": datetime.now().isoformat() } # CRUD Operations @app.get("/users", response_model=List[UserResponse]) async def get_users(): users = await User.find_all().to_list() return [UserResponse( id=str(user.id), username=user.username, email=user.email, full_name=user.full_name, created_at=user.created_at, updated_at=user.updated_at ) for user in users] @app.get("/users/{user_id}", response_model=UserResponse) async def get_user(user_id: str): try: user = await User.get(PydanticObjectId(user_id)) if not user: raise HTTPException(status_code=404, detail="User not found") return UserResponse( id=str(user.id), username=user.username, email=user.email, full_name=user.full_name, created_at=user.created_at, updated_at=user.updated_at ) except Exception: raise HTTPException(status_code=404, detail="User not found") @app.post("/users", response_model=UserResponse, status_code=201) async def create_user(user_data: UserCreate): # Check if username already exists existing_user = await User.find_one(User.username == user_data.username) if existing_user: raise HTTPException(status_code=400, detail="Username already exists") # Create new user user = User( username=user_data.username, email=user_data.email, full_name=user_data.full_name ) await user.create() return UserResponse( id=str(user.id), username=user.username, email=user.email, full_name=user.full_name, created_at=user.created_at, updated_at=user.updated_at ) @app.put("/users/{user_id}", response_model=UserResponse) async def update_user(user_id: str, user_update: UserUpdate): try: user = await User.get(PydanticObjectId(user_id)) if not user: raise HTTPException(status_code=404, detail="User not found") except Exception: raise HTTPException(status_code=404, detail="User not found") if user_update.username is not None: # Check if new username already exists existing_user = await User.find_one( User.username == user_update.username, User.id != user.id ) if existing_user: raise HTTPException(status_code=400, detail="Username already exists") user.username = user_update.username if user_update.email is not None: user.email = user_update.email if user_update.full_name is not None: user.full_name = user_update.full_name user.updated_at = datetime.now() await user.save() return UserResponse( id=str(user.id), username=user.username, email=user.email, full_name=user.full_name, created_at=user.created_at, updated_at=user.updated_at ) @app.delete("/users/{user_id}") async def delete_user(user_id: str): try: user = await User.get(PydanticObjectId(user_id)) if not user: raise HTTPException(status_code=404, detail="User not found") await user.delete() return {"message": "User deleted successfully"} except Exception: raise HTTPException(status_code=404, detail="User not found") if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )