""" Keyword Scheduler Service 데이터베이스에 등록된 키워드를 주기적으로 실행하는 스케줄러 """ import asyncio import logging import os import sys from datetime import datetime, timedelta from motor.motor_asyncio import AsyncIOMotorClient from typing import List, Optional import uuid # Import from shared module sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from shared.models import Keyword, PipelineJob from shared.queue_manager import QueueManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class KeywordScheduler: def __init__(self): self.queue_manager = QueueManager( redis_url=os.getenv("REDIS_URL", "redis://redis:6379") ) self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017") self.db_name = os.getenv("DB_NAME", "ai_writer_db") self.db = None self.check_interval = int(os.getenv("SCHEDULER_CHECK_INTERVAL", "60")) # 1분마다 체크 self.default_interval = int(os.getenv("DEFAULT_KEYWORD_INTERVAL", "60")) # 기본 1시간 async def start(self): """스케줄러 시작""" logger.info("Starting Keyword Scheduler") # Redis 연결 await self.queue_manager.connect() # MongoDB 연결 client = AsyncIOMotorClient(self.mongodb_url) self.db = client[self.db_name] # 초기 키워드 설정 await self.initialize_keywords() # 메인 루프 while True: try: await self.check_and_execute_keywords() await asyncio.sleep(self.check_interval) except Exception as e: logger.error(f"Error in scheduler loop: {e}") await asyncio.sleep(10) async def initialize_keywords(self): """초기 키워드 설정 (없으면 생성)""" try: # keywords 컬렉션 확인 count = await self.db.keywords.count_documents({}) if count == 0: logger.info("No keywords found. Creating default keywords...") # 기본 키워드 생성 default_keywords = [ { "keyword": "AI", "interval_minutes": 60, "is_active": True, "priority": 1, "rss_feeds": [] }, { "keyword": "경제", "interval_minutes": 120, "is_active": True, "priority": 0, "rss_feeds": [] }, { "keyword": "테크놀로지", "interval_minutes": 60, "is_active": True, "priority": 1, "rss_feeds": [] } ] for kw_data in default_keywords: keyword = Keyword(**kw_data) # 다음 실행 시간 설정 keyword.next_run = datetime.now() + timedelta(minutes=5) # 5분 후 첫 실행 await self.db.keywords.insert_one(keyword.dict()) logger.info(f"Created keyword: {keyword.keyword}") logger.info(f"Found {count} keywords in database") except Exception as e: logger.error(f"Error initializing keywords: {e}") async def check_and_execute_keywords(self): """실행할 키워드 체크 및 실행""" try: # 현재 시간 now = datetime.now() # 실행할 키워드 조회 (활성화되고 next_run이 현재 시간 이전인 것) query = { "is_active": True, "$or": [ {"next_run": {"$lte": now}}, {"next_run": None} # next_run이 설정되지 않은 경우 ] } # 우선순위 순으로 정렬 cursor = self.db.keywords.find(query).sort("priority", -1) keywords = await cursor.to_list(None) for keyword_data in keywords: keyword = Keyword(**keyword_data) await self.execute_keyword(keyword) except Exception as e: logger.error(f"Error checking keywords: {e}") async def execute_keyword(self, keyword: Keyword): """키워드 실행""" try: logger.info(f"Executing keyword: {keyword.keyword}") # PipelineJob 생성 job = PipelineJob( keyword_id=keyword.keyword_id, keyword=keyword.keyword, stage='rss_collection', data={ 'rss_feeds': keyword.rss_feeds if keyword.rss_feeds else [], 'max_articles': keyword.max_articles_per_run, 'scheduled': True }, priority=keyword.priority ) # 큐에 작업 추가 await self.queue_manager.enqueue('rss_collection', job) logger.info(f"Enqueued job for keyword '{keyword.keyword}' with job_id: {job.job_id}") # 키워드 업데이트 update_data = { "last_run": datetime.now(), "next_run": datetime.now() + timedelta(minutes=keyword.interval_minutes), "updated_at": datetime.now() } await self.db.keywords.update_one( {"keyword_id": keyword.keyword_id}, {"$set": update_data} ) logger.info(f"Updated keyword '{keyword.keyword}' - next run at {update_data['next_run']}") except Exception as e: logger.error(f"Error executing keyword {keyword.keyword}: {e}") async def add_keyword(self, keyword_text: str, interval_minutes: int = None, rss_feeds: List[str] = None, priority: int = 0): """새 키워드 추가""" try: # 중복 체크 existing = await self.db.keywords.find_one({"keyword": keyword_text}) if existing: logger.warning(f"Keyword '{keyword_text}' already exists") return None # 새 키워드 생성 keyword = Keyword( keyword=keyword_text, interval_minutes=interval_minutes or self.default_interval, rss_feeds=rss_feeds or [], priority=priority, next_run=datetime.now() + timedelta(minutes=1) # 1분 후 첫 실행 ) result = await self.db.keywords.insert_one(keyword.dict()) logger.info(f"Added new keyword: {keyword_text}") return keyword except Exception as e: logger.error(f"Error adding keyword: {e}") return None async def update_keyword(self, keyword_id: str, **kwargs): """키워드 업데이트""" try: # 업데이트할 필드 update_data = {k: v for k, v in kwargs.items() if v is not None} update_data["updated_at"] = datetime.now() result = await self.db.keywords.update_one( {"keyword_id": keyword_id}, {"$set": update_data} ) if result.modified_count > 0: logger.info(f"Updated keyword {keyword_id}") return True return False except Exception as e: logger.error(f"Error updating keyword: {e}") return False async def delete_keyword(self, keyword_id: str): """키워드 삭제""" try: result = await self.db.keywords.delete_one({"keyword_id": keyword_id}) if result.deleted_count > 0: logger.info(f"Deleted keyword {keyword_id}") return True return False except Exception as e: logger.error(f"Error deleting keyword: {e}") return False async def stop(self): """스케줄러 중지""" await self.queue_manager.disconnect() logger.info("Keyword Scheduler stopped") async def main(): """메인 함수""" scheduler = KeywordScheduler() try: await scheduler.start() except KeyboardInterrupt: logger.info("Received interrupt signal") finally: await scheduler.stop() if __name__ == "__main__": asyncio.run(main())