commit bec61e2b024566d8ad859e33fbf2c10c371d67de Author: yakenator Date: Mon Feb 23 13:50:51 2026 +0900 feat: catalyst detection service for stock events - Disclosure-based catalyst identification - GET /results endpoint for catalyst data by stock - REST API with health, streams, and trigger endpoints - Redis Streams for async job processing - Docker support with multi-stage build Co-Authored-By: Claude Opus 4.6 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a8779a6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY stock-common/ /tmp/stock-common/ +RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/ + +COPY stock-catalyst/ . +RUN pip install --no-cache-dir . + +CMD ["python", "-m", "stock_catalyst.worker"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f1aab96 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "stock-catalyst" +version = "0.1.0" +description = "Catalyst detection service - analyzes disclosures and news for positive signals" +requires-python = ">=3.12" +dependencies = [ + "stock-common", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_catalyst"] diff --git a/src/stock_catalyst/__init__.py b/src/stock_catalyst/__init__.py new file mode 100644 index 0000000..d99d30d --- /dev/null +++ b/src/stock_catalyst/__init__.py @@ -0,0 +1,3 @@ +"""Catalyst detection service.""" + +__version__ = "0.1.0" diff --git a/src/stock_catalyst/api.py b/src/stock_catalyst/api.py new file mode 100644 index 0000000..9a93e88 --- /dev/null +++ b/src/stock_catalyst/api.py @@ -0,0 +1,51 @@ +"""REST API for Catalyst detector service. + +Endpoints: + POST /detect - trigger catalyst detection for a stock + GET /results/{stock_code} - get catalyst results from MongoDB + GET /health - (from api_base) + GET /streams - (from api_base) +""" + +from pydantic import BaseModel + +from stock_common.api_base import create_app +from stock_common.database import get_mongo_database +from stock_common.queue import publish + +INPUT_STREAM = "queue:screened" +OUTPUT_STREAM = "queue:catalysts" + + +class DetectRequest(BaseModel): + stock_code: str + run_id: str = "" + composite_score: float = 0 + + +app = create_app( + title="stock-catalyst", + streams=[INPUT_STREAM, OUTPUT_STREAM], +) + + +@app.post("/detect") +async def detect_catalyst(req: DetectRequest): + msg_id = await publish(INPUT_STREAM, { + "stock_code": req.stock_code, + "run_id": req.run_id, + "composite_score": req.composite_score, + }) + return {"status": "queued", "message_id": msg_id, "stock_code": req.stock_code} + + +@app.get("/results/{stock_code}") +async def get_catalyst_results(stock_code: str, limit: int = 10): + db = get_mongo_database() + cursor = db.disclosures.find( + {"stock_code": stock_code} + ).sort("disclosed_at", -1).limit(limit) + docs = await cursor.to_list(limit) + for d in docs: + d["_id"] = str(d["_id"]) + return {"stock_code": stock_code, "disclosures": docs} diff --git a/src/stock_catalyst/detector.py b/src/stock_catalyst/detector.py new file mode 100644 index 0000000..cd15cad --- /dev/null +++ b/src/stock_catalyst/detector.py @@ -0,0 +1,86 @@ +"""Catalyst detection engine. + +Analyzes disclosures and news to detect positive catalysts for screened stocks. +Keyword-based pattern matching with configurable keyword lists. +""" + +import structlog + +from stock_common.models.disclosure import Disclosure + +logger = structlog.get_logger(module="catalyst.detector") + +# Catalyst keyword patterns (Korean) +CATALYST_KEYWORDS = { + "earnings_surprise": ["어닝서프라이즈", "실적 호조", "영업이익 증가", "순이익 증가", "흑자전환", "사상최대"], + "large_contract": ["대규모 계약", "수주", "공급계약", "납품계약", "신규 수주"], + "shareholder_return": ["배당", "자사주", "자기주식", "주주환원", "배당금 증가"], + "mna": ["인수", "합병", "M&A", "지분 취득", "경영권"], + "regulatory_positive": ["규제 완화", "인허가", "승인", "보조금", "세제 혜택"], +} + +# Value trap warning keywords +VALUE_TRAP_KEYWORDS = [ + "매출 감소", "영업손실", "적자 전환", "적자 지속", "구조조정", + "감자", "상장폐지", "관리종목", "자본잠식", +] + + +class CatalystDetector: + """Detects positive catalysts from disclosures and news.""" + + def detect_from_disclosures(self, disclosures: list[Disclosure]) -> dict: + """Analyze disclosures for catalyst signals. + + Returns: + Dict with catalyst_score, detected_catalysts, is_value_trap. + """ + detected: list[dict] = [] + value_trap_signals: list[str] = [] + + for disclosure in disclosures: + text = f"{disclosure.title} {disclosure.content_summary}".lower() + + # Check catalyst patterns + for category, keywords in CATALYST_KEYWORDS.items(): + for keyword in keywords: + if keyword in text: + detected.append({ + "category": category, + "keyword": keyword, + "title": disclosure.title, + "dart_id": disclosure.dart_id, + }) + + # Check value trap signals + for keyword in VALUE_TRAP_KEYWORDS: + if keyword in text: + value_trap_signals.append(keyword) + + # Score: 0-100 based on number and quality of catalysts + score = min(100, len(detected) * 20) + is_value_trap = len(value_trap_signals) >= 2 + + if is_value_trap: + score = max(0, score - 40) + + return { + "catalyst_score": score, + "detected_catalysts": detected, + "value_trap_signals": value_trap_signals, + "is_value_trap": is_value_trap, + } + + def detect_from_news(self, news_titles: list[str]) -> dict: + """Analyze news titles for catalyst signals.""" + detected: list[dict] = [] + + for title in news_titles: + text = title.lower() + for category, keywords in CATALYST_KEYWORDS.items(): + for keyword in keywords: + if keyword in text: + detected.append({"category": category, "keyword": keyword, "title": title}) + + score = min(100, len(detected) * 15) + return {"catalyst_score": score, "detected_catalysts": detected} diff --git a/src/stock_catalyst/worker.py b/src/stock_catalyst/worker.py new file mode 100644 index 0000000..8b3c3b9 --- /dev/null +++ b/src/stock_catalyst/worker.py @@ -0,0 +1,83 @@ +"""Catalyst detector worker - runs as a standalone service. + +Runs both the Redis stream consumer loop AND the REST API server concurrently. +""" + +import asyncio + +import structlog +import uvicorn + +from stock_common.config import settings +from stock_common.database import get_mongo_database +from stock_common.logging_config import configure_logging +from stock_common.models.disclosure import Disclosure +from stock_common.queue import consume, publish, ack + +from stock_catalyst.detector import CatalystDetector +from stock_catalyst.api import app + +logger = structlog.get_logger(service="catalyst") + +INPUT_STREAM = "queue:screened" +OUTPUT_STREAM = "queue:catalysts" +GROUP = "catalyst-detectors" +CONSUMER = "catalyst-worker-1" + + +async def worker_loop() -> None: + """Redis stream consumer loop with auto-restart on failure.""" + while True: + try: + detector = CatalystDetector() + db = get_mongo_database() + + while True: + messages = await consume(INPUT_STREAM, GROUP, CONSUMER, count=5, block=5000) + + for msg in messages: + stock_code = msg.data.get("stock_code", "") + + try: + raw_disclosures = await db.disclosures.find( + {"stock_code": stock_code} + ).sort("disclosed_at", -1).limit(20).to_list(20) + + disclosures = [Disclosure(**d) for d in raw_disclosures if "dart_id" in d] + + result = detector.detect_from_disclosures(disclosures) + result["stock_code"] = stock_code + result["run_id"] = msg.data.get("run_id", "") + result["composite_score"] = msg.data.get("composite_score", 0) + + await publish(OUTPUT_STREAM, result) + await ack(INPUT_STREAM, GROUP, msg.message_id) + logger.info("catalyst_detected", stock_code=stock_code, score=result["catalyst_score"]) + except Exception as exc: + logger.error("catalyst_failed", stock_code=stock_code, error=str(exc)) + except Exception as exc: + logger.error("worker_loop_crashed", error=str(exc)) + await asyncio.sleep(10) + logger.info("worker_loop_restarting") + + +async def run() -> None: + configure_logging(level=settings.log_level, log_format=settings.log_format) + logger.info("catalyst_detector_starting") + + config = uvicorn.Config(app, host="0.0.0.0", port=8005, log_level="info") + server = uvicorn.Server(config) + + await asyncio.gather( + worker_loop(), + server.serve(), + return_exceptions=True, + ) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main()