feat: CLI tool for stock analysis pipeline management

- Command-line interface for triggering collection and analysis
- Pipeline orchestration commands
- Integration with all microservices via Redis Streams

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yakenator
2026-02-23 13:50:55 +09:00
commit d972b84fb4
4 changed files with 149 additions and 0 deletions

View File

@ -0,0 +1,3 @@
"""CLI orchestration and reporting for stock analysis pipeline."""
__version__ = "0.1.0"

104
src/stock_cli/main.py Normal file
View File

@ -0,0 +1,104 @@
"""Typer CLI application - orchestrates the pipeline via Redis triggers."""
import asyncio
import json
import typer
from rich.console import Console
from rich.table import Table
from stock_common.config import settings
from stock_common.logging_config import configure_logging
from stock_common.queue.redis_client import get_redis, close_redis
app = typer.Typer(name="stock", help="Stock analysis pipeline CLI")
console = Console()
collect_app = typer.Typer(help="Data collection commands")
app.add_typer(collect_app, name="collect")
async def _trigger(stream: str, data: dict) -> None:
"""Send a trigger message to a service via Redis Stream."""
r = await get_redis()
await r.xadd(stream, {"payload": json.dumps(data, default=str)})
console.print(f"[green]Triggered[/green] {stream} with {data}")
await close_redis()
@collect_app.command("dart")
def collect_dart(
days: int = typer.Option(7, help="Days to look back for disclosures"),
stock_codes: str = typer.Option("", help="Comma-separated stock codes"),
):
"""Trigger DART data collection."""
codes = [c.strip() for c in stock_codes.split(",") if c.strip()] if stock_codes else []
asyncio.run(_trigger("queue:trigger-dart", {
"type": "all", "stock_codes": codes, "days": days,
}))
@collect_app.command("kis")
def collect_kis(
stock_codes: str = typer.Option("", help="Comma-separated stock codes"),
):
"""Trigger KIS price collection."""
codes = [c.strip() for c in stock_codes.split(",") if c.strip()] if stock_codes else []
asyncio.run(_trigger("queue:trigger-kis", {
"type": "all", "stock_codes": codes,
}))
@collect_app.command("news")
def collect_news(
stock_codes: str = typer.Option(..., help="Comma-separated stock codes"),
max_pages: int = typer.Option(3, help="Max pages per stock"),
):
"""Trigger news crawling."""
codes = [c.strip() for c in stock_codes.split(",")]
asyncio.run(_trigger("queue:trigger-news", {
"stock_codes": codes, "max_pages": max_pages,
}))
@app.command("screen")
def screen(
strategy: str = typer.Option("balanced", help="Screening strategy"),
top_n: int = typer.Option(50, help="Top N candidates"),
market: str = typer.Option("all", help="Market filter (all/KOSPI/KOSDAQ)"),
):
"""Trigger quantitative screening pipeline."""
asyncio.run(_trigger("queue:trigger-screen", {
"strategy": strategy, "top_n": top_n, "market": market,
}))
@app.command("status")
def status():
"""Show pipeline status - check Redis stream lengths."""
async def _status():
r = await get_redis()
streams = [
"queue:trigger-dart", "queue:trigger-kis", "queue:trigger-news",
"queue:raw-financials", "queue:raw-prices", "queue:raw-news",
"queue:raw-disclosures", "queue:trigger-screen",
"queue:screened", "queue:catalysts", "queue:results",
]
table = Table(title="Pipeline Status")
table.add_column("Stream", style="cyan")
table.add_column("Length", style="green")
for stream in streams:
length = await r.xlen(stream)
table.add_row(stream, str(length))
console.print(table)
await close_redis()
asyncio.run(_status())
if __name__ == "__main__":
app()