commit df70cf6373fcbef09ca6a5fa6116c02b10c6191e Author: yakenator Date: Mon Feb 23 13:50:46 2026 +0900 feat: KIS (Korea Investment) market data collector service - KIS API integration for stock prices and market data - GET /stocks endpoint with search, filter, pagination - GET /stocks/{code} endpoint with prices and valuation - REST API with health, streams, and trigger endpoints - Redis Streams for async job processing - Docker support with multi-stage build Co-Authored-By: Claude Opus 4.6 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b3bdfab --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY stock-common/ /tmp/stock-common/ +RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/ + +COPY stock-kis-collector/ . +RUN pip install --no-cache-dir . + +CMD ["python", "-m", "stock_kis_collector.worker"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..53dd979 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "stock-kis-collector" +version = "0.1.0" +description = "KIS Open API collector - stock prices and market data" +requires-python = ">=3.12" +dependencies = [ + "stock-common", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_kis_collector"] diff --git a/src/stock_kis_collector/__init__.py b/src/stock_kis_collector/__init__.py new file mode 100644 index 0000000..7eeff87 --- /dev/null +++ b/src/stock_kis_collector/__init__.py @@ -0,0 +1,3 @@ +"""KIS Open API collector service.""" + +__version__ = "0.1.0" diff --git a/src/stock_kis_collector/api.py b/src/stock_kis_collector/api.py new file mode 100644 index 0000000..6339c9f --- /dev/null +++ b/src/stock_kis_collector/api.py @@ -0,0 +1,142 @@ +"""REST API for KIS collector service. + +Endpoints: + GET /stocks - list stocks (from PostgreSQL) + GET /stocks/{stock_code} - get stock detail with latest price + POST /collect/stocks - trigger stock list collection + POST /collect/prices - trigger price collection + GET /health - (from api_base) + GET /streams - (from api_base) +""" + +import structlog +from pydantic import BaseModel + +from stock_common.api_base import create_app +from stock_common.database import get_pg_pool +from stock_common.queue import publish + +logger = structlog.get_logger(service="kis-api") + +TRIGGER_STREAM = "queue:trigger-kis" + + +class StocksRequest(BaseModel): + market: str = "all" + + +class PricesRequest(BaseModel): + stock_codes: list[str] + + +app = create_app( + title="stock-kis-collector", + streams=[TRIGGER_STREAM, "queue:raw-prices", "queue:raw-stocks"], +) + + +@app.get("/stocks") +async def list_stocks(market: str = "all", search: str = "", limit: int = 100, offset: int = 0): + try: + pool = await get_pg_pool() + except Exception: + return {"total": 0, "stocks": []} + + conditions = ["is_active = true"] + params: list = [] + idx = 1 + + if market != "all": + conditions.append(f"market = ${idx}") + params.append(market) + idx += 1 + + if search: + conditions.append(f"(stock_name ILIKE ${idx} OR stock_code ILIKE ${idx})") + params.append(f"%{search}%") + idx += 1 + + where = " AND ".join(conditions) + + try: + async with pool.acquire() as conn: + count = await conn.fetchval(f"SELECT COUNT(*) FROM stock WHERE {where}", *params) + rows = await conn.fetch( + f"""SELECT s.stock_code, s.stock_name, s.market, s.sector, s.industry, + p.close, p.volume, p.market_cap, + ROUND(((p.close - p2.close)::numeric / NULLIF(p2.close, 0)) * 100, 2) AS change_pct + FROM stock s + LEFT JOIN LATERAL ( + SELECT close, volume, market_cap FROM daily_price + WHERE stock_code = s.stock_code ORDER BY date DESC LIMIT 1 + ) p ON true + LEFT JOIN LATERAL ( + SELECT close FROM daily_price + WHERE stock_code = s.stock_code ORDER BY date DESC LIMIT 1 OFFSET 1 + ) p2 ON true + WHERE {where} + ORDER BY p.market_cap DESC NULLS LAST + LIMIT ${idx} OFFSET ${idx + 1}""", + *params, limit, offset, + ) + return {"total": count, "stocks": [dict(r) for r in rows]} + except Exception as exc: + logger.warning("stocks_query_failed", error=str(exc)) + return {"total": 0, "stocks": []} + + +@app.get("/stocks/{stock_code}") +async def get_stock(stock_code: str): + try: + pool = await get_pg_pool() + except Exception: + return {"error": "db_unavailable", "stock_code": stock_code} + + try: + async with pool.acquire() as conn: + stock = await conn.fetchrow( + "SELECT stock_code, stock_name, market, sector, industry FROM stock WHERE stock_code = $1", + stock_code, + ) + if not stock: + return {"error": "not_found", "stock_code": stock_code} + + prices = await conn.fetch( + """SELECT date, open, high, low, close, volume, market_cap + FROM daily_price WHERE stock_code = $1 + ORDER BY date DESC LIMIT 30""", + stock_code, + ) + + valuation = await conn.fetchrow( + """SELECT per, pbr, peg, roe, debt_ratio, fcf_yield, ev_ebitda + FROM valuation WHERE stock_code = $1 + ORDER BY calculated_at DESC LIMIT 1""", + stock_code, + ) + + result = dict(stock) + result["prices"] = [dict(p) for p in prices] + result["valuation"] = dict(valuation) if valuation else None + return result + except Exception as exc: + logger.warning("stock_detail_query_failed", error=str(exc), stock_code=stock_code) + return {"error": "query_failed", "stock_code": stock_code} + + +@app.post("/collect/stocks") +async def collect_stocks(req: StocksRequest): + msg_id = await publish(TRIGGER_STREAM, { + "type": "stocks", + "market": req.market, + }) + return {"status": "queued", "message_id": msg_id} + + +@app.post("/collect/prices") +async def collect_prices(req: PricesRequest): + msg_id = await publish(TRIGGER_STREAM, { + "type": "prices", + "stock_codes": req.stock_codes, + }) + return {"status": "queued", "message_id": msg_id, "stock_codes": req.stock_codes} diff --git a/src/stock_kis_collector/collector.py b/src/stock_kis_collector/collector.py new file mode 100644 index 0000000..9691af8 --- /dev/null +++ b/src/stock_kis_collector/collector.py @@ -0,0 +1,172 @@ +"""Korea Investment Securities (KIS) Open API collector for prices and stock data.""" + +import asyncio +from datetime import date, datetime, timedelta +from typing import Any + +from stock_common.collector_base import AuthenticationError, BaseCollector +from stock_common.config import settings +from stock_common.models.price import DailyPrice +from stock_common.models.stock import Market, Stock + + +class KISCollector(BaseCollector): + """Collector for KIS Open API (stock prices and market data). + + Authentication: OAuth2 (AppKey + AppSecret -> access token, 24h TTL). + """ + + def __init__(self) -> None: + super().__init__(rate_limit=settings.kis_rate_limit, timeout=30) + self.base_url = settings.kis_base_url + self.app_key = settings.kis_app_key + self.app_secret = settings.kis_app_secret + self._access_token: str | None = None + self._token_expires_at: datetime | None = None + self._token_lock = asyncio.Lock() + + async def collect(self, **kwargs: Any) -> None: + raise NotImplementedError("Use specific methods: collect_stocks, collect_prices, etc.") + + async def _ensure_token(self) -> str: + now = datetime.now() + if self._access_token and self._token_expires_at and now < self._token_expires_at: + return self._access_token + + async with self._token_lock: + now = datetime.now() + if self._access_token and self._token_expires_at and now < self._token_expires_at: + return self._access_token + + url = f"{self.base_url}/oauth2/tokenP" + payload = { + "grant_type": "client_credentials", + "appkey": self.app_key, + "appsecret": self.app_secret, + } + + try: + data = await self._request("POST", url, json=payload) + if not isinstance(data, dict): + raise AuthenticationError("Invalid token response") + self._access_token = data.get("access_token", "") + expires_in = int(data.get("expires_in", 86400)) + self._token_expires_at = now + timedelta(seconds=expires_in - 3600) + self.logger.info("kis_token_obtained", expires_in=expires_in) + return self._access_token + except Exception as exc: + raise AuthenticationError(f"Failed to obtain KIS token: {exc}") from exc + + def _build_headers(self, token: str, tr_id: str) -> dict[str, str]: + return { + "Content-Type": "application/json; charset=utf-8", + "authorization": f"Bearer {token}", + "appkey": self.app_key, + "appsecret": self.app_secret, + "tr_id": tr_id, + } + + async def collect_stocks(self, market: Market = Market.KOSPI) -> list[Stock]: + token = await self._ensure_token() + tr_id = "FHPST01740000" + url = f"{self.base_url}/uapi/domestic-stock/v1/ranking/market-cap" + fid_input_iscd = "0001" if market == Market.KOSPI else "1001" + + headers = self._build_headers(token, tr_id) + params = { + "FID_COND_MRKT_DIV_CODE": "J" if market == Market.KOSPI else "Q", + "FID_COND_SCR_DIV_CODE": "20174", + "FID_INPUT_ISCD": fid_input_iscd, + "FID_DIV_CLS_CODE": "0", + "FID_TRGT_CLS_CODE": "", "FID_TRGT_EXLS_CLS_CODE": "", + "FID_INPUT_PRICE_1": "", "FID_INPUT_PRICE_2": "", + "FID_VOL_CNT": "", "FID_INPUT_DATE_1": "", + } + + stocks: list[Stock] = [] + data = await self._request("GET", url, headers=headers, params=params) + if not isinstance(data, dict): + return stocks + + for item in data.get("output", []): + try: + stock = Stock( + stock_code=item.get("mksc_shrn_iscd", "").strip(), + stock_name=item.get("hts_kor_isnm", "").strip(), + market=market, sector=item.get("bstp_larg_div_code_name"), + industry=item.get("bstp_medm_div_code_name"), + is_active=True, updated_at=datetime.now(), + ) + if stock.stock_code: + stocks.append(stock) + except Exception as exc: + self.logger.warning("stock_parse_error", error=str(exc)) + + self.logger.info("stocks_collected", market=market.value, count=len(stocks)) + return stocks + + async def collect_price(self, stock_code: str) -> DailyPrice | None: + token = await self._ensure_token() + url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price" + headers = self._build_headers(token, "FHKST01010100") + params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code} + + data = await self._request("GET", url, headers=headers, params=params) + if not isinstance(data, dict): + return None + output = data.get("output", {}) + if not output: + return None + + try: + return DailyPrice( + stock_code=stock_code, date=date.today(), + open=output.get("stck_oprc"), high=output.get("stck_hgpr"), + low=output.get("stck_lwpr"), close=output.get("stck_prpr"), + volume=int(output.get("acml_vol", 0)) if output.get("acml_vol") else None, + market_cap=int(output.get("hts_avls", 0)) * 100_000_000 if output.get("hts_avls") else None, + source="KIS", + ) + except Exception as exc: + self.logger.warning("price_parse_error", stock_code=stock_code, error=str(exc)) + return None + + async def collect_daily_prices( + self, stock_code: str, start_date: date | None = None, end_date: date | None = None, + ) -> list[DailyPrice]: + token = await self._ensure_token() + url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + if end_date is None: + end_date = date.today() + if start_date is None: + start_date = end_date - timedelta(days=30) + + headers = self._build_headers(token, "FHKST03010100") + params = { + "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, + "FID_INPUT_DATE_1": start_date.strftime("%Y%m%d"), + "FID_INPUT_DATE_2": end_date.strftime("%Y%m%d"), + "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "0", + } + + data = await self._request("GET", url, headers=headers, params=params) + if not isinstance(data, dict): + return [] + + prices: list[DailyPrice] = [] + for item in data.get("output2", []): + try: + dt = item.get("stck_bsop_date", "") + if not dt: + continue + prices.append(DailyPrice( + stock_code=stock_code, date=datetime.strptime(dt, "%Y%m%d").date(), + open=item.get("stck_oprc"), high=item.get("stck_hgpr"), + low=item.get("stck_lwpr"), close=item.get("stck_clpr"), + volume=int(item.get("acml_vol", 0)) if item.get("acml_vol") else None, + source="KIS", + )) + except Exception as exc: + self.logger.warning("daily_price_parse_error", error=str(exc)) + + return prices diff --git a/src/stock_kis_collector/worker.py b/src/stock_kis_collector/worker.py new file mode 100644 index 0000000..69d94ff --- /dev/null +++ b/src/stock_kis_collector/worker.py @@ -0,0 +1,82 @@ +"""KIS collector worker - runs as a standalone service. + +Runs both the Redis stream consumer loop AND the REST API server concurrently. +""" + +import asyncio + +import structlog +import uvicorn + +from stock_common.config import settings +from stock_common.logging_config import configure_logging +from stock_common.models.stock import Market +from stock_common.queue import consume, publish, ack + +from stock_kis_collector.collector import KISCollector +from stock_kis_collector.api import app + +logger = structlog.get_logger(service="kis-collector") + +TRIGGER_STREAM = "queue:trigger-kis" +OUTPUT_PRICES = "queue:raw-prices" +OUTPUT_STOCKS = "queue:raw-stocks" +GROUP = "kis-collectors" +CONSUMER = "kis-worker-1" + + +async def worker_loop() -> None: + """Redis stream consumer loop with auto-restart on failure.""" + while True: + try: + async with KISCollector() as collector: + while True: + messages = await consume(TRIGGER_STREAM, GROUP, CONSUMER, count=1, block=5000) + + for msg in messages: + task_type = msg.data.get("type", "prices") + stock_codes = msg.data.get("stock_codes", []) + + try: + if task_type in ("stocks", "all"): + for market in (Market.KOSPI, Market.KOSDAQ): + stocks = await collector.collect_stocks(market) + for s in stocks: + await publish(OUTPUT_STOCKS, s.model_dump()) + + if task_type in ("prices", "all"): + for code in stock_codes: + price = await collector.collect_price(code) + if price: + await publish(OUTPUT_PRICES, price.model_dump()) + + await ack(TRIGGER_STREAM, GROUP, msg.message_id) + logger.info("task_completed", task_type=task_type) + except Exception as exc: + logger.error("task_failed", error=str(exc)) + except Exception as exc: + logger.error("worker_loop_crashed", error=str(exc)) + await asyncio.sleep(10) + logger.info("worker_loop_restarting") + + +async def run() -> None: + configure_logging(level=settings.log_level, log_format=settings.log_format) + logger.info("kis_collector_starting") + + config = uvicorn.Config(app, host="0.0.0.0", port=8002, log_level="info") + server = uvicorn.Server(config) + + await asyncio.gather( + worker_loop(), + server.serve(), + return_exceptions=True, + ) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main()