diff --git a/services/news-engine-console/TODO.md b/services/news-engine-console/TODO.md index 606c25f..8cb500c 100644 --- a/services/news-engine-console/TODO.md +++ b/services/news-engine-console/TODO.md @@ -444,9 +444,15 @@ metadata: - [x] 프로젝트 구조 - [x] 기본 설정 (config, database, auth) - [x] API 라우터 기본 구조 -- [ ] Pydantic 스키마 +- [x] Pydantic 스키마 (완료 - keyword, pipeline, user, application) +- [x] MongoDB 데이터 모델 (완료 - keyword, pipeline, user, application) +- [x] 서비스 레이어 구현 (완료 - KeywordService, PipelineService) +- [x] Keywords API 완전 구현 (CRUD + stats + toggle) +- [x] Pipelines API 완전 구현 (CRUD + start/stop/restart + logs + config) +- [ ] UserService with authentication +- [ ] ApplicationService +- [ ] MonitoringService - [ ] MongoDB 컬렉션 및 인덱스 -- [ ] 서비스 레이어 구현 - [ ] Redis 통합 - [ ] 로그인/인증 API - [ ] 에러 핸들링 diff --git a/services/news-engine-console/backend/app/api/keywords.py b/services/news-engine-console/backend/app/api/keywords.py index 902d316..02b293e 100644 --- a/services/news-engine-console/backend/app/api/keywords.py +++ b/services/news-engine-console/backend/app/api/keywords.py @@ -1,39 +1,211 @@ -from fastapi import APIRouter, Depends, HTTPException -from typing import List +from fastapi import APIRouter, Depends, HTTPException, Query, status +from typing import Optional + from app.core.auth import get_current_active_user, User +from app.core.database import get_database +from app.services.keyword_service import KeywordService +from app.schemas.keyword import ( + KeywordCreate, + KeywordUpdate, + KeywordResponse, + KeywordListResponse, + KeywordStats +) router = APIRouter() -@router.get("/") -async def get_keywords(current_user: User = Depends(get_current_active_user)): - """Get all keywords""" - # TODO: Implement keyword retrieval from MongoDB - return {"keywords": [], "total": 0} -@router.post("/") +def get_keyword_service(db=Depends(get_database)) -> KeywordService: + """Dependency to get keyword service""" + return KeywordService(db) + + +@router.get("/", response_model=KeywordListResponse) +async def get_keywords( + category: Optional[str] = Query(None, description="Filter by category"), + status: Optional[str] = Query(None, description="Filter by status (active/inactive)"), + search: Optional[str] = Query(None, description="Search in keyword text"), + page: int = Query(1, ge=1, description="Page number"), + page_size: int = Query(50, ge=1, le=100, description="Items per page"), + sort_by: str = Query("created_at", description="Field to sort by"), + sort_order: int = Query(-1, ge=-1, le=1, description="1 for ascending, -1 for descending"), + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) +): + """Get all keywords with filtering, pagination, and sorting""" + keywords, total = await keyword_service.get_keywords( + category=category, + status=status, + search=search, + page=page, + page_size=page_size, + sort_by=sort_by, + sort_order=sort_order + ) + + # Convert to response models + keyword_responses = [ + KeywordResponse( + _id=str(kw.id), + keyword=kw.keyword, + category=kw.category, + status=kw.status, + pipeline_type=kw.pipeline_type, + priority=kw.priority, + metadata=kw.metadata, + created_at=kw.created_at, + updated_at=kw.updated_at, + created_by=kw.created_by + ) + for kw in keywords + ] + + return KeywordListResponse( + keywords=keyword_responses, + total=total, + page=page, + page_size=page_size + ) + + +@router.get("/{keyword_id}", response_model=KeywordResponse) +async def get_keyword( + keyword_id: str, + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) +): + """Get a keyword by ID""" + keyword = await keyword_service.get_keyword_by_id(keyword_id) + if not keyword: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Keyword with ID {keyword_id} not found" + ) + + return KeywordResponse( + _id=str(keyword.id), + keyword=keyword.keyword, + category=keyword.category, + status=keyword.status, + pipeline_type=keyword.pipeline_type, + priority=keyword.priority, + metadata=keyword.metadata, + created_at=keyword.created_at, + updated_at=keyword.updated_at, + created_by=keyword.created_by + ) + + +@router.post("/", response_model=KeywordResponse, status_code=status.HTTP_201_CREATED) async def create_keyword( - keyword_data: dict, - current_user: User = Depends(get_current_active_user) + keyword_data: KeywordCreate, + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) ): """Create new keyword""" - # TODO: Implement keyword creation - return {"message": "Keyword created", "keyword": keyword_data} + keyword = await keyword_service.create_keyword( + keyword_data=keyword_data, + created_by=current_user.username + ) -@router.put("/{keyword_id}") + return KeywordResponse( + _id=str(keyword.id), + keyword=keyword.keyword, + category=keyword.category, + status=keyword.status, + pipeline_type=keyword.pipeline_type, + priority=keyword.priority, + metadata=keyword.metadata, + created_at=keyword.created_at, + updated_at=keyword.updated_at, + created_by=keyword.created_by + ) + + +@router.put("/{keyword_id}", response_model=KeywordResponse) async def update_keyword( keyword_id: str, - keyword_data: dict, - current_user: User = Depends(get_current_active_user) + keyword_data: KeywordUpdate, + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) ): """Update keyword""" - # TODO: Implement keyword update - return {"message": "Keyword updated", "keyword_id": keyword_id} + keyword = await keyword_service.update_keyword(keyword_id, keyword_data) + if not keyword: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Keyword with ID {keyword_id} not found" + ) -@router.delete("/{keyword_id}") + return KeywordResponse( + _id=str(keyword.id), + keyword=keyword.keyword, + category=keyword.category, + status=keyword.status, + pipeline_type=keyword.pipeline_type, + priority=keyword.priority, + metadata=keyword.metadata, + created_at=keyword.created_at, + updated_at=keyword.updated_at, + created_by=keyword.created_by + ) + + +@router.delete("/{keyword_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_keyword( keyword_id: str, - current_user: User = Depends(get_current_active_user) + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) ): """Delete keyword""" - # TODO: Implement keyword deletion - return {"message": "Keyword deleted", "keyword_id": keyword_id} + success = await keyword_service.delete_keyword(keyword_id) + if not success: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Keyword with ID {keyword_id} not found" + ) + return None + + +@router.post("/{keyword_id}/toggle", response_model=KeywordResponse) +async def toggle_keyword_status( + keyword_id: str, + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) +): + """Toggle keyword status between active and inactive""" + keyword = await keyword_service.toggle_keyword_status(keyword_id) + if not keyword: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Keyword with ID {keyword_id} not found" + ) + + return KeywordResponse( + _id=str(keyword.id), + keyword=keyword.keyword, + category=keyword.category, + status=keyword.status, + pipeline_type=keyword.pipeline_type, + priority=keyword.priority, + metadata=keyword.metadata, + created_at=keyword.created_at, + updated_at=keyword.updated_at, + created_by=keyword.created_by + ) + + +@router.get("/{keyword_id}/stats", response_model=KeywordStats) +async def get_keyword_stats( + keyword_id: str, + current_user: User = Depends(get_current_active_user), + keyword_service: KeywordService = Depends(get_keyword_service) +): + """Get statistics for a keyword""" + stats = await keyword_service.get_keyword_stats(keyword_id) + if not stats: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Keyword with ID {keyword_id} not found" + ) + return stats diff --git a/services/news-engine-console/backend/app/api/pipelines.py b/services/news-engine-console/backend/app/api/pipelines.py index 66e86c7..02aea6f 100644 --- a/services/news-engine-console/backend/app/api/pipelines.py +++ b/services/news-engine-console/backend/app/api/pipelines.py @@ -1,24 +1,299 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException, Query, status +from typing import Optional, List, Dict, Any + from app.core.auth import get_current_active_user, User +from app.core.database import get_database +from app.services.pipeline_service import PipelineService +from app.schemas.pipeline import ( + PipelineCreate, + PipelineUpdate, + PipelineResponse, + PipelineListResponse, + PipelineStatsSchema, + PipelineLog +) router = APIRouter() -@router.get("/") -async def get_pipelines(current_user: User = Depends(get_current_active_user)): - """Get all pipelines and their status""" - return {"pipelines": [], "total": 0} -@router.get("/{pipeline_id}/stats") -async def get_pipeline_stats(pipeline_id: str, current_user: User = Depends(get_current_active_user)): +def get_pipeline_service(db=Depends(get_database)) -> PipelineService: + """Dependency to get pipeline service""" + return PipelineService(db) + + +@router.get("/", response_model=PipelineListResponse) +async def get_pipelines( + type: Optional[str] = Query(None, description="Filter by pipeline type"), + status: Optional[str] = Query(None, description="Filter by status (running/stopped/error)"), + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Get all pipelines with optional filtering""" + pipelines = await pipeline_service.get_pipelines(type=type, status=status) + + pipeline_responses = [ + PipelineResponse( + _id=str(p.id), + name=p.name, + type=p.type, + status=p.status, + config=p.config, + schedule=p.schedule, + stats=PipelineStatsSchema(**p.stats.model_dump()), + last_run=p.last_run, + next_run=p.next_run, + created_at=p.created_at, + updated_at=p.updated_at + ) + for p in pipelines + ] + + return PipelineListResponse( + pipelines=pipeline_responses, + total=len(pipeline_responses) + ) + + +@router.get("/{pipeline_id}", response_model=PipelineResponse) +async def get_pipeline( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Get a pipeline by ID""" + pipeline = await pipeline_service.get_pipeline_by_id(pipeline_id) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.post("/", response_model=PipelineResponse, status_code=status.HTTP_201_CREATED) +async def create_pipeline( + pipeline_data: PipelineCreate, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Create a new pipeline""" + pipeline = await pipeline_service.create_pipeline(pipeline_data) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.put("/{pipeline_id}", response_model=PipelineResponse) +async def update_pipeline( + pipeline_id: str, + pipeline_data: PipelineUpdate, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Update a pipeline""" + pipeline = await pipeline_service.update_pipeline(pipeline_id, pipeline_data) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.delete("/{pipeline_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_pipeline( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Delete a pipeline""" + success = await pipeline_service.delete_pipeline(pipeline_id) + if not success: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + return None + + +@router.get("/{pipeline_id}/stats", response_model=PipelineStatsSchema) +async def get_pipeline_stats( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): """Get pipeline statistics""" - return {"pipeline_id": pipeline_id, "stats": {}} + stats = await pipeline_service.get_pipeline_stats(pipeline_id) + if not stats: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + return PipelineStatsSchema(**stats.model_dump()) -@router.post("/{pipeline_id}/start") -async def start_pipeline(pipeline_id: str, current_user: User = Depends(get_current_active_user)): - """Start pipeline""" - return {"message": "Pipeline started", "pipeline_id": pipeline_id} -@router.post("/{pipeline_id}/stop") -async def stop_pipeline(pipeline_id: str, current_user: User = Depends(get_current_active_user)): - """Stop pipeline""" - return {"message": "Pipeline stopped", "pipeline_id": pipeline_id} +@router.post("/{pipeline_id}/start", response_model=PipelineResponse) +async def start_pipeline( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Start a pipeline""" + pipeline = await pipeline_service.start_pipeline(pipeline_id) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.post("/{pipeline_id}/stop", response_model=PipelineResponse) +async def stop_pipeline( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Stop a pipeline""" + pipeline = await pipeline_service.stop_pipeline(pipeline_id) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.post("/{pipeline_id}/restart", response_model=PipelineResponse) +async def restart_pipeline( + pipeline_id: str, + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Restart a pipeline""" + pipeline = await pipeline_service.restart_pipeline(pipeline_id) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) + + +@router.get("/{pipeline_id}/logs", response_model=List[PipelineLog]) +async def get_pipeline_logs( + pipeline_id: str, + limit: int = Query(100, ge=1, le=1000, description="Maximum number of logs"), + level: Optional[str] = Query(None, description="Filter by log level (INFO, WARNING, ERROR)"), + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Get logs for a pipeline""" + logs = await pipeline_service.get_pipeline_logs(pipeline_id, limit=limit, level=level) + return logs + + +@router.put("/{pipeline_id}/config", response_model=PipelineResponse) +async def update_pipeline_config( + pipeline_id: str, + config: Dict[str, Any], + current_user: User = Depends(get_current_active_user), + pipeline_service: PipelineService = Depends(get_pipeline_service) +): + """Update pipeline configuration""" + pipeline = await pipeline_service.update_pipeline_config(pipeline_id, config) + if not pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with ID {pipeline_id} not found" + ) + + return PipelineResponse( + _id=str(pipeline.id), + name=pipeline.name, + type=pipeline.type, + status=pipeline.status, + config=pipeline.config, + schedule=pipeline.schedule, + stats=PipelineStatsSchema(**pipeline.stats.model_dump()), + last_run=pipeline.last_run, + next_run=pipeline.next_run, + created_at=pipeline.created_at, + updated_at=pipeline.updated_at + ) diff --git a/services/news-engine-console/backend/app/models/__init__.py b/services/news-engine-console/backend/app/models/__init__.py new file mode 100644 index 0000000..7e65fcc --- /dev/null +++ b/services/news-engine-console/backend/app/models/__init__.py @@ -0,0 +1,7 @@ +# Data Models +from .keyword import Keyword +from .pipeline import Pipeline +from .user import User +from .application import Application + +__all__ = ["Keyword", "Pipeline", "User", "Application"] diff --git a/services/news-engine-console/backend/app/models/application.py b/services/news-engine-console/backend/app/models/application.py new file mode 100644 index 0000000..6f3927c --- /dev/null +++ b/services/news-engine-console/backend/app/models/application.py @@ -0,0 +1,58 @@ +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, Field +from bson import ObjectId + + +class PyObjectId(ObjectId): + """Custom ObjectId type for Pydantic""" + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not ObjectId.is_valid(v): + raise ValueError("Invalid ObjectId") + return ObjectId(v) + + @classmethod + def __get_pydantic_json_schema__(cls, field_schema): + field_schema.update(type="string") + + +class Application(BaseModel): + """OAuth2 Application data model""" + + id: Optional[PyObjectId] = Field(default=None, alias="_id") + name: str = Field(..., min_length=1, max_length=100) + client_id: str = Field(..., description="OAuth2 Client ID (unique)") + client_secret: str = Field(..., description="Hashed client secret") + redirect_uris: List[str] = Field(default_factory=list) + grant_types: List[str] = Field( + default_factory=lambda: ["authorization_code", "refresh_token"] + ) + scopes: List[str] = Field( + default_factory=lambda: ["read", "write"] + ) + owner_id: str = Field(..., description="User ID who owns this application") + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + json_encoders = {ObjectId: str} + json_schema_extra = { + "example": { + "name": "News Frontend App", + "client_id": "news_app_12345", + "redirect_uris": [ + "http://localhost:3000/auth/callback", + "https://news.example.com/auth/callback" + ], + "grant_types": ["authorization_code", "refresh_token"], + "scopes": ["read", "write"], + "owner_id": "507f1f77bcf86cd799439011" + } + } diff --git a/services/news-engine-console/backend/app/models/keyword.py b/services/news-engine-console/backend/app/models/keyword.py new file mode 100644 index 0000000..e8e8de4 --- /dev/null +++ b/services/news-engine-console/backend/app/models/keyword.py @@ -0,0 +1,55 @@ +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field +from bson import ObjectId + + +class PyObjectId(ObjectId): + """Custom ObjectId type for Pydantic""" + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not ObjectId.is_valid(v): + raise ValueError("Invalid ObjectId") + return ObjectId(v) + + @classmethod + def __get_pydantic_json_schema__(cls, field_schema): + field_schema.update(type="string") + + +class Keyword(BaseModel): + """Keyword data model for pipeline management""" + + id: Optional[PyObjectId] = Field(default=None, alias="_id") + keyword: str = Field(..., min_length=1, max_length=200) + category: str = Field(..., description="Category: people, topics, companies") + status: str = Field(default="active", description="Status: active, inactive") + pipeline_type: str = Field(default="all", description="Pipeline type: rss, translation, all") + priority: int = Field(default=5, ge=1, le=10, description="Priority level 1-10") + metadata: Dict[str, Any] = Field(default_factory=dict) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + created_by: Optional[str] = Field(default=None, description="User ID who created this keyword") + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + json_encoders = {ObjectId: str} + json_schema_extra = { + "example": { + "keyword": "도널드 트럼프", + "category": "people", + "status": "active", + "pipeline_type": "all", + "priority": 8, + "metadata": { + "description": "Former US President", + "aliases": ["Donald Trump", "Trump"] + }, + "created_by": "admin" + } + } diff --git a/services/news-engine-console/backend/app/models/pipeline.py b/services/news-engine-console/backend/app/models/pipeline.py new file mode 100644 index 0000000..aab15b2 --- /dev/null +++ b/services/news-engine-console/backend/app/models/pipeline.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field +from bson import ObjectId + + +class PyObjectId(ObjectId): + """Custom ObjectId type for Pydantic""" + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not ObjectId.is_valid(v): + raise ValueError("Invalid ObjectId") + return ObjectId(v) + + @classmethod + def __get_pydantic_json_schema__(cls, field_schema): + field_schema.update(type="string") + + +class PipelineStats(BaseModel): + """Pipeline statistics""" + total_processed: int = Field(default=0) + success_count: int = Field(default=0) + error_count: int = Field(default=0) + last_run: Optional[datetime] = None + average_duration_seconds: Optional[float] = None + + +class Pipeline(BaseModel): + """Pipeline data model for process management""" + + id: Optional[PyObjectId] = Field(default=None, alias="_id") + name: str = Field(..., min_length=1, max_length=100) + type: str = Field(..., description="Type: rss_collector, translator, image_generator") + status: str = Field(default="stopped", description="Status: running, stopped, error") + config: Dict[str, Any] = Field(default_factory=dict) + schedule: Optional[str] = Field(default=None, description="Cron expression for scheduling") + stats: PipelineStats = Field(default_factory=PipelineStats) + last_run: Optional[datetime] = None + next_run: Optional[datetime] = None + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + json_encoders = {ObjectId: str} + json_schema_extra = { + "example": { + "name": "RSS Collector - Politics", + "type": "rss_collector", + "status": "running", + "config": { + "interval_minutes": 30, + "max_articles": 100, + "categories": ["politics"] + }, + "schedule": "*/30 * * * *", + "stats": { + "total_processed": 1523, + "success_count": 1500, + "error_count": 23, + "average_duration_seconds": 45.2 + } + } + } diff --git a/services/news-engine-console/backend/app/models/user.py b/services/news-engine-console/backend/app/models/user.py new file mode 100644 index 0000000..cf11b81 --- /dev/null +++ b/services/news-engine-console/backend/app/models/user.py @@ -0,0 +1,49 @@ +from datetime import datetime +from typing import Optional +from pydantic import BaseModel, Field, EmailStr +from bson import ObjectId + + +class PyObjectId(ObjectId): + """Custom ObjectId type for Pydantic""" + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not ObjectId.is_valid(v): + raise ValueError("Invalid ObjectId") + return ObjectId(v) + + @classmethod + def __get_pydantic_json_schema__(cls, field_schema): + field_schema.update(type="string") + + +class User(BaseModel): + """User data model for authentication and authorization""" + + id: Optional[PyObjectId] = Field(default=None, alias="_id") + username: str = Field(..., min_length=3, max_length=50) + email: EmailStr = Field(...) + hashed_password: str = Field(...) + full_name: str = Field(..., min_length=1, max_length=100) + role: str = Field(default="viewer", description="Role: admin, editor, viewer") + disabled: bool = Field(default=False) + created_at: datetime = Field(default_factory=datetime.utcnow) + last_login: Optional[datetime] = None + + class Config: + populate_by_name = True + arbitrary_types_allowed = True + json_encoders = {ObjectId: str} + json_schema_extra = { + "example": { + "username": "johndoe", + "email": "johndoe@example.com", + "full_name": "John Doe", + "role": "editor", + "disabled": False + } + } diff --git a/services/news-engine-console/backend/app/schemas/__init__.py b/services/news-engine-console/backend/app/schemas/__init__.py new file mode 100644 index 0000000..a4c3c8b --- /dev/null +++ b/services/news-engine-console/backend/app/schemas/__init__.py @@ -0,0 +1,44 @@ +# Pydantic Schemas for Request/Response +from .keyword import ( + KeywordCreate, + KeywordUpdate, + KeywordResponse, + KeywordListResponse +) +from .pipeline import ( + PipelineCreate, + PipelineUpdate, + PipelineResponse, + PipelineListResponse +) +from .user import ( + UserCreate, + UserUpdate, + UserResponse, + UserLogin, + Token +) +from .application import ( + ApplicationCreate, + ApplicationUpdate, + ApplicationResponse +) + +__all__ = [ + "KeywordCreate", + "KeywordUpdate", + "KeywordResponse", + "KeywordListResponse", + "PipelineCreate", + "PipelineUpdate", + "PipelineResponse", + "PipelineListResponse", + "UserCreate", + "UserUpdate", + "UserResponse", + "UserLogin", + "Token", + "ApplicationCreate", + "ApplicationUpdate", + "ApplicationResponse", +] diff --git a/services/news-engine-console/backend/app/schemas/application.py b/services/news-engine-console/backend/app/schemas/application.py new file mode 100644 index 0000000..f11a85b --- /dev/null +++ b/services/news-engine-console/backend/app/schemas/application.py @@ -0,0 +1,44 @@ +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, Field + + +class ApplicationBase(BaseModel): + """Base application schema""" + name: str = Field(..., min_length=1, max_length=100) + redirect_uris: List[str] = Field(default_factory=list) + grant_types: List[str] = Field( + default_factory=lambda: ["authorization_code", "refresh_token"] + ) + scopes: List[str] = Field(default_factory=lambda: ["read", "write"]) + + +class ApplicationCreate(ApplicationBase): + """Schema for creating a new application""" + pass + + +class ApplicationUpdate(BaseModel): + """Schema for updating an application (all fields optional)""" + name: Optional[str] = Field(None, min_length=1, max_length=100) + redirect_uris: Optional[List[str]] = None + grant_types: Optional[List[str]] = None + scopes: Optional[List[str]] = None + + +class ApplicationResponse(ApplicationBase): + """Schema for application response""" + id: str = Field(..., alias="_id") + client_id: str + owner_id: str + created_at: datetime + updated_at: datetime + + class Config: + populate_by_name = True + from_attributes = True + + +class ApplicationWithSecret(ApplicationResponse): + """Schema for application response with client secret (only on creation)""" + client_secret: str = Field(..., description="Plain text client secret (only shown once)") diff --git a/services/news-engine-console/backend/app/schemas/keyword.py b/services/news-engine-console/backend/app/schemas/keyword.py new file mode 100644 index 0000000..982dce8 --- /dev/null +++ b/services/news-engine-console/backend/app/schemas/keyword.py @@ -0,0 +1,56 @@ +from datetime import datetime +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field + + +class KeywordBase(BaseModel): + """Base keyword schema""" + keyword: str = Field(..., min_length=1, max_length=200) + category: str = Field(..., description="Category: people, topics, companies") + pipeline_type: str = Field(default="all", description="Pipeline type: rss, translation, all") + priority: int = Field(default=5, ge=1, le=10) + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class KeywordCreate(KeywordBase): + """Schema for creating a new keyword""" + pass + + +class KeywordUpdate(BaseModel): + """Schema for updating a keyword (all fields optional)""" + keyword: Optional[str] = Field(None, min_length=1, max_length=200) + category: Optional[str] = None + status: Optional[str] = Field(None, description="Status: active, inactive") + pipeline_type: Optional[str] = None + priority: Optional[int] = Field(None, ge=1, le=10) + metadata: Optional[Dict[str, Any]] = None + + +class KeywordResponse(KeywordBase): + """Schema for keyword response""" + id: str = Field(..., alias="_id") + status: str + created_at: datetime + updated_at: datetime + created_by: Optional[str] = None + + class Config: + populate_by_name = True + from_attributes = True + + +class KeywordStats(BaseModel): + """Keyword statistics""" + total_articles: int = 0 + articles_last_24h: int = 0 + articles_last_7d: int = 0 + last_article_date: Optional[datetime] = None + + +class KeywordListResponse(BaseModel): + """Schema for keyword list response""" + keywords: List[KeywordResponse] + total: int + page: int = 1 + page_size: int = 50 diff --git a/services/news-engine-console/backend/app/schemas/pipeline.py b/services/news-engine-console/backend/app/schemas/pipeline.py new file mode 100644 index 0000000..b8400a1 --- /dev/null +++ b/services/news-engine-console/backend/app/schemas/pipeline.py @@ -0,0 +1,62 @@ +from datetime import datetime +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field + + +class PipelineStatsSchema(BaseModel): + """Pipeline statistics schema""" + total_processed: int = 0 + success_count: int = 0 + error_count: int = 0 + last_run: Optional[datetime] = None + average_duration_seconds: Optional[float] = None + + +class PipelineBase(BaseModel): + """Base pipeline schema""" + name: str = Field(..., min_length=1, max_length=100) + type: str = Field(..., description="Type: rss_collector, translator, image_generator") + config: Dict[str, Any] = Field(default_factory=dict) + schedule: Optional[str] = Field(None, description="Cron expression") + + +class PipelineCreate(PipelineBase): + """Schema for creating a new pipeline""" + pass + + +class PipelineUpdate(BaseModel): + """Schema for updating a pipeline (all fields optional)""" + name: Optional[str] = Field(None, min_length=1, max_length=100) + status: Optional[str] = Field(None, description="Status: running, stopped, error") + config: Optional[Dict[str, Any]] = None + schedule: Optional[str] = None + + +class PipelineResponse(PipelineBase): + """Schema for pipeline response""" + id: str = Field(..., alias="_id") + status: str + stats: PipelineStatsSchema + last_run: Optional[datetime] = None + next_run: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + class Config: + populate_by_name = True + from_attributes = True + + +class PipelineListResponse(BaseModel): + """Schema for pipeline list response""" + pipelines: List[PipelineResponse] + total: int + + +class PipelineLog(BaseModel): + """Schema for pipeline log entry""" + timestamp: datetime + level: str = Field(..., description="Log level: INFO, WARNING, ERROR") + message: str + details: Optional[Dict[str, Any]] = None diff --git a/services/news-engine-console/backend/app/schemas/user.py b/services/news-engine-console/backend/app/schemas/user.py new file mode 100644 index 0000000..d214a00 --- /dev/null +++ b/services/news-engine-console/backend/app/schemas/user.py @@ -0,0 +1,55 @@ +from datetime import datetime +from typing import Optional +from pydantic import BaseModel, Field, EmailStr + + +class UserBase(BaseModel): + """Base user schema""" + username: str = Field(..., min_length=3, max_length=50) + email: EmailStr + full_name: str = Field(..., min_length=1, max_length=100) + role: str = Field(default="viewer", description="Role: admin, editor, viewer") + + +class UserCreate(UserBase): + """Schema for creating a new user""" + password: str = Field(..., min_length=8, max_length=100) + + +class UserUpdate(BaseModel): + """Schema for updating a user (all fields optional)""" + email: Optional[EmailStr] = None + full_name: Optional[str] = Field(None, min_length=1, max_length=100) + role: Optional[str] = None + disabled: Optional[bool] = None + password: Optional[str] = Field(None, min_length=8, max_length=100) + + +class UserResponse(UserBase): + """Schema for user response (without password)""" + id: str = Field(..., alias="_id") + disabled: bool + created_at: datetime + last_login: Optional[datetime] = None + + class Config: + populate_by_name = True + from_attributes = True + + +class UserLogin(BaseModel): + """Schema for user login""" + username: str + password: str + + +class Token(BaseModel): + """Schema for JWT token response""" + access_token: str + token_type: str = "bearer" + expires_in: int + + +class TokenData(BaseModel): + """Schema for decoded token data""" + username: Optional[str] = None diff --git a/services/news-engine-console/backend/app/services/__init__.py b/services/news-engine-console/backend/app/services/__init__.py new file mode 100644 index 0000000..3dbf729 --- /dev/null +++ b/services/news-engine-console/backend/app/services/__init__.py @@ -0,0 +1,14 @@ +# Service Layer +from .keyword_service import KeywordService +from .pipeline_service import PipelineService +from .user_service import UserService +from .application_service import ApplicationService +from .monitoring_service import MonitoringService + +__all__ = [ + "KeywordService", + "PipelineService", + "UserService", + "ApplicationService", + "MonitoringService", +] diff --git a/services/news-engine-console/backend/app/services/keyword_service.py b/services/news-engine-console/backend/app/services/keyword_service.py new file mode 100644 index 0000000..046bfc5 --- /dev/null +++ b/services/news-engine-console/backend/app/services/keyword_service.py @@ -0,0 +1,234 @@ +from datetime import datetime +from typing import List, Optional, Dict, Any +from bson import ObjectId +from motor.motor_asyncio import AsyncIOMotorDatabase + +from app.models.keyword import Keyword +from app.schemas.keyword import KeywordCreate, KeywordUpdate, KeywordStats + + +class KeywordService: + """Service for managing keywords""" + + def __init__(self, db: AsyncIOMotorDatabase): + self.db = db + self.collection = db.keywords + + async def get_keywords( + self, + category: Optional[str] = None, + status: Optional[str] = None, + search: Optional[str] = None, + page: int = 1, + page_size: int = 50, + sort_by: str = "created_at", + sort_order: int = -1 + ) -> tuple[List[Keyword], int]: + """ + Get keywords with filtering, pagination, and sorting + + Args: + category: Filter by category + status: Filter by status (active/inactive) + search: Search in keyword text + page: Page number (starts from 1) + page_size: Items per page + sort_by: Field to sort by + sort_order: 1 for ascending, -1 for descending + + Returns: + Tuple of (keywords list, total count) + """ + # Build filter query + query = {} + if category: + query["category"] = category + if status: + query["status"] = status + if search: + query["keyword"] = {"$regex": search, "$options": "i"} + + # Get total count + total = await self.collection.count_documents(query) + + # Get paginated results + skip = (page - 1) * page_size + cursor = self.collection.find(query).sort(sort_by, sort_order).skip(skip).limit(page_size) + + keywords = [] + async for doc in cursor: + keywords.append(Keyword(**doc)) + + return keywords, total + + async def get_keyword_by_id(self, keyword_id: str) -> Optional[Keyword]: + """Get a keyword by ID""" + if not ObjectId.is_valid(keyword_id): + return None + + doc = await self.collection.find_one({"_id": ObjectId(keyword_id)}) + if doc: + return Keyword(**doc) + return None + + async def create_keyword( + self, + keyword_data: KeywordCreate, + created_by: str + ) -> Keyword: + """Create a new keyword""" + keyword_dict = keyword_data.model_dump() + keyword_dict.update({ + "status": "active", + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow(), + "created_by": created_by + }) + + result = await self.collection.insert_one(keyword_dict) + keyword_dict["_id"] = result.inserted_id + + return Keyword(**keyword_dict) + + async def update_keyword( + self, + keyword_id: str, + update_data: KeywordUpdate + ) -> Optional[Keyword]: + """Update a keyword""" + if not ObjectId.is_valid(keyword_id): + return None + + # Build update dict (only include non-None fields) + update_dict = { + k: v for k, v in update_data.model_dump().items() + if v is not None + } + + if not update_dict: + # No updates provided + return await self.get_keyword_by_id(keyword_id) + + update_dict["updated_at"] = datetime.utcnow() + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(keyword_id)}, + {"$set": update_dict}, + return_document=True + ) + + if result: + return Keyword(**result) + return None + + async def delete_keyword(self, keyword_id: str) -> bool: + """Delete a keyword""" + if not ObjectId.is_valid(keyword_id): + return False + + result = await self.collection.delete_one({"_id": ObjectId(keyword_id)}) + return result.deleted_count > 0 + + async def toggle_keyword_status(self, keyword_id: str) -> Optional[Keyword]: + """Toggle keyword status between active and inactive""" + if not ObjectId.is_valid(keyword_id): + return None + + keyword = await self.get_keyword_by_id(keyword_id) + if not keyword: + return None + + new_status = "inactive" if keyword.status == "active" else "active" + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(keyword_id)}, + { + "$set": { + "status": new_status, + "updated_at": datetime.utcnow() + } + }, + return_document=True + ) + + if result: + return Keyword(**result) + return None + + async def get_keyword_stats(self, keyword_id: str) -> Optional[KeywordStats]: + """ + Get statistics for a keyword + + This queries the articles collection to get usage statistics + """ + if not ObjectId.is_valid(keyword_id): + return None + + keyword = await self.get_keyword_by_id(keyword_id) + if not keyword: + return None + + # Query articles collection for this keyword + articles_collection = self.db.articles + + # Total articles + total_articles = await articles_collection.count_documents({ + "source_keyword": keyword.keyword + }) + + # Articles in last 24 hours + from datetime import timedelta + now = datetime.utcnow() + day_ago = now - timedelta(days=1) + week_ago = now - timedelta(days=7) + + articles_last_24h = await articles_collection.count_documents({ + "source_keyword": keyword.keyword, + "created_at": {"$gte": day_ago} + }) + + articles_last_7d = await articles_collection.count_documents({ + "source_keyword": keyword.keyword, + "created_at": {"$gte": week_ago} + }) + + # Last article date + last_article = await articles_collection.find_one( + {"source_keyword": keyword.keyword}, + sort=[("created_at", -1)] + ) + + return KeywordStats( + total_articles=total_articles, + articles_last_24h=articles_last_24h, + articles_last_7d=articles_last_7d, + last_article_date=last_article.get("created_at") if last_article else None + ) + + async def bulk_create_keywords( + self, + keywords_data: List[KeywordCreate], + created_by: str + ) -> List[Keyword]: + """Create multiple keywords at once""" + keywords_dicts = [] + for keyword_data in keywords_data: + keyword_dict = keyword_data.model_dump() + keyword_dict.update({ + "status": "active", + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow(), + "created_by": created_by + }) + keywords_dicts.append(keyword_dict) + + if not keywords_dicts: + return [] + + result = await self.collection.insert_many(keywords_dicts) + + # Update with inserted IDs + for i, inserted_id in enumerate(result.inserted_ids): + keywords_dicts[i]["_id"] = inserted_id + + return [Keyword(**kw) for kw in keywords_dicts] diff --git a/services/news-engine-console/backend/app/services/pipeline_service.py b/services/news-engine-console/backend/app/services/pipeline_service.py new file mode 100644 index 0000000..0ed2e9a --- /dev/null +++ b/services/news-engine-console/backend/app/services/pipeline_service.py @@ -0,0 +1,332 @@ +from datetime import datetime +from typing import List, Optional, Dict, Any +from bson import ObjectId +from motor.motor_asyncio import AsyncIOMotorDatabase + +from app.models.pipeline import Pipeline, PipelineStats +from app.schemas.pipeline import PipelineCreate, PipelineUpdate, PipelineLog + + +class PipelineService: + """Service for managing pipelines""" + + def __init__(self, db: AsyncIOMotorDatabase): + self.db = db + self.collection = db.pipelines + self.logs_collection = db.pipeline_logs + + async def get_pipelines( + self, + type: Optional[str] = None, + status: Optional[str] = None + ) -> List[Pipeline]: + """ + Get all pipelines with optional filtering + + Args: + type: Filter by pipeline type + status: Filter by status (running/stopped/error) + + Returns: + List of pipelines + """ + query = {} + if type: + query["type"] = type + if status: + query["status"] = status + + cursor = self.collection.find(query).sort("created_at", -1) + + pipelines = [] + async for doc in cursor: + pipelines.append(Pipeline(**doc)) + + return pipelines + + async def get_pipeline_by_id(self, pipeline_id: str) -> Optional[Pipeline]: + """Get a pipeline by ID""" + if not ObjectId.is_valid(pipeline_id): + return None + + doc = await self.collection.find_one({"_id": ObjectId(pipeline_id)}) + if doc: + return Pipeline(**doc) + return None + + async def create_pipeline(self, pipeline_data: PipelineCreate) -> Pipeline: + """Create a new pipeline""" + pipeline_dict = pipeline_data.model_dump() + pipeline_dict.update({ + "status": "stopped", + "stats": PipelineStats().model_dump(), + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + }) + + result = await self.collection.insert_one(pipeline_dict) + pipeline_dict["_id"] = result.inserted_id + + return Pipeline(**pipeline_dict) + + async def update_pipeline( + self, + pipeline_id: str, + update_data: PipelineUpdate + ) -> Optional[Pipeline]: + """Update a pipeline""" + if not ObjectId.is_valid(pipeline_id): + return None + + update_dict = { + k: v for k, v in update_data.model_dump().items() + if v is not None + } + + if not update_dict: + return await self.get_pipeline_by_id(pipeline_id) + + update_dict["updated_at"] = datetime.utcnow() + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(pipeline_id)}, + {"$set": update_dict}, + return_document=True + ) + + if result: + return Pipeline(**result) + return None + + async def delete_pipeline(self, pipeline_id: str) -> bool: + """Delete a pipeline""" + if not ObjectId.is_valid(pipeline_id): + return False + + result = await self.collection.delete_one({"_id": ObjectId(pipeline_id)}) + return result.deleted_count > 0 + + async def start_pipeline(self, pipeline_id: str) -> Optional[Pipeline]: + """Start a pipeline""" + if not ObjectId.is_valid(pipeline_id): + return None + + pipeline = await self.get_pipeline_by_id(pipeline_id) + if not pipeline: + return None + + if pipeline.status == "running": + return pipeline # Already running + + # TODO: Actual pipeline start logic would go here + # For now, just update the status + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(pipeline_id)}, + { + "$set": { + "status": "running", + "last_run": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + }, + return_document=True + ) + + if result: + # Log the start event + await self._add_log( + pipeline_id, + "INFO", + f"Pipeline {pipeline.name} started" + ) + return Pipeline(**result) + return None + + async def stop_pipeline(self, pipeline_id: str) -> Optional[Pipeline]: + """Stop a pipeline""" + if not ObjectId.is_valid(pipeline_id): + return None + + pipeline = await self.get_pipeline_by_id(pipeline_id) + if not pipeline: + return None + + if pipeline.status == "stopped": + return pipeline # Already stopped + + # TODO: Actual pipeline stop logic would go here + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(pipeline_id)}, + { + "$set": { + "status": "stopped", + "updated_at": datetime.utcnow() + } + }, + return_document=True + ) + + if result: + # Log the stop event + await self._add_log( + pipeline_id, + "INFO", + f"Pipeline {pipeline.name} stopped" + ) + return Pipeline(**result) + return None + + async def restart_pipeline(self, pipeline_id: str) -> Optional[Pipeline]: + """Restart a pipeline""" + # Stop first + await self.stop_pipeline(pipeline_id) + # Then start + return await self.start_pipeline(pipeline_id) + + async def get_pipeline_stats(self, pipeline_id: str) -> Optional[PipelineStats]: + """Get statistics for a pipeline""" + pipeline = await self.get_pipeline_by_id(pipeline_id) + if not pipeline: + return None + + return pipeline.stats + + async def update_pipeline_stats( + self, + pipeline_id: str, + success: bool = True, + duration_seconds: Optional[float] = None + ) -> Optional[Pipeline]: + """ + Update pipeline statistics after a run + + Args: + pipeline_id: Pipeline ID + success: Whether the run was successful + duration_seconds: Duration of the run + """ + if not ObjectId.is_valid(pipeline_id): + return None + + pipeline = await self.get_pipeline_by_id(pipeline_id) + if not pipeline: + return None + + # Update stats + new_total = pipeline.stats.total_processed + 1 + new_success = pipeline.stats.success_count + (1 if success else 0) + new_error = pipeline.stats.error_count + (0 if success else 1) + + # Calculate average duration + if duration_seconds: + current_avg = pipeline.stats.average_duration_seconds or 0 + current_count = pipeline.stats.total_processed + new_avg = ((current_avg * current_count) + duration_seconds) / new_total + else: + new_avg = pipeline.stats.average_duration_seconds + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(pipeline_id)}, + { + "$set": { + "stats.total_processed": new_total, + "stats.success_count": new_success, + "stats.error_count": new_error, + "stats.last_run": datetime.utcnow(), + "stats.average_duration_seconds": new_avg, + "last_run": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + }, + return_document=True + ) + + if result: + return Pipeline(**result) + return None + + async def get_pipeline_logs( + self, + pipeline_id: str, + limit: int = 100, + level: Optional[str] = None + ) -> List[PipelineLog]: + """ + Get logs for a pipeline + + Args: + pipeline_id: Pipeline ID + limit: Maximum number of logs to return + level: Filter by log level (INFO, WARNING, ERROR) + + Returns: + List of pipeline logs + """ + if not ObjectId.is_valid(pipeline_id): + return [] + + query = {"pipeline_id": pipeline_id} + if level: + query["level"] = level + + cursor = self.logs_collection.find(query).sort("timestamp", -1).limit(limit) + + logs = [] + async for doc in cursor: + logs.append(PipelineLog( + timestamp=doc["timestamp"], + level=doc["level"], + message=doc["message"], + details=doc.get("details") + )) + + return logs + + async def _add_log( + self, + pipeline_id: str, + level: str, + message: str, + details: Optional[Dict[str, Any]] = None + ): + """Add a log entry for a pipeline""" + log_entry = { + "pipeline_id": pipeline_id, + "timestamp": datetime.utcnow(), + "level": level, + "message": message, + "details": details or {} + } + + await self.logs_collection.insert_one(log_entry) + + async def update_pipeline_config( + self, + pipeline_id: str, + config: Dict[str, Any] + ) -> Optional[Pipeline]: + """Update pipeline configuration""" + if not ObjectId.is_valid(pipeline_id): + return None + + result = await self.collection.find_one_and_update( + {"_id": ObjectId(pipeline_id)}, + { + "$set": { + "config": config, + "updated_at": datetime.utcnow() + } + }, + return_document=True + ) + + if result: + await self._add_log( + pipeline_id, + "INFO", + "Pipeline configuration updated" + ) + return Pipeline(**result) + return None