From 76bb6da1167abb0c0983e79d7798723453faa1c1 Mon Sep 17 00:00:00 2001 From: yakenator Date: Mon, 23 Feb 2026 13:50:53 +0900 Subject: [PATCH] feat: LLM-powered stock analysis service - AI-driven stock analysis with recommendation generation - GET /results endpoint for analysis data by stock - REST API with health, streams, and trigger endpoints - Redis Streams for async job processing - Docker support with multi-stage build Co-Authored-By: Claude Opus 4.6 --- .dockerignore | 8 +++ .gitignore | 22 +++++++ Dockerfile | 11 ++++ pyproject.toml | 16 +++++ src/stock_llm_analyzer/__init__.py | 3 + src/stock_llm_analyzer/analyzer.py | 98 ++++++++++++++++++++++++++++++ src/stock_llm_analyzer/api.py | 50 +++++++++++++++ src/stock_llm_analyzer/worker.py | 85 ++++++++++++++++++++++++++ 8 files changed, 293 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 pyproject.toml create mode 100644 src/stock_llm_analyzer/__init__.py create mode 100644 src/stock_llm_analyzer/analyzer.py create mode 100644 src/stock_llm_analyzer/api.py create mode 100644 src/stock_llm_analyzer/worker.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b1a3d8e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY stock-common/ /tmp/stock-common/ +RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/ + +COPY stock-llm-analyzer/ . +RUN pip install --no-cache-dir . + +CMD ["python", "-m", "stock_llm_analyzer.worker"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..59c94bd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "stock-llm-analyzer" +version = "0.1.0" +description = "LLM qualitative analysis service using Anthropic Claude API" +requires-python = ">=3.12" +dependencies = [ + "stock-common", + "anthropic>=0.40", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_llm_analyzer"] diff --git a/src/stock_llm_analyzer/__init__.py b/src/stock_llm_analyzer/__init__.py new file mode 100644 index 0000000..c66a0bb --- /dev/null +++ b/src/stock_llm_analyzer/__init__.py @@ -0,0 +1,3 @@ +"""LLM qualitative analysis service using Anthropic Claude API.""" + +__version__ = "0.1.0" diff --git a/src/stock_llm_analyzer/analyzer.py b/src/stock_llm_analyzer/analyzer.py new file mode 100644 index 0000000..ffbcb75 --- /dev/null +++ b/src/stock_llm_analyzer/analyzer.py @@ -0,0 +1,98 @@ +"""Anthropic Claude-based qualitative stock analysis.""" + +import json +from uuid import uuid4 + +import anthropic +import structlog +from aiolimiter import AsyncLimiter + +from stock_common.config import settings +from stock_common.models.analysis import LLMAnalysis, Recommendation + +logger = structlog.get_logger(module="llm_analyzer") + + +class LLMAnalyzer: + """Anthropic Claude-based qualitative analysis.""" + + def __init__(self) -> None: + self.client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) + self.limiter = AsyncLimiter(settings.llm_rate_limit, 1) + + async def analyze(self, stock_data: dict) -> LLMAnalysis: + """Perform qualitative analysis on a stock. + + Args: + stock_data: Dict containing stock_code, financials, valuation, + disclosures, news, catalyst info. + + Returns: + LLMAnalysis model with recommendation. + """ + prompt = self._build_prompt(stock_data) + + async with self.limiter: + response = await self.client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=2000, + messages=[{"role": "user", "content": prompt}], + ) + + return self._parse_response(response, stock_data) + + def _build_prompt(self, data: dict) -> str: + stock_code = data.get("stock_code", "") + catalysts = data.get("detected_catalysts", []) + catalyst_text = "\n".join( + f"- [{c.get('category', '')}] {c.get('keyword', '')}: {c.get('title', '')}" + for c in catalysts + ) + + return f"""Analyze the following Korean stock as an investment opportunity. + +Stock Code: {stock_code} +Composite Screening Score: {data.get('composite_score', 'N/A')} +Catalyst Score: {data.get('catalyst_score', 'N/A')} +Value Trap Warning: {data.get('is_value_trap', False)} + +Detected Catalysts: +{catalyst_text or 'None detected'} + +Provide your analysis in the following JSON format: +{{ + "summary": "2-3 sentence investment thesis", + "valuation_comment": "Is this stock undervalued? Why?", + "risk_factors": ["risk1", "risk2"], + "catalysts": ["catalyst1", "catalyst2"], + "recommendation": "STRONG_BUY|BUY|HOLD|SELL|STRONG_SELL", + "confidence": 0.0-1.0 +}}""" + + def _parse_response(self, response, stock_data: dict) -> LLMAnalysis: + from datetime import datetime + + text = response.content[0].text + try: + parsed = json.loads(text) + except json.JSONDecodeError: + # Try to extract JSON from response + start = text.find("{") + end = text.rfind("}") + 1 + if start >= 0 and end > start: + parsed = json.loads(text[start:end]) + else: + parsed = {} + + return LLMAnalysis( + analysis_id=uuid4(), + stock_code=stock_data.get("stock_code", ""), + summary=parsed.get("summary", ""), + valuation_comment=parsed.get("valuation_comment", ""), + risk_factors=parsed.get("risk_factors", []), + catalysts=parsed.get("catalysts", []), + recommendation=Recommendation(parsed.get("recommendation", "HOLD")), + confidence=float(parsed.get("confidence", 0.5)), + analyzed_at=datetime.now(), + model_name="claude-sonnet-4-20250514", + ) diff --git a/src/stock_llm_analyzer/api.py b/src/stock_llm_analyzer/api.py new file mode 100644 index 0000000..96d3b7c --- /dev/null +++ b/src/stock_llm_analyzer/api.py @@ -0,0 +1,50 @@ +"""REST API for LLM Analyzer service. + +Endpoints: + POST /analyze - trigger LLM analysis for a stock + GET /results/{stock_code} - get analysis results from MongoDB + GET /health - (from api_base) + GET /streams - (from api_base) +""" + +from pydantic import BaseModel + +from stock_common.api_base import create_app +from stock_common.database import get_mongo_database +from stock_common.queue import publish + +INPUT_STREAM = "queue:catalysts" +OUTPUT_STREAM = "queue:results" + + +class AnalyzeRequest(BaseModel): + stock_code: str + run_id: str = "" + catalyst_score: float = 50 + composite_score: float = 0 + detected_catalysts: list[dict] = [] + is_value_trap: bool = False + + +app = create_app( + title="stock-llm-analyzer", + streams=[INPUT_STREAM, OUTPUT_STREAM], +) + + +@app.post("/analyze") +async def analyze_stock(req: AnalyzeRequest): + msg_id = await publish(INPUT_STREAM, req.model_dump()) + return {"status": "queued", "message_id": msg_id, "stock_code": req.stock_code} + + +@app.get("/results/{stock_code}") +async def get_analysis_results(stock_code: str, limit: int = 5): + db = get_mongo_database() + cursor = db.llm_analysis.find( + {"stock_code": stock_code} + ).sort("analyzed_at", -1).limit(limit) + docs = await cursor.to_list(limit) + for d in docs: + d["_id"] = str(d["_id"]) + return {"stock_code": stock_code, "analyses": docs} diff --git a/src/stock_llm_analyzer/worker.py b/src/stock_llm_analyzer/worker.py new file mode 100644 index 0000000..4f1f1d3 --- /dev/null +++ b/src/stock_llm_analyzer/worker.py @@ -0,0 +1,85 @@ +"""LLM analyzer worker - runs as a standalone service. + +Runs both the Redis stream consumer loop AND the REST API server concurrently. +""" + +import asyncio + +import structlog +import uvicorn + +from stock_common.config import settings +from stock_common.logging_config import configure_logging +from stock_common.queue import consume, publish, ack + +from stock_llm_analyzer.analyzer import LLMAnalyzer +from stock_llm_analyzer.api import app + +logger = structlog.get_logger(service="llm-analyzer") + +INPUT_STREAM = "queue:catalysts" +OUTPUT_STREAM = "queue:results" +GROUP = "llm-analyzers" +CONSUMER = "llm-worker-1" + + +async def worker_loop() -> None: + """Redis stream consumer loop with auto-restart on failure.""" + while True: + try: + analyzer = LLMAnalyzer() + + while True: + messages = await consume(INPUT_STREAM, GROUP, CONSUMER, count=1, block=5000) + + for msg in messages: + stock_code = msg.data.get("stock_code", "") + + try: + catalyst_score = msg.data.get("catalyst_score", 0) + if catalyst_score < 20: + logger.info("skipping_low_catalyst", stock_code=stock_code, score=catalyst_score) + await ack(INPUT_STREAM, GROUP, msg.message_id) + continue + + analysis = await analyzer.analyze(msg.data) + + await publish(OUTPUT_STREAM, { + "stock_code": stock_code, + "run_id": msg.data.get("run_id", ""), + **analysis.model_dump(), + }) + await ack(INPUT_STREAM, GROUP, msg.message_id) + logger.info( + "analysis_completed", stock_code=stock_code, + recommendation=analysis.recommendation.value, + confidence=analysis.confidence, + ) + except Exception as exc: + logger.error("analysis_failed", stock_code=stock_code, error=str(exc)) + except Exception as exc: + logger.error("worker_loop_crashed", error=str(exc)) + await asyncio.sleep(10) + logger.info("worker_loop_restarting") + + +async def run() -> None: + configure_logging(level=settings.log_level, log_format=settings.log_format) + logger.info("llm_analyzer_starting") + + config = uvicorn.Config(app, host="0.0.0.0", port=8006, log_level="info") + server = uvicorn.Server(config) + + await asyncio.gather( + worker_loop(), + server.serve(), + return_exceptions=True, + ) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main()