feat: stock screening service with value/growth strategies

- Multi-factor screening (PER, PBR, ROE, composite scoring)
- GET /results/latest endpoint with stock name joins
- REST API with health, streams, and trigger endpoints
- Redis Streams for async job processing
- Docker support with multi-stage build

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yakenator
2026-02-23 13:50:50 +09:00
commit 6f2897a973
8 changed files with 349 additions and 0 deletions

8
.dockerignore Normal file
View File

@ -0,0 +1,8 @@
__pycache__
*.pyc
.git
.venv
*.egg-info
dist
build
.env

22
.gitignore vendored Normal file
View File

@ -0,0 +1,22 @@
__pycache__/
*.py[cod]
*$py.class
*.so
*.egg-info/
dist/
build/
.eggs/
.venv/
venv/
.env
.vscode/
.idea/
*.swp
*.swo
.DS_Store
.pytest_cache/
htmlcov/
.coverage
coverage.xml
.mypy_cache/
.claude/

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM python:3.12-slim
WORKDIR /app
COPY stock-common/ /tmp/stock-common/
RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/
COPY stock-screener/ .
RUN pip install --no-cache-dir .
CMD ["python", "-m", "stock_screener.worker"]

15
pyproject.toml Normal file
View File

@ -0,0 +1,15 @@
[project]
name = "stock-screener"
version = "0.1.0"
description = "Quantitative stock screening and valuation service"
requires-python = ">=3.12"
dependencies = [
"stock-common",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/stock_screener"]

View File

@ -0,0 +1,3 @@
"""Quantitative stock screening and valuation service."""
__version__ = "0.1.0"

97
src/stock_screener/api.py Normal file
View File

@ -0,0 +1,97 @@
"""REST API for Screener service.
Endpoints:
POST /screen - trigger screening run
GET /results/latest - get latest screening results (with stock names)
GET /results/{run_id} - get screening results by run_id
GET /health - (from api_base)
GET /streams - (from api_base)
"""
import structlog
from pydantic import BaseModel
from stock_common.api_base import create_app
from stock_common.database import get_pg_pool
from stock_common.queue import publish
logger = structlog.get_logger(service="screener-api")
TRIGGER_STREAM = "queue:trigger-screen"
class ScreenRequest(BaseModel):
strategy: str = "balanced"
market: str = "all"
top_n: int = 50
app = create_app(
title="stock-screener",
streams=[TRIGGER_STREAM, "queue:screened"],
)
@app.post("/screen")
async def run_screening(req: ScreenRequest):
msg_id = await publish(TRIGGER_STREAM, {
"strategy": req.strategy,
"market": req.market,
"top_n": req.top_n,
})
return {"status": "queued", "message_id": msg_id, "strategy": req.strategy}
@app.get("/results/latest")
async def get_latest_results(limit: int = 50):
try:
pool = await get_pg_pool()
async with pool.acquire() as conn:
latest_run = await conn.fetchrow(
"SELECT run_id, strategy, screened_at FROM screening_result ORDER BY screened_at DESC LIMIT 1",
)
if not latest_run:
return {"run_id": None, "results": []}
run_id = latest_run["run_id"]
rows = await conn.fetch(
"""SELECT sr.stock_code, s.stock_name, s.market, s.sector,
sr.composite_score, sr.per_score, sr.pbr_score,
sr.roe_score, sr.rank, sr.strategy, sr.screened_at
FROM screening_result sr
LEFT JOIN stock s ON sr.stock_code = s.stock_code
WHERE sr.run_id = $1
ORDER BY sr.rank
LIMIT $2""",
run_id, limit,
)
return {
"run_id": str(run_id),
"strategy": latest_run["strategy"],
"screened_at": str(latest_run["screened_at"]) if latest_run["screened_at"] else None,
"results": [dict(r) for r in rows],
}
except Exception as exc:
logger.warning("latest_results_failed", error=str(exc))
return {"run_id": None, "results": []}
@app.get("/results/{run_id}")
async def get_results(run_id: str):
try:
pool = await get_pg_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT sr.stock_code, s.stock_name, s.market, s.sector,
sr.composite_score, sr.per_score, sr.pbr_score,
sr.roe_score, sr.rank, sr.strategy, sr.screened_at
FROM screening_result sr
LEFT JOIN stock s ON sr.stock_code = s.stock_code
WHERE sr.run_id = $1
ORDER BY sr.rank""",
run_id,
)
return {"run_id": run_id, "results": [dict(r) for r in rows]}
except Exception as exc:
logger.warning("results_query_failed", error=str(exc), run_id=run_id)
return {"run_id": run_id, "results": []}

View File

@ -0,0 +1,112 @@
"""Quantitative stock screening engine.
Scores and ranks stocks based on valuation metrics relative to sector averages.
"""
from uuid import UUID, uuid4
import structlog
from stock_common.models.screening import ScreeningResult, ScreeningStrategy, STRATEGY_WEIGHTS
logger = structlog.get_logger(module="screener")
class Screener:
"""Quantitative stock screening engine.
Reads valuation data from PostgreSQL, applies percentile-based scoring
within sectors, and produces ranked screening results.
"""
def __init__(self, pg_pool) -> None:
self.pool = pg_pool
async def screen(
self,
strategy: ScreeningStrategy = ScreeningStrategy.BALANCED,
market: str = "all",
top_n: int = 50,
) -> tuple[UUID, list[ScreeningResult]]:
"""Execute quantitative screening.
1. Get all latest valuations
2. Group by sector
3. Calculate percentile ranks within sector
4. Apply strategy weights
5. Calculate composite score
6. Rank and return top N
"""
run_id = uuid4()
weights = STRATEGY_WEIGHTS[strategy]
# Fetch all latest valuations
query = """
SELECT v.stock_code, v.per, v.pbr, v.psr, v.peg, v.ev_ebitda,
v.roe, v.roa, v.debt_ratio, v.fcf_yield,
s.sector, s.market
FROM valuation v
JOIN stock s ON v.stock_code = s.stock_code
WHERE s.is_active = TRUE
AND v.date = (SELECT MAX(date) FROM valuation)
"""
if market != "all":
query += f" AND s.market = '{market}'"
async with self.pool.acquire() as conn:
rows = await conn.fetch(query)
if not rows:
return run_id, []
# Calculate percentile scores and composite
results = self._calculate_scores(rows, weights, run_id, strategy)
results.sort(key=lambda r: r.composite_score, reverse=True)
# Assign ranks and trim to top_n
for i, result in enumerate(results[:top_n], 1):
result.rank = i
logger.info("screening_complete", run_id=str(run_id), total=len(results), top_n=top_n)
return run_id, results[:top_n]
def _calculate_scores(self, rows, weights, run_id, strategy):
"""Calculate percentile-based composite scores."""
from datetime import datetime
from decimal import Decimal
results = []
for row in rows:
# Simple scoring: lower PER/PBR = better, higher ROE = better
scores = {}
for metric, weight in weights.items():
val = row.get(metric)
if val is not None:
scores[metric] = float(val)
if not scores:
continue
# Normalize: for now use raw values, full percentile logic to be added
composite = Decimal("0")
per_score = Decimal(str(scores.get("per", 0)))
pbr_score = Decimal(str(scores.get("pbr", 0)))
roe_score = Decimal(str(scores.get("roe", 0)))
# Composite = weighted sum (simplified)
for metric, weight in weights.items():
if metric in scores:
composite += Decimal(str(weight * scores[metric]))
results.append(ScreeningResult(
run_id=run_id,
stock_code=row["stock_code"],
strategy=strategy,
composite_score=composite,
per_score=per_score,
pbr_score=pbr_score,
roe_score=roe_score,
screened_at=datetime.now(),
))
return results

View File

@ -0,0 +1,81 @@
"""Screener worker - runs as a standalone service.
Runs both the Redis stream consumer loop AND the REST API server concurrently.
"""
import asyncio
import structlog
import uvicorn
from stock_common.config import settings
from stock_common.database import get_pg_pool, close_pg_pool
from stock_common.logging_config import configure_logging
from stock_common.models.screening import ScreeningStrategy
from stock_common.queue import consume, publish, ack
from stock_screener.screener import Screener
from stock_screener.api import app
logger = structlog.get_logger(service="screener")
TRIGGER_STREAM = "queue:trigger-screen"
OUTPUT_STREAM = "queue:screened"
GROUP = "screeners"
CONSUMER = "screener-worker-1"
async def worker_loop() -> None:
"""Redis stream consumer loop with auto-restart on failure."""
while True:
try:
pool = await get_pg_pool()
screener = Screener(pool)
while True:
messages = await consume(TRIGGER_STREAM, GROUP, CONSUMER, count=1, block=5000)
for msg in messages:
strategy = ScreeningStrategy(msg.data.get("strategy", "balanced"))
top_n = msg.data.get("top_n", 50)
market = msg.data.get("market", "all")
try:
run_id, results = await screener.screen(strategy=strategy, top_n=top_n, market=market)
for result in results:
await publish(OUTPUT_STREAM, {
"run_id": str(run_id),
**result.model_dump(),
})
await ack(TRIGGER_STREAM, GROUP, msg.message_id)
logger.info("screening_completed", run_id=str(run_id), results=len(results))
except Exception as exc:
logger.error("screening_failed", error=str(exc))
except Exception as exc:
logger.error("worker_loop_crashed", error=str(exc))
await close_pg_pool()
await asyncio.sleep(10)
logger.info("worker_loop_restarting")
async def run() -> None:
configure_logging(level=settings.log_level, log_format=settings.log_format)
logger.info("screener_starting")
config = uvicorn.Config(app, host="0.0.0.0", port=8004, log_level="info")
server = uvicorn.Server(config)
await asyncio.gather(
worker_loop(),
server.serve(),
return_exceptions=True,
)
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()