From f5fab9b18605463aea2145b0b73eef34342ca4c3 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Thu, 8 Aug 2024 21:20:33 -0700 Subject: [PATCH] Auto-update: Thu Aug 8 21:20:33 PDT 2024 --- sijapi/routers/tts.py | 162 ++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 91 deletions(-) diff --git a/sijapi/routers/tts.py b/sijapi/routers/tts.py index f127715..e465446 100644 --- a/sijapi/routers/tts.py +++ b/sijapi/routers/tts.py @@ -123,6 +123,76 @@ async def generate_speech_endpoint( err(traceback.format_exc()) raise HTTPException(status_code=666, detail="error in TTS") + + +async def determine_voice_id(voice_name: str) -> str: + debug(f"Searching for voice id for {voice_name}") + debug(f"Tts.elevenlabs.voices: {Tts.elevenlabs.voices}") + + # Check if the voice is in the configured voices + if voice_name and Tts.has_key(f'elevenlabs.voices.{voice_name}'): + voice_id = Tts.get_value(f'elevenlabs.voices.{voice_name}') + debug(f"Found voice ID in config - {voice_id}") + return voice_id + + debug(f"Requested voice not among the voices specified in config/tts.yaml. Checking with ElevenLabs API using api_key: {Tts.elevenlabs.key}.") + url = "https://api.elevenlabs.io/v1/voices" + headers = {"xi-api-key": Tts.elevenlabs.key} + async with httpx.AsyncClient() as client: + try: + response = await client.get(url, headers=headers) + debug(f"Response status: {response.status_code}") + if response.status_code == 200: + voices_data = response.json().get("voices", []) + for voice in voices_data: + if voice_name == voice["voice_id"] or (voice_name and voice_name.lower() == voice["name"].lower()): + debug(f"Found voice ID from API - {voice['voice_id']}") + return voice["voice_id"] + else: + err(f"Failed to get voices from ElevenLabs API. Status code: {response.status_code}") + err(f"Response content: {response.text}") + except Exception as e: + err(f"Error determining voice ID: {e}") + + warn(f"Voice '{voice_name}' not found; using the default specified in config/tts.yaml: {Tts.elevenlabs.default}") + if Tts.has_key(f'elevenlabs.voices.{Tts.elevenlabs.default}'): + return Tts.get_value(f'elevenlabs.voices.{Tts.elevenlabs.default}') + else: + err(f"Default voice '{Tts.elevenlabs.default}' not found in configuration. Using first available voice.") + first_voice = next(iter(vars(Tts.elevenlabs.voices))) + return Tts.get_value(f'elevenlabs.voices.{first_voice}') + +async def elevenlabs_tts(model: str, input_text: str, voice: Optional[str], title: str = None, output_dir: str = None): + if getattr(API.EXTENSIONS, 'elevenlabs', False): + voice_id = await determine_voice_id(voice) + + url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" + payload = { + "text": input_text, + "model_id": model + } + headers = {"Content-Type": "application/json", "xi-api-key": Tts.elevenlabs.key} + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: + response = await client.post(url, json=payload, headers=headers) + output_dir = output_dir if output_dir else TTS_OUTPUT_DIR + title = title if title else dt_datetime.now().strftime("%Y%m%d%H%M%S") + filename = f"{sanitize_filename(title)}.mp3" + file_path = Path(output_dir) / filename + if response.status_code == 200: + with open(file_path, "wb") as audio_file: + audio_file.write(response.content) + return file_path + else: + raise HTTPException(status_code=response.status_code, detail="Error from ElevenLabs API") + + except Exception as e: + err(f"Error from Elevenlabs API: {e}") + raise HTTPException(status_code=500, detail=f"Error from ElevenLabs API: {e}") + + else: + warn(f"elevenlabs_tts called but ElevenLabs module is not enabled in config.") + raise HTTPException(status_code=400, detail="ElevenLabs TTS is not enabled") async def generate_speech( bg_tasks: BackgroundTasks, @@ -155,7 +225,7 @@ async def generate_speech( debug(f"Model: {model}") debug(f"Voice: {voice}") debug(f"Tts.elevenlabs: {Tts.elevenlabs}") - + if model == "eleven_turbo_v2" and getattr(API.EXTENSIONS, 'elevenlabs', False): info("Using ElevenLabs.") audio_file_path = await elevenlabs_tts(model, text, voice, title, output_dir) @@ -196,7 +266,6 @@ async def generate_speech( err(f"Failed to generate speech: {e}") err(f"Traceback: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Failed to generate speech: {e}") - async def get_model(voice: str = None, voice_file: UploadFile = None): @@ -211,94 +280,6 @@ async def get_model(voice: str = None, voice_file: UploadFile = None): raise HTTPException(status_code=400, detail="No model or voice specified, or no TTS module loaded") -async def determine_voice_id(voice_name: str) -> str: - debug(f"Searching for voice id for {voice_name}") - debug(f"Tts.elevenlabs: {Tts.elevenlabs}") - - # Check if the voice is specified in the configuration - if voice_name is None: - voice_name = Tts.elevenlabs.voice - - # Use the API key from the configuration - api_key = Tts.elevenlabs.api_key or Tts.SECRETS.ELEVENLABS - - if not api_key: - err("No ElevenLabs API key found in configuration") - raise ValueError("ElevenLabs API key is missing") - - # If we don't have a voice ID system in the config, we'll need to fetch it from the API - url = "https://api.elevenlabs.io/v1/voices" - headers = {"xi-api-key": api_key} - async with httpx.AsyncClient() as client: - try: - response = await client.get(url, headers=headers) - debug(f"Response status: {response.status_code}") - if response.status_code == 200: - voices_data = response.json().get("voices", []) - for voice in voices_data: - if voice_name.lower() == voice["name"].lower(): - debug(f"Found voice ID from API - {voice['voice_id']}") - return voice["voice_id"] - else: - err(f"Failed to get voices from ElevenLabs API. Status code: {response.status_code}") - err(f"Response content: {response.text}") - except Exception as e: - err(f"Error determining voice ID: {e}") - - warn(f"Voice '{voice_name}' not found. Using the first available voice.") - if voices_data: - return voices_data[0]["voice_id"] - else: - raise ValueError("No voices available from ElevenLabs API") - - - - -async def elevenlabs_tts(model: str, input_text: str, voice: Optional[str], title: str = None, output_dir: str = None): - # Debug logging - debug(f"API.EXTENSIONS: {API.EXTENSIONS}") - debug(f"API.EXTENSIONS.elevenlabs: {getattr(API.EXTENSIONS, 'elevenlabs', None)}") - debug(f"Tts config: {Tts}") - - - if API.EXTENSIONS.elevenlabs: - voice_id = await determine_voice_id(voice) - api_key = Tts.elevenlabs.api_key or Tts.SECRETS.ELEVENLABS - - if not api_key: - err("No ElevenLabs API key found in configuration") - raise ValueError("ElevenLabs API key is missing") - - url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" - payload = { - "text": input_text, - "model_id": model - } - headers = {"Content-Type": "application/json", "xi-api-key": api_key} - try: - async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: - response = await client.post(url, json=payload, headers=headers) - output_dir = output_dir if output_dir else TTS_OUTPUT_DIR - title = title if title else dt_datetime.now().strftime("%Y%m%d%H%M%S") - filename = f"{sanitize_filename(title)}.mp3" - file_path = Path(output_dir) / filename - if response.status_code == 200: - with open(file_path, "wb") as audio_file: - audio_file.write(response.content) - return file_path - else: - raise HTTPException(status_code=response.status_code, detail="Error from ElevenLabs API") - - except Exception as e: - err(f"Error from Elevenlabs API: {e}") - raise HTTPException(status_code=500, detail=f"Error from ElevenLabs API: {e}") - - else: - warn(f"elevenlabs_tts called but ElevenLabs module is not enabled in config.") - raise HTTPException(status_code=400, detail="ElevenLabs TTS is not enabled") - - - async def get_text_content(text: Optional[str], file: Optional[UploadFile]) -> str: if file: return (await file.read()).decode("utf-8").strip() @@ -308,7 +289,6 @@ async def get_text_content(text: Optional[str], file: Optional[UploadFile]) -> s raise HTTPException(status_code=400, detail="No text provided") - async def get_voice_file_path(voice: str = None, voice_file: UploadFile = None) -> str: if voice: debug(f"Looking for voice: {voice}")