"""Authentication service""" from typing import Optional from datetime import datetime, timedelta from bson import ObjectId from app.utils.database import get_database from app.utils.security import verify_password, hash_password from app.models.user import UserCreate, User, UserInDB, UserRole from app.models.auth_history import AuthHistoryCreate, AuthAction import logging logger = logging.getLogger(__name__) class AuthService: """Service for handling authentication logic""" @staticmethod async def authenticate_user(username: str, password: str) -> Optional[UserInDB]: """Authenticate a user with username/email and password""" db = get_database() # Try to find user by username or email user = await db.users.find_one({ "$or": [ {"username": username}, {"email": username} ] }) if not user: return None if not verify_password(password, user["hashed_password"]): return None return UserInDB(**user) @staticmethod async def create_user(user_data: UserCreate) -> User: """Create a new user""" db = get_database() # Check if user already exists existing_user = await db.users.find_one({ "$or": [ {"email": user_data.email}, {"username": user_data.username} ] }) if existing_user: raise ValueError("User with this email or username already exists") # Hash password hashed_password = hash_password(user_data.password) # Prepare user document user_doc = { **user_data.model_dump(exclude={"password"}), "hashed_password": hashed_password, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } # Insert user result = await db.users.insert_one(user_doc) user_doc["_id"] = result.inserted_id return User(**user_doc) @staticmethod async def get_user_by_id(user_id: str) -> Optional[User]: """Get user by ID""" db = get_database() try: user = await db.users.find_one({"_id": ObjectId(user_id)}) if user: return User(**user) except Exception as e: logger.error(f"Error getting user by ID: {e}") return None @staticmethod async def log_auth_action( user_id: str, action: AuthAction, ip_address: str, user_agent: Optional[str] = None, application_id: Optional[str] = None, result: str = "success", details: Optional[dict] = None ): """Log authentication action""" db = get_database() history_doc = { "user_id": user_id, "application_id": application_id, "action": action.value, "ip_address": ip_address, "user_agent": user_agent, "result": result, "details": details, "created_at": datetime.utcnow() } await db.auth_history.insert_one(history_doc) @staticmethod async def create_admin_user(): """Create default admin user if not exists""" from app.config import settings db = get_database() # Check if admin exists admin = await db.users.find_one({"email": settings.admin_email}) if not admin: admin_data = UserCreate( email=settings.admin_email, username="admin", password=settings.admin_password, full_name="System Administrator", role=UserRole.SYSTEM_ADMIN ) try: await AuthService.create_user(admin_data) logger.info("Admin user created successfully") except ValueError: logger.info("Admin user already exists")