Files
drama-studio/audio-studio-api/app/services/drama_orchestrator.py
jungwoo choi cc547372c0 feat: Drama Studio 프로젝트 초기 구조 설정
- FastAPI 백엔드 (audio-studio-api)
- Next.js 프론트엔드 (audio-studio-ui)
- Qwen3-TTS 엔진 (audio-studio-tts)
- MusicGen 서비스 (audio-studio-musicgen)
- Docker Compose 개발/운영 환경

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 11:39:38 +09:00

363 lines
12 KiB
Python

# 드라마 오케스트레이터
# 스크립트 파싱 → 에셋 생성 → 타임라인 구성 → 믹싱 조율
import os
import uuid
import asyncio
import tempfile
from datetime import datetime
from typing import Optional
from pydub import AudioSegment
from app.models.drama import (
ParsedScript, ScriptElement, ElementType, Character,
TimelineItem, DramaProject, DramaCreateRequest
)
from app.services.script_parser import script_parser
from app.services.audio_mixer import audio_mixer
from app.services.tts_client import tts_client
from app.services.freesound_client import freesound_client
from app.database import db
class DramaOrchestrator:
"""
드라마 생성 오케스트레이터
워크플로우:
1. 스크립트 파싱
2. 캐릭터-보이스 매핑
3. 에셋 생성 (TTS, 음악, 효과음)
4. 타임라인 구성
5. 오디오 믹싱
6. 최종 파일 출력
"""
# 기본 대사 간격 (초)
DEFAULT_DIALOGUE_GAP = 0.5
# 효과음 기본 길이 (초)
DEFAULT_SFX_DURATION = 2.0
# 예상 TTS 속도 (글자/초)
TTS_CHARS_PER_SECOND = 5
async def create_project(
self,
request: DramaCreateRequest
) -> DramaProject:
"""새 드라마 프로젝트 생성"""
project_id = str(uuid.uuid4())
# 스크립트 파싱
parsed = script_parser.parse(request.script)
# 보이스 매핑 적용
voice_mapping = request.voice_mapping or {}
for char in parsed.characters:
if char.name in voice_mapping:
char.voice_id = voice_mapping[char.name]
project = DramaProject(
project_id=project_id,
title=request.title or parsed.title or "Untitled Drama",
script_raw=request.script,
script_parsed=parsed,
voice_mapping=voice_mapping,
status="draft"
)
# DB 저장
await db.dramas.insert_one(project.model_dump())
return project
async def get_project(self, project_id: str) -> Optional[DramaProject]:
"""프로젝트 조회"""
doc = await db.dramas.find_one({"project_id": project_id})
if doc:
return DramaProject(**doc)
return None
async def update_project_status(
self,
project_id: str,
status: str,
error_message: Optional[str] = None
):
"""프로젝트 상태 업데이트"""
update = {
"status": status,
"updated_at": datetime.utcnow()
}
if error_message:
update["error_message"] = error_message
await db.dramas.update_one(
{"project_id": project_id},
{"$set": update}
)
def estimate_duration(self, parsed: ParsedScript) -> float:
"""예상 재생 시간 계산 (초)"""
total = 0.0
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
# 대사 길이 추정
text_len = len(element.text or "")
total += text_len / self.TTS_CHARS_PER_SECOND
total += self.DEFAULT_DIALOGUE_GAP
elif element.type == ElementType.PAUSE:
total += element.duration or 1.0
elif element.type == ElementType.SFX:
total += self.DEFAULT_SFX_DURATION
return total
async def generate_assets(
self,
project: DramaProject,
temp_dir: str
) -> dict[str, str]:
"""
에셋 생성 (TTS, SFX)
Returns:
audio_id -> 파일 경로 매핑
"""
assets: dict[str, str] = {}
parsed = project.script_parsed
if not parsed:
return assets
dialogue_index = 0
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
# TTS 생성
audio_id = f"dialogue_{dialogue_index}"
# 보이스 ID 결정
voice_id = project.voice_mapping.get(element.character)
if not voice_id:
# 기본 보이스 사용 (첫 번째 프리셋)
voice_id = "default"
try:
# TTS 엔진 호출
audio_data = await tts_client.synthesize(
text=element.text or "",
voice_id=voice_id,
instruct=element.emotion
)
# 파일 저장
file_path = os.path.join(temp_dir, f"{audio_id}.wav")
with open(file_path, "wb") as f:
f.write(audio_data)
assets[audio_id] = file_path
except Exception as e:
print(f"TTS 생성 실패 ({element.character}): {e}")
# 무음으로 대체
silence_duration = len(element.text or "") / self.TTS_CHARS_PER_SECOND
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
file_path = os.path.join(temp_dir, f"{audio_id}.wav")
silence.export(file_path, format="wav")
assets[audio_id] = file_path
dialogue_index += 1
elif element.type == ElementType.SFX:
# Freesound에서 효과음 검색
audio_id = f"sfx_{element.description}"
try:
results = await freesound_client.search(
query=element.description,
page_size=1
)
if results and len(results) > 0:
sound = results[0]
# 프리뷰 다운로드
if sound.get("preview_url"):
audio_data = await freesound_client.download_preview(
sound["preview_url"]
)
file_path = os.path.join(temp_dir, f"sfx_{sound['id']}.mp3")
with open(file_path, "wb") as f:
f.write(audio_data)
assets[audio_id] = file_path
except Exception as e:
print(f"SFX 검색 실패 ({element.description}): {e}")
elif element.type == ElementType.MUSIC:
# MusicGen은 GPU 필요하므로 여기서는 placeholder
# 실제 구현 시 music_client 추가 필요
audio_id = f"music_{element.description}"
# TODO: MusicGen 연동
return assets
def build_timeline(
self,
parsed: ParsedScript,
assets: dict[str, str]
) -> list[TimelineItem]:
"""타임라인 구성"""
timeline: list[TimelineItem] = []
current_time = 0.0
dialogue_index = 0
current_music: Optional[dict] = None
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
audio_id = f"dialogue_{dialogue_index}"
if audio_id in assets:
# 오디오 길이 확인
try:
audio = AudioSegment.from_file(assets[audio_id])
duration = len(audio) / 1000.0
except:
duration = len(element.text or "") / self.TTS_CHARS_PER_SECOND
timeline.append(TimelineItem(
start_time=current_time,
duration=duration,
type="voice",
audio_path=audio_id,
volume=1.0
))
current_time += duration + self.DEFAULT_DIALOGUE_GAP
dialogue_index += 1
elif element.type == ElementType.PAUSE:
current_time += element.duration or 1.0
elif element.type == ElementType.SFX:
audio_id = f"sfx_{element.description}"
if audio_id in assets:
try:
audio = AudioSegment.from_file(assets[audio_id])
duration = len(audio) / 1000.0
except:
duration = self.DEFAULT_SFX_DURATION
timeline.append(TimelineItem(
start_time=current_time,
duration=duration,
type="sfx",
audio_path=audio_id,
volume=element.volume or 1.0
))
elif element.type == ElementType.MUSIC:
audio_id = f"music_{element.description}"
if element.action == "stop":
current_music = None
elif element.action in ("play", "change", "fade_in"):
if audio_id in assets:
# 음악은 현재 시점부터 끝까지 (나중에 조정)
current_music = {
"audio_id": audio_id,
"start_time": current_time,
"volume": element.volume or 0.3,
"fade_in": element.fade_duration if element.action == "fade_in" else 0
}
# 배경음악 아이템 추가 (전체 길이로)
if current_music:
timeline.append(TimelineItem(
start_time=current_music["start_time"],
duration=current_time - current_music["start_time"],
type="music",
audio_path=current_music["audio_id"],
volume=current_music["volume"],
fade_in=current_music.get("fade_in", 0)
))
return timeline
async def render(
self,
project_id: str,
output_format: str = "wav"
) -> Optional[str]:
"""
드라마 렌더링
Returns:
출력 파일 경로
"""
project = await self.get_project(project_id)
if not project or not project.script_parsed:
return None
await self.update_project_status(project_id, "processing")
try:
with tempfile.TemporaryDirectory() as temp_dir:
# 1. 에셋 생성
assets = await self.generate_assets(project, temp_dir)
# 2. 타임라인 구성
timeline = self.build_timeline(project.script_parsed, assets)
# 3. 믹싱
mixed_audio = audio_mixer.mix_timeline(timeline, assets)
# 4. 출력
output_path = os.path.join(temp_dir, f"drama_{project_id}.{output_format}")
audio_mixer.export(mixed_audio, output_path, format=output_format)
# 5. GridFS에 저장 (TODO: 실제 구현)
# file_id = await save_to_gridfs(output_path)
# 임시: 파일 복사
final_path = f"/tmp/drama_{project_id}.{output_format}"
import shutil
shutil.copy(output_path, final_path)
# 상태 업데이트
await db.dramas.update_one(
{"project_id": project_id},
{
"$set": {
"status": "completed",
"timeline": [t.model_dump() for t in timeline],
"output_file_id": final_path,
"updated_at": datetime.utcnow()
}
}
)
return final_path
except Exception as e:
await self.update_project_status(project_id, "error", str(e))
raise
async def list_projects(
self,
skip: int = 0,
limit: int = 20
) -> list[DramaProject]:
"""프로젝트 목록 조회"""
cursor = db.dramas.find().sort("created_at", -1).skip(skip).limit(limit)
projects = []
async for doc in cursor:
projects.append(DramaProject(**doc))
return projects
# 싱글톤 인스턴스
drama_orchestrator = DramaOrchestrator()