Files
site11/services/pipeline/monitor/monitor.py
2025-09-28 20:41:57 +09:00

349 lines
11 KiB
Python

"""
Pipeline Monitor Service
파이프라인 상태 모니터링 및 대시보드 API
"""
import os
import sys
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from motor.motor_asyncio import AsyncIOMotorClient
import redis.asyncio as redis
# Import from shared module
from shared.models import KeywordSubscription, PipelineJob, FinalArticle
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Pipeline Monitor", version="1.0.0")
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global connections
redis_client = None
mongodb_client = None
db = None
@app.on_event("startup")
async def startup_event():
"""서버 시작 시 연결 초기화"""
global redis_client, mongodb_client, db
# Redis 연결
redis_url = os.getenv("REDIS_URL", "redis://redis:6379")
redis_client = await redis.from_url(redis_url, decode_responses=True)
# MongoDB 연결
mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
mongodb_client = AsyncIOMotorClient(mongodb_url)
db = mongodb_client[os.getenv("DB_NAME", "ai_writer_db")]
logger.info("Pipeline Monitor started successfully")
@app.on_event("shutdown")
async def shutdown_event():
"""서버 종료 시 연결 해제"""
if redis_client:
await redis_client.close()
if mongodb_client:
mongodb_client.close()
@app.get("/")
async def root():
"""헬스 체크"""
return {"status": "Pipeline Monitor is running"}
@app.get("/api/stats")
async def get_stats():
"""전체 파이프라인 통계"""
try:
# 큐별 대기 작업 수
queue_stats = {}
queues = [
"queue:keyword",
"queue:rss",
"queue:search",
"queue:summarize",
"queue:assembly"
]
for queue in queues:
length = await redis_client.llen(queue)
queue_stats[queue] = length
# 오늘 생성된 기사 수
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
articles_today = await db.articles.count_documents({
"created_at": {"$gte": today}
})
# 활성 키워드 수
active_keywords = await db.keywords.count_documents({
"is_active": True
})
# 총 기사 수
total_articles = await db.articles.count_documents({})
return {
"queues": queue_stats,
"articles_today": articles_today,
"active_keywords": active_keywords,
"total_articles": total_articles,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/queues/{queue_name}")
async def get_queue_details(queue_name: str):
"""특정 큐의 상세 정보"""
try:
queue_key = f"queue:{queue_name}"
# 큐 길이
length = await redis_client.llen(queue_key)
# 최근 10개 작업 미리보기
items = await redis_client.lrange(queue_key, 0, 9)
# 처리 중인 작업
processing_key = f"processing:{queue_name}"
processing = await redis_client.smembers(processing_key)
# 실패한 작업
failed_key = f"failed:{queue_name}"
failed_count = await redis_client.llen(failed_key)
return {
"queue": queue_name,
"length": length,
"processing_count": len(processing),
"failed_count": failed_count,
"preview": items[:10],
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting queue details: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/keywords")
async def get_keywords():
"""등록된 키워드 목록"""
try:
keywords = []
cursor = db.keywords.find({"is_active": True})
async for keyword in cursor:
# 해당 키워드의 최근 기사
latest_article = await db.articles.find_one(
{"keyword_id": str(keyword["_id"])},
sort=[("created_at", -1)]
)
keywords.append({
"id": str(keyword["_id"]),
"keyword": keyword["keyword"],
"schedule": keyword.get("schedule", "30분마다"),
"created_at": keyword.get("created_at"),
"last_article": latest_article["created_at"] if latest_article else None,
"article_count": await db.articles.count_documents(
{"keyword_id": str(keyword["_id"])}
)
})
return keywords
except Exception as e:
logger.error(f"Error getting keywords: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/keywords")
async def add_keyword(keyword: str, schedule: str = "30min"):
"""새 키워드 등록"""
try:
new_keyword = {
"keyword": keyword,
"schedule": schedule,
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
result = await db.keywords.insert_one(new_keyword)
return {
"id": str(result.inserted_id),
"keyword": keyword,
"message": "Keyword registered successfully"
}
except Exception as e:
logger.error(f"Error adding keyword: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/keywords/{keyword_id}")
async def delete_keyword(keyword_id: str):
"""키워드 비활성화"""
try:
result = await db.keywords.update_one(
{"_id": keyword_id},
{"$set": {"is_active": False, "updated_at": datetime.now()}}
)
if result.modified_count > 0:
return {"message": "Keyword deactivated successfully"}
else:
raise HTTPException(status_code=404, detail="Keyword not found")
except Exception as e:
logger.error(f"Error deleting keyword: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/articles")
async def get_articles(limit: int = 10, skip: int = 0):
"""최근 생성된 기사 목록"""
try:
articles = []
cursor = db.articles.find().sort("created_at", -1).skip(skip).limit(limit)
async for article in cursor:
articles.append({
"id": str(article["_id"]),
"title": article["title"],
"keyword": article["keyword"],
"summary": article.get("summary", ""),
"created_at": article["created_at"],
"processing_time": article.get("processing_time", 0),
"pipeline_stages": article.get("pipeline_stages", [])
})
total = await db.articles.count_documents({})
return {
"articles": articles,
"total": total,
"limit": limit,
"skip": skip
}
except Exception as e:
logger.error(f"Error getting articles: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/articles/{article_id}")
async def get_article(article_id: str):
"""특정 기사 상세 정보"""
try:
article = await db.articles.find_one({"_id": article_id})
if not article:
raise HTTPException(status_code=404, detail="Article not found")
return article
except Exception as e:
logger.error(f"Error getting article: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/workers")
async def get_workers():
"""워커 상태 정보"""
try:
workers = {}
worker_types = [
"scheduler",
"rss_collector",
"google_search",
"ai_summarizer",
"article_assembly"
]
for worker_type in worker_types:
active_key = f"workers:{worker_type}:active"
active_workers = await redis_client.smembers(active_key)
workers[worker_type] = {
"active": len(active_workers),
"worker_ids": list(active_workers)
}
return workers
except Exception as e:
logger.error(f"Error getting workers: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/trigger/{keyword}")
async def trigger_keyword_processing(keyword: str):
"""수동으로 키워드 처리 트리거"""
try:
# 키워드 찾기
keyword_doc = await db.keywords.find_one({
"keyword": keyword,
"is_active": True
})
if not keyword_doc:
raise HTTPException(status_code=404, detail="Keyword not found or inactive")
# 작업 생성
job = PipelineJob(
keyword_id=str(keyword_doc["_id"]),
keyword=keyword,
stage="keyword_processing",
created_at=datetime.now()
)
# 큐에 추가
await redis_client.rpush("queue:keyword", job.json())
return {
"message": f"Processing triggered for keyword: {keyword}",
"job_id": job.job_id
}
except Exception as e:
logger.error(f"Error triggering keyword: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/health")
async def health_check():
"""시스템 헬스 체크"""
try:
# Redis 체크
redis_status = await redis_client.ping()
# MongoDB 체크
mongodb_status = await db.command("ping")
return {
"status": "healthy",
"redis": "connected" if redis_status else "disconnected",
"mongodb": "connected" if mongodb_status else "disconnected",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)