Files
site11/console/backend/auth.py
jungwoo choi 315eeea2ae Step 5: Authentication System & Environment Variables
- Implemented JWT authentication in Console backend
- Added .env file for environment variable management
- Updated docker-compose to use .env variables
- Created authentication endpoints (login/logout/me)
- Added protected route middleware
- Created ARCHITECTURE.md with Kafka as main messaging platform
- Defined Kafka for both events and task queues
- Redis dedicated for caching and session management

Test credentials:
- admin/admin123
- user/user123

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-10 16:36:47 +09:00

65 lines
2.0 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import os
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class UserLogin(BaseModel):
username: str
password: str
class UserInDB(BaseModel):
username: str
hashed_password: str
email: str
full_name: Optional[str] = None
is_active: bool = True
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
return token_data