From f9350dd7197389da03a727323bfceb8e945b951f Mon Sep 17 00:00:00 2001 From: yakenator Date: Mon, 23 Feb 2026 13:50:45 +0900 Subject: [PATCH] feat: DART disclosure and financial data collector service - DART OpenAPI integration for disclosure crawling - Financial statement extraction (annual/quarterly) - REST API with health, streams, and trigger endpoints - Redis Streams for async job processing - Docker support with multi-stage build Co-Authored-By: Claude Opus 4.6 --- .dockerignore | 8 ++ .gitignore | 22 ++++ Dockerfile | 11 ++ pyproject.toml | 15 +++ src/stock_dart_collector/__init__.py | 3 + src/stock_dart_collector/api.py | 51 ++++++++ src/stock_dart_collector/collector.py | 173 ++++++++++++++++++++++++++ src/stock_dart_collector/worker.py | 83 ++++++++++++ 8 files changed, 366 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 pyproject.toml create mode 100644 src/stock_dart_collector/__init__.py create mode 100644 src/stock_dart_collector/api.py create mode 100644 src/stock_dart_collector/collector.py create mode 100644 src/stock_dart_collector/worker.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..daf0201 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +__pycache__ +*.pyc +.git +.venv +*.egg-info +dist +build +.env diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..38704f8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY stock-common/ /tmp/stock-common/ +RUN pip install --no-cache-dir /tmp/stock-common/ && rm -rf /tmp/stock-common/ + +COPY stock-dart-collector/ . +RUN pip install --no-cache-dir . + +CMD ["python", "-m", "stock_dart_collector.worker"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..563b30c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "stock-dart-collector" +version = "0.1.0" +description = "DART OpenAPI collector - disclosures and financial statements" +requires-python = ">=3.12" +dependencies = [ + "stock-common", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_dart_collector"] diff --git a/src/stock_dart_collector/__init__.py b/src/stock_dart_collector/__init__.py new file mode 100644 index 0000000..4ada21f --- /dev/null +++ b/src/stock_dart_collector/__init__.py @@ -0,0 +1,3 @@ +"""DART OpenAPI collector service.""" + +__version__ = "0.1.0" diff --git a/src/stock_dart_collector/api.py b/src/stock_dart_collector/api.py new file mode 100644 index 0000000..8a0c953 --- /dev/null +++ b/src/stock_dart_collector/api.py @@ -0,0 +1,51 @@ +"""REST API for DART collector service. + +Endpoints: + POST /collect/financials - trigger financial statement collection + POST /collect/disclosures - trigger disclosure collection + GET /health - (from api_base) + GET /streams - (from api_base) +""" + +from pydantic import BaseModel + +from stock_common.api_base import create_app +from stock_common.queue import publish + +TRIGGER_STREAM = "queue:trigger-dart" + + +class FinancialsRequest(BaseModel): + stock_codes: list[str] + year: int = 2024 + + +class DisclosuresRequest(BaseModel): + stock_codes: list[str] | None = None + days: int = 7 + + +app = create_app( + title="stock-dart-collector", + streams=[TRIGGER_STREAM, "queue:raw-financials", "queue:raw-disclosures"], +) + + +@app.post("/collect/financials") +async def collect_financials(req: FinancialsRequest): + msg_id = await publish(TRIGGER_STREAM, { + "type": "financials", + "stock_codes": req.stock_codes, + "year": req.year, + }) + return {"status": "queued", "message_id": msg_id, "stock_codes": req.stock_codes} + + +@app.post("/collect/disclosures") +async def collect_disclosures(req: DisclosuresRequest): + msg_id = await publish(TRIGGER_STREAM, { + "type": "disclosures", + "stock_codes": req.stock_codes or [], + "days": req.days, + }) + return {"status": "queued", "message_id": msg_id} diff --git a/src/stock_dart_collector/collector.py b/src/stock_dart_collector/collector.py new file mode 100644 index 0000000..b2dfb79 --- /dev/null +++ b/src/stock_dart_collector/collector.py @@ -0,0 +1,173 @@ +"""DART OpenAPI data collector for disclosures and financial statements.""" + +import io +import zipfile +from datetime import datetime, timedelta +from typing import Any +from xml.etree import ElementTree + +from stock_common.collector_base import BaseCollector, CollectorError +from stock_common.config import settings +from stock_common.models.disclosure import Disclosure, Sentiment +from stock_common.models.financial import Financial, PeriodType + +DART_BASE_URL = "https://opendart.fss.or.kr/api" + + +class DARTCollector(BaseCollector): + """Collector for DART OpenAPI (disclosures and financial statements). + + Rate limit: 10,000 requests/day. + """ + + def __init__(self) -> None: + super().__init__(rate_limit=settings.dart_rate_limit, timeout=30) + self.api_key = settings.dart_api_key + self._corp_code_map: dict[str, str] = {} + + async def collect(self, **kwargs: Any) -> None: + raise NotImplementedError("Use specific methods: collect_corp_codes, etc.") + + async def collect_corp_codes(self) -> dict[str, str]: + url = f"{DART_BASE_URL}/corpCode.xml" + params = {"crtfc_key": self.api_key} + + self.logger.info("collecting_corp_codes") + data = await self._download_binary(url, params=params) + + with zipfile.ZipFile(io.BytesIO(data)) as zf: + xml_filename = zf.namelist()[0] + xml_data = zf.read(xml_filename) + + root = ElementTree.fromstring(xml_data) + corp_map: dict[str, str] = {} + for item in root.findall("list"): + corp_code = item.findtext("corp_code", "") + stock_code = item.findtext("stock_code", "") + if stock_code and stock_code.strip(): + corp_map[stock_code.strip()] = corp_code.strip() + + self._corp_code_map = corp_map + self.logger.info("corp_codes_collected", count=len(corp_map)) + return corp_map + + def _get_corp_code(self, stock_code: str) -> str: + corp_code = self._corp_code_map.get(stock_code) + if not corp_code: + raise CollectorError( + f"Corp code not found for stock_code={stock_code}. Call collect_corp_codes() first." + ) + return corp_code + + async def collect_financials( + self, stock_code: str, fiscal_year: int, report_code: str = "11011", + ) -> list[Financial]: + corp_code = self._get_corp_code(stock_code) + url = f"{DART_BASE_URL}/fnlttSinglAcntAll.json" + params = { + "crtfc_key": self.api_key, + "corp_code": corp_code, + "bsns_year": str(fiscal_year), + "reprt_code": report_code, + "fs_div": "OFS", + } + + data = await self._request("GET", url, params=params) + if not isinstance(data, dict): + return [] + + status = data.get("status", "") + if status == "013": + return [] + if status != "000": + self.logger.warning("dart_api_error", status=status, message=data.get("message", "")) + return [] + + return self._parse_financial_response(data.get("list", []), stock_code, fiscal_year, report_code) + + def _parse_financial_response( + self, items: list[dict], stock_code: str, fiscal_year: int, report_code: str, + ) -> list[Financial]: + quarter_map = {"11013": 1, "11012": 2, "11014": 3, "11011": 4} + quarter = quarter_map.get(report_code, 4) + period_type = PeriodType.ANNUAL if report_code == "11011" else PeriodType.QUARTER + + account_ids = { + "ifrs-full_Revenue": "revenue", + "ifrs-full_ProfitLossFromOperatingActivities": "operating_profit", + "ifrs-full_ProfitLoss": "net_income", + "ifrs-full_Assets": "total_assets", + "ifrs-full_Liabilities": "total_liabilities", + "ifrs-full_Equity": "total_equity", + "ifrs-full_CashFlowsFromUsedInOperatingActivities": "operating_cashflow", + } + + account_map: dict[str, int | None] = {} + for item in items: + account_id = item.get("account_id", "") + for dart_key, our_key in account_ids.items(): + if account_id == dart_key: + amount_str = item.get("thstrm_amount", "") + try: + amount = int(amount_str.replace(",", "")) if amount_str else None + except ValueError: + amount = None + account_map[our_key] = amount + + if not account_map: + return [] + + return [Financial( + stock_code=stock_code, fiscal_year=fiscal_year, fiscal_quarter=quarter, + period_type=period_type, revenue=account_map.get("revenue"), + operating_profit=account_map.get("operating_profit"), + net_income=account_map.get("net_income"), + total_assets=account_map.get("total_assets"), + total_liabilities=account_map.get("total_liabilities"), + total_equity=account_map.get("total_equity"), + operating_cashflow=account_map.get("operating_cashflow"), + free_cashflow=None, source="DART", + )] + + async def collect_disclosures(self, stock_code: str | None = None, days: int = 7) -> list[Disclosure]: + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + url = f"{DART_BASE_URL}/list.json" + params: dict[str, str] = { + "crtfc_key": self.api_key, + "bgn_de": start_date.strftime("%Y%m%d"), + "end_de": end_date.strftime("%Y%m%d"), + "page_no": "1", + "page_count": "100", + } + + if stock_code: + params["corp_code"] = self._get_corp_code(stock_code) + + data = await self._request("GET", url, params=params) + if not isinstance(data, dict) or data.get("status") != "000": + return [] + + disclosures: list[Disclosure] = [] + for item in data.get("list", []): + try: + disclosed_at = ( + datetime.strptime(item.get("rcept_dt", ""), "%Y%m%d") + if item.get("rcept_dt") else None + ) + disclosures.append(Disclosure( + dart_id=item.get("rcept_no", ""), + stock_code=stock_code or item.get("stock_code", ""), + title=item.get("report_nm", ""), + category=item.get("pblntf_ty", ""), + content_raw_url=f"https://dart.fss.or.kr/dsaf001/main.do?rcpNo={item.get('rcept_no', '')}", + sentiment=Sentiment.NEUTRAL, + disclosed_at=disclosed_at, + processed_at=datetime.now(), + )) + except Exception as exc: + self.logger.warning("disclosure_parse_error", error=str(exc)) + + self.logger.info("disclosures_collected", count=len(disclosures)) + return disclosures diff --git a/src/stock_dart_collector/worker.py b/src/stock_dart_collector/worker.py new file mode 100644 index 0000000..1f5bbbe --- /dev/null +++ b/src/stock_dart_collector/worker.py @@ -0,0 +1,83 @@ +"""DART collector worker - runs as a standalone service. + +Runs both the Redis stream consumer loop AND the REST API server concurrently. +""" + +import asyncio + +import structlog +import uvicorn + +from stock_common.config import settings +from stock_common.logging_config import configure_logging +from stock_common.queue import consume, publish, ack + +from stock_dart_collector.collector import DARTCollector +from stock_dart_collector.api import app + +logger = structlog.get_logger(service="dart-collector") + +TRIGGER_STREAM = "queue:trigger-dart" +OUTPUT_FINANCIALS = "queue:raw-financials" +OUTPUT_DISCLOSURES = "queue:raw-disclosures" +GROUP = "dart-collectors" +CONSUMER = "dart-worker-1" + + +async def worker_loop() -> None: + """Redis stream consumer loop with retry on startup failure.""" + while True: + try: + async with DARTCollector() as collector: + await collector.collect_corp_codes() + + while True: + messages = await consume(TRIGGER_STREAM, GROUP, CONSUMER, count=1, block=5000) + + for msg in messages: + task_type = msg.data.get("type", "all") + stock_codes = msg.data.get("stock_codes", []) + + try: + if task_type in ("financials", "all"): + for code in stock_codes: + financials = await collector.collect_financials(code, msg.data.get("year", 2024)) + for f in financials: + await publish(OUTPUT_FINANCIALS, f.model_dump()) + + if task_type in ("disclosures", "all"): + disclosures = await collector.collect_disclosures(days=msg.data.get("days", 7)) + for d in disclosures: + await publish(OUTPUT_DISCLOSURES, d.model_dump()) + + await ack(TRIGGER_STREAM, GROUP, msg.message_id) + logger.info("task_completed", task_type=task_type) + except Exception as exc: + logger.error("task_failed", error=str(exc), task_type=task_type) + except Exception as exc: + logger.error("worker_loop_crashed", error=str(exc)) + await asyncio.sleep(10) + logger.info("worker_loop_restarting") + + +async def run() -> None: + configure_logging(level=settings.log_level, log_format=settings.log_format) + logger.info("dart_collector_starting") + + config = uvicorn.Config(app, host="0.0.0.0", port=8001, log_level="info") + server = uvicorn.Server(config) + + # API 서버는 worker 루프가 죽어도 계속 동작 + await asyncio.gather( + worker_loop(), + server.serve(), + return_exceptions=True, + ) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main()