feat: shared library for stock analysis microservices
- Common models (Stock, DailyPrice, Valuation, ScreeningResult) - Database connectors (PostgreSQL via asyncpg, MongoDB via motor, Redis) - Redis Streams pub/sub utilities - Base collector class with common patterns - Logging configuration with structlog Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
8
.dockerignore
Normal file
8
.dockerignore
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
__pycache__
|
||||||
|
*.pyc
|
||||||
|
.git
|
||||||
|
.venv
|
||||||
|
*.egg-info
|
||||||
|
dist
|
||||||
|
build
|
||||||
|
.env
|
||||||
22
.gitignore
vendored
Normal file
22
.gitignore
vendored
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
*.so
|
||||||
|
*.egg-info/
|
||||||
|
dist/
|
||||||
|
build/
|
||||||
|
.eggs/
|
||||||
|
.venv/
|
||||||
|
venv/
|
||||||
|
.env
|
||||||
|
.vscode/
|
||||||
|
.idea/
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
.DS_Store
|
||||||
|
.pytest_cache/
|
||||||
|
htmlcov/
|
||||||
|
.coverage
|
||||||
|
coverage.xml
|
||||||
|
.mypy_cache/
|
||||||
|
.claude/
|
||||||
25
pyproject.toml
Normal file
25
pyproject.toml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
[project]
|
||||||
|
name = "stock-common"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Shared models, config, DB/Redis utilities for stock analysis microservices"
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
dependencies = [
|
||||||
|
"pydantic>=2.0",
|
||||||
|
"pydantic-settings>=2.0",
|
||||||
|
"asyncpg>=0.29",
|
||||||
|
"motor>=3.3",
|
||||||
|
"redis>=5.0",
|
||||||
|
"structlog>=24.0",
|
||||||
|
"aiohttp>=3.9",
|
||||||
|
"aiolimiter>=1.1",
|
||||||
|
"tenacity>=8.2",
|
||||||
|
"fastapi>=0.115",
|
||||||
|
"uvicorn[standard]>=0.34",
|
||||||
|
]
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["src/stock_common"]
|
||||||
3
src/stock_common/__init__.py
Normal file
3
src/stock_common/__init__.py
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
"""Shared library for stock analysis microservices."""
|
||||||
|
|
||||||
|
__version__ = "0.1.0"
|
||||||
98
src/stock_common/api_base.py
Normal file
98
src/stock_common/api_base.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
"""Shared FastAPI app factory and common endpoints for all microservices.
|
||||||
|
|
||||||
|
Each service calls `create_app()` to get a FastAPI instance with:
|
||||||
|
- /health - liveness/readiness check
|
||||||
|
- /streams - Redis stream queue lengths
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import AsyncGenerator, Callable, Awaitable
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
import uvicorn
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
|
from stock_common.queue.redis_client import get_redis, close_redis
|
||||||
|
|
||||||
|
logger = structlog.get_logger(module="api_base")
|
||||||
|
|
||||||
|
_start_time: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def create_app(
|
||||||
|
title: str,
|
||||||
|
version: str = "0.1.0",
|
||||||
|
streams: list[str] | None = None,
|
||||||
|
on_startup: Callable[[], Awaitable[None]] | None = None,
|
||||||
|
on_shutdown: Callable[[], Awaitable[None]] | None = None,
|
||||||
|
) -> FastAPI:
|
||||||
|
"""Create a FastAPI app with common health/stream endpoints.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
title: Service name (e.g., "stock-dart-collector").
|
||||||
|
version: API version.
|
||||||
|
streams: Redis stream names this service reads/writes.
|
||||||
|
on_startup: Optional async callback run on startup.
|
||||||
|
on_shutdown: Optional async callback run on shutdown.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||||
|
global _start_time
|
||||||
|
_start_time = time.time()
|
||||||
|
if on_startup:
|
||||||
|
await on_startup()
|
||||||
|
logger.info("api_started", service=title)
|
||||||
|
yield
|
||||||
|
if on_shutdown:
|
||||||
|
await on_shutdown()
|
||||||
|
await close_redis()
|
||||||
|
logger.info("api_stopped", service=title)
|
||||||
|
|
||||||
|
app = FastAPI(title=title, version=version, lifespan=lifespan)
|
||||||
|
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_methods=["*"],
|
||||||
|
allow_headers=["*"],
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health():
|
||||||
|
uptime = time.time() - _start_time
|
||||||
|
try:
|
||||||
|
r = await get_redis()
|
||||||
|
await r.ping()
|
||||||
|
redis_ok = True
|
||||||
|
except Exception:
|
||||||
|
redis_ok = False
|
||||||
|
return {
|
||||||
|
"service": title,
|
||||||
|
"status": "ok" if redis_ok else "degraded",
|
||||||
|
"redis": redis_ok,
|
||||||
|
"uptime_seconds": round(uptime, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
@app.get("/streams")
|
||||||
|
async def stream_info():
|
||||||
|
if not streams:
|
||||||
|
return {"streams": {}}
|
||||||
|
r = await get_redis()
|
||||||
|
info = {}
|
||||||
|
for stream_name in streams:
|
||||||
|
try:
|
||||||
|
length = await r.xlen(stream_name)
|
||||||
|
info[stream_name] = {"length": length}
|
||||||
|
except Exception:
|
||||||
|
info[stream_name] = {"length": -1, "error": "stream not found"}
|
||||||
|
return {"streams": info}
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def run_api(app: FastAPI, host: str = "0.0.0.0", port: int = 8000) -> None:
|
||||||
|
"""Run the FastAPI app with uvicorn (blocking)."""
|
||||||
|
uvicorn.run(app, host=host, port=port, log_level="info")
|
||||||
105
src/stock_common/collector_base.py
Normal file
105
src/stock_common/collector_base.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
"""Abstract base collector with rate limiting, retry, and session management."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
import structlog
|
||||||
|
from aiolimiter import AsyncLimiter
|
||||||
|
from tenacity import (
|
||||||
|
before_sleep_log,
|
||||||
|
retry,
|
||||||
|
retry_if_exception_type,
|
||||||
|
stop_after_attempt,
|
||||||
|
wait_exponential,
|
||||||
|
)
|
||||||
|
|
||||||
|
_retry_logger = logging.getLogger("stock_common.collector_base")
|
||||||
|
|
||||||
|
|
||||||
|
class CollectorError(Exception):
|
||||||
|
"""Base exception for collector errors."""
|
||||||
|
|
||||||
|
|
||||||
|
class RateLimitExceededError(CollectorError):
|
||||||
|
"""Raised when API rate limit is exceeded."""
|
||||||
|
|
||||||
|
|
||||||
|
class AuthenticationError(CollectorError):
|
||||||
|
"""Raised when API authentication fails."""
|
||||||
|
|
||||||
|
|
||||||
|
class BaseCollector(ABC):
|
||||||
|
"""Abstract base class for all data collectors."""
|
||||||
|
|
||||||
|
def __init__(self, rate_limit: float = 10.0, timeout: int = 30) -> None:
|
||||||
|
self._local_limiter = AsyncLimiter(rate_limit, 1)
|
||||||
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
||||||
|
self._session: aiohttp.ClientSession | None = None
|
||||||
|
self.logger = structlog.get_logger(collector=self.__class__.__name__)
|
||||||
|
|
||||||
|
async def __aenter__(self) -> BaseCollector:
|
||||||
|
self._session = aiohttp.ClientSession(timeout=self.timeout)
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *args: Any) -> None:
|
||||||
|
if self._session:
|
||||||
|
await self._session.close()
|
||||||
|
self._session = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> aiohttp.ClientSession:
|
||||||
|
if self._session is None:
|
||||||
|
raise RuntimeError(f"{self.__class__.__name__} must be used as async context manager")
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
@property
|
||||||
|
def limiter(self):
|
||||||
|
return self._local_limiter
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
wait=wait_exponential(multiplier=1, min=1, max=30),
|
||||||
|
retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)),
|
||||||
|
before_sleep=before_sleep_log(_retry_logger, logging.WARNING),
|
||||||
|
)
|
||||||
|
async def _request(self, method: str, url: str, **kwargs: Any) -> dict | str:
|
||||||
|
async with self.limiter:
|
||||||
|
self.logger.info("api_request", method=method, url=url)
|
||||||
|
async with self.session.request(method, url, **kwargs) as resp:
|
||||||
|
if resp.status == 429:
|
||||||
|
retry_after = int(resp.headers.get("Retry-After", "5"))
|
||||||
|
self.logger.warning("rate_limit_hit", retry_after=retry_after)
|
||||||
|
await asyncio.sleep(retry_after)
|
||||||
|
raise RateLimitExceededError(f"Rate limited, retry after {retry_after}s")
|
||||||
|
resp.raise_for_status()
|
||||||
|
content_type = resp.content_type or ""
|
||||||
|
if "json" in content_type:
|
||||||
|
return await resp.json()
|
||||||
|
return await resp.text()
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
wait=wait_exponential(multiplier=1, min=1, max=30),
|
||||||
|
retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)),
|
||||||
|
before_sleep=before_sleep_log(_retry_logger, logging.WARNING),
|
||||||
|
)
|
||||||
|
async def _download_binary(self, url: str, **kwargs: Any) -> bytes:
|
||||||
|
async with self.limiter:
|
||||||
|
self.logger.info("api_request_binary", url=url)
|
||||||
|
async with self.session.get(url, **kwargs) as resp:
|
||||||
|
if resp.status == 429:
|
||||||
|
retry_after = int(resp.headers.get("Retry-After", "5"))
|
||||||
|
await asyncio.sleep(retry_after)
|
||||||
|
raise RateLimitExceededError(f"Rate limited, retry after {retry_after}s")
|
||||||
|
resp.raise_for_status()
|
||||||
|
return await resp.read()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def collect(self, **kwargs: Any) -> None:
|
||||||
|
"""Execute data collection. Must be implemented by subclasses."""
|
||||||
|
...
|
||||||
77
src/stock_common/config.py
Normal file
77
src/stock_common/config.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
"""Centralized configuration using pydantic-settings."""
|
||||||
|
|
||||||
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
"""Application settings loaded from environment variables and .env file."""
|
||||||
|
|
||||||
|
model_config = SettingsConfigDict(
|
||||||
|
env_file=".env",
|
||||||
|
env_file_encoding="utf-8",
|
||||||
|
case_sensitive=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Database: PostgreSQL --
|
||||||
|
postgres_host: str = "localhost"
|
||||||
|
postgres_port: int = 5432
|
||||||
|
postgres_db: str = "stock_analysis"
|
||||||
|
postgres_user: str = "stock_user"
|
||||||
|
postgres_password: str = "change_me_in_production"
|
||||||
|
|
||||||
|
# -- Database: MongoDB --
|
||||||
|
mongodb_uri: str = "mongodb://localhost:27017"
|
||||||
|
mongodb_db: str = "stock_analysis"
|
||||||
|
|
||||||
|
# -- Redis --
|
||||||
|
redis_url: str = "redis://localhost:6379/0"
|
||||||
|
|
||||||
|
# -- API Keys --
|
||||||
|
dart_api_key: str = ""
|
||||||
|
kis_app_key: str = ""
|
||||||
|
kis_app_secret: str = ""
|
||||||
|
kis_account_no: str = ""
|
||||||
|
kis_base_url: str = "https://openapi.koreainvestment.com:9443"
|
||||||
|
anthropic_api_key: str = ""
|
||||||
|
|
||||||
|
# -- Rate Limits (requests per second) --
|
||||||
|
dart_rate_limit: float = 10.0
|
||||||
|
kis_rate_limit: float = 20.0
|
||||||
|
news_rate_limit: float = 1.0
|
||||||
|
llm_rate_limit: float = 5.0
|
||||||
|
|
||||||
|
# -- Kakao Talk Notifications --
|
||||||
|
kakao_rest_api_key: str = ""
|
||||||
|
kakao_access_token: str = ""
|
||||||
|
kakao_refresh_token: str = ""
|
||||||
|
|
||||||
|
# -- Trading --
|
||||||
|
trading_mode: str = "paper"
|
||||||
|
kis_paper_base_url: str = "https://openapivts.koreainvestment.com:29443"
|
||||||
|
kis_account_cano: str = ""
|
||||||
|
kis_account_prdt_cd: str = "01"
|
||||||
|
|
||||||
|
# -- Screening Defaults --
|
||||||
|
default_strategy: str = "balanced"
|
||||||
|
default_top_n: int = 50
|
||||||
|
default_market: str = "all"
|
||||||
|
|
||||||
|
# -- Logging --
|
||||||
|
log_level: str = "INFO"
|
||||||
|
log_format: str = "json"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def kis_trading_base_url(self) -> str:
|
||||||
|
if self.trading_mode == "paper":
|
||||||
|
return self.kis_paper_base_url
|
||||||
|
return self.kis_base_url
|
||||||
|
|
||||||
|
@property
|
||||||
|
def postgres_dsn(self) -> str:
|
||||||
|
return (
|
||||||
|
f"postgresql://{self.postgres_user}:{self.postgres_password}"
|
||||||
|
f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
18
src/stock_common/database/__init__.py
Normal file
18
src/stock_common/database/__init__.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
"""Database connection management for PostgreSQL and MongoDB."""
|
||||||
|
|
||||||
|
from stock_common.database.mongodb import (
|
||||||
|
close_mongo_client,
|
||||||
|
ensure_indexes,
|
||||||
|
get_mongo_client,
|
||||||
|
get_mongo_database,
|
||||||
|
)
|
||||||
|
from stock_common.database.postgres import close_pg_pool, get_pg_pool
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"close_mongo_client",
|
||||||
|
"close_pg_pool",
|
||||||
|
"ensure_indexes",
|
||||||
|
"get_mongo_client",
|
||||||
|
"get_mongo_database",
|
||||||
|
"get_pg_pool",
|
||||||
|
]
|
||||||
43
src/stock_common/database/mongodb.py
Normal file
43
src/stock_common/database/mongodb.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
"""Async MongoDB connection management using motor."""
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
|
||||||
|
|
||||||
|
from stock_common.config import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger(module="database.mongodb")
|
||||||
|
|
||||||
|
_client: AsyncIOMotorClient | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_mongo_client() -> AsyncIOMotorClient:
|
||||||
|
global _client
|
||||||
|
if _client is None:
|
||||||
|
_client = AsyncIOMotorClient(settings.mongodb_uri)
|
||||||
|
logger.info("mongodb_client_created", uri=settings.mongodb_uri)
|
||||||
|
return _client
|
||||||
|
|
||||||
|
|
||||||
|
def get_mongo_database() -> AsyncIOMotorDatabase:
|
||||||
|
client = get_mongo_client()
|
||||||
|
return client[settings.mongodb_db]
|
||||||
|
|
||||||
|
|
||||||
|
async def ensure_indexes() -> None:
|
||||||
|
db = get_mongo_database()
|
||||||
|
await db.disclosures.create_index("dart_id", unique=True)
|
||||||
|
await db.disclosures.create_index([("stock_code", 1), ("disclosed_at", -1)])
|
||||||
|
await db.news.create_index("url", unique=True)
|
||||||
|
await db.news.create_index([("stock_codes", 1), ("published_at", -1)])
|
||||||
|
await db.llm_analysis.create_index("analysis_id", unique=True)
|
||||||
|
await db.llm_analysis.create_index([("stock_code", 1), ("analyzed_at", -1)])
|
||||||
|
await db.llm_analysis.create_index("screening_run_id")
|
||||||
|
logger.info("mongodb_indexes_ensured")
|
||||||
|
|
||||||
|
|
||||||
|
async def close_mongo_client() -> None:
|
||||||
|
global _client
|
||||||
|
if _client is not None:
|
||||||
|
_client.close()
|
||||||
|
_client = None
|
||||||
|
logger.info("mongodb_client_closed")
|
||||||
35
src/stock_common/database/postgres.py
Normal file
35
src/stock_common/database/postgres.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
"""Async PostgreSQL connection pool management using asyncpg."""
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
from stock_common.config import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger(module="database.postgres")
|
||||||
|
|
||||||
|
_pool: asyncpg.Pool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_pg_pool() -> asyncpg.Pool:
|
||||||
|
global _pool
|
||||||
|
if _pool is None:
|
||||||
|
try:
|
||||||
|
_pool = await asyncpg.create_pool(
|
||||||
|
dsn=settings.postgres_dsn,
|
||||||
|
min_size=2,
|
||||||
|
max_size=10,
|
||||||
|
command_timeout=60,
|
||||||
|
)
|
||||||
|
logger.info("postgres_pool_created", host=settings.postgres_host, db=settings.postgres_db)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("postgres_pool_creation_failed", error=str(exc))
|
||||||
|
raise ConnectionError(f"Failed to connect to PostgreSQL: {exc}") from exc
|
||||||
|
return _pool
|
||||||
|
|
||||||
|
|
||||||
|
async def close_pg_pool() -> None:
|
||||||
|
global _pool
|
||||||
|
if _pool is not None:
|
||||||
|
await _pool.close()
|
||||||
|
_pool = None
|
||||||
|
logger.info("postgres_pool_closed")
|
||||||
32
src/stock_common/logging_config.py
Normal file
32
src/stock_common/logging_config.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
"""Structured logging configuration using structlog."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
|
||||||
|
def configure_logging(level: str = "INFO", log_format: str = "json") -> None:
|
||||||
|
"""Configure structlog for the application."""
|
||||||
|
log_level = getattr(logging, level.upper(), logging.INFO)
|
||||||
|
|
||||||
|
shared_processors: list[structlog.types.Processor] = [
|
||||||
|
structlog.contextvars.merge_contextvars,
|
||||||
|
structlog.processors.add_log_level,
|
||||||
|
structlog.processors.TimeStamper(fmt="iso"),
|
||||||
|
structlog.processors.StackInfoRenderer(),
|
||||||
|
structlog.processors.format_exc_info,
|
||||||
|
structlog.processors.UnicodeDecoder(),
|
||||||
|
]
|
||||||
|
|
||||||
|
if log_format == "json":
|
||||||
|
renderer: structlog.types.Processor = structlog.processors.JSONRenderer()
|
||||||
|
else:
|
||||||
|
renderer = structlog.dev.ConsoleRenderer()
|
||||||
|
|
||||||
|
structlog.configure(
|
||||||
|
processors=[*shared_processors, renderer],
|
||||||
|
wrapper_class=structlog.make_filtering_bound_logger(log_level),
|
||||||
|
context_class=dict,
|
||||||
|
logger_factory=structlog.PrintLoggerFactory(),
|
||||||
|
cache_logger_on_first_use=True,
|
||||||
|
)
|
||||||
43
src/stock_common/models/__init__.py
Normal file
43
src/stock_common/models/__init__.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
"""Pydantic data models for all entities."""
|
||||||
|
|
||||||
|
from stock_common.models.analysis import LLMAnalysis, Recommendation
|
||||||
|
from stock_common.models.disclosure import Disclosure, Sentiment
|
||||||
|
from stock_common.models.financial import Financial, PeriodType
|
||||||
|
from stock_common.models.news import (
|
||||||
|
News,
|
||||||
|
OGMeta,
|
||||||
|
OpportunityCategory,
|
||||||
|
OpportunitySignal,
|
||||||
|
OpportunityStrength,
|
||||||
|
RiskCategory,
|
||||||
|
RiskSeverity,
|
||||||
|
RiskSignal,
|
||||||
|
)
|
||||||
|
from stock_common.models.price import DailyPrice
|
||||||
|
from stock_common.models.screening import ScreeningResult, ScreeningStrategy, STRATEGY_WEIGHTS
|
||||||
|
from stock_common.models.stock import Market, Stock
|
||||||
|
from stock_common.models.valuation import Valuation
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"DailyPrice",
|
||||||
|
"Disclosure",
|
||||||
|
"Financial",
|
||||||
|
"LLMAnalysis",
|
||||||
|
"Market",
|
||||||
|
"News",
|
||||||
|
"OGMeta",
|
||||||
|
"OpportunityCategory",
|
||||||
|
"OpportunitySignal",
|
||||||
|
"OpportunityStrength",
|
||||||
|
"PeriodType",
|
||||||
|
"Recommendation",
|
||||||
|
"RiskCategory",
|
||||||
|
"RiskSeverity",
|
||||||
|
"RiskSignal",
|
||||||
|
"ScreeningResult",
|
||||||
|
"ScreeningStrategy",
|
||||||
|
"Sentiment",
|
||||||
|
"Stock",
|
||||||
|
"STRATEGY_WEIGHTS",
|
||||||
|
"Valuation",
|
||||||
|
]
|
||||||
33
src/stock_common/models/analysis.py
Normal file
33
src/stock_common/models/analysis.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
"""LLM analysis result data model (MongoDB)."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from enum import StrEnum
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class Recommendation(StrEnum):
|
||||||
|
STRONG_BUY = "STRONG_BUY"
|
||||||
|
BUY = "BUY"
|
||||||
|
HOLD = "HOLD"
|
||||||
|
SELL = "SELL"
|
||||||
|
STRONG_SELL = "STRONG_SELL"
|
||||||
|
|
||||||
|
|
||||||
|
class LLMAnalysis(BaseModel):
|
||||||
|
"""Represents an LLM-generated qualitative analysis of a stock."""
|
||||||
|
|
||||||
|
analysis_id: UUID = Field(..., description="Unique analysis identifier")
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
screening_run_id: UUID | None = Field(None, description="Associated screening run ID")
|
||||||
|
summary: str = Field("", description="Investment thesis summary")
|
||||||
|
valuation_comment: str = Field("", description="Valuation assessment")
|
||||||
|
risk_factors: list[str] = Field(default_factory=list, description="Identified risk factors")
|
||||||
|
catalysts: list[str] = Field(default_factory=list, description="Identified upside catalysts")
|
||||||
|
recommendation: Recommendation = Field(
|
||||||
|
Recommendation.HOLD, description="Investment recommendation"
|
||||||
|
)
|
||||||
|
confidence: float = Field(0.5, ge=0.0, le=1.0, description="Confidence score (0.0 - 1.0)")
|
||||||
|
analyzed_at: datetime | None = Field(None, description="Analysis datetime")
|
||||||
|
model_name: str = Field("", description="LLM model used for analysis")
|
||||||
28
src/stock_common/models/disclosure.py
Normal file
28
src/stock_common/models/disclosure.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
"""DART disclosure data model (MongoDB)."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class Sentiment(StrEnum):
|
||||||
|
"""Sentiment classification for disclosures and news."""
|
||||||
|
|
||||||
|
POSITIVE = "POSITIVE"
|
||||||
|
NEUTRAL = "NEUTRAL"
|
||||||
|
NEGATIVE = "NEGATIVE"
|
||||||
|
|
||||||
|
|
||||||
|
class Disclosure(BaseModel):
|
||||||
|
"""Represents a DART corporate disclosure."""
|
||||||
|
|
||||||
|
dart_id: str = Field(..., description="Unique DART disclosure ID")
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
title: str = Field(..., description="Disclosure title")
|
||||||
|
category: str = Field("", description="Disclosure category")
|
||||||
|
content_summary: str = Field("", description="Summarized content")
|
||||||
|
content_raw_url: str = Field("", description="URL to full disclosure document")
|
||||||
|
sentiment: Sentiment = Field(Sentiment.NEUTRAL, description="Sentiment classification")
|
||||||
|
disclosed_at: datetime | None = Field(None, description="Disclosure datetime")
|
||||||
|
processed_at: datetime | None = Field(None, description="Processing datetime")
|
||||||
33
src/stock_common/models/financial.py
Normal file
33
src/stock_common/models/financial.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
"""Financial statement data model."""
|
||||||
|
|
||||||
|
from datetime import date
|
||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class PeriodType(StrEnum):
|
||||||
|
"""Financial reporting period type."""
|
||||||
|
|
||||||
|
ANNUAL = "ANNUAL"
|
||||||
|
QUARTER = "QUARTER"
|
||||||
|
|
||||||
|
|
||||||
|
class Financial(BaseModel):
|
||||||
|
"""Represents a financial statement record for a stock."""
|
||||||
|
|
||||||
|
id: int | None = Field(None, description="Auto-increment ID (set by DB)")
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
fiscal_year: int = Field(..., description="Fiscal year (e.g., 2024)")
|
||||||
|
fiscal_quarter: int = Field(..., ge=1, le=4, description="Quarter (1-4)")
|
||||||
|
period_type: PeriodType = Field(..., description="ANNUAL or QUARTER")
|
||||||
|
revenue: int | None = Field(None, description="Total revenue (KRW)")
|
||||||
|
operating_profit: int | None = Field(None, description="Operating profit (KRW)")
|
||||||
|
net_income: int | None = Field(None, description="Net income (KRW)")
|
||||||
|
total_assets: int | None = Field(None, description="Total assets (KRW)")
|
||||||
|
total_liabilities: int | None = Field(None, description="Total liabilities (KRW)")
|
||||||
|
total_equity: int | None = Field(None, description="Total equity (KRW)")
|
||||||
|
operating_cashflow: int | None = Field(None, description="Operating cash flow (KRW)")
|
||||||
|
free_cashflow: int | None = Field(None, description="Free cash flow (KRW)")
|
||||||
|
source: str = Field("DART", max_length=20, description="Data source identifier")
|
||||||
|
reported_at: date | None = Field(None, description="Report filing date")
|
||||||
108
src/stock_common/models/news.py
Normal file
108
src/stock_common/models/news.py
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
"""News article data model (MongoDB)."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from enum import Enum, StrEnum
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from stock_common.models.disclosure import Sentiment
|
||||||
|
|
||||||
|
|
||||||
|
class RiskCategory(StrEnum):
|
||||||
|
GOVERNANCE = "governance"
|
||||||
|
FINANCIAL = "financial"
|
||||||
|
LEGAL = "legal"
|
||||||
|
REGULATORY = "regulatory"
|
||||||
|
MARKET = "market"
|
||||||
|
OPERATIONAL = "operational"
|
||||||
|
GEOPOLITICAL = "geopolitical"
|
||||||
|
|
||||||
|
|
||||||
|
class RiskSeverity(int, Enum):
|
||||||
|
LEVEL_10 = 10
|
||||||
|
LEVEL_9 = 9
|
||||||
|
LEVEL_8 = 8
|
||||||
|
LEVEL_7 = 7
|
||||||
|
LEVEL_6 = 6
|
||||||
|
LEVEL_5 = 5
|
||||||
|
LEVEL_4 = 4
|
||||||
|
LEVEL_3 = 3
|
||||||
|
LEVEL_2 = 2
|
||||||
|
LEVEL_1 = 1
|
||||||
|
|
||||||
|
|
||||||
|
class RiskSignal(BaseModel):
|
||||||
|
keyword: str = Field(..., description="Matched keyword")
|
||||||
|
category: RiskCategory = Field(..., description="Risk category")
|
||||||
|
severity: RiskSeverity = Field(..., description="Severity level")
|
||||||
|
description: str = Field("", description="Human-readable description")
|
||||||
|
|
||||||
|
|
||||||
|
class OGMeta(BaseModel):
|
||||||
|
title: str = Field("", description="og:title")
|
||||||
|
description: str = Field("", description="og:description")
|
||||||
|
image: str = Field("", description="og:image URL")
|
||||||
|
site_name: str = Field("", description="og:site_name (publisher)")
|
||||||
|
url: str = Field("", description="og:url (canonical URL)")
|
||||||
|
|
||||||
|
|
||||||
|
class OpportunityCategory(StrEnum):
|
||||||
|
EARNINGS = "earnings"
|
||||||
|
GROWTH = "growth"
|
||||||
|
VALUATION = "valuation"
|
||||||
|
SHAREHOLDER = "shareholder"
|
||||||
|
CONTRACT = "contract"
|
||||||
|
TECHNOLOGY = "technology"
|
||||||
|
POLICY = "policy"
|
||||||
|
|
||||||
|
|
||||||
|
class OpportunityStrength(int, Enum):
|
||||||
|
LEVEL_10 = 10
|
||||||
|
LEVEL_9 = 9
|
||||||
|
LEVEL_8 = 8
|
||||||
|
LEVEL_7 = 7
|
||||||
|
LEVEL_6 = 6
|
||||||
|
LEVEL_5 = 5
|
||||||
|
LEVEL_4 = 4
|
||||||
|
LEVEL_3 = 3
|
||||||
|
LEVEL_2 = 2
|
||||||
|
LEVEL_1 = 1
|
||||||
|
|
||||||
|
|
||||||
|
class OpportunitySignal(BaseModel):
|
||||||
|
keyword: str = Field(..., description="Matched keyword")
|
||||||
|
category: OpportunityCategory = Field(..., description="Opportunity category")
|
||||||
|
strength: OpportunityStrength = Field(..., description="Signal strength")
|
||||||
|
description: str = Field("", description="Human-readable description")
|
||||||
|
|
||||||
|
|
||||||
|
class News(BaseModel):
|
||||||
|
"""Represents a news article related to stocks."""
|
||||||
|
|
||||||
|
url: str = Field(..., description="Unique URL of the news article")
|
||||||
|
title: str = Field(..., description="Article title")
|
||||||
|
content: str = Field("", description="Article content or excerpt")
|
||||||
|
stock_codes: list[str] = Field(default_factory=list, description="Related stock codes")
|
||||||
|
source: str = Field("", description="News source")
|
||||||
|
sentiment: Sentiment = Field(Sentiment.NEUTRAL, description="Sentiment classification")
|
||||||
|
sentiment_score: float = Field(0.0, description="Sentiment score (-1.0 ~ +1.0)")
|
||||||
|
risk_signals: list[RiskSignal] = Field(default_factory=list)
|
||||||
|
opportunity_signals: list[OpportunitySignal] = Field(default_factory=list)
|
||||||
|
risk_keywords: list[str] = Field(default_factory=list)
|
||||||
|
opportunity_keywords: list[str] = Field(default_factory=list)
|
||||||
|
og_meta: OGMeta | None = Field(None, description="Open Graph metadata")
|
||||||
|
source_topic: str = Field("", description="Macro topic query that found this article")
|
||||||
|
published_at: datetime | None = Field(None, description="Publication datetime")
|
||||||
|
collected_at: datetime | None = Field(None, description="Collection datetime")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def max_risk_severity(self) -> RiskSeverity | None:
|
||||||
|
if not self.risk_signals:
|
||||||
|
return None
|
||||||
|
return max(self.risk_signals, key=lambda s: s.severity.value).severity
|
||||||
|
|
||||||
|
@property
|
||||||
|
def max_opportunity_strength(self) -> OpportunityStrength | None:
|
||||||
|
if not self.opportunity_signals:
|
||||||
|
return None
|
||||||
|
return max(self.opportunity_signals, key=lambda s: s.strength.value).strength
|
||||||
22
src/stock_common/models/price.py
Normal file
22
src/stock_common/models/price.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
"""Daily price data model."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class DailyPrice(BaseModel):
|
||||||
|
"""Represents daily OHLCV price data for a stock."""
|
||||||
|
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
date: datetime.date = Field(..., description="Trading date")
|
||||||
|
open: Decimal | None = Field(None, description="Opening price (KRW)")
|
||||||
|
high: Decimal | None = Field(None, description="High price (KRW)")
|
||||||
|
low: Decimal | None = Field(None, description="Low price (KRW)")
|
||||||
|
close: Decimal | None = Field(None, description="Closing price (KRW)")
|
||||||
|
volume: int | None = Field(None, description="Trading volume")
|
||||||
|
market_cap: int | None = Field(None, description="Market capitalization (KRW)")
|
||||||
|
source: str = Field("KIS", max_length=20, description="Data source identifier")
|
||||||
51
src/stock_common/models/screening.py
Normal file
51
src/stock_common/models/screening.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
"""Screening result data model."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
from enum import StrEnum
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class ScreeningStrategy(StrEnum):
|
||||||
|
VALUE = "value"
|
||||||
|
GROWTH = "growth"
|
||||||
|
QUALITY = "quality"
|
||||||
|
BALANCED = "balanced"
|
||||||
|
|
||||||
|
|
||||||
|
STRATEGY_WEIGHTS: dict[ScreeningStrategy, dict[str, float]] = {
|
||||||
|
ScreeningStrategy.VALUE: {
|
||||||
|
"per": 0.25, "pbr": 0.25, "peg": 0.10,
|
||||||
|
"roe": 0.10, "fcf_yield": 0.20, "debt_ratio": 0.10,
|
||||||
|
},
|
||||||
|
ScreeningStrategy.GROWTH: {
|
||||||
|
"per": 0.10, "pbr": 0.05, "peg": 0.30,
|
||||||
|
"roe": 0.15, "fcf_yield": 0.10, "debt_ratio": 0.10, "ev_ebitda": 0.20,
|
||||||
|
},
|
||||||
|
ScreeningStrategy.QUALITY: {
|
||||||
|
"per": 0.10, "pbr": 0.10, "peg": 0.10,
|
||||||
|
"roe": 0.30, "fcf_yield": 0.15, "debt_ratio": 0.25,
|
||||||
|
},
|
||||||
|
ScreeningStrategy.BALANCED: {
|
||||||
|
"per": 0.15, "pbr": 0.15, "peg": 0.15,
|
||||||
|
"roe": 0.15, "fcf_yield": 0.15, "debt_ratio": 0.15, "ev_ebitda": 0.10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ScreeningResult(BaseModel):
|
||||||
|
id: int | None = Field(None, description="Auto-increment ID (set by DB)")
|
||||||
|
run_id: UUID = Field(..., description="Screening run identifier")
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
strategy: ScreeningStrategy = Field(..., description="Strategy used")
|
||||||
|
composite_score: Decimal = Field(..., description="Overall weighted score")
|
||||||
|
per_score: Decimal | None = Field(None)
|
||||||
|
pbr_score: Decimal | None = Field(None)
|
||||||
|
peg_score: Decimal | None = Field(None)
|
||||||
|
roe_score: Decimal | None = Field(None)
|
||||||
|
fcf_yield_score: Decimal | None = Field(None)
|
||||||
|
debt_ratio_score: Decimal | None = Field(None)
|
||||||
|
rank: int | None = Field(None, description="Rank within this run")
|
||||||
|
screened_at: datetime | None = Field(None, description="Screening timestamp")
|
||||||
26
src/stock_common/models/stock.py
Normal file
26
src/stock_common/models/stock.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
"""Stock master data model."""
|
||||||
|
|
||||||
|
from datetime import date, datetime
|
||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class Market(StrEnum):
|
||||||
|
"""Korean stock market identifiers."""
|
||||||
|
|
||||||
|
KOSPI = "KOSPI"
|
||||||
|
KOSDAQ = "KOSDAQ"
|
||||||
|
|
||||||
|
|
||||||
|
class Stock(BaseModel):
|
||||||
|
"""Represents a listed stock on KOSPI or KOSDAQ."""
|
||||||
|
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code (e.g., '005930')")
|
||||||
|
stock_name: str = Field(..., max_length=100, description="Company name (Korean)")
|
||||||
|
market: Market = Field(..., description="Market: KOSPI or KOSDAQ")
|
||||||
|
sector: str | None = Field(None, max_length=100, description="Industry sector")
|
||||||
|
industry: str | None = Field(None, max_length=100, description="Specific industry")
|
||||||
|
listing_date: date | None = Field(None, description="IPO date")
|
||||||
|
is_active: bool = Field(True, description="Whether currently trading")
|
||||||
|
updated_at: datetime | None = Field(None, description="Last update timestamp")
|
||||||
25
src/stock_common/models/valuation.py
Normal file
25
src/stock_common/models/valuation.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
"""Valuation metrics data model."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class Valuation(BaseModel):
|
||||||
|
"""Represents calculated valuation metrics for a stock on a given date."""
|
||||||
|
|
||||||
|
stock_code: str = Field(..., max_length=10, description="KRX stock code")
|
||||||
|
date: datetime.date = Field(..., description="Calculation date")
|
||||||
|
per: Decimal | None = Field(None, description="Price-to-Earnings Ratio")
|
||||||
|
pbr: Decimal | None = Field(None, description="Price-to-Book Ratio")
|
||||||
|
psr: Decimal | None = Field(None, description="Price-to-Sales Ratio")
|
||||||
|
peg: Decimal | None = Field(None, description="PEG Ratio")
|
||||||
|
ev_ebitda: Decimal | None = Field(None, description="EV/EBITDA")
|
||||||
|
roe: Decimal | None = Field(None, description="Return on Equity (%)")
|
||||||
|
roa: Decimal | None = Field(None, description="Return on Assets (%)")
|
||||||
|
debt_ratio: Decimal | None = Field(None, description="Debt Ratio (%)")
|
||||||
|
fcf_yield: Decimal | None = Field(None, description="Free Cash Flow Yield (%)")
|
||||||
|
calculated_at: datetime.datetime | None = Field(None, description="Calculation timestamp")
|
||||||
13
src/stock_common/queue/__init__.py
Normal file
13
src/stock_common/queue/__init__.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
"""Redis queue utilities for inter-service communication via Redis Streams."""
|
||||||
|
|
||||||
|
from stock_common.queue.redis_client import close_redis, get_redis
|
||||||
|
from stock_common.queue.streams import ack, publish, consume, StreamMessage
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"StreamMessage",
|
||||||
|
"ack",
|
||||||
|
"close_redis",
|
||||||
|
"consume",
|
||||||
|
"get_redis",
|
||||||
|
"publish",
|
||||||
|
]
|
||||||
31
src/stock_common/queue/redis_client.py
Normal file
31
src/stock_common/queue/redis_client.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
"""Redis connection management (singleton pattern)."""
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from redis.asyncio import Redis
|
||||||
|
|
||||||
|
from stock_common.config import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger(module="queue.redis")
|
||||||
|
|
||||||
|
_redis: Redis | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_redis() -> Redis:
|
||||||
|
global _redis
|
||||||
|
if _redis is None:
|
||||||
|
_redis = Redis.from_url(
|
||||||
|
settings.redis_url,
|
||||||
|
decode_responses=True,
|
||||||
|
max_connections=20,
|
||||||
|
)
|
||||||
|
await _redis.ping()
|
||||||
|
logger.info("redis_connected", url=settings.redis_url)
|
||||||
|
return _redis
|
||||||
|
|
||||||
|
|
||||||
|
async def close_redis() -> None:
|
||||||
|
global _redis
|
||||||
|
if _redis is not None:
|
||||||
|
await _redis.aclose()
|
||||||
|
_redis = None
|
||||||
|
logger.info("redis_closed")
|
||||||
92
src/stock_common/queue/streams.py
Normal file
92
src/stock_common/queue/streams.py
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
"""Redis Streams helpers for simple publish/consume pipeline.
|
||||||
|
|
||||||
|
Each service does: pull from Redis -> process -> push to Redis.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from redis.asyncio import Redis
|
||||||
|
|
||||||
|
from stock_common.queue.redis_client import get_redis
|
||||||
|
|
||||||
|
logger = structlog.get_logger(module="queue.streams")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamMessage:
|
||||||
|
"""A message read from a Redis Stream."""
|
||||||
|
|
||||||
|
message_id: str
|
||||||
|
stream: str
|
||||||
|
data: dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
async def publish(stream: str, data: dict[str, Any], redis: Redis | None = None) -> str:
|
||||||
|
"""Publish a message to a Redis Stream.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream: Stream name (e.g., "queue:raw-prices").
|
||||||
|
data: Message payload (will be JSON-serialized).
|
||||||
|
redis: Optional Redis client (uses singleton if not provided).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The message ID assigned by Redis.
|
||||||
|
"""
|
||||||
|
r = redis or await get_redis()
|
||||||
|
message_id = await r.xadd(stream, {"payload": json.dumps(data, default=str)})
|
||||||
|
logger.debug("stream_published", stream=stream, message_id=message_id)
|
||||||
|
return message_id
|
||||||
|
|
||||||
|
|
||||||
|
async def consume(
|
||||||
|
stream: str,
|
||||||
|
group: str,
|
||||||
|
consumer: str,
|
||||||
|
count: int = 10,
|
||||||
|
block: int = 5000,
|
||||||
|
redis: Redis | None = None,
|
||||||
|
) -> list[StreamMessage]:
|
||||||
|
"""Consume messages from a Redis Stream using consumer groups.
|
||||||
|
|
||||||
|
Automatically creates the consumer group if it doesn't exist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream: Stream name.
|
||||||
|
group: Consumer group name.
|
||||||
|
consumer: Consumer name within the group.
|
||||||
|
count: Max messages to read per call.
|
||||||
|
block: Block timeout in milliseconds (0 = forever).
|
||||||
|
redis: Optional Redis client.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of StreamMessage objects.
|
||||||
|
"""
|
||||||
|
r = redis or await get_redis()
|
||||||
|
|
||||||
|
# Ensure consumer group exists
|
||||||
|
try:
|
||||||
|
await r.xgroup_create(stream, group, id="0", mkstream=True)
|
||||||
|
except Exception:
|
||||||
|
pass # Group already exists
|
||||||
|
|
||||||
|
results = await r.xreadgroup(group, consumer, {stream: ">"}, count=count, block=block)
|
||||||
|
|
||||||
|
messages = []
|
||||||
|
for stream_name, entries in results:
|
||||||
|
for message_id, fields in entries:
|
||||||
|
try:
|
||||||
|
payload = json.loads(fields.get("payload", "{}"))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
payload = fields
|
||||||
|
messages.append(StreamMessage(message_id=message_id, stream=stream_name, data=payload))
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|
||||||
|
|
||||||
|
async def ack(stream: str, group: str, message_id: str, redis: Redis | None = None) -> None:
|
||||||
|
"""Acknowledge a message (mark as processed)."""
|
||||||
|
r = redis or await get_redis()
|
||||||
|
await r.xack(stream, group, message_id)
|
||||||
Reference in New Issue
Block a user