from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional from datetime import datetime import uvicorn app = FastAPI( title="Users Service", description="User management microservice", version="0.1.0" ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # In-memory storage for now (will be replaced with MongoDB later) users_db = {} user_id_counter = 1 # Pydantic models class UserCreate(BaseModel): username: str email: str full_name: Optional[str] = None class UserUpdate(BaseModel): username: Optional[str] = None email: Optional[str] = None full_name: Optional[str] = None class User(BaseModel): id: int username: str email: str full_name: Optional[str] = None created_at: datetime updated_at: datetime # Health check @app.get("/health") async def health_check(): return { "status": "healthy", "service": "users", "timestamp": datetime.now().isoformat() } # CRUD Operations @app.get("/users", response_model=List[User]) async def get_users(): return list(users_db.values()) @app.get("/users/{user_id}", response_model=User) async def get_user(user_id: int): if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") return users_db[user_id] @app.post("/users", response_model=User, status_code=201) async def create_user(user: UserCreate): global user_id_counter # Check if username already exists for existing_user in users_db.values(): if existing_user["username"] == user.username: raise HTTPException(status_code=400, detail="Username already exists") new_user = { "id": user_id_counter, "username": user.username, "email": user.email, "full_name": user.full_name, "created_at": datetime.now(), "updated_at": datetime.now() } users_db[user_id_counter] = new_user user_id_counter += 1 return new_user @app.put("/users/{user_id}", response_model=User) async def update_user(user_id: int, user_update: UserUpdate): if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") user = users_db[user_id] if user_update.username is not None: # Check if new username already exists for uid, existing_user in users_db.items(): if uid != user_id and existing_user["username"] == user_update.username: raise HTTPException(status_code=400, detail="Username already exists") user["username"] = user_update.username if user_update.email is not None: user["email"] = user_update.email if user_update.full_name is not None: user["full_name"] = user_update.full_name user["updated_at"] = datetime.now() return user @app.delete("/users/{user_id}") async def delete_user(user_id: int): if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") del users_db[user_id] return {"message": "User deleted successfully"} if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True )