feat: KIS (Korea Investment) market data collector service
- KIS API integration for stock prices and market data
- GET /stocks endpoint with search, filter, pagination
- GET /stocks/{code} endpoint with prices and valuation
- REST API with health, streams, and trigger endpoints
- Redis Streams for async job processing
- Docker support with multi-stage build
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
8
.dockerignore
Normal file
8
.dockerignore
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
__pycache__
|
||||||
|
*.pyc
|
||||||
|
.git
|
||||||
|
.venv
|
||||||
|
*.egg-info
|
||||||
|
dist
|
||||||
|
build
|
||||||
|
.env
|
||||||
22
.gitignore
vendored
Normal file
22
.gitignore
vendored
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
*.so
|
||||||
|
*.egg-info/
|
||||||
|
dist/
|
||||||
|
build/
|
||||||
|
.eggs/
|
||||||
|
.venv/
|
||||||
|
venv/
|
||||||
|
.env
|
||||||
|
.vscode/
|
||||||
|
.idea/
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
.DS_Store
|
||||||
|
.pytest_cache/
|
||||||
|
htmlcov/
|
||||||
|
.coverage
|
||||||
|
coverage.xml
|
||||||
|
.mypy_cache/
|
||||||
|
.claude/
|
||||||
11
Dockerfile
Normal file
11
Dockerfile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY stock-common/ /tmp/stock-common/
|
||||||
|
RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/
|
||||||
|
|
||||||
|
COPY stock-kis-collector/ .
|
||||||
|
RUN pip install --no-cache-dir .
|
||||||
|
|
||||||
|
CMD ["python", "-m", "stock_kis_collector.worker"]
|
||||||
15
pyproject.toml
Normal file
15
pyproject.toml
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[project]
|
||||||
|
name = "stock-kis-collector"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "KIS Open API collector - stock prices and market data"
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
dependencies = [
|
||||||
|
"stock-common",
|
||||||
|
]
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["src/stock_kis_collector"]
|
||||||
3
src/stock_kis_collector/__init__.py
Normal file
3
src/stock_kis_collector/__init__.py
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
"""KIS Open API collector service."""
|
||||||
|
|
||||||
|
__version__ = "0.1.0"
|
||||||
142
src/stock_kis_collector/api.py
Normal file
142
src/stock_kis_collector/api.py
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
"""REST API for KIS collector service.
|
||||||
|
|
||||||
|
Endpoints:
|
||||||
|
GET /stocks - list stocks (from PostgreSQL)
|
||||||
|
GET /stocks/{stock_code} - get stock detail with latest price
|
||||||
|
POST /collect/stocks - trigger stock list collection
|
||||||
|
POST /collect/prices - trigger price collection
|
||||||
|
GET /health - (from api_base)
|
||||||
|
GET /streams - (from api_base)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from stock_common.api_base import create_app
|
||||||
|
from stock_common.database import get_pg_pool
|
||||||
|
from stock_common.queue import publish
|
||||||
|
|
||||||
|
logger = structlog.get_logger(service="kis-api")
|
||||||
|
|
||||||
|
TRIGGER_STREAM = "queue:trigger-kis"
|
||||||
|
|
||||||
|
|
||||||
|
class StocksRequest(BaseModel):
|
||||||
|
market: str = "all"
|
||||||
|
|
||||||
|
|
||||||
|
class PricesRequest(BaseModel):
|
||||||
|
stock_codes: list[str]
|
||||||
|
|
||||||
|
|
||||||
|
app = create_app(
|
||||||
|
title="stock-kis-collector",
|
||||||
|
streams=[TRIGGER_STREAM, "queue:raw-prices", "queue:raw-stocks"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stocks")
|
||||||
|
async def list_stocks(market: str = "all", search: str = "", limit: int = 100, offset: int = 0):
|
||||||
|
try:
|
||||||
|
pool = await get_pg_pool()
|
||||||
|
except Exception:
|
||||||
|
return {"total": 0, "stocks": []}
|
||||||
|
|
||||||
|
conditions = ["is_active = true"]
|
||||||
|
params: list = []
|
||||||
|
idx = 1
|
||||||
|
|
||||||
|
if market != "all":
|
||||||
|
conditions.append(f"market = ${idx}")
|
||||||
|
params.append(market)
|
||||||
|
idx += 1
|
||||||
|
|
||||||
|
if search:
|
||||||
|
conditions.append(f"(stock_name ILIKE ${idx} OR stock_code ILIKE ${idx})")
|
||||||
|
params.append(f"%{search}%")
|
||||||
|
idx += 1
|
||||||
|
|
||||||
|
where = " AND ".join(conditions)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
count = await conn.fetchval(f"SELECT COUNT(*) FROM stock WHERE {where}", *params)
|
||||||
|
rows = await conn.fetch(
|
||||||
|
f"""SELECT s.stock_code, s.stock_name, s.market, s.sector, s.industry,
|
||||||
|
p.close, p.volume, p.market_cap,
|
||||||
|
ROUND(((p.close - p2.close)::numeric / NULLIF(p2.close, 0)) * 100, 2) AS change_pct
|
||||||
|
FROM stock s
|
||||||
|
LEFT JOIN LATERAL (
|
||||||
|
SELECT close, volume, market_cap FROM daily_price
|
||||||
|
WHERE stock_code = s.stock_code ORDER BY date DESC LIMIT 1
|
||||||
|
) p ON true
|
||||||
|
LEFT JOIN LATERAL (
|
||||||
|
SELECT close FROM daily_price
|
||||||
|
WHERE stock_code = s.stock_code ORDER BY date DESC LIMIT 1 OFFSET 1
|
||||||
|
) p2 ON true
|
||||||
|
WHERE {where}
|
||||||
|
ORDER BY p.market_cap DESC NULLS LAST
|
||||||
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
||||||
|
*params, limit, offset,
|
||||||
|
)
|
||||||
|
return {"total": count, "stocks": [dict(r) for r in rows]}
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("stocks_query_failed", error=str(exc))
|
||||||
|
return {"total": 0, "stocks": []}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stocks/{stock_code}")
|
||||||
|
async def get_stock(stock_code: str):
|
||||||
|
try:
|
||||||
|
pool = await get_pg_pool()
|
||||||
|
except Exception:
|
||||||
|
return {"error": "db_unavailable", "stock_code": stock_code}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
stock = await conn.fetchrow(
|
||||||
|
"SELECT stock_code, stock_name, market, sector, industry FROM stock WHERE stock_code = $1",
|
||||||
|
stock_code,
|
||||||
|
)
|
||||||
|
if not stock:
|
||||||
|
return {"error": "not_found", "stock_code": stock_code}
|
||||||
|
|
||||||
|
prices = await conn.fetch(
|
||||||
|
"""SELECT date, open, high, low, close, volume, market_cap
|
||||||
|
FROM daily_price WHERE stock_code = $1
|
||||||
|
ORDER BY date DESC LIMIT 30""",
|
||||||
|
stock_code,
|
||||||
|
)
|
||||||
|
|
||||||
|
valuation = await conn.fetchrow(
|
||||||
|
"""SELECT per, pbr, peg, roe, debt_ratio, fcf_yield, ev_ebitda
|
||||||
|
FROM valuation WHERE stock_code = $1
|
||||||
|
ORDER BY calculated_at DESC LIMIT 1""",
|
||||||
|
stock_code,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = dict(stock)
|
||||||
|
result["prices"] = [dict(p) for p in prices]
|
||||||
|
result["valuation"] = dict(valuation) if valuation else None
|
||||||
|
return result
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("stock_detail_query_failed", error=str(exc), stock_code=stock_code)
|
||||||
|
return {"error": "query_failed", "stock_code": stock_code}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/collect/stocks")
|
||||||
|
async def collect_stocks(req: StocksRequest):
|
||||||
|
msg_id = await publish(TRIGGER_STREAM, {
|
||||||
|
"type": "stocks",
|
||||||
|
"market": req.market,
|
||||||
|
})
|
||||||
|
return {"status": "queued", "message_id": msg_id}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/collect/prices")
|
||||||
|
async def collect_prices(req: PricesRequest):
|
||||||
|
msg_id = await publish(TRIGGER_STREAM, {
|
||||||
|
"type": "prices",
|
||||||
|
"stock_codes": req.stock_codes,
|
||||||
|
})
|
||||||
|
return {"status": "queued", "message_id": msg_id, "stock_codes": req.stock_codes}
|
||||||
172
src/stock_kis_collector/collector.py
Normal file
172
src/stock_kis_collector/collector.py
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
"""Korea Investment Securities (KIS) Open API collector for prices and stock data."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from datetime import date, datetime, timedelta
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from stock_common.collector_base import AuthenticationError, BaseCollector
|
||||||
|
from stock_common.config import settings
|
||||||
|
from stock_common.models.price import DailyPrice
|
||||||
|
from stock_common.models.stock import Market, Stock
|
||||||
|
|
||||||
|
|
||||||
|
class KISCollector(BaseCollector):
|
||||||
|
"""Collector for KIS Open API (stock prices and market data).
|
||||||
|
|
||||||
|
Authentication: OAuth2 (AppKey + AppSecret -> access token, 24h TTL).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__(rate_limit=settings.kis_rate_limit, timeout=30)
|
||||||
|
self.base_url = settings.kis_base_url
|
||||||
|
self.app_key = settings.kis_app_key
|
||||||
|
self.app_secret = settings.kis_app_secret
|
||||||
|
self._access_token: str | None = None
|
||||||
|
self._token_expires_at: datetime | None = None
|
||||||
|
self._token_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def collect(self, **kwargs: Any) -> None:
|
||||||
|
raise NotImplementedError("Use specific methods: collect_stocks, collect_prices, etc.")
|
||||||
|
|
||||||
|
async def _ensure_token(self) -> str:
|
||||||
|
now = datetime.now()
|
||||||
|
if self._access_token and self._token_expires_at and now < self._token_expires_at:
|
||||||
|
return self._access_token
|
||||||
|
|
||||||
|
async with self._token_lock:
|
||||||
|
now = datetime.now()
|
||||||
|
if self._access_token and self._token_expires_at and now < self._token_expires_at:
|
||||||
|
return self._access_token
|
||||||
|
|
||||||
|
url = f"{self.base_url}/oauth2/tokenP"
|
||||||
|
payload = {
|
||||||
|
"grant_type": "client_credentials",
|
||||||
|
"appkey": self.app_key,
|
||||||
|
"appsecret": self.app_secret,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = await self._request("POST", url, json=payload)
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise AuthenticationError("Invalid token response")
|
||||||
|
self._access_token = data.get("access_token", "")
|
||||||
|
expires_in = int(data.get("expires_in", 86400))
|
||||||
|
self._token_expires_at = now + timedelta(seconds=expires_in - 3600)
|
||||||
|
self.logger.info("kis_token_obtained", expires_in=expires_in)
|
||||||
|
return self._access_token
|
||||||
|
except Exception as exc:
|
||||||
|
raise AuthenticationError(f"Failed to obtain KIS token: {exc}") from exc
|
||||||
|
|
||||||
|
def _build_headers(self, token: str, tr_id: str) -> dict[str, str]:
|
||||||
|
return {
|
||||||
|
"Content-Type": "application/json; charset=utf-8",
|
||||||
|
"authorization": f"Bearer {token}",
|
||||||
|
"appkey": self.app_key,
|
||||||
|
"appsecret": self.app_secret,
|
||||||
|
"tr_id": tr_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def collect_stocks(self, market: Market = Market.KOSPI) -> list[Stock]:
|
||||||
|
token = await self._ensure_token()
|
||||||
|
tr_id = "FHPST01740000"
|
||||||
|
url = f"{self.base_url}/uapi/domestic-stock/v1/ranking/market-cap"
|
||||||
|
fid_input_iscd = "0001" if market == Market.KOSPI else "1001"
|
||||||
|
|
||||||
|
headers = self._build_headers(token, tr_id)
|
||||||
|
params = {
|
||||||
|
"FID_COND_MRKT_DIV_CODE": "J" if market == Market.KOSPI else "Q",
|
||||||
|
"FID_COND_SCR_DIV_CODE": "20174",
|
||||||
|
"FID_INPUT_ISCD": fid_input_iscd,
|
||||||
|
"FID_DIV_CLS_CODE": "0",
|
||||||
|
"FID_TRGT_CLS_CODE": "", "FID_TRGT_EXLS_CLS_CODE": "",
|
||||||
|
"FID_INPUT_PRICE_1": "", "FID_INPUT_PRICE_2": "",
|
||||||
|
"FID_VOL_CNT": "", "FID_INPUT_DATE_1": "",
|
||||||
|
}
|
||||||
|
|
||||||
|
stocks: list[Stock] = []
|
||||||
|
data = await self._request("GET", url, headers=headers, params=params)
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return stocks
|
||||||
|
|
||||||
|
for item in data.get("output", []):
|
||||||
|
try:
|
||||||
|
stock = Stock(
|
||||||
|
stock_code=item.get("mksc_shrn_iscd", "").strip(),
|
||||||
|
stock_name=item.get("hts_kor_isnm", "").strip(),
|
||||||
|
market=market, sector=item.get("bstp_larg_div_code_name"),
|
||||||
|
industry=item.get("bstp_medm_div_code_name"),
|
||||||
|
is_active=True, updated_at=datetime.now(),
|
||||||
|
)
|
||||||
|
if stock.stock_code:
|
||||||
|
stocks.append(stock)
|
||||||
|
except Exception as exc:
|
||||||
|
self.logger.warning("stock_parse_error", error=str(exc))
|
||||||
|
|
||||||
|
self.logger.info("stocks_collected", market=market.value, count=len(stocks))
|
||||||
|
return stocks
|
||||||
|
|
||||||
|
async def collect_price(self, stock_code: str) -> DailyPrice | None:
|
||||||
|
token = await self._ensure_token()
|
||||||
|
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price"
|
||||||
|
headers = self._build_headers(token, "FHKST01010100")
|
||||||
|
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code}
|
||||||
|
|
||||||
|
data = await self._request("GET", url, headers=headers, params=params)
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return None
|
||||||
|
output = data.get("output", {})
|
||||||
|
if not output:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
return DailyPrice(
|
||||||
|
stock_code=stock_code, date=date.today(),
|
||||||
|
open=output.get("stck_oprc"), high=output.get("stck_hgpr"),
|
||||||
|
low=output.get("stck_lwpr"), close=output.get("stck_prpr"),
|
||||||
|
volume=int(output.get("acml_vol", 0)) if output.get("acml_vol") else None,
|
||||||
|
market_cap=int(output.get("hts_avls", 0)) * 100_000_000 if output.get("hts_avls") else None,
|
||||||
|
source="KIS",
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
self.logger.warning("price_parse_error", stock_code=stock_code, error=str(exc))
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def collect_daily_prices(
|
||||||
|
self, stock_code: str, start_date: date | None = None, end_date: date | None = None,
|
||||||
|
) -> list[DailyPrice]:
|
||||||
|
token = await self._ensure_token()
|
||||||
|
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
|
||||||
|
if end_date is None:
|
||||||
|
end_date = date.today()
|
||||||
|
if start_date is None:
|
||||||
|
start_date = end_date - timedelta(days=30)
|
||||||
|
|
||||||
|
headers = self._build_headers(token, "FHKST03010100")
|
||||||
|
params = {
|
||||||
|
"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code,
|
||||||
|
"FID_INPUT_DATE_1": start_date.strftime("%Y%m%d"),
|
||||||
|
"FID_INPUT_DATE_2": end_date.strftime("%Y%m%d"),
|
||||||
|
"FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "0",
|
||||||
|
}
|
||||||
|
|
||||||
|
data = await self._request("GET", url, headers=headers, params=params)
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return []
|
||||||
|
|
||||||
|
prices: list[DailyPrice] = []
|
||||||
|
for item in data.get("output2", []):
|
||||||
|
try:
|
||||||
|
dt = item.get("stck_bsop_date", "")
|
||||||
|
if not dt:
|
||||||
|
continue
|
||||||
|
prices.append(DailyPrice(
|
||||||
|
stock_code=stock_code, date=datetime.strptime(dt, "%Y%m%d").date(),
|
||||||
|
open=item.get("stck_oprc"), high=item.get("stck_hgpr"),
|
||||||
|
low=item.get("stck_lwpr"), close=item.get("stck_clpr"),
|
||||||
|
volume=int(item.get("acml_vol", 0)) if item.get("acml_vol") else None,
|
||||||
|
source="KIS",
|
||||||
|
))
|
||||||
|
except Exception as exc:
|
||||||
|
self.logger.warning("daily_price_parse_error", error=str(exc))
|
||||||
|
|
||||||
|
return prices
|
||||||
82
src/stock_kis_collector/worker.py
Normal file
82
src/stock_kis_collector/worker.py
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
"""KIS collector worker - runs as a standalone service.
|
||||||
|
|
||||||
|
Runs both the Redis stream consumer loop AND the REST API server concurrently.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
from stock_common.config import settings
|
||||||
|
from stock_common.logging_config import configure_logging
|
||||||
|
from stock_common.models.stock import Market
|
||||||
|
from stock_common.queue import consume, publish, ack
|
||||||
|
|
||||||
|
from stock_kis_collector.collector import KISCollector
|
||||||
|
from stock_kis_collector.api import app
|
||||||
|
|
||||||
|
logger = structlog.get_logger(service="kis-collector")
|
||||||
|
|
||||||
|
TRIGGER_STREAM = "queue:trigger-kis"
|
||||||
|
OUTPUT_PRICES = "queue:raw-prices"
|
||||||
|
OUTPUT_STOCKS = "queue:raw-stocks"
|
||||||
|
GROUP = "kis-collectors"
|
||||||
|
CONSUMER = "kis-worker-1"
|
||||||
|
|
||||||
|
|
||||||
|
async def worker_loop() -> None:
|
||||||
|
"""Redis stream consumer loop with auto-restart on failure."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with KISCollector() as collector:
|
||||||
|
while True:
|
||||||
|
messages = await consume(TRIGGER_STREAM, GROUP, CONSUMER, count=1, block=5000)
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
task_type = msg.data.get("type", "prices")
|
||||||
|
stock_codes = msg.data.get("stock_codes", [])
|
||||||
|
|
||||||
|
try:
|
||||||
|
if task_type in ("stocks", "all"):
|
||||||
|
for market in (Market.KOSPI, Market.KOSDAQ):
|
||||||
|
stocks = await collector.collect_stocks(market)
|
||||||
|
for s in stocks:
|
||||||
|
await publish(OUTPUT_STOCKS, s.model_dump())
|
||||||
|
|
||||||
|
if task_type in ("prices", "all"):
|
||||||
|
for code in stock_codes:
|
||||||
|
price = await collector.collect_price(code)
|
||||||
|
if price:
|
||||||
|
await publish(OUTPUT_PRICES, price.model_dump())
|
||||||
|
|
||||||
|
await ack(TRIGGER_STREAM, GROUP, msg.message_id)
|
||||||
|
logger.info("task_completed", task_type=task_type)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("task_failed", error=str(exc))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("worker_loop_crashed", error=str(exc))
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
logger.info("worker_loop_restarting")
|
||||||
|
|
||||||
|
|
||||||
|
async def run() -> None:
|
||||||
|
configure_logging(level=settings.log_level, log_format=settings.log_format)
|
||||||
|
logger.info("kis_collector_starting")
|
||||||
|
|
||||||
|
config = uvicorn.Config(app, host="0.0.0.0", port=8002, log_level="info")
|
||||||
|
server = uvicorn.Server(config)
|
||||||
|
|
||||||
|
await asyncio.gather(
|
||||||
|
worker_loop(),
|
||||||
|
server.serve(),
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user