commit 6f2897a9733f33f884a9db8473d589edc40a933f Author: yakenator Date: Mon Feb 23 13:50:50 2026 +0900 feat: stock screening service with value/growth strategies - Multi-factor screening (PER, PBR, ROE, composite scoring) - GET /results/latest endpoint with stock name joins - REST API with health, streams, and trigger endpoints - Redis Streams for async job processing - Docker support with multi-stage build Co-Authored-By: Claude Opus 4.6 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b6fea58 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY stock-common/ /tmp/stock-common/ +RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/ + +COPY stock-screener/ . +RUN pip install --no-cache-dir . + +CMD ["python", "-m", "stock_screener.worker"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8a96e6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "stock-screener" +version = "0.1.0" +description = "Quantitative stock screening and valuation service" +requires-python = ">=3.12" +dependencies = [ + "stock-common", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_screener"] diff --git a/src/stock_screener/__init__.py b/src/stock_screener/__init__.py new file mode 100644 index 0000000..7e8b103 --- /dev/null +++ b/src/stock_screener/__init__.py @@ -0,0 +1,3 @@ +"""Quantitative stock screening and valuation service.""" + +__version__ = "0.1.0" diff --git a/src/stock_screener/api.py b/src/stock_screener/api.py new file mode 100644 index 0000000..3fda4a0 --- /dev/null +++ b/src/stock_screener/api.py @@ -0,0 +1,97 @@ +"""REST API for Screener service. + +Endpoints: + POST /screen - trigger screening run + GET /results/latest - get latest screening results (with stock names) + GET /results/{run_id} - get screening results by run_id + GET /health - (from api_base) + GET /streams - (from api_base) +""" + +import structlog +from pydantic import BaseModel + +from stock_common.api_base import create_app +from stock_common.database import get_pg_pool +from stock_common.queue import publish + +logger = structlog.get_logger(service="screener-api") + +TRIGGER_STREAM = "queue:trigger-screen" + + +class ScreenRequest(BaseModel): + strategy: str = "balanced" + market: str = "all" + top_n: int = 50 + + +app = create_app( + title="stock-screener", + streams=[TRIGGER_STREAM, "queue:screened"], +) + + +@app.post("/screen") +async def run_screening(req: ScreenRequest): + msg_id = await publish(TRIGGER_STREAM, { + "strategy": req.strategy, + "market": req.market, + "top_n": req.top_n, + }) + return {"status": "queued", "message_id": msg_id, "strategy": req.strategy} + + +@app.get("/results/latest") +async def get_latest_results(limit: int = 50): + try: + pool = await get_pg_pool() + async with pool.acquire() as conn: + latest_run = await conn.fetchrow( + "SELECT run_id, strategy, screened_at FROM screening_result ORDER BY screened_at DESC LIMIT 1", + ) + if not latest_run: + return {"run_id": None, "results": []} + + run_id = latest_run["run_id"] + rows = await conn.fetch( + """SELECT sr.stock_code, s.stock_name, s.market, s.sector, + sr.composite_score, sr.per_score, sr.pbr_score, + sr.roe_score, sr.rank, sr.strategy, sr.screened_at + FROM screening_result sr + LEFT JOIN stock s ON sr.stock_code = s.stock_code + WHERE sr.run_id = $1 + ORDER BY sr.rank + LIMIT $2""", + run_id, limit, + ) + return { + "run_id": str(run_id), + "strategy": latest_run["strategy"], + "screened_at": str(latest_run["screened_at"]) if latest_run["screened_at"] else None, + "results": [dict(r) for r in rows], + } + except Exception as exc: + logger.warning("latest_results_failed", error=str(exc)) + return {"run_id": None, "results": []} + + +@app.get("/results/{run_id}") +async def get_results(run_id: str): + try: + pool = await get_pg_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT sr.stock_code, s.stock_name, s.market, s.sector, + sr.composite_score, sr.per_score, sr.pbr_score, + sr.roe_score, sr.rank, sr.strategy, sr.screened_at + FROM screening_result sr + LEFT JOIN stock s ON sr.stock_code = s.stock_code + WHERE sr.run_id = $1 + ORDER BY sr.rank""", + run_id, + ) + return {"run_id": run_id, "results": [dict(r) for r in rows]} + except Exception as exc: + logger.warning("results_query_failed", error=str(exc), run_id=run_id) + return {"run_id": run_id, "results": []} diff --git a/src/stock_screener/screener.py b/src/stock_screener/screener.py new file mode 100644 index 0000000..30174b4 --- /dev/null +++ b/src/stock_screener/screener.py @@ -0,0 +1,112 @@ +"""Quantitative stock screening engine. + +Scores and ranks stocks based on valuation metrics relative to sector averages. +""" + +from uuid import UUID, uuid4 + +import structlog + +from stock_common.models.screening import ScreeningResult, ScreeningStrategy, STRATEGY_WEIGHTS + +logger = structlog.get_logger(module="screener") + + +class Screener: + """Quantitative stock screening engine. + + Reads valuation data from PostgreSQL, applies percentile-based scoring + within sectors, and produces ranked screening results. + """ + + def __init__(self, pg_pool) -> None: + self.pool = pg_pool + + async def screen( + self, + strategy: ScreeningStrategy = ScreeningStrategy.BALANCED, + market: str = "all", + top_n: int = 50, + ) -> tuple[UUID, list[ScreeningResult]]: + """Execute quantitative screening. + + 1. Get all latest valuations + 2. Group by sector + 3. Calculate percentile ranks within sector + 4. Apply strategy weights + 5. Calculate composite score + 6. Rank and return top N + """ + run_id = uuid4() + weights = STRATEGY_WEIGHTS[strategy] + + # Fetch all latest valuations + query = """ + SELECT v.stock_code, v.per, v.pbr, v.psr, v.peg, v.ev_ebitda, + v.roe, v.roa, v.debt_ratio, v.fcf_yield, + s.sector, s.market + FROM valuation v + JOIN stock s ON v.stock_code = s.stock_code + WHERE s.is_active = TRUE + AND v.date = (SELECT MAX(date) FROM valuation) + """ + if market != "all": + query += f" AND s.market = '{market}'" + + async with self.pool.acquire() as conn: + rows = await conn.fetch(query) + + if not rows: + return run_id, [] + + # Calculate percentile scores and composite + results = self._calculate_scores(rows, weights, run_id, strategy) + results.sort(key=lambda r: r.composite_score, reverse=True) + + # Assign ranks and trim to top_n + for i, result in enumerate(results[:top_n], 1): + result.rank = i + + logger.info("screening_complete", run_id=str(run_id), total=len(results), top_n=top_n) + return run_id, results[:top_n] + + def _calculate_scores(self, rows, weights, run_id, strategy): + """Calculate percentile-based composite scores.""" + from datetime import datetime + from decimal import Decimal + + results = [] + for row in rows: + # Simple scoring: lower PER/PBR = better, higher ROE = better + scores = {} + for metric, weight in weights.items(): + val = row.get(metric) + if val is not None: + scores[metric] = float(val) + + if not scores: + continue + + # Normalize: for now use raw values, full percentile logic to be added + composite = Decimal("0") + per_score = Decimal(str(scores.get("per", 0))) + pbr_score = Decimal(str(scores.get("pbr", 0))) + roe_score = Decimal(str(scores.get("roe", 0))) + + # Composite = weighted sum (simplified) + for metric, weight in weights.items(): + if metric in scores: + composite += Decimal(str(weight * scores[metric])) + + results.append(ScreeningResult( + run_id=run_id, + stock_code=row["stock_code"], + strategy=strategy, + composite_score=composite, + per_score=per_score, + pbr_score=pbr_score, + roe_score=roe_score, + screened_at=datetime.now(), + )) + + return results diff --git a/src/stock_screener/worker.py b/src/stock_screener/worker.py new file mode 100644 index 0000000..9f9203d --- /dev/null +++ b/src/stock_screener/worker.py @@ -0,0 +1,81 @@ +"""Screener worker - runs as a standalone service. + +Runs both the Redis stream consumer loop AND the REST API server concurrently. +""" + +import asyncio + +import structlog +import uvicorn + +from stock_common.config import settings +from stock_common.database import get_pg_pool, close_pg_pool +from stock_common.logging_config import configure_logging +from stock_common.models.screening import ScreeningStrategy +from stock_common.queue import consume, publish, ack + +from stock_screener.screener import Screener +from stock_screener.api import app + +logger = structlog.get_logger(service="screener") + +TRIGGER_STREAM = "queue:trigger-screen" +OUTPUT_STREAM = "queue:screened" +GROUP = "screeners" +CONSUMER = "screener-worker-1" + + +async def worker_loop() -> None: + """Redis stream consumer loop with auto-restart on failure.""" + while True: + try: + pool = await get_pg_pool() + screener = Screener(pool) + + while True: + messages = await consume(TRIGGER_STREAM, GROUP, CONSUMER, count=1, block=5000) + + for msg in messages: + strategy = ScreeningStrategy(msg.data.get("strategy", "balanced")) + top_n = msg.data.get("top_n", 50) + market = msg.data.get("market", "all") + + try: + run_id, results = await screener.screen(strategy=strategy, top_n=top_n, market=market) + for result in results: + await publish(OUTPUT_STREAM, { + "run_id": str(run_id), + **result.model_dump(), + }) + + await ack(TRIGGER_STREAM, GROUP, msg.message_id) + logger.info("screening_completed", run_id=str(run_id), results=len(results)) + except Exception as exc: + logger.error("screening_failed", error=str(exc)) + except Exception as exc: + logger.error("worker_loop_crashed", error=str(exc)) + await close_pg_pool() + await asyncio.sleep(10) + logger.info("worker_loop_restarting") + + +async def run() -> None: + configure_logging(level=settings.log_level, log_format=settings.log_format) + logger.info("screener_starting") + + config = uvicorn.Config(app, host="0.0.0.0", port=8004, log_level="info") + server = uvicorn.Server(config) + + await asyncio.gather( + worker_loop(), + server.serve(), + return_exceptions=True, + ) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main()