diff --git a/services/news-engine-console/backend/app/api/monitoring.py b/services/news-engine-console/backend/app/api/monitoring.py index 3fa29df..3d1bcf3 100644 --- a/services/news-engine-console/backend/app/api/monitoring.py +++ b/services/news-engine-console/backend/app/api/monitoring.py @@ -4,6 +4,7 @@ from datetime import datetime from app.core.auth import get_current_active_user, User from app.core.database import get_database +from app.core.pipeline_client import get_pipeline_client, PipelineClient from app.services.monitoring_service import MonitoringService router = APIRouter() @@ -135,3 +136,58 @@ async def get_error_summary( """ summary = await monitoring_service.get_error_summary(hours=hours) return summary + + +# ============================================================================= +# Pipeline Monitor Proxy Endpoints +# ============================================================================= + +@router.get("/pipeline/stats") +async def get_pipeline_stats( + current_user: User = Depends(get_current_active_user), + pipeline_client: PipelineClient = Depends(get_pipeline_client) +): + """ + Get pipeline statistics from Pipeline Monitor service + + Returns queue status, article counts, and worker info + """ + return await pipeline_client.get_stats() + + +@router.get("/pipeline/health") +async def get_pipeline_health( + current_user: User = Depends(get_current_active_user), + pipeline_client: PipelineClient = Depends(get_pipeline_client) +): + """ + Get Pipeline Monitor service health status + """ + return await pipeline_client.get_health() + + +@router.get("/pipeline/queues/{queue_name}") +async def get_queue_details( + queue_name: str, + current_user: User = Depends(get_current_active_user), + pipeline_client: PipelineClient = Depends(get_pipeline_client) +): + """ + Get details for a specific pipeline queue + + Returns queue length, processing count, failed count, and preview of items + """ + return await pipeline_client.get_queue_details(queue_name) + + +@router.get("/pipeline/workers") +async def get_pipeline_workers( + current_user: User = Depends(get_current_active_user), + pipeline_client: PipelineClient = Depends(get_pipeline_client) +): + """ + Get status of all pipeline workers + + Returns active worker counts for each pipeline type + """ + return await pipeline_client.get_workers() diff --git a/services/news-engine-console/backend/app/core/pipeline_client.py b/services/news-engine-console/backend/app/core/pipeline_client.py new file mode 100644 index 0000000..ba4db7f --- /dev/null +++ b/services/news-engine-console/backend/app/core/pipeline_client.py @@ -0,0 +1,127 @@ +""" +Pipeline Monitor API Client +파이프라인 모니터 서비스와 통신하는 HTTP 클라이언트 +""" +import os +import httpx +from typing import Dict, Any, Optional +from fastapi import HTTPException + +# Pipeline Monitor 서비스 URL +PIPELINE_MONITOR_URL = os.getenv("PIPELINE_MONITOR_URL", "http://localhost:8100") + + +class PipelineClient: + """Pipeline Monitor API와 통신하는 클라이언트""" + + def __init__(self): + self.base_url = PIPELINE_MONITOR_URL + self.client = httpx.AsyncClient(base_url=self.base_url, timeout=30.0) + + async def close(self): + """클라이언트 연결 종료""" + await self.client.aclose() + + async def _request( + self, + method: str, + path: str, + params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Pipeline Monitor API에 HTTP 요청 전송 + + Args: + method: HTTP 메소드 (GET, POST, DELETE 등) + path: API 경로 + params: 쿼리 파라미터 + json: 요청 바디 (JSON) + + Returns: + API 응답 데이터 + + Raises: + HTTPException: API 요청 실패 시 + """ + try: + response = await self.client.request( + method=method, + url=path, + params=params, + json=json + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + raise HTTPException( + status_code=e.response.status_code, + detail=f"Pipeline Monitor API error: {e.response.text}" + ) + except httpx.RequestError as e: + raise HTTPException( + status_code=503, + detail=f"Failed to connect to Pipeline Monitor: {str(e)}" + ) + + # Stats & Health + async def get_stats(self) -> Dict[str, Any]: + """전체 파이프라인 통계 조회""" + return await self._request("GET", "/api/stats") + + async def get_health(self) -> Dict[str, Any]: + """헬스 체크""" + return await self._request("GET", "/api/health") + + # Queue Management + async def get_queue_details(self, queue_name: str) -> Dict[str, Any]: + """특정 큐의 상세 정보 조회""" + return await self._request("GET", f"/api/queues/{queue_name}") + + # Worker Management + async def get_workers(self) -> Dict[str, Any]: + """워커 상태 조회""" + return await self._request("GET", "/api/workers") + + # Keyword Management + async def get_keywords(self) -> list: + """등록된 키워드 목록 조회""" + return await self._request("GET", "/api/keywords") + + async def add_keyword(self, keyword: str, schedule: str = "30min") -> Dict[str, Any]: + """새 키워드 등록""" + return await self._request( + "POST", + "/api/keywords", + params={"keyword": keyword, "schedule": schedule} + ) + + async def delete_keyword(self, keyword_id: str) -> Dict[str, Any]: + """키워드 삭제""" + return await self._request("DELETE", f"/api/keywords/{keyword_id}") + + async def trigger_keyword(self, keyword: str) -> Dict[str, Any]: + """수동으로 키워드 처리 트리거""" + return await self._request("POST", f"/api/trigger/{keyword}") + + # Article Management + async def get_articles(self, limit: int = 10, skip: int = 0) -> Dict[str, Any]: + """최근 생성된 기사 목록 조회""" + return await self._request( + "GET", + "/api/articles", + params={"limit": limit, "skip": skip} + ) + + async def get_article(self, article_id: str) -> Dict[str, Any]: + """특정 기사 상세 정보 조회""" + return await self._request("GET", f"/api/articles/{article_id}") + + +# 전역 클라이언트 인스턴스 +pipeline_client = PipelineClient() + + +async def get_pipeline_client() -> PipelineClient: + """의존성 주입용 Pipeline 클라이언트 가져오기""" + return pipeline_client diff --git a/services/news-engine-console/frontend/src/api/monitoring.ts b/services/news-engine-console/frontend/src/api/monitoring.ts index 4838d11..0cc356f 100644 --- a/services/news-engine-console/frontend/src/api/monitoring.ts +++ b/services/news-engine-console/frontend/src/api/monitoring.ts @@ -65,3 +65,47 @@ export const getPipelineActivity = async (params?: { ) return response.data } + +// ============================================================================= +// Pipeline Monitor Proxy Endpoints +// ============================================================================= + +export const getPipelineStats = async (): Promise<{ + queues: Record + articles_today: number + active_keywords: number + total_articles: number + timestamp: string +}> => { + const response = await apiClient.get('/api/v1/monitoring/pipeline/stats') + return response.data +} + +export const getPipelineHealth = async (): Promise<{ + status: string + redis: string + mongodb: string + timestamp: string +}> => { + const response = await apiClient.get('/api/v1/monitoring/pipeline/health') + return response.data +} + +export const getQueueDetails = async (queueName: string): Promise<{ + queue: string + length: number + processing_count: number + failed_count: number + preview: any[] + timestamp: string +}> => { + const response = await apiClient.get(`/api/v1/monitoring/pipeline/queues/${queueName}`) + return response.data +} + +export const getPipelineWorkers = async (): Promise< + Record +> => { + const response = await apiClient.get('/api/v1/monitoring/pipeline/workers') + return response.data +} diff --git a/services/news-engine-console/frontend/src/pages/Monitoring.tsx b/services/news-engine-console/frontend/src/pages/Monitoring.tsx index 456b814..0fbb262 100644 --- a/services/news-engine-console/frontend/src/pages/Monitoring.tsx +++ b/services/news-engine-console/frontend/src/pages/Monitoring.tsx @@ -29,6 +29,9 @@ import { getHealthCheck, getDatabaseStats, getRecentLogs, + getPipelineStats, + getPipelineHealth, + getPipelineWorkers, } from '@/api/monitoring' import type { MonitoringOverview, SystemStatus, DatabaseStats, LogEntry } from '@/types' @@ -41,6 +44,11 @@ const Monitoring = () => { const [error, setError] = useState(null) const [autoRefresh, setAutoRefresh] = useState(true) + // Pipeline Monitor stats + const [pipelineStats, setPipelineStats] = useState(null) + const [pipelineHealth, setPipelineHealth] = useState(null) + const [pipelineWorkers, setPipelineWorkers] = useState(null) + useEffect(() => { fetchData() const interval = setInterval(() => { @@ -56,16 +64,22 @@ const Monitoring = () => { setLoading(true) setError(null) try { - const [overviewData, healthData, dbData, logsData] = await Promise.all([ + const [overviewData, healthData, dbData, logsData, pipelineStatsData, pipelineHealthData, pipelineWorkersData] = await Promise.all([ getMonitoringOverview().catch(() => null), getHealthCheck().catch(() => null), getDatabaseStats().catch(() => null), getRecentLogs({ limit: 50 }).catch(() => []), + getPipelineStats().catch(() => null), + getPipelineHealth().catch(() => null), + getPipelineWorkers().catch(() => null), ]) setOverview(overviewData) setHealth(healthData) setDbStats(dbData) setLogs(logsData) + setPipelineStats(pipelineStatsData) + setPipelineHealth(pipelineHealthData) + setPipelineWorkers(pipelineWorkersData) } catch (err: any) { setError(err.message || 'Failed to fetch monitoring data') } finally { @@ -350,6 +364,95 @@ const Monitoring = () => { )} + {/* Pipeline Monitor Stats */} + {(pipelineStats || pipelineWorkers) && ( + + + Pipeline Monitor + + + {pipelineStats && ( + + + Queue Status + + + {Object.entries(pipelineStats.queues || {}).map(([queueName, count]) => ( + + + + {queueName.replace('queue:', '')} + + {count as number} + + + ))} + + + + + + Articles Today + + {pipelineStats.articles_today || 0} + + + + Active Keywords + + {pipelineStats.active_keywords || 0} + + + + Total Articles + + {pipelineStats.total_articles || 0} + + + + Pipeline Health + + + + + + )} + + {pipelineWorkers && ( + + + Pipeline Workers + + + {Object.entries(pipelineWorkers).map(([workerType, workerInfo]: [string, any]) => ( + + + {workerType.replace('_', ' ')} + 0 ? 'success.main' : 'text.secondary'}> + {workerInfo.active} active + + + + ))} + + + )} + + )} + {/* Recent Logs */}