"""DART OpenAPI data collector for disclosures and financial statements.""" import io import zipfile from datetime import datetime, timedelta from typing import Any from xml.etree import ElementTree from stock_common.collector_base import BaseCollector, CollectorError from stock_common.config import settings from stock_common.models.disclosure import Disclosure, Sentiment from stock_common.models.financial import Financial, PeriodType DART_BASE_URL = "https://opendart.fss.or.kr/api" class DARTCollector(BaseCollector): """Collector for DART OpenAPI (disclosures and financial statements). Rate limit: 10,000 requests/day. """ def __init__(self) -> None: super().__init__(rate_limit=settings.dart_rate_limit, timeout=30) self.api_key = settings.dart_api_key self._corp_code_map: dict[str, str] = {} async def collect(self, **kwargs: Any) -> None: raise NotImplementedError("Use specific methods: collect_corp_codes, etc.") async def collect_corp_codes(self) -> dict[str, str]: url = f"{DART_BASE_URL}/corpCode.xml" params = {"crtfc_key": self.api_key} self.logger.info("collecting_corp_codes") data = await self._download_binary(url, params=params) with zipfile.ZipFile(io.BytesIO(data)) as zf: xml_filename = zf.namelist()[0] xml_data = zf.read(xml_filename) root = ElementTree.fromstring(xml_data) corp_map: dict[str, str] = {} for item in root.findall("list"): corp_code = item.findtext("corp_code", "") stock_code = item.findtext("stock_code", "") if stock_code and stock_code.strip(): corp_map[stock_code.strip()] = corp_code.strip() self._corp_code_map = corp_map self.logger.info("corp_codes_collected", count=len(corp_map)) return corp_map def _get_corp_code(self, stock_code: str) -> str: corp_code = self._corp_code_map.get(stock_code) if not corp_code: raise CollectorError( f"Corp code not found for stock_code={stock_code}. Call collect_corp_codes() first." ) return corp_code async def collect_financials( self, stock_code: str, fiscal_year: int, report_code: str = "11011", ) -> list[Financial]: corp_code = self._get_corp_code(stock_code) url = f"{DART_BASE_URL}/fnlttSinglAcntAll.json" params = { "crtfc_key": self.api_key, "corp_code": corp_code, "bsns_year": str(fiscal_year), "reprt_code": report_code, "fs_div": "OFS", } data = await self._request("GET", url, params=params) if not isinstance(data, dict): return [] status = data.get("status", "") if status == "013": return [] if status != "000": self.logger.warning("dart_api_error", status=status, message=data.get("message", "")) return [] return self._parse_financial_response(data.get("list", []), stock_code, fiscal_year, report_code) def _parse_financial_response( self, items: list[dict], stock_code: str, fiscal_year: int, report_code: str, ) -> list[Financial]: quarter_map = {"11013": 1, "11012": 2, "11014": 3, "11011": 4} quarter = quarter_map.get(report_code, 4) period_type = PeriodType.ANNUAL if report_code == "11011" else PeriodType.QUARTER account_ids = { "ifrs-full_Revenue": "revenue", "ifrs-full_ProfitLossFromOperatingActivities": "operating_profit", "ifrs-full_ProfitLoss": "net_income", "ifrs-full_Assets": "total_assets", "ifrs-full_Liabilities": "total_liabilities", "ifrs-full_Equity": "total_equity", "ifrs-full_CashFlowsFromUsedInOperatingActivities": "operating_cashflow", } account_map: dict[str, int | None] = {} for item in items: account_id = item.get("account_id", "") for dart_key, our_key in account_ids.items(): if account_id == dart_key: amount_str = item.get("thstrm_amount", "") try: amount = int(amount_str.replace(",", "")) if amount_str else None except ValueError: amount = None account_map[our_key] = amount if not account_map: return [] return [Financial( stock_code=stock_code, fiscal_year=fiscal_year, fiscal_quarter=quarter, period_type=period_type, revenue=account_map.get("revenue"), operating_profit=account_map.get("operating_profit"), net_income=account_map.get("net_income"), total_assets=account_map.get("total_assets"), total_liabilities=account_map.get("total_liabilities"), total_equity=account_map.get("total_equity"), operating_cashflow=account_map.get("operating_cashflow"), free_cashflow=None, source="DART", )] async def collect_disclosures(self, stock_code: str | None = None, days: int = 7) -> list[Disclosure]: end_date = datetime.now() start_date = end_date - timedelta(days=days) url = f"{DART_BASE_URL}/list.json" params: dict[str, str] = { "crtfc_key": self.api_key, "bgn_de": start_date.strftime("%Y%m%d"), "end_de": end_date.strftime("%Y%m%d"), "page_no": "1", "page_count": "100", } if stock_code: params["corp_code"] = self._get_corp_code(stock_code) data = await self._request("GET", url, params=params) if not isinstance(data, dict) or data.get("status") != "000": return [] disclosures: list[Disclosure] = [] for item in data.get("list", []): try: disclosed_at = ( datetime.strptime(item.get("rcept_dt", ""), "%Y%m%d") if item.get("rcept_dt") else None ) disclosures.append(Disclosure( dart_id=item.get("rcept_no", ""), stock_code=stock_code or item.get("stock_code", ""), title=item.get("report_nm", ""), category=item.get("pblntf_ty", ""), content_raw_url=f"https://dart.fss.or.kr/dsaf001/main.do?rcpNo={item.get('rcept_no', '')}", sentiment=Sentiment.NEUTRAL, disclosed_at=disclosed_at, processed_at=datetime.now(), )) except Exception as exc: self.logger.warning("disclosure_parse_error", error=str(exc)) self.logger.info("disclosures_collected", count=len(disclosures)) return disclosures