feat: Drama Studio 프로젝트 초기 구조 설정

- FastAPI 백엔드 (audio-studio-api)
- Next.js 프론트엔드 (audio-studio-ui)
- Qwen3-TTS 엔진 (audio-studio-tts)
- MusicGen 서비스 (audio-studio-musicgen)
- Docker Compose 개발/운영 환경

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jungwoo choi
2026-01-26 11:39:38 +09:00
commit cc547372c0
70 changed files with 18399 additions and 0 deletions

View File

@ -0,0 +1,260 @@
# 오디오 믹서 서비스
# pydub를 사용한 오디오 합성/믹싱
import os
import tempfile
from typing import Optional
from pydub import AudioSegment
from pydub.effects import normalize
from app.models.drama import TimelineItem
class AudioMixer:
"""
오디오 믹서
기능:
- 여러 오디오 트랙 합성
- 볼륨 조절
- 페이드 인/아웃
- 타임라인 기반 믹싱
"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def load_audio(self, file_path: str) -> AudioSegment:
"""오디오 파일 로드"""
return AudioSegment.from_file(file_path)
def adjust_volume(self, audio: AudioSegment, volume: float) -> AudioSegment:
"""볼륨 조절 (0.0 ~ 2.0, 1.0 = 원본)"""
if volume == 1.0:
return audio
# dB 변환: 0.5 = -6dB, 2.0 = +6dB
db_change = 20 * (volume ** 0.5 - 1) if volume > 0 else -120
return audio + db_change
def apply_fade(
self,
audio: AudioSegment,
fade_in_ms: int = 0,
fade_out_ms: int = 0
) -> AudioSegment:
"""페이드 인/아웃 적용"""
if fade_in_ms > 0:
audio = audio.fade_in(fade_in_ms)
if fade_out_ms > 0:
audio = audio.fade_out(fade_out_ms)
return audio
def concatenate(self, segments: list[AudioSegment]) -> AudioSegment:
"""오디오 세그먼트 연결"""
if not segments:
return AudioSegment.silent(duration=0)
result = segments[0]
for segment in segments[1:]:
result += segment
return result
def overlay(
self,
base: AudioSegment,
overlay_audio: AudioSegment,
position_ms: int = 0
) -> AudioSegment:
"""오디오 오버레이 (배경음악 위에 보이스 등)"""
return base.overlay(overlay_audio, position=position_ms)
def create_silence(self, duration_ms: int) -> AudioSegment:
"""무음 생성"""
return AudioSegment.silent(duration=duration_ms)
def mix_timeline(
self,
timeline: list[TimelineItem],
audio_files: dict[str, str] # audio_path -> 실제 파일 경로
) -> AudioSegment:
"""
타임라인 기반 믹싱
Args:
timeline: 타임라인 아이템 리스트
audio_files: 오디오 경로 매핑
Returns:
믹싱된 오디오
"""
if not timeline:
return AudioSegment.silent(duration=1000)
# 전체 길이 계산
total_duration_ms = max(
int((item.start_time + item.duration) * 1000)
for item in timeline
)
# 트랙별 분리 (voice, music, sfx)
voice_track = AudioSegment.silent(duration=total_duration_ms)
music_track = AudioSegment.silent(duration=total_duration_ms)
sfx_track = AudioSegment.silent(duration=total_duration_ms)
for item in timeline:
if not item.audio_path or item.audio_path not in audio_files:
continue
file_path = audio_files[item.audio_path]
if not os.path.exists(file_path):
continue
# 오디오 로드 및 처리
audio = self.load_audio(file_path)
# 볼륨 조절
audio = self.adjust_volume(audio, item.volume)
# 페이드 적용
fade_in_ms = int(item.fade_in * 1000)
fade_out_ms = int(item.fade_out * 1000)
audio = self.apply_fade(audio, fade_in_ms, fade_out_ms)
# 위치 계산
position_ms = int(item.start_time * 1000)
# 트랙에 오버레이
if item.type == "voice":
voice_track = voice_track.overlay(audio, position=position_ms)
elif item.type == "music":
music_track = music_track.overlay(audio, position=position_ms)
elif item.type == "sfx":
sfx_track = sfx_track.overlay(audio, position=position_ms)
# 트랙 믹싱 (music -> sfx -> voice 순서로 레이어링)
mixed = music_track.overlay(sfx_track).overlay(voice_track)
return mixed
def auto_duck(
self,
music: AudioSegment,
voice: AudioSegment,
duck_amount_db: float = -10,
attack_ms: int = 100,
release_ms: int = 300
) -> AudioSegment:
"""
Auto-ducking: 보이스가 나올 때 음악 볼륨 자동 감소
간단한 구현 - 보이스가 있는 구간에서 음악 볼륨 낮춤
"""
# 보이스 길이에 맞춰 음악 조절
if len(music) < len(voice):
music = music + AudioSegment.silent(duration=len(voice) - len(music))
# 보이스의 무음/유음 구간 감지 (간단한 RMS 기반)
chunk_ms = 50
ducked_music = AudioSegment.silent(duration=0)
for i in range(0, len(voice), chunk_ms):
voice_chunk = voice[i:i + chunk_ms]
music_chunk = music[i:i + chunk_ms]
# 보이스 RMS가 임계값 이상이면 ducking
if voice_chunk.rms > 100: # 임계값 조정 가능
music_chunk = music_chunk + duck_amount_db
ducked_music += music_chunk
return ducked_music
def export(
self,
audio: AudioSegment,
output_path: str,
format: str = "wav",
normalize_audio: bool = True
) -> str:
"""
오디오 내보내기
Args:
audio: 오디오 세그먼트
output_path: 출력 파일 경로
format: 출력 포맷 (wav, mp3)
normalize_audio: 노멀라이즈 여부
Returns:
저장된 파일 경로
"""
if normalize_audio:
audio = normalize(audio)
# 포맷별 설정
export_params = {}
if format == "mp3":
export_params = {"format": "mp3", "bitrate": "192k"}
else:
export_params = {"format": "wav"}
audio.export(output_path, **export_params)
return output_path
def create_with_background(
self,
voice_segments: list[tuple[AudioSegment, float]], # (audio, start_time)
background_music: Optional[AudioSegment] = None,
music_volume: float = 0.3,
gap_between_lines_ms: int = 500
) -> AudioSegment:
"""
보이스 + 배경음악 간단 합성
Args:
voice_segments: (오디오, 시작시간) 튜플 리스트
background_music: 배경음악 (없으면 무음)
music_volume: 배경음악 볼륨
gap_between_lines_ms: 대사 간 간격
Returns:
합성된 오디오
"""
if not voice_segments:
return AudioSegment.silent(duration=1000)
# 전체 보이스 트랙 생성
voice_track = AudioSegment.silent(duration=0)
for audio, start_time in voice_segments:
# 시작 위치까지 무음 추가
current_pos = len(voice_track)
target_pos = int(start_time * 1000)
if target_pos > current_pos:
voice_track += AudioSegment.silent(duration=target_pos - current_pos)
voice_track += audio
voice_track += AudioSegment.silent(duration=gap_between_lines_ms)
total_duration = len(voice_track)
# 배경음악 처리
if background_music:
# 음악 길이 조정
if len(background_music) < total_duration:
# 루프
loops_needed = (total_duration // len(background_music)) + 1
background_music = background_music * loops_needed
background_music = background_music[:total_duration]
# 볼륨 조절
background_music = self.adjust_volume(background_music, music_volume)
# Auto-ducking 적용
background_music = self.auto_duck(background_music, voice_track)
# 믹싱
return background_music.overlay(voice_track)
else:
return voice_track
# 싱글톤 인스턴스
audio_mixer = AudioMixer()

View File

@ -0,0 +1,362 @@
# 드라마 오케스트레이터
# 스크립트 파싱 → 에셋 생성 → 타임라인 구성 → 믹싱 조율
import os
import uuid
import asyncio
import tempfile
from datetime import datetime
from typing import Optional
from pydub import AudioSegment
from app.models.drama import (
ParsedScript, ScriptElement, ElementType, Character,
TimelineItem, DramaProject, DramaCreateRequest
)
from app.services.script_parser import script_parser
from app.services.audio_mixer import audio_mixer
from app.services.tts_client import tts_client
from app.services.freesound_client import freesound_client
from app.database import db
class DramaOrchestrator:
"""
드라마 생성 오케스트레이터
워크플로우:
1. 스크립트 파싱
2. 캐릭터-보이스 매핑
3. 에셋 생성 (TTS, 음악, 효과음)
4. 타임라인 구성
5. 오디오 믹싱
6. 최종 파일 출력
"""
# 기본 대사 간격 (초)
DEFAULT_DIALOGUE_GAP = 0.5
# 효과음 기본 길이 (초)
DEFAULT_SFX_DURATION = 2.0
# 예상 TTS 속도 (글자/초)
TTS_CHARS_PER_SECOND = 5
async def create_project(
self,
request: DramaCreateRequest
) -> DramaProject:
"""새 드라마 프로젝트 생성"""
project_id = str(uuid.uuid4())
# 스크립트 파싱
parsed = script_parser.parse(request.script)
# 보이스 매핑 적용
voice_mapping = request.voice_mapping or {}
for char in parsed.characters:
if char.name in voice_mapping:
char.voice_id = voice_mapping[char.name]
project = DramaProject(
project_id=project_id,
title=request.title or parsed.title or "Untitled Drama",
script_raw=request.script,
script_parsed=parsed,
voice_mapping=voice_mapping,
status="draft"
)
# DB 저장
await db.dramas.insert_one(project.model_dump())
return project
async def get_project(self, project_id: str) -> Optional[DramaProject]:
"""프로젝트 조회"""
doc = await db.dramas.find_one({"project_id": project_id})
if doc:
return DramaProject(**doc)
return None
async def update_project_status(
self,
project_id: str,
status: str,
error_message: Optional[str] = None
):
"""프로젝트 상태 업데이트"""
update = {
"status": status,
"updated_at": datetime.utcnow()
}
if error_message:
update["error_message"] = error_message
await db.dramas.update_one(
{"project_id": project_id},
{"$set": update}
)
def estimate_duration(self, parsed: ParsedScript) -> float:
"""예상 재생 시간 계산 (초)"""
total = 0.0
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
# 대사 길이 추정
text_len = len(element.text or "")
total += text_len / self.TTS_CHARS_PER_SECOND
total += self.DEFAULT_DIALOGUE_GAP
elif element.type == ElementType.PAUSE:
total += element.duration or 1.0
elif element.type == ElementType.SFX:
total += self.DEFAULT_SFX_DURATION
return total
async def generate_assets(
self,
project: DramaProject,
temp_dir: str
) -> dict[str, str]:
"""
에셋 생성 (TTS, SFX)
Returns:
audio_id -> 파일 경로 매핑
"""
assets: dict[str, str] = {}
parsed = project.script_parsed
if not parsed:
return assets
dialogue_index = 0
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
# TTS 생성
audio_id = f"dialogue_{dialogue_index}"
# 보이스 ID 결정
voice_id = project.voice_mapping.get(element.character)
if not voice_id:
# 기본 보이스 사용 (첫 번째 프리셋)
voice_id = "default"
try:
# TTS 엔진 호출
audio_data = await tts_client.synthesize(
text=element.text or "",
voice_id=voice_id,
instruct=element.emotion
)
# 파일 저장
file_path = os.path.join(temp_dir, f"{audio_id}.wav")
with open(file_path, "wb") as f:
f.write(audio_data)
assets[audio_id] = file_path
except Exception as e:
print(f"TTS 생성 실패 ({element.character}): {e}")
# 무음으로 대체
silence_duration = len(element.text or "") / self.TTS_CHARS_PER_SECOND
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
file_path = os.path.join(temp_dir, f"{audio_id}.wav")
silence.export(file_path, format="wav")
assets[audio_id] = file_path
dialogue_index += 1
elif element.type == ElementType.SFX:
# Freesound에서 효과음 검색
audio_id = f"sfx_{element.description}"
try:
results = await freesound_client.search(
query=element.description,
page_size=1
)
if results and len(results) > 0:
sound = results[0]
# 프리뷰 다운로드
if sound.get("preview_url"):
audio_data = await freesound_client.download_preview(
sound["preview_url"]
)
file_path = os.path.join(temp_dir, f"sfx_{sound['id']}.mp3")
with open(file_path, "wb") as f:
f.write(audio_data)
assets[audio_id] = file_path
except Exception as e:
print(f"SFX 검색 실패 ({element.description}): {e}")
elif element.type == ElementType.MUSIC:
# MusicGen은 GPU 필요하므로 여기서는 placeholder
# 실제 구현 시 music_client 추가 필요
audio_id = f"music_{element.description}"
# TODO: MusicGen 연동
return assets
def build_timeline(
self,
parsed: ParsedScript,
assets: dict[str, str]
) -> list[TimelineItem]:
"""타임라인 구성"""
timeline: list[TimelineItem] = []
current_time = 0.0
dialogue_index = 0
current_music: Optional[dict] = None
for element in parsed.elements:
if element.type == ElementType.DIALOGUE:
audio_id = f"dialogue_{dialogue_index}"
if audio_id in assets:
# 오디오 길이 확인
try:
audio = AudioSegment.from_file(assets[audio_id])
duration = len(audio) / 1000.0
except:
duration = len(element.text or "") / self.TTS_CHARS_PER_SECOND
timeline.append(TimelineItem(
start_time=current_time,
duration=duration,
type="voice",
audio_path=audio_id,
volume=1.0
))
current_time += duration + self.DEFAULT_DIALOGUE_GAP
dialogue_index += 1
elif element.type == ElementType.PAUSE:
current_time += element.duration or 1.0
elif element.type == ElementType.SFX:
audio_id = f"sfx_{element.description}"
if audio_id in assets:
try:
audio = AudioSegment.from_file(assets[audio_id])
duration = len(audio) / 1000.0
except:
duration = self.DEFAULT_SFX_DURATION
timeline.append(TimelineItem(
start_time=current_time,
duration=duration,
type="sfx",
audio_path=audio_id,
volume=element.volume or 1.0
))
elif element.type == ElementType.MUSIC:
audio_id = f"music_{element.description}"
if element.action == "stop":
current_music = None
elif element.action in ("play", "change", "fade_in"):
if audio_id in assets:
# 음악은 현재 시점부터 끝까지 (나중에 조정)
current_music = {
"audio_id": audio_id,
"start_time": current_time,
"volume": element.volume or 0.3,
"fade_in": element.fade_duration if element.action == "fade_in" else 0
}
# 배경음악 아이템 추가 (전체 길이로)
if current_music:
timeline.append(TimelineItem(
start_time=current_music["start_time"],
duration=current_time - current_music["start_time"],
type="music",
audio_path=current_music["audio_id"],
volume=current_music["volume"],
fade_in=current_music.get("fade_in", 0)
))
return timeline
async def render(
self,
project_id: str,
output_format: str = "wav"
) -> Optional[str]:
"""
드라마 렌더링
Returns:
출력 파일 경로
"""
project = await self.get_project(project_id)
if not project or not project.script_parsed:
return None
await self.update_project_status(project_id, "processing")
try:
with tempfile.TemporaryDirectory() as temp_dir:
# 1. 에셋 생성
assets = await self.generate_assets(project, temp_dir)
# 2. 타임라인 구성
timeline = self.build_timeline(project.script_parsed, assets)
# 3. 믹싱
mixed_audio = audio_mixer.mix_timeline(timeline, assets)
# 4. 출력
output_path = os.path.join(temp_dir, f"drama_{project_id}.{output_format}")
audio_mixer.export(mixed_audio, output_path, format=output_format)
# 5. GridFS에 저장 (TODO: 실제 구현)
# file_id = await save_to_gridfs(output_path)
# 임시: 파일 복사
final_path = f"/tmp/drama_{project_id}.{output_format}"
import shutil
shutil.copy(output_path, final_path)
# 상태 업데이트
await db.dramas.update_one(
{"project_id": project_id},
{
"$set": {
"status": "completed",
"timeline": [t.model_dump() for t in timeline],
"output_file_id": final_path,
"updated_at": datetime.utcnow()
}
}
)
return final_path
except Exception as e:
await self.update_project_status(project_id, "error", str(e))
raise
async def list_projects(
self,
skip: int = 0,
limit: int = 20
) -> list[DramaProject]:
"""프로젝트 목록 조회"""
cursor = db.dramas.find().sort("created_at", -1).skip(skip).limit(limit)
projects = []
async for doc in cursor:
projects.append(DramaProject(**doc))
return projects
# 싱글톤 인스턴스
drama_orchestrator = DramaOrchestrator()

View File

@ -0,0 +1,165 @@
"""Freesound API 클라이언트
효과음 검색 및 다운로드
https://freesound.org/docs/api/
"""
import os
import logging
from typing import Optional, List, Dict
import httpx
logger = logging.getLogger(__name__)
class FreesoundClient:
"""Freesound API 클라이언트"""
BASE_URL = "https://freesound.org/apiv2"
def __init__(self):
self.api_key = os.getenv("FREESOUND_API_KEY", "")
self.timeout = httpx.Timeout(30.0, connect=10.0)
def _get_headers(self) -> dict:
"""인증 헤더 반환"""
return {"Authorization": f"Token {self.api_key}"}
async def search(
self,
query: str,
page: int = 1,
page_size: int = 20,
filter_fields: Optional[str] = None,
sort: str = "score",
min_duration: Optional[float] = None,
max_duration: Optional[float] = None,
) -> Dict:
"""효과음 검색
Args:
query: 검색어
page: 페이지 번호
page_size: 페이지당 결과 수
filter_fields: 필터 (예: "duration:[1 TO 5]")
sort: 정렬 (score, duration_asc, duration_desc, created_desc 등)
min_duration: 최소 길이 (초)
max_duration: 최대 길이 (초)
Returns:
검색 결과 딕셔너리
"""
if not self.api_key:
logger.warning("Freesound API 키가 설정되지 않음")
return {"count": 0, "results": []}
# 필터 구성
filters = []
if min_duration is not None or max_duration is not None:
min_d = min_duration if min_duration is not None else 0
max_d = max_duration if max_duration is not None else "*"
filters.append(f"duration:[{min_d} TO {max_d}]")
if filter_fields:
filters.append(filter_fields)
params = {
"query": query,
"page": page,
"page_size": min(page_size, 150), # Freesound 최대 150
"sort": sort,
"fields": "id,name,description,duration,tags,previews,license,username",
}
if filters:
params["filter"] = " ".join(filters)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.BASE_URL}/search/text/",
params=params,
headers=self._get_headers(),
)
response.raise_for_status()
data = response.json()
# 결과 정리
results = []
for sound in data.get("results", []):
results.append({
"freesound_id": sound["id"],
"name": sound.get("name", ""),
"description": sound.get("description", ""),
"duration": sound.get("duration", 0),
"tags": sound.get("tags", []),
"preview_url": sound.get("previews", {}).get("preview-hq-mp3", ""),
"license": sound.get("license", ""),
"username": sound.get("username", ""),
})
return {
"count": data.get("count", 0),
"page": page,
"page_size": page_size,
"results": results,
}
async def get_sound(self, sound_id: int) -> Dict:
"""사운드 상세 정보 조회"""
if not self.api_key:
raise ValueError("Freesound API 키 필요")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.BASE_URL}/sounds/{sound_id}/",
headers=self._get_headers(),
)
response.raise_for_status()
return response.json()
async def download_preview(self, preview_url: str) -> bytes:
"""프리뷰 오디오 다운로드 (인증 불필요)"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(preview_url)
response.raise_for_status()
return response.content
async def get_similar_sounds(
self,
sound_id: int,
page_size: int = 10,
) -> List[Dict]:
"""유사한 사운드 검색"""
if not self.api_key:
return []
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.BASE_URL}/sounds/{sound_id}/similar/",
params={
"page_size": page_size,
"fields": "id,name,description,duration,tags,previews,license",
},
headers=self._get_headers(),
)
response.raise_for_status()
data = response.json()
results = []
for sound in data.get("results", []):
results.append({
"freesound_id": sound["id"],
"name": sound.get("name", ""),
"description": sound.get("description", ""),
"duration": sound.get("duration", 0),
"tags": sound.get("tags", []),
"preview_url": sound.get("previews", {}).get("preview-hq-mp3", ""),
"license": sound.get("license", ""),
})
return results
# 싱글톤 인스턴스
freesound_client = FreesoundClient()

View File

@ -0,0 +1,174 @@
# 드라마 스크립트 파서
# 마크다운 형식의 대본을 구조화된 데이터로 변환
import re
from typing import Optional
from app.models.drama import (
ParsedScript, ScriptElement, Character, ElementType
)
class ScriptParser:
"""
드라마 스크립트 파서
지원 형식:
- # 제목
- [장소: 설명] 또는 [지문]
- [효과음: 설명]
- [음악: 설명] 또는 [음악 시작/중지/변경: 설명]
- [쉼: 2초]
- 캐릭터명(설명, 감정): 대사
- 캐릭터명: 대사
"""
# 정규식 패턴
TITLE_PATTERN = re.compile(r'^#\s+(.+)$')
DIRECTION_PATTERN = re.compile(r'^\[(?:장소|지문|장면):\s*(.+)\]$')
SFX_PATTERN = re.compile(r'^\[효과음:\s*(.+)\]$')
MUSIC_PATTERN = re.compile(r'^\[음악(?:\s+(시작|중지|변경|페이드인|페이드아웃))?:\s*(.+)\]$')
PAUSE_PATTERN = re.compile(r'^\[쉼:\s*(\d+(?:\.\d+)?)\s*초?\]$')
DIALOGUE_PATTERN = re.compile(r'^([^(\[:]+?)(?:\(([^)]*)\))?:\s*(.+)$')
# 음악 액션 매핑
MUSIC_ACTIONS = {
None: "play",
"시작": "play",
"중지": "stop",
"변경": "change",
"페이드인": "fade_in",
"페이드아웃": "fade_out",
}
def parse(self, script: str) -> ParsedScript:
"""스크립트 파싱"""
lines = script.strip().split('\n')
title: Optional[str] = None
characters: dict[str, Character] = {}
elements: list[ScriptElement] = []
for line in lines:
line = line.strip()
if not line:
continue
# 제목
if match := self.TITLE_PATTERN.match(line):
title = match.group(1)
continue
# 지문/장면
if match := self.DIRECTION_PATTERN.match(line):
elements.append(ScriptElement(
type=ElementType.DIRECTION,
text=match.group(1)
))
continue
# 효과음
if match := self.SFX_PATTERN.match(line):
elements.append(ScriptElement(
type=ElementType.SFX,
description=match.group(1),
volume=1.0
))
continue
# 음악
if match := self.MUSIC_PATTERN.match(line):
action_kr = match.group(1)
action = self.MUSIC_ACTIONS.get(action_kr, "play")
elements.append(ScriptElement(
type=ElementType.MUSIC,
description=match.group(2),
action=action,
volume=0.3,
fade_duration=2.0
))
continue
# 쉼
if match := self.PAUSE_PATTERN.match(line):
elements.append(ScriptElement(
type=ElementType.PAUSE,
duration=float(match.group(1))
))
continue
# 대사
if match := self.DIALOGUE_PATTERN.match(line):
char_name = match.group(1).strip()
char_info = match.group(2) # 괄호 안 내용 (설명, 감정)
dialogue_text = match.group(3).strip()
# 캐릭터 정보 파싱
emotion = None
description = None
if char_info:
parts = [p.strip() for p in char_info.split(',')]
if len(parts) >= 2:
description = parts[0]
emotion = parts[1]
else:
# 단일 값은 감정으로 처리
emotion = parts[0]
# 캐릭터 등록
if char_name not in characters:
characters[char_name] = Character(
name=char_name,
description=description
)
elif description and not characters[char_name].description:
characters[char_name].description = description
elements.append(ScriptElement(
type=ElementType.DIALOGUE,
character=char_name,
text=dialogue_text,
emotion=emotion
))
continue
# 매칭 안 되는 줄은 지문으로 처리 (대괄호 없는 일반 텍스트)
if not line.startswith('[') and not line.startswith('#'):
# 콜론이 없으면 지문으로 처리
if ':' not in line:
elements.append(ScriptElement(
type=ElementType.DIRECTION,
text=line
))
return ParsedScript(
title=title,
characters=list(characters.values()),
elements=elements
)
def validate_script(self, script: str) -> tuple[bool, list[str]]:
"""
스크립트 유효성 검사
Returns: (is_valid, error_messages)
"""
errors = []
if not script or not script.strip():
errors.append("스크립트가 비어있습니다")
return False, errors
parsed = self.parse(script)
if not parsed.elements:
errors.append("파싱된 요소가 없습니다")
# 대사가 있는지 확인
dialogue_count = sum(1 for e in parsed.elements if e.type == ElementType.DIALOGUE)
if dialogue_count == 0:
errors.append("대사가 없습니다")
return len(errors) == 0, errors
# 싱글톤 인스턴스
script_parser = ScriptParser()

View File

@ -0,0 +1,135 @@
"""TTS 엔진 클라이언트
audio-studio-tts 서비스와 통신
"""
import os
import logging
from typing import Optional, Tuple, List
import httpx
logger = logging.getLogger(__name__)
class TTSClient:
"""TTS 엔진 HTTP 클라이언트"""
def __init__(self):
self.base_url = os.getenv("TTS_ENGINE_URL", "http://localhost:8001")
self.timeout = httpx.Timeout(120.0, connect=10.0) # TTS는 시간이 걸릴 수 있음
async def health_check(self) -> dict:
"""TTS 엔진 헬스체크"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/health")
response.raise_for_status()
return response.json()
async def get_speakers(self) -> List[str]:
"""프리셋 스피커 목록 조회"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/speakers")
response.raise_for_status()
return response.json()["speakers"]
async def get_languages(self) -> dict:
"""지원 언어 목록 조회"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/languages")
response.raise_for_status()
return response.json()["languages"]
async def synthesize(
self,
text: str,
speaker: str = "Chelsie",
language: str = "ko",
instruct: Optional[str] = None,
) -> Tuple[bytes, int]:
"""프리셋 음성으로 TTS 합성
Returns:
(audio_bytes, sample_rate)
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
payload = {
"text": text,
"speaker": speaker,
"language": language,
}
if instruct:
payload["instruct"] = instruct
response = await client.post(
f"{self.base_url}/synthesize",
json=payload,
)
response.raise_for_status()
# 샘플레이트 추출
sample_rate = int(response.headers.get("X-Sample-Rate", "24000"))
return response.content, sample_rate
async def voice_clone(
self,
text: str,
ref_audio: bytes,
ref_text: str,
language: str = "ko",
) -> Tuple[bytes, int]:
"""Voice Clone으로 TTS 합성
Returns:
(audio_bytes, sample_rate)
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# multipart/form-data로 전송
files = {"ref_audio": ("reference.wav", ref_audio, "audio/wav")}
data = {
"text": text,
"ref_text": ref_text,
"language": language,
}
response = await client.post(
f"{self.base_url}/voice-clone",
files=files,
data=data,
)
response.raise_for_status()
sample_rate = int(response.headers.get("X-Sample-Rate", "24000"))
return response.content, sample_rate
async def voice_design(
self,
text: str,
instruct: str,
language: str = "ko",
) -> Tuple[bytes, int]:
"""Voice Design으로 TTS 합성
Returns:
(audio_bytes, sample_rate)
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
payload = {
"text": text,
"instruct": instruct,
"language": language,
}
response = await client.post(
f"{self.base_url}/voice-design",
json=payload,
)
response.raise_for_status()
sample_rate = int(response.headers.get("X-Sample-Rate", "24000"))
return response.content, sample_rate
# 싱글톤 인스턴스
tts_client = TTSClient()