from fastapi import APIRouter, Depends, HTTPException, Query, status from typing import Optional, List, Dict, Any from app.core.auth import get_current_active_user, User from app.core.database import get_database from app.services.pipeline_service import PipelineService from app.schemas.pipeline import ( PipelineCreate, PipelineUpdate, PipelineResponse, PipelineListResponse, PipelineStatsSchema, PipelineLog ) router = APIRouter() def get_pipeline_service(db=Depends(get_database)) -> PipelineService: """Dependency to get pipeline service""" return PipelineService(db) @router.get("/", response_model=PipelineListResponse) async def get_pipelines( type: Optional[str] = Query(None, description="Filter by pipeline type"), status: Optional[str] = Query(None, description="Filter by status (running/stopped/error)"), current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Get all pipelines with optional filtering""" pipelines = await pipeline_service.get_pipelines(type=type, status=status) pipeline_responses = [ PipelineResponse( _id=str(p.id), name=p.name, type=p.type, status=p.status, config=p.config, schedule=p.schedule, stats=PipelineStatsSchema(**p.stats.model_dump()), last_run=p.last_run, next_run=p.next_run, created_at=p.created_at, updated_at=p.updated_at ) for p in pipelines ] return PipelineListResponse( pipelines=pipeline_responses, total=len(pipeline_responses) ) @router.get("/{pipeline_id}", response_model=PipelineResponse) async def get_pipeline( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Get a pipeline by ID""" pipeline = await pipeline_service.get_pipeline_by_id(pipeline_id) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.post("/", response_model=PipelineResponse, status_code=status.HTTP_201_CREATED) async def create_pipeline( pipeline_data: PipelineCreate, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Create a new pipeline""" pipeline = await pipeline_service.create_pipeline(pipeline_data) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.put("/{pipeline_id}", response_model=PipelineResponse) async def update_pipeline( pipeline_id: str, pipeline_data: PipelineUpdate, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Update a pipeline""" pipeline = await pipeline_service.update_pipeline(pipeline_id, pipeline_data) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.delete("/{pipeline_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_pipeline( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Delete a pipeline""" success = await pipeline_service.delete_pipeline(pipeline_id) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return None @router.get("/{pipeline_id}/stats", response_model=PipelineStatsSchema) async def get_pipeline_stats( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Get pipeline statistics""" stats = await pipeline_service.get_pipeline_stats(pipeline_id) if not stats: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineStatsSchema(**stats.model_dump()) @router.post("/{pipeline_id}/start", response_model=PipelineResponse) async def start_pipeline( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Start a pipeline""" pipeline = await pipeline_service.start_pipeline(pipeline_id) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.post("/{pipeline_id}/stop", response_model=PipelineResponse) async def stop_pipeline( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Stop a pipeline""" pipeline = await pipeline_service.stop_pipeline(pipeline_id) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.post("/{pipeline_id}/restart", response_model=PipelineResponse) async def restart_pipeline( pipeline_id: str, current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Restart a pipeline""" pipeline = await pipeline_service.restart_pipeline(pipeline_id) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at ) @router.get("/{pipeline_id}/logs", response_model=List[PipelineLog]) async def get_pipeline_logs( pipeline_id: str, limit: int = Query(100, ge=1, le=1000, description="Maximum number of logs"), level: Optional[str] = Query(None, description="Filter by log level (INFO, WARNING, ERROR)"), current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Get logs for a pipeline""" logs = await pipeline_service.get_pipeline_logs(pipeline_id, limit=limit, level=level) return logs @router.put("/{pipeline_id}/config", response_model=PipelineResponse) async def update_pipeline_config( pipeline_id: str, config: Dict[str, Any], current_user: User = Depends(get_current_active_user), pipeline_service: PipelineService = Depends(get_pipeline_service) ): """Update pipeline configuration""" pipeline = await pipeline_service.update_pipeline_config(pipeline_id, config) if not pipeline: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pipeline with ID {pipeline_id} not found" ) return PipelineResponse( _id=str(pipeline.id), name=pipeline.name, type=pipeline.type, status=pipeline.status, config=pipeline.config, schedule=pipeline.schedule, stats=PipelineStatsSchema(**pipeline.stats.model_dump()), last_run=pipeline.last_run, next_run=pipeline.next_run, created_at=pipeline.created_at, updated_at=pipeline.updated_at )