- FastAPI 백엔드 (audio-studio-api) - Next.js 프론트엔드 (audio-studio-ui) - Qwen3-TTS 엔진 (audio-studio-tts) - MusicGen 서비스 (audio-studio-musicgen) - Docker Compose 개발/운영 환경 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
366 lines
9.0 KiB
Markdown
366 lines
9.0 KiB
Markdown
# Flume - K8s 기반 범용 파이프라인 시스템
|
|
|
|
## 개요
|
|
|
|
Flume은 K8s 네이티브 범용 파이프라인/워크플로우 시스템입니다.
|
|
|
|
- **비주얼 에디터** + **YAML/JSON 정의** 지원
|
|
- **하이브리드 실행**: 경량 작업은 중앙 엔진, 무거운 작업은 K8s Job
|
|
- **로컬 LLM 통합**: vLLM 기반 GPU 노드 지원
|
|
|
|
## 아키텍처
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
subgraph Frontend["Frontend (Next.js)"]
|
|
VE[Visual Node Editor<br/>ReactFlow]
|
|
PD[Pipeline Dashboard]
|
|
MC[Monitoring Console]
|
|
end
|
|
|
|
subgraph API["API Server (FastAPI)"]
|
|
CRUD[Pipeline CRUD]
|
|
TM[Trigger Manager]
|
|
NR[Node Registry]
|
|
end
|
|
|
|
subgraph Engine["Pipeline Engine"]
|
|
SCH[Scheduler<br/>APScheduler]
|
|
EXE[Executor<br/>Async]
|
|
SM[State Manager]
|
|
end
|
|
|
|
subgraph K8s["Kubernetes Cluster"]
|
|
OP[Flume Operator<br/>kopf]
|
|
subgraph Workers["Worker Pods"]
|
|
CPU[CPU Workers]
|
|
GPU[GPU Workers<br/>vLLM]
|
|
end
|
|
end
|
|
|
|
subgraph Storage["Storage"]
|
|
Redis[(Redis<br/>Queue/Cache)]
|
|
MongoDB[(MongoDB<br/>Definitions/History)]
|
|
end
|
|
|
|
Frontend --> API
|
|
API --> Engine
|
|
Engine --> Redis
|
|
Engine --> MongoDB
|
|
Engine --> OP
|
|
OP --> Workers
|
|
GPU -.->|OpenAI API| EXE
|
|
```
|
|
|
|
## 실행 모델 (하이브리드)
|
|
|
|
```mermaid
|
|
flowchart LR
|
|
subgraph Light["경량 작업 (중앙 실행)"]
|
|
L1[HTTP Request]
|
|
L2[JSON Transform]
|
|
L3[Condition/Switch]
|
|
L4[Variable Set]
|
|
end
|
|
|
|
subgraph Heavy["무거운 작업 (K8s Job)"]
|
|
H1[LLM Generate]
|
|
H2[Image Process]
|
|
H3[Data ETL]
|
|
H4[Custom Script]
|
|
end
|
|
|
|
ENG[Engine] --> Light
|
|
ENG --> |K8s Job 생성| Heavy
|
|
```
|
|
|
|
### 실행 기준
|
|
|
|
| 구분 | 실행 위치 | 예시 |
|
|
|------|----------|------|
|
|
| 경량 | 중앙 엔진 | HTTP 요청, 데이터 변환, 조건 분기 |
|
|
| GPU | GPU Pod | LLM 생성, 이미지 생성 |
|
|
| 무거운 CPU | K8s Job | 대용량 ETL, 장시간 스크립트 |
|
|
|
|
## 핵심 컴포넌트
|
|
|
|
### 1. Frontend (flume-ui)
|
|
|
|
```
|
|
flume-ui/
|
|
├── src/
|
|
│ ├── app/
|
|
│ │ ├── page.tsx # 대시보드
|
|
│ │ ├── pipelines/
|
|
│ │ │ ├── page.tsx # 파이프라인 목록
|
|
│ │ │ ├── [id]/
|
|
│ │ │ │ ├── page.tsx # 파이프라인 상세
|
|
│ │ │ │ └── edit/page.tsx # 비주얼 에디터
|
|
│ │ │ └── new/page.tsx # 새 파이프라인
|
|
│ │ └── runs/page.tsx # 실행 히스토리
|
|
│ ├── components/
|
|
│ │ ├── editor/ # ReactFlow 노드 에디터
|
|
│ │ ├── nodes/ # 커스텀 노드 컴포넌트
|
|
│ │ └── ui/ # shadcn/ui
|
|
│ └── lib/
|
|
│ └── api.ts # API 클라이언트
|
|
```
|
|
|
|
### 2. API Server (flume-api)
|
|
|
|
```
|
|
flume-api/
|
|
├── app/
|
|
│ ├── main.py
|
|
│ ├── database.py
|
|
│ ├── models/
|
|
│ │ ├── pipeline.py # 파이프라인 정의
|
|
│ │ ├── run.py # 실행 기록
|
|
│ │ └── node.py # 노드 정의
|
|
│ ├── routers/
|
|
│ │ ├── pipelines.py # CRUD
|
|
│ │ ├── runs.py # 실행 관리
|
|
│ │ ├── triggers.py # 트리거 관리
|
|
│ │ └── nodes.py # 노드 레지스트리
|
|
│ └── services/
|
|
│ └── engine_client.py # 엔진 통신
|
|
```
|
|
|
|
### 3. Pipeline Engine (flume-engine)
|
|
|
|
```
|
|
flume-engine/
|
|
├── engine/
|
|
│ ├── executor.py # DAG 실행기
|
|
│ ├── scheduler.py # 스케줄러 (cron, interval)
|
|
│ ├── state.py # 상태 관리
|
|
│ └── k8s_client.py # K8s Job 생성
|
|
├── nodes/
|
|
│ ├── base.py # 노드 베이스 클래스
|
|
│ ├── builtin/
|
|
│ │ ├── http.py # HTTP 요청
|
|
│ │ ├── transform.py # 데이터 변환
|
|
│ │ ├── condition.py # 조건 분기
|
|
│ │ ├── llm.py # LLM 호출
|
|
│ │ └── database.py # DB 작업
|
|
│ └── registry.py # 노드 레지스트리
|
|
└── worker.py # 메인 워커
|
|
```
|
|
|
|
### 4. K8s Operator (flume-operator)
|
|
|
|
```
|
|
flume-operator/
|
|
├── operator.py # kopf 기반 오퍼레이터
|
|
├── crds/
|
|
│ └── flumejob.yaml # CRD 정의
|
|
└── templates/
|
|
├── cpu_job.yaml
|
|
└── gpu_job.yaml
|
|
```
|
|
|
|
## 파이프라인 정의
|
|
|
|
### YAML 형식
|
|
|
|
```yaml
|
|
apiVersion: flume/v1
|
|
kind: Pipeline
|
|
metadata:
|
|
name: article-generator
|
|
description: 기사 생성 파이프라인
|
|
|
|
trigger:
|
|
type: webhook
|
|
# type: cron
|
|
# schedule: "0 9 * * *"
|
|
|
|
variables:
|
|
model: llama3.1-70b
|
|
|
|
nodes:
|
|
- id: fetch-topic
|
|
type: http-request
|
|
config:
|
|
method: GET
|
|
url: "https://api.example.com/topics"
|
|
|
|
- id: generate-article
|
|
type: llm-generate
|
|
runOn: gpu # GPU Pod에서 실행
|
|
config:
|
|
model: "{{variables.model}}"
|
|
maxTokens: 4096
|
|
inputs:
|
|
prompt: |
|
|
주제: {{fetch-topic.output.topic}}
|
|
위 주제로 뉴스 기사를 작성하세요.
|
|
|
|
- id: save-article
|
|
type: mongodb-insert
|
|
config:
|
|
database: flume
|
|
collection: articles
|
|
inputs:
|
|
document:
|
|
title: "{{fetch-topic.output.topic}}"
|
|
content: "{{generate-article.output}}"
|
|
createdAt: "{{$now}}"
|
|
|
|
edges:
|
|
- from: fetch-topic
|
|
to: generate-article
|
|
- from: generate-article
|
|
to: save-article
|
|
```
|
|
|
|
## 노드 SDK
|
|
|
|
### 커스텀 노드 작성
|
|
|
|
```python
|
|
from flume.nodes import Node, NodeInput, NodeOutput
|
|
|
|
class MyCustomNode(Node):
|
|
"""커스텀 노드 예시"""
|
|
|
|
name = "my-custom-node"
|
|
category = "custom"
|
|
run_on = "engine" # engine | cpu | gpu
|
|
|
|
class Input(NodeInput):
|
|
data: str
|
|
count: int = 1
|
|
|
|
class Output(NodeOutput):
|
|
result: list[str]
|
|
|
|
async def execute(self, input: Input) -> Output:
|
|
"""노드 실행 로직"""
|
|
results = [input.data] * input.count
|
|
return self.Output(result=results)
|
|
```
|
|
|
|
### 노드 등록
|
|
|
|
```python
|
|
from flume.nodes import registry
|
|
|
|
registry.register(MyCustomNode)
|
|
```
|
|
|
|
## 빌트인 노드
|
|
|
|
| 카테고리 | 노드 | 설명 | 실행 위치 |
|
|
|---------|------|------|----------|
|
|
| **Trigger** | webhook | 웹훅 수신 | - |
|
|
| | cron | 스케줄 실행 | - |
|
|
| | manual | 수동 실행 | - |
|
|
| **HTTP** | http-request | HTTP 요청 | engine |
|
|
| | http-response | 응답 반환 | engine |
|
|
| **Transform** | json-transform | JSON 변환 | engine |
|
|
| | template | 템플릿 렌더링 | engine |
|
|
| **Logic** | condition | 조건 분기 | engine |
|
|
| | switch | 다중 분기 | engine |
|
|
| | loop | 반복 실행 | engine |
|
|
| **AI** | llm-generate | LLM 텍스트 생성 | gpu |
|
|
| | llm-chat | LLM 대화 | gpu |
|
|
| | embedding | 임베딩 생성 | gpu |
|
|
| **Database** | mongodb-query | MongoDB 조회 | engine |
|
|
| | mongodb-insert | MongoDB 삽입 | engine |
|
|
| | redis-get/set | Redis 작업 | engine |
|
|
| **Utility** | delay | 지연 | engine |
|
|
| | log | 로깅 | engine |
|
|
| | error | 에러 발생 | engine |
|
|
|
|
## 데이터 모델 (MongoDB)
|
|
|
|
### pipelines 컬렉션
|
|
|
|
```javascript
|
|
{
|
|
_id: ObjectId,
|
|
name: "article-generator",
|
|
description: "기사 생성 파이프라인",
|
|
definition: { /* YAML을 파싱한 객체 */ },
|
|
trigger: { type: "webhook", config: {} },
|
|
status: "active", // active | paused | draft
|
|
createdAt: ISODate,
|
|
updatedAt: ISODate
|
|
}
|
|
```
|
|
|
|
### runs 컬렉션
|
|
|
|
```javascript
|
|
{
|
|
_id: ObjectId,
|
|
pipelineId: ObjectId,
|
|
status: "running", // pending | running | completed | failed
|
|
trigger: { type: "webhook", data: {} },
|
|
nodes: {
|
|
"fetch-topic": {
|
|
status: "completed",
|
|
output: { topic: "..." },
|
|
startedAt: ISODate,
|
|
completedAt: ISODate
|
|
},
|
|
"generate-article": {
|
|
status: "running",
|
|
startedAt: ISODate
|
|
}
|
|
},
|
|
startedAt: ISODate,
|
|
completedAt: ISODate,
|
|
error: null
|
|
}
|
|
```
|
|
|
|
## 환경 변수
|
|
|
|
```bash
|
|
# flume-api
|
|
MONGODB_URL=mongodb://admin:password@mongodb:27017/
|
|
DB_NAME=flume
|
|
REDIS_URL=redis://redis:6379
|
|
ENGINE_URL=http://flume-engine:8001
|
|
|
|
# flume-engine
|
|
MONGODB_URL=mongodb://admin:password@mongodb:27017/
|
|
DB_NAME=flume
|
|
REDIS_URL=redis://redis:6379
|
|
VLLM_URL=http://vllm:8000/v1
|
|
K8S_NAMESPACE=flume
|
|
|
|
# flume-ui
|
|
NEXT_PUBLIC_API_URL=http://localhost:8000
|
|
```
|
|
|
|
## 개발 로드맵
|
|
|
|
### Phase 1: 코어
|
|
- [ ] 파이프라인 정의 스키마
|
|
- [ ] API Server 기본 CRUD
|
|
- [ ] Engine 코어 (DAG 실행)
|
|
- [ ] 빌트인 노드 (HTTP, Transform, Logic)
|
|
|
|
### Phase 2: 실행
|
|
- [ ] 스케줄러 (cron, interval)
|
|
- [ ] K8s Job 실행
|
|
- [ ] 상태 관리 및 재시도
|
|
|
|
### Phase 3: AI
|
|
- [ ] LLM 노드 (vLLM 연동)
|
|
- [ ] 임베딩 노드
|
|
- [ ] GPU 스케줄링
|
|
|
|
### Phase 4: UI
|
|
- [ ] 비주얼 노드 에디터
|
|
- [ ] 실행 모니터링
|
|
- [ ] 로그 뷰어
|
|
|
|
### Phase 5: 확장
|
|
- [ ] 커스텀 노드 SDK
|
|
- [ ] 플러그인 시스템
|
|
- [ ] 멀티 테넌시
|