Files
drama-studio/.claude/skills/flume-architecture.md
jungwoo choi cc547372c0 feat: Drama Studio 프로젝트 초기 구조 설정
- FastAPI 백엔드 (audio-studio-api)
- Next.js 프론트엔드 (audio-studio-ui)
- Qwen3-TTS 엔진 (audio-studio-tts)
- MusicGen 서비스 (audio-studio-musicgen)
- Docker Compose 개발/운영 환경

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 11:39:38 +09:00

9.0 KiB

Flume - K8s 기반 범용 파이프라인 시스템

개요

Flume은 K8s 네이티브 범용 파이프라인/워크플로우 시스템입니다.

  • 비주얼 에디터 + YAML/JSON 정의 지원
  • 하이브리드 실행: 경량 작업은 중앙 엔진, 무거운 작업은 K8s Job
  • 로컬 LLM 통합: vLLM 기반 GPU 노드 지원

아키텍처

flowchart TB
    subgraph Frontend["Frontend (Next.js)"]
        VE[Visual Node Editor<br/>ReactFlow]
        PD[Pipeline Dashboard]
        MC[Monitoring Console]
    end

    subgraph API["API Server (FastAPI)"]
        CRUD[Pipeline CRUD]
        TM[Trigger Manager]
        NR[Node Registry]
    end

    subgraph Engine["Pipeline Engine"]
        SCH[Scheduler<br/>APScheduler]
        EXE[Executor<br/>Async]
        SM[State Manager]
    end

    subgraph K8s["Kubernetes Cluster"]
        OP[Flume Operator<br/>kopf]
        subgraph Workers["Worker Pods"]
            CPU[CPU Workers]
            GPU[GPU Workers<br/>vLLM]
        end
    end

    subgraph Storage["Storage"]
        Redis[(Redis<br/>Queue/Cache)]
        MongoDB[(MongoDB<br/>Definitions/History)]
    end

    Frontend --> API
    API --> Engine
    Engine --> Redis
    Engine --> MongoDB
    Engine --> OP
    OP --> Workers
    GPU -.->|OpenAI API| EXE

실행 모델 (하이브리드)

flowchart LR
    subgraph Light["경량 작업 (중앙 실행)"]
        L1[HTTP Request]
        L2[JSON Transform]
        L3[Condition/Switch]
        L4[Variable Set]
    end

    subgraph Heavy["무거운 작업 (K8s Job)"]
        H1[LLM Generate]
        H2[Image Process]
        H3[Data ETL]
        H4[Custom Script]
    end

    ENG[Engine] --> Light
    ENG --> |K8s Job 생성| Heavy

실행 기준

구분 실행 위치 예시
경량 중앙 엔진 HTTP 요청, 데이터 변환, 조건 분기
GPU GPU Pod LLM 생성, 이미지 생성
무거운 CPU K8s Job 대용량 ETL, 장시간 스크립트

핵심 컴포넌트

1. Frontend (flume-ui)

flume-ui/
├── src/
│   ├── app/
│   │   ├── page.tsx              # 대시보드
│   │   ├── pipelines/
│   │   │   ├── page.tsx          # 파이프라인 목록
│   │   │   ├── [id]/
│   │   │   │   ├── page.tsx      # 파이프라인 상세
│   │   │   │   └── edit/page.tsx # 비주얼 에디터
│   │   │   └── new/page.tsx      # 새 파이프라인
│   │   └── runs/page.tsx         # 실행 히스토리
│   ├── components/
│   │   ├── editor/               # ReactFlow 노드 에디터
│   │   ├── nodes/                # 커스텀 노드 컴포넌트
│   │   └── ui/                   # shadcn/ui
│   └── lib/
│       └── api.ts                # API 클라이언트

2. API Server (flume-api)

flume-api/
├── app/
│   ├── main.py
│   ├── database.py
│   ├── models/
│   │   ├── pipeline.py           # 파이프라인 정의
│   │   ├── run.py                # 실행 기록
│   │   └── node.py               # 노드 정의
│   ├── routers/
│   │   ├── pipelines.py          # CRUD
│   │   ├── runs.py               # 실행 관리
│   │   ├── triggers.py           # 트리거 관리
│   │   └── nodes.py              # 노드 레지스트리
│   └── services/
│       └── engine_client.py      # 엔진 통신

3. Pipeline Engine (flume-engine)

flume-engine/
├── engine/
│   ├── executor.py               # DAG 실행기
│   ├── scheduler.py              # 스케줄러 (cron, interval)
│   ├── state.py                  # 상태 관리
│   └── k8s_client.py             # K8s Job 생성
├── nodes/
│   ├── base.py                   # 노드 베이스 클래스
│   ├── builtin/
│   │   ├── http.py               # HTTP 요청
│   │   ├── transform.py          # 데이터 변환
│   │   ├── condition.py          # 조건 분기
│   │   ├── llm.py                # LLM 호출
│   │   └── database.py           # DB 작업
│   └── registry.py               # 노드 레지스트리
└── worker.py                     # 메인 워커

4. K8s Operator (flume-operator)

flume-operator/
├── operator.py                   # kopf 기반 오퍼레이터
├── crds/
│   └── flumejob.yaml             # CRD 정의
└── templates/
    ├── cpu_job.yaml
    └── gpu_job.yaml

파이프라인 정의

YAML 형식

apiVersion: flume/v1
kind: Pipeline
metadata:
  name: article-generator
  description: 기사 생성 파이프라인

trigger:
  type: webhook
  # type: cron
  # schedule: "0 9 * * *"

variables:
  model: llama3.1-70b

nodes:
  - id: fetch-topic
    type: http-request
    config:
      method: GET
      url: "https://api.example.com/topics"

  - id: generate-article
    type: llm-generate
    runOn: gpu  # GPU Pod에서 실행
    config:
      model: "{{variables.model}}"
      maxTokens: 4096
    inputs:
      prompt: |
        주제: {{fetch-topic.output.topic}}
        위 주제로 뉴스 기사를 작성하세요.

  - id: save-article
    type: mongodb-insert
    config:
      database: flume
      collection: articles
    inputs:
      document:
        title: "{{fetch-topic.output.topic}}"
        content: "{{generate-article.output}}"
        createdAt: "{{$now}}"

edges:
  - from: fetch-topic
    to: generate-article
  - from: generate-article
    to: save-article

노드 SDK

커스텀 노드 작성

from flume.nodes import Node, NodeInput, NodeOutput

class MyCustomNode(Node):
    """커스텀 노드 예시"""

    name = "my-custom-node"
    category = "custom"
    run_on = "engine"  # engine | cpu | gpu

    class Input(NodeInput):
        data: str
        count: int = 1

    class Output(NodeOutput):
        result: list[str]

    async def execute(self, input: Input) -> Output:
        """노드 실행 로직"""
        results = [input.data] * input.count
        return self.Output(result=results)

노드 등록

from flume.nodes import registry

registry.register(MyCustomNode)

빌트인 노드

카테고리 노드 설명 실행 위치
Trigger webhook 웹훅 수신 -
cron 스케줄 실행 -
manual 수동 실행 -
HTTP http-request HTTP 요청 engine
http-response 응답 반환 engine
Transform json-transform JSON 변환 engine
template 템플릿 렌더링 engine
Logic condition 조건 분기 engine
switch 다중 분기 engine
loop 반복 실행 engine
AI llm-generate LLM 텍스트 생성 gpu
llm-chat LLM 대화 gpu
embedding 임베딩 생성 gpu
Database mongodb-query MongoDB 조회 engine
mongodb-insert MongoDB 삽입 engine
redis-get/set Redis 작업 engine
Utility delay 지연 engine
log 로깅 engine
error 에러 발생 engine

데이터 모델 (MongoDB)

pipelines 컬렉션

{
  _id: ObjectId,
  name: "article-generator",
  description: "기사 생성 파이프라인",
  definition: { /* YAML을 파싱한 객체 */ },
  trigger: { type: "webhook", config: {} },
  status: "active",  // active | paused | draft
  createdAt: ISODate,
  updatedAt: ISODate
}

runs 컬렉션

{
  _id: ObjectId,
  pipelineId: ObjectId,
  status: "running",  // pending | running | completed | failed
  trigger: { type: "webhook", data: {} },
  nodes: {
    "fetch-topic": {
      status: "completed",
      output: { topic: "..." },
      startedAt: ISODate,
      completedAt: ISODate
    },
    "generate-article": {
      status: "running",
      startedAt: ISODate
    }
  },
  startedAt: ISODate,
  completedAt: ISODate,
  error: null
}

환경 변수

# flume-api
MONGODB_URL=mongodb://admin:password@mongodb:27017/
DB_NAME=flume
REDIS_URL=redis://redis:6379
ENGINE_URL=http://flume-engine:8001

# flume-engine
MONGODB_URL=mongodb://admin:password@mongodb:27017/
DB_NAME=flume
REDIS_URL=redis://redis:6379
VLLM_URL=http://vllm:8000/v1
K8S_NAMESPACE=flume

# flume-ui
NEXT_PUBLIC_API_URL=http://localhost:8000

개발 로드맵

Phase 1: 코어

  • 파이프라인 정의 스키마
  • API Server 기본 CRUD
  • Engine 코어 (DAG 실행)
  • 빌트인 노드 (HTTP, Transform, Logic)

Phase 2: 실행

  • 스케줄러 (cron, interval)
  • K8s Job 실행
  • 상태 관리 및 재시도

Phase 3: AI

  • LLM 노드 (vLLM 연동)
  • 임베딩 노드
  • GPU 스케줄링

Phase 4: UI

  • 비주얼 노드 에디터
  • 실행 모니터링
  • 로그 뷰어

Phase 5: 확장

  • 커스텀 노드 SDK
  • 플러그인 시스템
  • 멀티 테넌시