commit f0c4060aed6925ca49470a7663e8534272a79fa1 Author: yakenator Date: Mon Feb 23 13:50:38 2026 +0900 feat: shared library for stock analysis microservices - Common models (Stock, DailyPrice, Valuation, ScreeningResult) - Database connectors (PostgreSQL via asyncpg, MongoDB via motor, Redis) - Redis Streams pub/sub utilities - Base collector class with common patterns - Logging configuration with structlog Co-Authored-By: Claude Opus 4.6 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..176bbe5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "stock-common" +version = "0.1.0" +description = "Shared models, config, DB/Redis utilities for stock analysis microservices" +requires-python = ">=3.12" +dependencies = [ + "pydantic>=2.0", + "pydantic-settings>=2.0", + "asyncpg>=0.29", + "motor>=3.3", + "redis>=5.0", + "structlog>=24.0", + "aiohttp>=3.9", + "aiolimiter>=1.1", + "tenacity>=8.2", + "fastapi>=0.115", + "uvicorn[standard]>=0.34", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_common"] diff --git a/src/stock_common/__init__.py b/src/stock_common/__init__.py new file mode 100644 index 0000000..82e8daf --- /dev/null +++ b/src/stock_common/__init__.py @@ -0,0 +1,3 @@ +"""Shared library for stock analysis microservices.""" + +__version__ = "0.1.0" diff --git a/src/stock_common/api_base.py b/src/stock_common/api_base.py new file mode 100644 index 0000000..c296118 --- /dev/null +++ b/src/stock_common/api_base.py @@ -0,0 +1,98 @@ +"""Shared FastAPI app factory and common endpoints for all microservices. + +Each service calls `create_app()` to get a FastAPI instance with: +- /health - liveness/readiness check +- /streams - Redis stream queue lengths +""" + +import time +from contextlib import asynccontextmanager +from typing import AsyncGenerator, Callable, Awaitable + +import structlog +import uvicorn +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from stock_common.queue.redis_client import get_redis, close_redis + +logger = structlog.get_logger(module="api_base") + +_start_time: float = 0.0 + + +def create_app( + title: str, + version: str = "0.1.0", + streams: list[str] | None = None, + on_startup: Callable[[], Awaitable[None]] | None = None, + on_shutdown: Callable[[], Awaitable[None]] | None = None, +) -> FastAPI: + """Create a FastAPI app with common health/stream endpoints. + + Args: + title: Service name (e.g., "stock-dart-collector"). + version: API version. + streams: Redis stream names this service reads/writes. + on_startup: Optional async callback run on startup. + on_shutdown: Optional async callback run on shutdown. + """ + + @asynccontextmanager + async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + global _start_time + _start_time = time.time() + if on_startup: + await on_startup() + logger.info("api_started", service=title) + yield + if on_shutdown: + await on_shutdown() + await close_redis() + logger.info("api_stopped", service=title) + + app = FastAPI(title=title, version=version, lifespan=lifespan) + + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + ) + + @app.get("/health") + async def health(): + uptime = time.time() - _start_time + try: + r = await get_redis() + await r.ping() + redis_ok = True + except Exception: + redis_ok = False + return { + "service": title, + "status": "ok" if redis_ok else "degraded", + "redis": redis_ok, + "uptime_seconds": round(uptime, 1), + } + + @app.get("/streams") + async def stream_info(): + if not streams: + return {"streams": {}} + r = await get_redis() + info = {} + for stream_name in streams: + try: + length = await r.xlen(stream_name) + info[stream_name] = {"length": length} + except Exception: + info[stream_name] = {"length": -1, "error": "stream not found"} + return {"streams": info} + + return app + + +def run_api(app: FastAPI, host: str = "0.0.0.0", port: int = 8000) -> None: + """Run the FastAPI app with uvicorn (blocking).""" + uvicorn.run(app, host=host, port=port, log_level="info") diff --git a/src/stock_common/collector_base.py b/src/stock_common/collector_base.py new file mode 100644 index 0000000..3f5fe12 --- /dev/null +++ b/src/stock_common/collector_base.py @@ -0,0 +1,105 @@ +"""Abstract base collector with rate limiting, retry, and session management.""" + +from __future__ import annotations + +import asyncio +import logging +from abc import ABC, abstractmethod +from typing import Any + +import aiohttp +import structlog +from aiolimiter import AsyncLimiter +from tenacity import ( + before_sleep_log, + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +_retry_logger = logging.getLogger("stock_common.collector_base") + + +class CollectorError(Exception): + """Base exception for collector errors.""" + + +class RateLimitExceededError(CollectorError): + """Raised when API rate limit is exceeded.""" + + +class AuthenticationError(CollectorError): + """Raised when API authentication fails.""" + + +class BaseCollector(ABC): + """Abstract base class for all data collectors.""" + + def __init__(self, rate_limit: float = 10.0, timeout: int = 30) -> None: + self._local_limiter = AsyncLimiter(rate_limit, 1) + self.timeout = aiohttp.ClientTimeout(total=timeout) + self._session: aiohttp.ClientSession | None = None + self.logger = structlog.get_logger(collector=self.__class__.__name__) + + async def __aenter__(self) -> BaseCollector: + self._session = aiohttp.ClientSession(timeout=self.timeout) + return self + + async def __aexit__(self, *args: Any) -> None: + if self._session: + await self._session.close() + self._session = None + + @property + def session(self) -> aiohttp.ClientSession: + if self._session is None: + raise RuntimeError(f"{self.__class__.__name__} must be used as async context manager") + return self._session + + @property + def limiter(self): + return self._local_limiter + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)), + before_sleep=before_sleep_log(_retry_logger, logging.WARNING), + ) + async def _request(self, method: str, url: str, **kwargs: Any) -> dict | str: + async with self.limiter: + self.logger.info("api_request", method=method, url=url) + async with self.session.request(method, url, **kwargs) as resp: + if resp.status == 429: + retry_after = int(resp.headers.get("Retry-After", "5")) + self.logger.warning("rate_limit_hit", retry_after=retry_after) + await asyncio.sleep(retry_after) + raise RateLimitExceededError(f"Rate limited, retry after {retry_after}s") + resp.raise_for_status() + content_type = resp.content_type or "" + if "json" in content_type: + return await resp.json() + return await resp.text() + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)), + before_sleep=before_sleep_log(_retry_logger, logging.WARNING), + ) + async def _download_binary(self, url: str, **kwargs: Any) -> bytes: + async with self.limiter: + self.logger.info("api_request_binary", url=url) + async with self.session.get(url, **kwargs) as resp: + if resp.status == 429: + retry_after = int(resp.headers.get("Retry-After", "5")) + await asyncio.sleep(retry_after) + raise RateLimitExceededError(f"Rate limited, retry after {retry_after}s") + resp.raise_for_status() + return await resp.read() + + @abstractmethod + async def collect(self, **kwargs: Any) -> None: + """Execute data collection. Must be implemented by subclasses.""" + ... diff --git a/src/stock_common/config.py b/src/stock_common/config.py new file mode 100644 index 0000000..160b0e4 --- /dev/null +++ b/src/stock_common/config.py @@ -0,0 +1,77 @@ +"""Centralized configuration using pydantic-settings.""" + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """Application settings loaded from environment variables and .env file.""" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + ) + + # -- Database: PostgreSQL -- + postgres_host: str = "localhost" + postgres_port: int = 5432 + postgres_db: str = "stock_analysis" + postgres_user: str = "stock_user" + postgres_password: str = "change_me_in_production" + + # -- Database: MongoDB -- + mongodb_uri: str = "mongodb://localhost:27017" + mongodb_db: str = "stock_analysis" + + # -- Redis -- + redis_url: str = "redis://localhost:6379/0" + + # -- API Keys -- + dart_api_key: str = "" + kis_app_key: str = "" + kis_app_secret: str = "" + kis_account_no: str = "" + kis_base_url: str = "https://openapi.koreainvestment.com:9443" + anthropic_api_key: str = "" + + # -- Rate Limits (requests per second) -- + dart_rate_limit: float = 10.0 + kis_rate_limit: float = 20.0 + news_rate_limit: float = 1.0 + llm_rate_limit: float = 5.0 + + # -- Kakao Talk Notifications -- + kakao_rest_api_key: str = "" + kakao_access_token: str = "" + kakao_refresh_token: str = "" + + # -- Trading -- + trading_mode: str = "paper" + kis_paper_base_url: str = "https://openapivts.koreainvestment.com:29443" + kis_account_cano: str = "" + kis_account_prdt_cd: str = "01" + + # -- Screening Defaults -- + default_strategy: str = "balanced" + default_top_n: int = 50 + default_market: str = "all" + + # -- Logging -- + log_level: str = "INFO" + log_format: str = "json" + + @property + def kis_trading_base_url(self) -> str: + if self.trading_mode == "paper": + return self.kis_paper_base_url + return self.kis_base_url + + @property + def postgres_dsn(self) -> str: + return ( + f"postgresql://{self.postgres_user}:{self.postgres_password}" + f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}" + ) + + +settings = Settings() diff --git a/src/stock_common/database/__init__.py b/src/stock_common/database/__init__.py new file mode 100644 index 0000000..ab24c7e --- /dev/null +++ b/src/stock_common/database/__init__.py @@ -0,0 +1,18 @@ +"""Database connection management for PostgreSQL and MongoDB.""" + +from stock_common.database.mongodb import ( + close_mongo_client, + ensure_indexes, + get_mongo_client, + get_mongo_database, +) +from stock_common.database.postgres import close_pg_pool, get_pg_pool + +__all__ = [ + "close_mongo_client", + "close_pg_pool", + "ensure_indexes", + "get_mongo_client", + "get_mongo_database", + "get_pg_pool", +] diff --git a/src/stock_common/database/mongodb.py b/src/stock_common/database/mongodb.py new file mode 100644 index 0000000..73c41b4 --- /dev/null +++ b/src/stock_common/database/mongodb.py @@ -0,0 +1,43 @@ +"""Async MongoDB connection management using motor.""" + +import structlog +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase + +from stock_common.config import settings + +logger = structlog.get_logger(module="database.mongodb") + +_client: AsyncIOMotorClient | None = None + + +def get_mongo_client() -> AsyncIOMotorClient: + global _client + if _client is None: + _client = AsyncIOMotorClient(settings.mongodb_uri) + logger.info("mongodb_client_created", uri=settings.mongodb_uri) + return _client + + +def get_mongo_database() -> AsyncIOMotorDatabase: + client = get_mongo_client() + return client[settings.mongodb_db] + + +async def ensure_indexes() -> None: + db = get_mongo_database() + await db.disclosures.create_index("dart_id", unique=True) + await db.disclosures.create_index([("stock_code", 1), ("disclosed_at", -1)]) + await db.news.create_index("url", unique=True) + await db.news.create_index([("stock_codes", 1), ("published_at", -1)]) + await db.llm_analysis.create_index("analysis_id", unique=True) + await db.llm_analysis.create_index([("stock_code", 1), ("analyzed_at", -1)]) + await db.llm_analysis.create_index("screening_run_id") + logger.info("mongodb_indexes_ensured") + + +async def close_mongo_client() -> None: + global _client + if _client is not None: + _client.close() + _client = None + logger.info("mongodb_client_closed") diff --git a/src/stock_common/database/postgres.py b/src/stock_common/database/postgres.py new file mode 100644 index 0000000..4cfbc29 --- /dev/null +++ b/src/stock_common/database/postgres.py @@ -0,0 +1,35 @@ +"""Async PostgreSQL connection pool management using asyncpg.""" + +import asyncpg +import structlog + +from stock_common.config import settings + +logger = structlog.get_logger(module="database.postgres") + +_pool: asyncpg.Pool | None = None + + +async def get_pg_pool() -> asyncpg.Pool: + global _pool + if _pool is None: + try: + _pool = await asyncpg.create_pool( + dsn=settings.postgres_dsn, + min_size=2, + max_size=10, + command_timeout=60, + ) + logger.info("postgres_pool_created", host=settings.postgres_host, db=settings.postgres_db) + except Exception as exc: + logger.error("postgres_pool_creation_failed", error=str(exc)) + raise ConnectionError(f"Failed to connect to PostgreSQL: {exc}") from exc + return _pool + + +async def close_pg_pool() -> None: + global _pool + if _pool is not None: + await _pool.close() + _pool = None + logger.info("postgres_pool_closed") diff --git a/src/stock_common/logging_config.py b/src/stock_common/logging_config.py new file mode 100644 index 0000000..a3f62fa --- /dev/null +++ b/src/stock_common/logging_config.py @@ -0,0 +1,32 @@ +"""Structured logging configuration using structlog.""" + +import logging + +import structlog + + +def configure_logging(level: str = "INFO", log_format: str = "json") -> None: + """Configure structlog for the application.""" + log_level = getattr(logging, level.upper(), logging.INFO) + + shared_processors: list[structlog.types.Processor] = [ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + ] + + if log_format == "json": + renderer: structlog.types.Processor = structlog.processors.JSONRenderer() + else: + renderer = structlog.dev.ConsoleRenderer() + + structlog.configure( + processors=[*shared_processors, renderer], + wrapper_class=structlog.make_filtering_bound_logger(log_level), + context_class=dict, + logger_factory=structlog.PrintLoggerFactory(), + cache_logger_on_first_use=True, + ) diff --git a/src/stock_common/models/__init__.py b/src/stock_common/models/__init__.py new file mode 100644 index 0000000..a793cde --- /dev/null +++ b/src/stock_common/models/__init__.py @@ -0,0 +1,43 @@ +"""Pydantic data models for all entities.""" + +from stock_common.models.analysis import LLMAnalysis, Recommendation +from stock_common.models.disclosure import Disclosure, Sentiment +from stock_common.models.financial import Financial, PeriodType +from stock_common.models.news import ( + News, + OGMeta, + OpportunityCategory, + OpportunitySignal, + OpportunityStrength, + RiskCategory, + RiskSeverity, + RiskSignal, +) +from stock_common.models.price import DailyPrice +from stock_common.models.screening import ScreeningResult, ScreeningStrategy, STRATEGY_WEIGHTS +from stock_common.models.stock import Market, Stock +from stock_common.models.valuation import Valuation + +__all__ = [ + "DailyPrice", + "Disclosure", + "Financial", + "LLMAnalysis", + "Market", + "News", + "OGMeta", + "OpportunityCategory", + "OpportunitySignal", + "OpportunityStrength", + "PeriodType", + "Recommendation", + "RiskCategory", + "RiskSeverity", + "RiskSignal", + "ScreeningResult", + "ScreeningStrategy", + "Sentiment", + "Stock", + "STRATEGY_WEIGHTS", + "Valuation", +] diff --git a/src/stock_common/models/analysis.py b/src/stock_common/models/analysis.py new file mode 100644 index 0000000..6e91f0f --- /dev/null +++ b/src/stock_common/models/analysis.py @@ -0,0 +1,33 @@ +"""LLM analysis result data model (MongoDB).""" + +from datetime import datetime +from enum import StrEnum +from uuid import UUID + +from pydantic import BaseModel, Field + + +class Recommendation(StrEnum): + STRONG_BUY = "STRONG_BUY" + BUY = "BUY" + HOLD = "HOLD" + SELL = "SELL" + STRONG_SELL = "STRONG_SELL" + + +class LLMAnalysis(BaseModel): + """Represents an LLM-generated qualitative analysis of a stock.""" + + analysis_id: UUID = Field(..., description="Unique analysis identifier") + stock_code: str = Field(..., max_length=10, description="KRX stock code") + screening_run_id: UUID | None = Field(None, description="Associated screening run ID") + summary: str = Field("", description="Investment thesis summary") + valuation_comment: str = Field("", description="Valuation assessment") + risk_factors: list[str] = Field(default_factory=list, description="Identified risk factors") + catalysts: list[str] = Field(default_factory=list, description="Identified upside catalysts") + recommendation: Recommendation = Field( + Recommendation.HOLD, description="Investment recommendation" + ) + confidence: float = Field(0.5, ge=0.0, le=1.0, description="Confidence score (0.0 - 1.0)") + analyzed_at: datetime | None = Field(None, description="Analysis datetime") + model_name: str = Field("", description="LLM model used for analysis") diff --git a/src/stock_common/models/disclosure.py b/src/stock_common/models/disclosure.py new file mode 100644 index 0000000..b4739ee --- /dev/null +++ b/src/stock_common/models/disclosure.py @@ -0,0 +1,28 @@ +"""DART disclosure data model (MongoDB).""" + +from datetime import datetime +from enum import StrEnum + +from pydantic import BaseModel, Field + + +class Sentiment(StrEnum): + """Sentiment classification for disclosures and news.""" + + POSITIVE = "POSITIVE" + NEUTRAL = "NEUTRAL" + NEGATIVE = "NEGATIVE" + + +class Disclosure(BaseModel): + """Represents a DART corporate disclosure.""" + + dart_id: str = Field(..., description="Unique DART disclosure ID") + stock_code: str = Field(..., max_length=10, description="KRX stock code") + title: str = Field(..., description="Disclosure title") + category: str = Field("", description="Disclosure category") + content_summary: str = Field("", description="Summarized content") + content_raw_url: str = Field("", description="URL to full disclosure document") + sentiment: Sentiment = Field(Sentiment.NEUTRAL, description="Sentiment classification") + disclosed_at: datetime | None = Field(None, description="Disclosure datetime") + processed_at: datetime | None = Field(None, description="Processing datetime") diff --git a/src/stock_common/models/financial.py b/src/stock_common/models/financial.py new file mode 100644 index 0000000..159691b --- /dev/null +++ b/src/stock_common/models/financial.py @@ -0,0 +1,33 @@ +"""Financial statement data model.""" + +from datetime import date +from enum import StrEnum + +from pydantic import BaseModel, Field + + +class PeriodType(StrEnum): + """Financial reporting period type.""" + + ANNUAL = "ANNUAL" + QUARTER = "QUARTER" + + +class Financial(BaseModel): + """Represents a financial statement record for a stock.""" + + id: int | None = Field(None, description="Auto-increment ID (set by DB)") + stock_code: str = Field(..., max_length=10, description="KRX stock code") + fiscal_year: int = Field(..., description="Fiscal year (e.g., 2024)") + fiscal_quarter: int = Field(..., ge=1, le=4, description="Quarter (1-4)") + period_type: PeriodType = Field(..., description="ANNUAL or QUARTER") + revenue: int | None = Field(None, description="Total revenue (KRW)") + operating_profit: int | None = Field(None, description="Operating profit (KRW)") + net_income: int | None = Field(None, description="Net income (KRW)") + total_assets: int | None = Field(None, description="Total assets (KRW)") + total_liabilities: int | None = Field(None, description="Total liabilities (KRW)") + total_equity: int | None = Field(None, description="Total equity (KRW)") + operating_cashflow: int | None = Field(None, description="Operating cash flow (KRW)") + free_cashflow: int | None = Field(None, description="Free cash flow (KRW)") + source: str = Field("DART", max_length=20, description="Data source identifier") + reported_at: date | None = Field(None, description="Report filing date") diff --git a/src/stock_common/models/news.py b/src/stock_common/models/news.py new file mode 100644 index 0000000..66420a7 --- /dev/null +++ b/src/stock_common/models/news.py @@ -0,0 +1,108 @@ +"""News article data model (MongoDB).""" + +from datetime import datetime +from enum import Enum, StrEnum + +from pydantic import BaseModel, Field + +from stock_common.models.disclosure import Sentiment + + +class RiskCategory(StrEnum): + GOVERNANCE = "governance" + FINANCIAL = "financial" + LEGAL = "legal" + REGULATORY = "regulatory" + MARKET = "market" + OPERATIONAL = "operational" + GEOPOLITICAL = "geopolitical" + + +class RiskSeverity(int, Enum): + LEVEL_10 = 10 + LEVEL_9 = 9 + LEVEL_8 = 8 + LEVEL_7 = 7 + LEVEL_6 = 6 + LEVEL_5 = 5 + LEVEL_4 = 4 + LEVEL_3 = 3 + LEVEL_2 = 2 + LEVEL_1 = 1 + + +class RiskSignal(BaseModel): + keyword: str = Field(..., description="Matched keyword") + category: RiskCategory = Field(..., description="Risk category") + severity: RiskSeverity = Field(..., description="Severity level") + description: str = Field("", description="Human-readable description") + + +class OGMeta(BaseModel): + title: str = Field("", description="og:title") + description: str = Field("", description="og:description") + image: str = Field("", description="og:image URL") + site_name: str = Field("", description="og:site_name (publisher)") + url: str = Field("", description="og:url (canonical URL)") + + +class OpportunityCategory(StrEnum): + EARNINGS = "earnings" + GROWTH = "growth" + VALUATION = "valuation" + SHAREHOLDER = "shareholder" + CONTRACT = "contract" + TECHNOLOGY = "technology" + POLICY = "policy" + + +class OpportunityStrength(int, Enum): + LEVEL_10 = 10 + LEVEL_9 = 9 + LEVEL_8 = 8 + LEVEL_7 = 7 + LEVEL_6 = 6 + LEVEL_5 = 5 + LEVEL_4 = 4 + LEVEL_3 = 3 + LEVEL_2 = 2 + LEVEL_1 = 1 + + +class OpportunitySignal(BaseModel): + keyword: str = Field(..., description="Matched keyword") + category: OpportunityCategory = Field(..., description="Opportunity category") + strength: OpportunityStrength = Field(..., description="Signal strength") + description: str = Field("", description="Human-readable description") + + +class News(BaseModel): + """Represents a news article related to stocks.""" + + url: str = Field(..., description="Unique URL of the news article") + title: str = Field(..., description="Article title") + content: str = Field("", description="Article content or excerpt") + stock_codes: list[str] = Field(default_factory=list, description="Related stock codes") + source: str = Field("", description="News source") + sentiment: Sentiment = Field(Sentiment.NEUTRAL, description="Sentiment classification") + sentiment_score: float = Field(0.0, description="Sentiment score (-1.0 ~ +1.0)") + risk_signals: list[RiskSignal] = Field(default_factory=list) + opportunity_signals: list[OpportunitySignal] = Field(default_factory=list) + risk_keywords: list[str] = Field(default_factory=list) + opportunity_keywords: list[str] = Field(default_factory=list) + og_meta: OGMeta | None = Field(None, description="Open Graph metadata") + source_topic: str = Field("", description="Macro topic query that found this article") + published_at: datetime | None = Field(None, description="Publication datetime") + collected_at: datetime | None = Field(None, description="Collection datetime") + + @property + def max_risk_severity(self) -> RiskSeverity | None: + if not self.risk_signals: + return None + return max(self.risk_signals, key=lambda s: s.severity.value).severity + + @property + def max_opportunity_strength(self) -> OpportunityStrength | None: + if not self.opportunity_signals: + return None + return max(self.opportunity_signals, key=lambda s: s.strength.value).strength diff --git a/src/stock_common/models/price.py b/src/stock_common/models/price.py new file mode 100644 index 0000000..56c0319 --- /dev/null +++ b/src/stock_common/models/price.py @@ -0,0 +1,22 @@ +"""Daily price data model.""" + +from __future__ import annotations + +import datetime +from decimal import Decimal + +from pydantic import BaseModel, Field + + +class DailyPrice(BaseModel): + """Represents daily OHLCV price data for a stock.""" + + stock_code: str = Field(..., max_length=10, description="KRX stock code") + date: datetime.date = Field(..., description="Trading date") + open: Decimal | None = Field(None, description="Opening price (KRW)") + high: Decimal | None = Field(None, description="High price (KRW)") + low: Decimal | None = Field(None, description="Low price (KRW)") + close: Decimal | None = Field(None, description="Closing price (KRW)") + volume: int | None = Field(None, description="Trading volume") + market_cap: int | None = Field(None, description="Market capitalization (KRW)") + source: str = Field("KIS", max_length=20, description="Data source identifier") diff --git a/src/stock_common/models/screening.py b/src/stock_common/models/screening.py new file mode 100644 index 0000000..faa5d9c --- /dev/null +++ b/src/stock_common/models/screening.py @@ -0,0 +1,51 @@ +"""Screening result data model.""" + +from datetime import datetime +from decimal import Decimal +from enum import StrEnum +from uuid import UUID + +from pydantic import BaseModel, Field + + +class ScreeningStrategy(StrEnum): + VALUE = "value" + GROWTH = "growth" + QUALITY = "quality" + BALANCED = "balanced" + + +STRATEGY_WEIGHTS: dict[ScreeningStrategy, dict[str, float]] = { + ScreeningStrategy.VALUE: { + "per": 0.25, "pbr": 0.25, "peg": 0.10, + "roe": 0.10, "fcf_yield": 0.20, "debt_ratio": 0.10, + }, + ScreeningStrategy.GROWTH: { + "per": 0.10, "pbr": 0.05, "peg": 0.30, + "roe": 0.15, "fcf_yield": 0.10, "debt_ratio": 0.10, "ev_ebitda": 0.20, + }, + ScreeningStrategy.QUALITY: { + "per": 0.10, "pbr": 0.10, "peg": 0.10, + "roe": 0.30, "fcf_yield": 0.15, "debt_ratio": 0.25, + }, + ScreeningStrategy.BALANCED: { + "per": 0.15, "pbr": 0.15, "peg": 0.15, + "roe": 0.15, "fcf_yield": 0.15, "debt_ratio": 0.15, "ev_ebitda": 0.10, + }, +} + + +class ScreeningResult(BaseModel): + id: int | None = Field(None, description="Auto-increment ID (set by DB)") + run_id: UUID = Field(..., description="Screening run identifier") + stock_code: str = Field(..., max_length=10, description="KRX stock code") + strategy: ScreeningStrategy = Field(..., description="Strategy used") + composite_score: Decimal = Field(..., description="Overall weighted score") + per_score: Decimal | None = Field(None) + pbr_score: Decimal | None = Field(None) + peg_score: Decimal | None = Field(None) + roe_score: Decimal | None = Field(None) + fcf_yield_score: Decimal | None = Field(None) + debt_ratio_score: Decimal | None = Field(None) + rank: int | None = Field(None, description="Rank within this run") + screened_at: datetime | None = Field(None, description="Screening timestamp") diff --git a/src/stock_common/models/stock.py b/src/stock_common/models/stock.py new file mode 100644 index 0000000..513d46d --- /dev/null +++ b/src/stock_common/models/stock.py @@ -0,0 +1,26 @@ +"""Stock master data model.""" + +from datetime import date, datetime +from enum import StrEnum + +from pydantic import BaseModel, Field + + +class Market(StrEnum): + """Korean stock market identifiers.""" + + KOSPI = "KOSPI" + KOSDAQ = "KOSDAQ" + + +class Stock(BaseModel): + """Represents a listed stock on KOSPI or KOSDAQ.""" + + stock_code: str = Field(..., max_length=10, description="KRX stock code (e.g., '005930')") + stock_name: str = Field(..., max_length=100, description="Company name (Korean)") + market: Market = Field(..., description="Market: KOSPI or KOSDAQ") + sector: str | None = Field(None, max_length=100, description="Industry sector") + industry: str | None = Field(None, max_length=100, description="Specific industry") + listing_date: date | None = Field(None, description="IPO date") + is_active: bool = Field(True, description="Whether currently trading") + updated_at: datetime | None = Field(None, description="Last update timestamp") diff --git a/src/stock_common/models/valuation.py b/src/stock_common/models/valuation.py new file mode 100644 index 0000000..e005123 --- /dev/null +++ b/src/stock_common/models/valuation.py @@ -0,0 +1,25 @@ +"""Valuation metrics data model.""" + +from __future__ import annotations + +import datetime +from decimal import Decimal + +from pydantic import BaseModel, Field + + +class Valuation(BaseModel): + """Represents calculated valuation metrics for a stock on a given date.""" + + stock_code: str = Field(..., max_length=10, description="KRX stock code") + date: datetime.date = Field(..., description="Calculation date") + per: Decimal | None = Field(None, description="Price-to-Earnings Ratio") + pbr: Decimal | None = Field(None, description="Price-to-Book Ratio") + psr: Decimal | None = Field(None, description="Price-to-Sales Ratio") + peg: Decimal | None = Field(None, description="PEG Ratio") + ev_ebitda: Decimal | None = Field(None, description="EV/EBITDA") + roe: Decimal | None = Field(None, description="Return on Equity (%)") + roa: Decimal | None = Field(None, description="Return on Assets (%)") + debt_ratio: Decimal | None = Field(None, description="Debt Ratio (%)") + fcf_yield: Decimal | None = Field(None, description="Free Cash Flow Yield (%)") + calculated_at: datetime.datetime | None = Field(None, description="Calculation timestamp") diff --git a/src/stock_common/queue/__init__.py b/src/stock_common/queue/__init__.py new file mode 100644 index 0000000..8e5290a --- /dev/null +++ b/src/stock_common/queue/__init__.py @@ -0,0 +1,13 @@ +"""Redis queue utilities for inter-service communication via Redis Streams.""" + +from stock_common.queue.redis_client import close_redis, get_redis +from stock_common.queue.streams import ack, publish, consume, StreamMessage + +__all__ = [ + "StreamMessage", + "ack", + "close_redis", + "consume", + "get_redis", + "publish", +] diff --git a/src/stock_common/queue/redis_client.py b/src/stock_common/queue/redis_client.py new file mode 100644 index 0000000..9ea11a7 --- /dev/null +++ b/src/stock_common/queue/redis_client.py @@ -0,0 +1,31 @@ +"""Redis connection management (singleton pattern).""" + +import structlog +from redis.asyncio import Redis + +from stock_common.config import settings + +logger = structlog.get_logger(module="queue.redis") + +_redis: Redis | None = None + + +async def get_redis() -> Redis: + global _redis + if _redis is None: + _redis = Redis.from_url( + settings.redis_url, + decode_responses=True, + max_connections=20, + ) + await _redis.ping() + logger.info("redis_connected", url=settings.redis_url) + return _redis + + +async def close_redis() -> None: + global _redis + if _redis is not None: + await _redis.aclose() + _redis = None + logger.info("redis_closed") diff --git a/src/stock_common/queue/streams.py b/src/stock_common/queue/streams.py new file mode 100644 index 0000000..5bb84a0 --- /dev/null +++ b/src/stock_common/queue/streams.py @@ -0,0 +1,92 @@ +"""Redis Streams helpers for simple publish/consume pipeline. + +Each service does: pull from Redis -> process -> push to Redis. +""" + +import json +from dataclasses import dataclass +from typing import Any + +import structlog +from redis.asyncio import Redis + +from stock_common.queue.redis_client import get_redis + +logger = structlog.get_logger(module="queue.streams") + + +@dataclass +class StreamMessage: + """A message read from a Redis Stream.""" + + message_id: str + stream: str + data: dict[str, Any] + + +async def publish(stream: str, data: dict[str, Any], redis: Redis | None = None) -> str: + """Publish a message to a Redis Stream. + + Args: + stream: Stream name (e.g., "queue:raw-prices"). + data: Message payload (will be JSON-serialized). + redis: Optional Redis client (uses singleton if not provided). + + Returns: + The message ID assigned by Redis. + """ + r = redis or await get_redis() + message_id = await r.xadd(stream, {"payload": json.dumps(data, default=str)}) + logger.debug("stream_published", stream=stream, message_id=message_id) + return message_id + + +async def consume( + stream: str, + group: str, + consumer: str, + count: int = 10, + block: int = 5000, + redis: Redis | None = None, +) -> list[StreamMessage]: + """Consume messages from a Redis Stream using consumer groups. + + Automatically creates the consumer group if it doesn't exist. + + Args: + stream: Stream name. + group: Consumer group name. + consumer: Consumer name within the group. + count: Max messages to read per call. + block: Block timeout in milliseconds (0 = forever). + redis: Optional Redis client. + + Returns: + List of StreamMessage objects. + """ + r = redis or await get_redis() + + # Ensure consumer group exists + try: + await r.xgroup_create(stream, group, id="0", mkstream=True) + except Exception: + pass # Group already exists + + results = await r.xreadgroup(group, consumer, {stream: ">"}, count=count, block=block) + + messages = [] + for stream_name, entries in results: + for message_id, fields in entries: + try: + payload = json.loads(fields.get("payload", "{}")) + except json.JSONDecodeError: + payload = fields + messages.append(StreamMessage(message_id=message_id, stream=stream_name, data=payload)) + + return messages + + +async def ack(stream: str, group: str, message_id: str, redis: Redis | None = None) -> None: + """Acknowledge a message (mark as processed).""" + r = redis or await get_redis() + await r.xack(stream, group, message_id)