Files
jungwoo choi f4b75b96a5 feat: Phase 1 - Complete authentication system with JWT
Backend Implementation (FastAPI + MongoDB):
- JWT authentication with access/refresh tokens
- User registration and login endpoints
- Password hashing with bcrypt (fixed 72-byte limit)
- Protected endpoints with JWT middleware
- Token refresh mechanism
- Role-Based Access Control (RBAC) structure
- Pydantic v2 models and async MongoDB with Motor
- API endpoints: /api/auth/register, /api/auth/login, /api/auth/me, /api/auth/refresh

Frontend Implementation (React + TypeScript + Material-UI):
- Login and Register pages with validation
- AuthContext for global authentication state
- API client with Axios interceptors for token refresh
- Protected routes with automatic redirect
- User profile display in navigation
- Logout functionality

Technical Achievements:
- Resolved bcrypt 72-byte limit (replaced passlib with native bcrypt)
- Fixed Pydantic v2 compatibility (PyObjectId, ConfigDict)
- Implemented automatic token refresh on 401 errors
- Created comprehensive test suite for all auth endpoints

Docker & Kubernetes:
- Backend image: yakenator/site11-console-backend:latest
- Frontend image: yakenator/site11-console-frontend:latest
- Deployed to site11-pipeline namespace
- Nginx reverse proxy configuration

Documentation:
- CONSOLE_ARCHITECTURE.md - Complete system architecture
- PHASE1_COMPLETION.md - Detailed completion report
- PROGRESS.md - Updated with Phase 1 status

All authentication endpoints tested and verified working.

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-28 16:23:07 +09:00

347 lines
11 KiB
Python

from fastapi import FastAPI, HTTPException, Request, Response, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordRequestForm
import uvicorn
from datetime import datetime, timedelta
import httpx
import os
import asyncio
import logging
from typing import Any
from contextlib import asynccontextmanager
from auth import (
Token, UserLogin, UserInDB,
verify_password, get_password_hash,
create_access_token, get_current_user,
ACCESS_TOKEN_EXPIRE_MINUTES
)
# Import event consumer
from event_consumer import AdvancedEventConsumer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global event consumer instance
event_consumer = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global event_consumer
try:
# Initialize and start event consumer
event_consumer = AdvancedEventConsumer(
topics=["user-events", "oauth-events"],
group_id="console-consumer-group",
redis_url=os.getenv("REDIS_URL", "redis://redis:6379"),
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092"),
enable_dlq=True,
dlq_topic="dead-letter-queue"
)
await event_consumer.start()
logger.info("Event consumer started successfully")
except Exception as e:
logger.error(f"Failed to start event consumer: {e}")
# Continue without event consumer (degraded mode)
event_consumer = None
yield
# Shutdown
if event_consumer:
await event_consumer.stop()
logger.info("Event consumer stopped")
app = FastAPI(
title="Console API Gateway",
description="Central orchestrator for microservices",
version="0.1.0",
lifespan=lifespan
)
# Service URLs from environment
USERS_SERVICE_URL = os.getenv("USERS_SERVICE_URL", "http://users-backend:8000")
IMAGES_SERVICE_URL = os.getenv("IMAGES_SERVICE_URL", "http://images-backend:8000")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "Console API Gateway",
"status": "running",
"timestamp": datetime.now().isoformat()
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "console",
"timestamp": datetime.now().isoformat(),
"event_consumer": "running" if event_consumer else "not running"
}
@app.get("/api/health")
async def api_health_check():
"""API health check endpoint for frontend"""
return {
"status": "healthy",
"service": "console-backend",
"timestamp": datetime.now().isoformat()
}
@app.get("/api/users/health")
async def users_health_check():
"""Users service health check endpoint"""
# TODO: Replace with actual users service health check when implemented
return {
"status": "healthy",
"service": "users-service",
"timestamp": datetime.now().isoformat()
}
# Event Management Endpoints
@app.get("/api/events/stats")
async def get_event_stats(current_user = Depends(get_current_user)):
"""Get event consumer statistics"""
if not event_consumer:
raise HTTPException(status_code=503, detail="Event consumer not available")
return {
"stats": event_consumer.stats,
"timestamp": datetime.now().isoformat()
}
@app.get("/api/events/dlq")
async def get_dlq_messages(
limit: int = 10,
current_user = Depends(get_current_user)
):
"""Get messages from Dead Letter Queue"""
if not event_consumer:
raise HTTPException(status_code=503, detail="Event consumer not available")
messages = await event_consumer.get_dlq_messages(limit=limit)
return {
"messages": messages,
"count": len(messages),
"timestamp": datetime.now().isoformat()
}
@app.post("/api/events/dlq/{event_id}/retry")
async def retry_dlq_message(
event_id: str,
current_user = Depends(get_current_user)
):
"""Manually retry a message from DLQ"""
if not event_consumer:
raise HTTPException(status_code=503, detail="Event consumer not available")
success = await event_consumer.retry_dlq_message(event_id)
if not success:
raise HTTPException(status_code=404, detail="Event not found in DLQ")
return {
"status": "retry_initiated",
"event_id": event_id,
"timestamp": datetime.now().isoformat()
}
@app.get("/api/events/schemas")
async def get_event_schemas():
"""Get all event schemas documentation"""
from shared.kafka.schema_registry import SchemaRegistry
schemas = SchemaRegistry.get_all_schemas()
return {
"schemas": schemas,
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
# Authentication endpoints
@app.post("/api/auth/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""Login endpoint for authentication"""
# For demo purposes - in production, check against database
# This is temporary until we integrate with Users service
demo_users = {
"admin": {
"username": "admin",
"hashed_password": get_password_hash("admin123"),
"email": "admin@site11.com",
"full_name": "Administrator",
"is_active": True
},
"user": {
"username": "user",
"hashed_password": get_password_hash("user123"),
"email": "user@site11.com",
"full_name": "Test User",
"is_active": True
}
}
user = demo_users.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/api/auth/me")
async def get_me(current_user = Depends(get_current_user)):
"""Get current user information"""
return {
"username": current_user.username,
"email": f"{current_user.username}@site11.com",
"is_active": True
}
@app.post("/api/auth/logout")
async def logout(current_user = Depends(get_current_user)):
"""Logout endpoint"""
# In a real application, you might want to blacklist the token
return {"message": "Successfully logged out"}
@app.get("/api/status")
async def system_status():
services_status = {}
# Check Users service
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{USERS_SERVICE_URL}/health", timeout=2.0)
services_status["users"] = "online" if response.status_code == 200 else "error"
except:
services_status["users"] = "offline"
# Check Images service
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{IMAGES_SERVICE_URL}/health", timeout=2.0)
services_status["images"] = "online" if response.status_code == 200 else "error"
except:
services_status["images"] = "offline"
# Other services (not yet implemented)
services_status["oauth"] = "pending"
services_status["applications"] = "pending"
services_status["data"] = "pending"
services_status["statistics"] = "pending"
return {
"console": "online",
"services": services_status,
"timestamp": datetime.now().isoformat()
}
# Protected endpoint example
@app.get("/api/protected")
async def protected_route(current_user = Depends(get_current_user)):
"""Example of a protected route"""
return {
"message": "This is a protected route",
"user": current_user.username
}
# API Gateway - Route to Images service
@app.api_route("/api/images/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_images(path: str, request: Request):
"""Proxy requests to Images service (public for image proxy)"""
try:
async with httpx.AsyncClient() as client:
# Build the target URL
url = f"{IMAGES_SERVICE_URL}/api/v1/{path}"
# Get request body if exists
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
# Forward the request
response = await client.request(
method=request.method,
url=url,
headers={
key: value for key, value in request.headers.items()
if key.lower() not in ["host", "content-length"]
},
content=body,
params=request.query_params
)
# Return the response
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Images service unavailable")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API Gateway - Route to Users service
@app.api_route("/api/users/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_to_users(path: str, request: Request, current_user = Depends(get_current_user)):
"""Proxy requests to Users service (protected)"""
try:
async with httpx.AsyncClient() as client:
# Build the target URL
url = f"{USERS_SERVICE_URL}/{path}"
# Get request body if exists
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
# Forward the request
response = await client.request(
method=request.method,
url=url,
headers={
key: value for key, value in request.headers.items()
if key.lower() not in ["host", "content-length"]
},
content=body,
params=request.query_params
)
# Return the response
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Users service unavailable")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)