""" Qwen3-TTS Voice Clone API Server 별도 프로세스로 실행 (GPU 메모리 관리를 위해) """ import os import io import base64 import tempfile import torch import soundfile as sf import numpy as np from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware import json import pickle app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) model = None PROFILES_DIR = os.path.join(os.path.dirname(__file__), "voice-profiles") os.makedirs(PROFILES_DIR, exist_ok=True) def get_model(): global model if model is None: from qwen_tts import Qwen3TTSModel print("Loading Qwen3-TTS model...") model = Qwen3TTSModel.from_pretrained( "Qwen/Qwen3-TTS-12Hz-1.7B-Base", device_map="cuda:0", dtype=torch.bfloat16, ) print("Model loaded!") return model @app.get("/health") @app.get("/api/tts/health") def health(): return {"status": "ok", "model_loaded": model is not None} @app.post("/api/tts/clone") async def voice_clone( text: str = Form(...), language: str = Form("korean"), ref_audio: UploadFile = File(...), ref_text: str = Form(""), ): """참조 음성으로 보이스 클로닝""" m = get_model() # 참조 음성 저장 with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: content = await ref_audio.read() tmp.write(content) tmp_path = tmp.name try: # wav 변환 (필요 시) if not ref_audio.filename.endswith(".wav"): wav_path = tmp_path + "_converted.wav" os.system(f'ffmpeg -i "{tmp_path}" -ar 16000 -ac 1 -y "{wav_path}" 2>/dev/null') os.unlink(tmp_path) tmp_path = wav_path kwargs = { "text": text, "language": language, "ref_audio": tmp_path, } if ref_text and ref_text.strip(): kwargs["ref_text"] = ref_text else: kwargs["x_vector_only_mode"] = True wavs, sr = m.generate_voice_clone(**kwargs) print(f"Clone generated: wavs={len(wavs)}, samples={len(wavs[0]) if len(wavs) > 0 else 0}, sr={sr}") audio_data = np.array(wavs[0], dtype=np.float32) buf = io.BytesIO() sf.write(buf, audio_data, sr, format="WAV") buf.seek(0) return StreamingResponse(buf, media_type="audio/wav", headers={"Content-Disposition": "attachment; filename=tts_output.wav"}) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) @app.post("/api/tts/design") async def voice_design( text: str = Form(...), language: str = Form("korean"), instruct: str = Form("A calm, professional Korean male voice"), ): """음성 디자인으로 생성 (참조 음성 없이)""" m = get_model() wavs, sr = m.generate_voice_design(text=text, instruct=instruct, language=language) buf = io.BytesIO() sf.write(buf, wavs[0], sr, format="WAV") buf.seek(0) return StreamingResponse(buf, media_type="audio/wav", headers={"Content-Disposition": "attachment; filename=tts_output.wav"}) @app.post("/api/tts/profiles") async def create_profile( name: str = Form(...), ref_audio: UploadFile = File(...), ref_text: str = Form(""), ): """음성 프로필 등록: 참조 음성으로 보이스 프로필 생성 후 저장""" m = get_model() with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: content = await ref_audio.read() tmp.write(content) tmp_path = tmp.name try: if not ref_audio.filename.endswith(".wav"): wav_path = tmp_path + "_converted.wav" os.system(f'ffmpeg -i "{tmp_path}" -ar 16000 -ac 1 -y "{wav_path}" 2>/dev/null') os.unlink(tmp_path) tmp_path = wav_path # 프로필 생성 kwargs = {"ref_audio": tmp_path} if ref_text and ref_text.strip(): kwargs["ref_text"] = ref_text prompt = m.create_voice_clone_prompt(**kwargs) else: kwargs["x_vector_only_mode"] = True prompt = m.create_voice_clone_prompt(**kwargs) # 저장 profile_id = name.replace(" ", "_").lower() profile_path = os.path.join(PROFILES_DIR, f"{profile_id}.pkl") meta_path = os.path.join(PROFILES_DIR, f"{profile_id}.json") with open(profile_path, "wb") as f: pickle.dump(prompt, f) with open(meta_path, "w") as f: json.dump({"id": profile_id, "name": name, "ref_text": ref_text}, f, ensure_ascii=False) return {"id": profile_id, "name": name, "status": "created"} finally: if os.path.exists(tmp_path): os.unlink(tmp_path) @app.get("/api/tts/profiles") def list_profiles(): """저장된 음성 프로필 목록""" profiles = [] for f in os.listdir(PROFILES_DIR): if f.endswith(".json"): with open(os.path.join(PROFILES_DIR, f)) as fh: profiles.append(json.load(fh)) return profiles @app.delete("/api/tts/profiles/{profile_id}") def delete_profile(profile_id: str): """음성 프로필 삭제""" pkl = os.path.join(PROFILES_DIR, f"{profile_id}.pkl") meta = os.path.join(PROFILES_DIR, f"{profile_id}.json") if os.path.exists(pkl): os.unlink(pkl) if os.path.exists(meta): os.unlink(meta) return {"status": "deleted"} @app.post("/api/tts/generate") async def generate_from_profile( text: str = Form(...), profile_id: str = Form(...), language: str = Form("korean"), ): """저장된 음성 프로필로 TTS 생성""" m = get_model() profile_path = os.path.join(PROFILES_DIR, f"{profile_id}.pkl") if not os.path.exists(profile_path): return {"error": f"Profile '{profile_id}' not found"}, 404 with open(profile_path, "rb") as f: prompt = pickle.load(f) print(f"Generating with profile '{profile_id}', text='{text[:50]}...', language={language}") wavs, sr = m.generate_voice_clone( text=text, language=language, voice_clone_prompt=prompt, ) print(f"Generated: wavs={len(wavs)}, samples={len(wavs[0]) if len(wavs) > 0 else 0}, sr={sr}") if len(wavs) == 0 or len(wavs[0]) == 0: return {"error": "Empty audio generated"}, 500 audio_data = np.array(wavs[0], dtype=np.float32) buf = io.BytesIO() sf.write(buf, audio_data, sr, format="WAV") buf.seek(0) return StreamingResponse(buf, media_type="audio/wav", headers={"Content-Disposition": "attachment; filename=tts_output.wav"}) if __name__ == "__main__": import uvicorn get_model() uvicorn.run(app, host="0.0.0.0", port=8090)