feat: catalyst detection service for stock events

- Disclosure-based catalyst identification
- GET /results endpoint for catalyst data by stock
- REST API with health, streams, and trigger endpoints
- Redis Streams for async job processing
- Docker support with multi-stage build

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yakenator
2026-02-23 13:50:51 +09:00
commit bec61e2b02
8 changed files with 279 additions and 0 deletions

8
.dockerignore Normal file
View File

@ -0,0 +1,8 @@
__pycache__
*.pyc
.git
.venv
*.egg-info
dist
build
.env

22
.gitignore vendored Normal file
View File

@ -0,0 +1,22 @@
__pycache__/
*.py[cod]
*$py.class
*.so
*.egg-info/
dist/
build/
.eggs/
.venv/
venv/
.env
.vscode/
.idea/
*.swp
*.swo
.DS_Store
.pytest_cache/
htmlcov/
.coverage
coverage.xml
.mypy_cache/
.claude/

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM python:3.12-slim
WORKDIR /app
COPY stock-common/ /tmp/stock-common/
RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/
COPY stock-catalyst/ .
RUN pip install --no-cache-dir .
CMD ["python", "-m", "stock_catalyst.worker"]

15
pyproject.toml Normal file
View File

@ -0,0 +1,15 @@
[project]
name = "stock-catalyst"
version = "0.1.0"
description = "Catalyst detection service - analyzes disclosures and news for positive signals"
requires-python = ">=3.12"
dependencies = [
"stock-common",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/stock_catalyst"]

View File

@ -0,0 +1,3 @@
"""Catalyst detection service."""
__version__ = "0.1.0"

51
src/stock_catalyst/api.py Normal file
View File

@ -0,0 +1,51 @@
"""REST API for Catalyst detector service.
Endpoints:
POST /detect - trigger catalyst detection for a stock
GET /results/{stock_code} - get catalyst results from MongoDB
GET /health - (from api_base)
GET /streams - (from api_base)
"""
from pydantic import BaseModel
from stock_common.api_base import create_app
from stock_common.database import get_mongo_database
from stock_common.queue import publish
INPUT_STREAM = "queue:screened"
OUTPUT_STREAM = "queue:catalysts"
class DetectRequest(BaseModel):
stock_code: str
run_id: str = ""
composite_score: float = 0
app = create_app(
title="stock-catalyst",
streams=[INPUT_STREAM, OUTPUT_STREAM],
)
@app.post("/detect")
async def detect_catalyst(req: DetectRequest):
msg_id = await publish(INPUT_STREAM, {
"stock_code": req.stock_code,
"run_id": req.run_id,
"composite_score": req.composite_score,
})
return {"status": "queued", "message_id": msg_id, "stock_code": req.stock_code}
@app.get("/results/{stock_code}")
async def get_catalyst_results(stock_code: str, limit: int = 10):
db = get_mongo_database()
cursor = db.disclosures.find(
{"stock_code": stock_code}
).sort("disclosed_at", -1).limit(limit)
docs = await cursor.to_list(limit)
for d in docs:
d["_id"] = str(d["_id"])
return {"stock_code": stock_code, "disclosures": docs}

View File

@ -0,0 +1,86 @@
"""Catalyst detection engine.
Analyzes disclosures and news to detect positive catalysts for screened stocks.
Keyword-based pattern matching with configurable keyword lists.
"""
import structlog
from stock_common.models.disclosure import Disclosure
logger = structlog.get_logger(module="catalyst.detector")
# Catalyst keyword patterns (Korean)
CATALYST_KEYWORDS = {
"earnings_surprise": ["어닝서프라이즈", "실적 호조", "영업이익 증가", "순이익 증가", "흑자전환", "사상최대"],
"large_contract": ["대규모 계약", "수주", "공급계약", "납품계약", "신규 수주"],
"shareholder_return": ["배당", "자사주", "자기주식", "주주환원", "배당금 증가"],
"mna": ["인수", "합병", "M&A", "지분 취득", "경영권"],
"regulatory_positive": ["규제 완화", "인허가", "승인", "보조금", "세제 혜택"],
}
# Value trap warning keywords
VALUE_TRAP_KEYWORDS = [
"매출 감소", "영업손실", "적자 전환", "적자 지속", "구조조정",
"감자", "상장폐지", "관리종목", "자본잠식",
]
class CatalystDetector:
"""Detects positive catalysts from disclosures and news."""
def detect_from_disclosures(self, disclosures: list[Disclosure]) -> dict:
"""Analyze disclosures for catalyst signals.
Returns:
Dict with catalyst_score, detected_catalysts, is_value_trap.
"""
detected: list[dict] = []
value_trap_signals: list[str] = []
for disclosure in disclosures:
text = f"{disclosure.title} {disclosure.content_summary}".lower()
# Check catalyst patterns
for category, keywords in CATALYST_KEYWORDS.items():
for keyword in keywords:
if keyword in text:
detected.append({
"category": category,
"keyword": keyword,
"title": disclosure.title,
"dart_id": disclosure.dart_id,
})
# Check value trap signals
for keyword in VALUE_TRAP_KEYWORDS:
if keyword in text:
value_trap_signals.append(keyword)
# Score: 0-100 based on number and quality of catalysts
score = min(100, len(detected) * 20)
is_value_trap = len(value_trap_signals) >= 2
if is_value_trap:
score = max(0, score - 40)
return {
"catalyst_score": score,
"detected_catalysts": detected,
"value_trap_signals": value_trap_signals,
"is_value_trap": is_value_trap,
}
def detect_from_news(self, news_titles: list[str]) -> dict:
"""Analyze news titles for catalyst signals."""
detected: list[dict] = []
for title in news_titles:
text = title.lower()
for category, keywords in CATALYST_KEYWORDS.items():
for keyword in keywords:
if keyword in text:
detected.append({"category": category, "keyword": keyword, "title": title})
score = min(100, len(detected) * 15)
return {"catalyst_score": score, "detected_catalysts": detected}

View File

@ -0,0 +1,83 @@
"""Catalyst detector worker - runs as a standalone service.
Runs both the Redis stream consumer loop AND the REST API server concurrently.
"""
import asyncio
import structlog
import uvicorn
from stock_common.config import settings
from stock_common.database import get_mongo_database
from stock_common.logging_config import configure_logging
from stock_common.models.disclosure import Disclosure
from stock_common.queue import consume, publish, ack
from stock_catalyst.detector import CatalystDetector
from stock_catalyst.api import app
logger = structlog.get_logger(service="catalyst")
INPUT_STREAM = "queue:screened"
OUTPUT_STREAM = "queue:catalysts"
GROUP = "catalyst-detectors"
CONSUMER = "catalyst-worker-1"
async def worker_loop() -> None:
"""Redis stream consumer loop with auto-restart on failure."""
while True:
try:
detector = CatalystDetector()
db = get_mongo_database()
while True:
messages = await consume(INPUT_STREAM, GROUP, CONSUMER, count=5, block=5000)
for msg in messages:
stock_code = msg.data.get("stock_code", "")
try:
raw_disclosures = await db.disclosures.find(
{"stock_code": stock_code}
).sort("disclosed_at", -1).limit(20).to_list(20)
disclosures = [Disclosure(**d) for d in raw_disclosures if "dart_id" in d]
result = detector.detect_from_disclosures(disclosures)
result["stock_code"] = stock_code
result["run_id"] = msg.data.get("run_id", "")
result["composite_score"] = msg.data.get("composite_score", 0)
await publish(OUTPUT_STREAM, result)
await ack(INPUT_STREAM, GROUP, msg.message_id)
logger.info("catalyst_detected", stock_code=stock_code, score=result["catalyst_score"])
except Exception as exc:
logger.error("catalyst_failed", stock_code=stock_code, error=str(exc))
except Exception as exc:
logger.error("worker_loop_crashed", error=str(exc))
await asyncio.sleep(10)
logger.info("worker_loop_restarting")
async def run() -> None:
configure_logging(level=settings.log_level, log_format=settings.log_format)
logger.info("catalyst_detector_starting")
config = uvicorn.Config(app, host="0.0.0.0", port=8005, log_level="info")
server = uvicorn.Server(config)
await asyncio.gather(
worker_loop(),
server.serve(),
return_exceptions=True,
)
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()