commit d972b84fb4ebad3d298e83251b2b7dfdbf2691a0 Author: yakenator Date: Mon Feb 23 13:50:55 2026 +0900 feat: CLI tool for stock analysis pipeline management - Command-line interface for triggering collection and analysis - Pipeline orchestration commands - Integration with all microservices via Redis Streams Co-Authored-By: Claude Opus 4.6 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3ebf17 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +dist/ +build/ +.eggs/ +.venv/ +venv/ +.env +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +.mypy_cache/ +.claude/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5eefe6b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "stock-cli" +version = "0.1.0" +description = "CLI orchestration and reporting for stock analysis pipeline" +requires-python = ">=3.12" +dependencies = [ + "stock-common", + "typer>=0.12", + "rich>=13.0", +] + +[project.scripts] +stock = "stock_cli.main:app" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/stock_cli"] diff --git a/src/stock_cli/__init__.py b/src/stock_cli/__init__.py new file mode 100644 index 0000000..28e7342 --- /dev/null +++ b/src/stock_cli/__init__.py @@ -0,0 +1,3 @@ +"""CLI orchestration and reporting for stock analysis pipeline.""" + +__version__ = "0.1.0" diff --git a/src/stock_cli/main.py b/src/stock_cli/main.py new file mode 100644 index 0000000..5a60d75 --- /dev/null +++ b/src/stock_cli/main.py @@ -0,0 +1,104 @@ +"""Typer CLI application - orchestrates the pipeline via Redis triggers.""" + +import asyncio +import json + +import typer +from rich.console import Console +from rich.table import Table + +from stock_common.config import settings +from stock_common.logging_config import configure_logging +from stock_common.queue.redis_client import get_redis, close_redis + +app = typer.Typer(name="stock", help="Stock analysis pipeline CLI") +console = Console() + +collect_app = typer.Typer(help="Data collection commands") +app.add_typer(collect_app, name="collect") + + +async def _trigger(stream: str, data: dict) -> None: + """Send a trigger message to a service via Redis Stream.""" + r = await get_redis() + await r.xadd(stream, {"payload": json.dumps(data, default=str)}) + console.print(f"[green]Triggered[/green] {stream} with {data}") + await close_redis() + + +@collect_app.command("dart") +def collect_dart( + days: int = typer.Option(7, help="Days to look back for disclosures"), + stock_codes: str = typer.Option("", help="Comma-separated stock codes"), +): + """Trigger DART data collection.""" + codes = [c.strip() for c in stock_codes.split(",") if c.strip()] if stock_codes else [] + asyncio.run(_trigger("queue:trigger-dart", { + "type": "all", "stock_codes": codes, "days": days, + })) + + +@collect_app.command("kis") +def collect_kis( + stock_codes: str = typer.Option("", help="Comma-separated stock codes"), +): + """Trigger KIS price collection.""" + codes = [c.strip() for c in stock_codes.split(",") if c.strip()] if stock_codes else [] + asyncio.run(_trigger("queue:trigger-kis", { + "type": "all", "stock_codes": codes, + })) + + +@collect_app.command("news") +def collect_news( + stock_codes: str = typer.Option(..., help="Comma-separated stock codes"), + max_pages: int = typer.Option(3, help="Max pages per stock"), +): + """Trigger news crawling.""" + codes = [c.strip() for c in stock_codes.split(",")] + asyncio.run(_trigger("queue:trigger-news", { + "stock_codes": codes, "max_pages": max_pages, + })) + + +@app.command("screen") +def screen( + strategy: str = typer.Option("balanced", help="Screening strategy"), + top_n: int = typer.Option(50, help="Top N candidates"), + market: str = typer.Option("all", help="Market filter (all/KOSPI/KOSDAQ)"), +): + """Trigger quantitative screening pipeline.""" + asyncio.run(_trigger("queue:trigger-screen", { + "strategy": strategy, "top_n": top_n, "market": market, + })) + + +@app.command("status") +def status(): + """Show pipeline status - check Redis stream lengths.""" + + async def _status(): + r = await get_redis() + streams = [ + "queue:trigger-dart", "queue:trigger-kis", "queue:trigger-news", + "queue:raw-financials", "queue:raw-prices", "queue:raw-news", + "queue:raw-disclosures", "queue:trigger-screen", + "queue:screened", "queue:catalysts", "queue:results", + ] + + table = Table(title="Pipeline Status") + table.add_column("Stream", style="cyan") + table.add_column("Length", style="green") + + for stream in streams: + length = await r.xlen(stream) + table.add_row(stream, str(length)) + + console.print(table) + await close_redis() + + asyncio.run(_status()) + + +if __name__ == "__main__": + app()