← back to jooliperbush__voiceagent

Function bodies 220 total

All specs Real LLM only Function bodies
advance method · python · L251-L286 (36 LOC)
agent/flow_tracker.py
    def advance(self, tool_name=None, tool_result=None, assistant_text=None):
        """Advance the flow state based on what just happened."""
        if not self.active_flow or not self.current_step:
            return

        flow = next((f for f in self.flows if f["name"] == self.active_flow), None)
        if not flow:
            return

        nodes = flow["flow_json"].get("nodes", {})
        current = nodes.get(self.current_step)
        if not current:
            return

        ntype = current.get("type", "")

        # For tool_call nodes, advance on tool completion
        if ntype == "tool_call" and tool_name:
            next_nodes = self._get_next_nodes(nodes, self.current_step)
            if next_nodes:
                # First output = success, second = failure
                is_success = not (tool_result and isinstance(tool_result, dict) and tool_result.get("error"))
                idx = 0 if is_success else min(1, len(next_nodes) - 1)
                self.curre
get_context_hint method · python · L288-L312 (25 LOC)
agent/flow_tracker.py
    def get_context_hint(self):
        """Return a context hint string for the current flow position."""
        if not self.active_flow:
            return ""

        flow = next((f for f in self.flows if f["name"] == self.active_flow), None)
        if not flow:
            return ""

        nodes = flow["flow_json"].get("nodes", {})
        current = nodes.get(self.current_step)
        if not current:
            return f'\n[Flow Context: Matched "{self.active_flow}". Flow complete or position unknown.]'

        label = current.get("label", current.get("type", ""))
        step_desc = self._describe_step(current) or label

        # Find what comes next
        next_nodes = self._get_next_nodes(nodes, self.current_step)
        next_desc = ""
        if next_nodes and next_nodes[0] in nodes:
            next_label = nodes[next_nodes[0]].get("label", "")
            next_desc = f" Next: {next_label}." if next_label else ""

        return f'\n[Flow Context: Caller matched "{self
main function · python · L15-L93 (79 LOC)
agent/main.py
async def main():
    load_dotenv()

    print("\n  Voice Agent Starting...\n")

    # Database
    await init_db()
    print("  Database initialized")
    await seed_db()

    # Config
    config = AgentConfig()
    print(f"  Business: {config.get('business_name')}")

    # STT engine (optional)
    stt_engine = None
    try:
        from agent.stt_engine import create_stt_engine
        stt_engine = create_stt_engine()
    except Exception as e:
        print(f"  STT: Not available ({e})")

    # TTS engine (optional, depends on TTS packages being installed)
    tts_engine = None
    try:
        from agent.tts_engine import create_tts_engine

        # Build overrides from saved config (so UI preferences persist across restarts)
        tts_overrides = {}
        tts_fields = [
            "tts_engine", "edge_voice", "kokoro_voice", "kokoro_speed",
            "openai_voice", "openai_speed", "openai_model",
            "voicebox_url", "voicebox_profile_id", "voicebox_model_size", "v
DeepgramSTTEngine class · python · L4-L44 (41 LOC)
agent/stt_deepgram.py
class DeepgramSTTEngine:
    """STT engine using Deepgram's pre-recorded (batch) API."""

    def __init__(self):
        from deepgram import AsyncDeepgramClient

        api_key = os.environ.get("DEEPGRAM_API_KEY", "")
        if not api_key:
            raise ValueError("DEEPGRAM_API_KEY environment variable is required")

        self.client = AsyncDeepgramClient(api_key=api_key)
        self.model = os.environ.get("DEEPGRAM_MODEL", "nova-3")
        self.language = os.environ.get("DEEPGRAM_LANGUAGE", "en")
        self.available = True
        print(f"  STT: Deepgram ({self.model}, {self.language})")

    async def transcribe(self, audio_bytes: bytes) -> str:
        if not self.available:
            return ""

        response = await self.client.listen.v1.media.transcribe_file(
            request=audio_bytes,
            model=self.model,
            language=self.language,
            encoding="linear16",
            request_options={
                "additional_query_paramet
__init__ method · python · L7-L18 (12 LOC)
agent/stt_deepgram.py
    def __init__(self):
        from deepgram import AsyncDeepgramClient

        api_key = os.environ.get("DEEPGRAM_API_KEY", "")
        if not api_key:
            raise ValueError("DEEPGRAM_API_KEY environment variable is required")

        self.client = AsyncDeepgramClient(api_key=api_key)
        self.model = os.environ.get("DEEPGRAM_MODEL", "nova-3")
        self.language = os.environ.get("DEEPGRAM_LANGUAGE", "en")
        self.available = True
        print(f"  STT: Deepgram ({self.model}, {self.language})")
transcribe method · python · L20-L44 (25 LOC)
agent/stt_deepgram.py
    async def transcribe(self, audio_bytes: bytes) -> str:
        if not self.available:
            return ""

        response = await self.client.listen.v1.media.transcribe_file(
            request=audio_bytes,
            model=self.model,
            language=self.language,
            encoding="linear16",
            request_options={
                "additional_query_parameters": {
                    "sample_rate": 16000,
                    "channels": 1,
                },
            },
        )

        # Extract transcript from response
        try:
            transcript = (
                response.results.channels[0].alternatives[0].transcript
            )
            return transcript.strip()
        except (AttributeError, IndexError):
            return ""
create_stt_engine function · python · L6-L15 (10 LOC)
agent/stt_engine.py
def create_stt_engine():
    """Factory that returns the appropriate STT engine based on STT_ENGINE env var."""
    engine_name = os.environ.get("STT_ENGINE", "whisper").lower()

    if engine_name == "deepgram":
        from agent.stt_deepgram import DeepgramSTTEngine
        return DeepgramSTTEngine()

    # Default: local Whisper
    return STTEngine()
If a scraper extracted this row, it came from Repobility (https://repobility.com)
STTEngine class · python · L18-L67 (50 LOC)
agent/stt_engine.py
class STTEngine:
    def __init__(self):
        model_size = os.environ.get("WHISPER_MODEL", "base")
        language = os.environ.get("WHISPER_LANGUAGE", "en")
        self.language = language if language != "auto" else None

        try:
            from faster_whisper import WhisperModel
            # Auto-detect device
            device = "cpu"
            compute_type = "int8"
            try:
                import torch
                if torch.cuda.is_available():
                    device = "cuda"
                    compute_type = "float16"
            except ImportError:
                pass

            print(f"  STT: Loading Whisper {model_size} on {device}...")
            self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
            self.available = True
            print(f"  STT: Whisper {model_size} loaded")
        except ImportError:
            print("  STT: faster-whisper not installed, voice input disabled")
            self.model 
__init__ method · python · L19-L44 (26 LOC)
agent/stt_engine.py
    def __init__(self):
        model_size = os.environ.get("WHISPER_MODEL", "base")
        language = os.environ.get("WHISPER_LANGUAGE", "en")
        self.language = language if language != "auto" else None

        try:
            from faster_whisper import WhisperModel
            # Auto-detect device
            device = "cpu"
            compute_type = "int8"
            try:
                import torch
                if torch.cuda.is_available():
                    device = "cuda"
                    compute_type = "float16"
            except ImportError:
                pass

            print(f"  STT: Loading Whisper {model_size} on {device}...")
            self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
            self.available = True
            print(f"  STT: Whisper {model_size} loaded")
        except ImportError:
            print("  STT: faster-whisper not installed, voice input disabled")
            self.model = None
          
transcribe method · python · L46-L57 (12 LOC)
agent/stt_engine.py
    async def transcribe(self, audio_bytes: bytes) -> str:
        if not self.available:
            return ""

        # Convert PCM int16 bytes to float32 numpy
        audio_int16 = np.frombuffer(audio_bytes, dtype=np.int16)
        audio_float32 = audio_int16.astype(np.float32) / 32768.0

        # Run transcription in executor to not block event loop
        loop = asyncio.get_event_loop()
        text = await loop.run_in_executor(None, self._transcribe_sync, audio_float32)
        return text
_transcribe_sync method · python · L59-L67 (9 LOC)
agent/stt_engine.py
    def _transcribe_sync(self, audio: np.ndarray) -> str:
        segments, _ = self.model.transcribe(
            audio,
            language=self.language,
            beam_size=5,
            vad_filter=True,
        )
        text = " ".join(seg.text for seg in segments).strip()
        return text
SentenceChunker class · python · L11-L54 (44 LOC)
agent/text_chunker.py
class SentenceChunker:
    """Buffers streaming text and yields complete sentences for TTS.

    Splits on sentence-ending punctuation (.!?) followed by whitespace.
    Very long text without punctuation is force-split at a word boundary.
    """

    def __init__(self):
        self.buffer = ""

    def add(self, text_delta: str) -> list[str]:
        """Add text delta, return list of complete sentences (may be empty)."""
        self.buffer += text_delta
        chunks = []

        while True:
            match = _SENTENCE_END.search(self.buffer)
            if match:
                end = match.end()
                chunk = self.buffer[:end].strip()
                if chunk:
                    chunks.append(chunk)
                self.buffer = self.buffer[end:]
                continue

            # Force-split very long buffers at last word boundary
            if len(self.buffer) >= MAX_CHUNK_LEN:
                last_space = self.buffer.rfind(' ', 40)
                if last_s
add method · python · L21-L48 (28 LOC)
agent/text_chunker.py
    def add(self, text_delta: str) -> list[str]:
        """Add text delta, return list of complete sentences (may be empty)."""
        self.buffer += text_delta
        chunks = []

        while True:
            match = _SENTENCE_END.search(self.buffer)
            if match:
                end = match.end()
                chunk = self.buffer[:end].strip()
                if chunk:
                    chunks.append(chunk)
                self.buffer = self.buffer[end:]
                continue

            # Force-split very long buffers at last word boundary
            if len(self.buffer) >= MAX_CHUNK_LEN:
                last_space = self.buffer.rfind(' ', 40)
                if last_space > 0:
                    chunk = self.buffer[:last_space].strip()
                    if chunk:
                        chunks.append(chunk)
                    self.buffer = self.buffer[last_space:].lstrip()
                    continue

            break

        return chunks
flush method · python · L50-L54 (5 LOC)
agent/text_chunker.py
    def flush(self) -> str | None:
        """Flush remaining buffer as a final chunk."""
        remaining = self.buffer.strip()
        self.buffer = ""
        return remaining if remaining else None
check_availability function · python · L5-L31 (27 LOC)
agent/tools.py
async def check_availability(db, date, service_type=None):
    if service_type:
        rows = await db.fetch_all(
            """SELECT a.date, a.time, s.name as service_name
               FROM availability a
               LEFT JOIN services s ON a.service_id = s.id
               WHERE a.date = ? AND a.is_booked = FALSE
               AND (s.name LIKE ? OR a.service_id IS NULL)
               ORDER BY a.time""",
            (date, f"%{service_type}%"),
        )
    else:
        rows = await db.fetch_all(
            """SELECT a.date, a.time, s.name as service_name
               FROM availability a
               LEFT JOIN services s ON a.service_id = s.id
               WHERE a.date = ? AND a.is_booked = FALSE
               ORDER BY a.time""",
            (date,),
        )
    slots = [{"time": r["time"], "service": r["service_name"] or "Any"} for r in rows]
    if not slots:
        return {"available_slots": [], "message": f"No available slots on {date}."}
    return {
     
Powered by Repobility — scan your code at https://repobility.com
create_booking function · python · L34-L70 (37 LOC)
agent/tools.py
async def create_booking(db, customer_name, date, time, service_type, **kwargs):
    # Find or create customer
    customer_id = await _find_or_create_customer(
        db, customer_name, kwargs.get("customer_phone"), kwargs.get("customer_email")
    )

    # Find service
    svc_row = await db.fetch_one(
        "SELECT id FROM services WHERE name LIKE ?", (f"%{service_type}%",)
    )
    service_id = svc_row["id"] if svc_row else None

    # Check slot is available
    slot = await db.fetch_one(
        "SELECT id FROM availability WHERE date = ? AND time = ? AND is_booked = FALSE",
        (date, time),
    )
    if not slot:
        return {"status": "failed", "message": f"The {time} slot on {date} is no longer available."}

    # Mark slot as booked
    await db.execute("UPDATE availability SET is_booked = TRUE WHERE id = ?", (slot["id"],))

    # Create booking
    ref = await generate_booking_ref(db)
    await db.execute(
        """INSERT INTO bookings (booking_ref, customer_id
create_support_ticket function · python · L73-L90 (18 LOC)
agent/tools.py
async def create_support_ticket(db, customer_name, issue_summary, priority, category, **kwargs):
    customer_id = await _find_or_create_customer(
        db, customer_name, kwargs.get("customer_phone"), kwargs.get("customer_email")
    )

    ref = await generate_ticket_ref(db)
    await db.execute(
        """INSERT INTO tickets (ticket_ref, customer_id, issue_summary, issue_details, priority, category)
           VALUES (?, ?, ?, ?, ?, ?)""",
        (ref, customer_id, issue_summary, kwargs.get("issue_details"), priority, category),
    )
    await db.commit()

    return {
        "status": "created",
        "ticket_ref": ref,
        "summary": f"Support ticket {ref} created for {customer_name}: {issue_summary} (Priority: {priority}).",
    }
lookup_customer function · python · L93-L125 (33 LOC)
agent/tools.py
async def lookup_customer(db, name=None, phone=None):
    if name:
        rows = await db.fetch_all(
            "SELECT * FROM customers WHERE name LIKE ?", (f"%{name}%",)
        )
    elif phone:
        rows = await db.fetch_all(
            "SELECT * FROM customers WHERE phone LIKE ?", (f"%{phone}%",)
        )
    else:
        return {"found": False, "message": "Please provide a name or phone number to search."}

    if not rows:
        return {"found": False, "message": "No customer found matching that search."}

    customers = []
    for cust in rows:
        # Get recent bookings
        cust["recent_bookings"] = await db.fetch_all(
            """SELECT b.booking_ref, b.date, b.time, s.name as service_name, b.status
               FROM bookings b LEFT JOIN services s ON b.service_id = s.id
               WHERE b.customer_id = ? ORDER BY b.created_at DESC LIMIT 5""",
            (cust["id"],),
        )
        # Get recent tickets
        cust["recent_tickets"] = await db
transfer_to_human function · python · L128-L134 (7 LOC)
agent/tools.py
async def transfer_to_human(db, reason, department, context_summary):
    return {
        "status": "transferring",
        "department": department,
        "message": f"Transferring to {department} department. Reason: {reason}",
        "context_passed": True,
    }
_find_or_create_customer function · python · L137-L161 (25 LOC)
agent/tools.py
async def _find_or_create_customer(db, name, phone=None, email=None):
    row = await db.fetch_one("SELECT id FROM customers WHERE name LIKE ?", (f"%{name}%",))
    if row:
        # Update contact info if provided
        if phone or email:
            updates = []
            params = []
            if phone:
                updates.append("phone = ?")
                params.append(phone)
            if email:
                updates.append("email = ?")
                params.append(email)
            params.append(row["id"])
            await db.execute(
                f"UPDATE customers SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
                params,
            )
        return row["id"]
    else:
        new_id = await db.insert_returning_id(
            "INSERT INTO customers (name, phone, email) VALUES (?, ?, ?)",
            (name, phone, email),
        )
        return new_id
execute_tool function · python · L173-L180 (8 LOC)
agent/tools.py
async def execute_tool(db, tool_name, tool_input):
    func = TOOL_MAP.get(tool_name)
    if not func:
        return {"error": f"Unknown tool: {tool_name}"}
    try:
        return await func(db, **tool_input)
    except Exception as e:
        return {"error": str(e)}
DeepgramTTSEngine class · python · L9-L75 (67 LOC)
agent/tts_deepgram.py
class DeepgramTTSEngine(TTSEngine):
    """TTS engine using Deepgram's Aura text-to-speech API.

    Keeps a persistent HTTP session to avoid connection overhead per sentence.
    Returns PCM 16-bit 16kHz mono directly — no resampling needed.
    """

    API_URL = "https://api.deepgram.com/v1/speak"

    def __init__(self):
        self.api_key = os.environ.get("DEEPGRAM_API_KEY", "")
        if not self.api_key:
            raise ValueError("DEEPGRAM_API_KEY is required for Deepgram TTS")

        self.model = os.environ.get("DEEPGRAM_TTS_MODEL", "aura-2-thalia-en")
        self._session = None
        self._headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json",
        }
        self._params = {
            "model": self.model,
            "encoding": "linear16",
            "sample_rate": "16000",
            "container": "none",
        }
        print(f"  TTS: Using Deepgram Aura (model: {self.model})")

    @property
   
__init__ method · python · L18-L35 (18 LOC)
agent/tts_deepgram.py
    def __init__(self):
        self.api_key = os.environ.get("DEEPGRAM_API_KEY", "")
        if not self.api_key:
            raise ValueError("DEEPGRAM_API_KEY is required for Deepgram TTS")

        self.model = os.environ.get("DEEPGRAM_TTS_MODEL", "aura-2-thalia-en")
        self._session = None
        self._headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json",
        }
        self._params = {
            "model": self.model,
            "encoding": "linear16",
            "sample_rate": "16000",
            "container": "none",
        }
        print(f"  TTS: Using Deepgram Aura (model: {self.model})")
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
_get_session method · python · L41-L44 (4 LOC)
agent/tts_deepgram.py
    async def _get_session(self):
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(headers=self._headers)
        return self._session
generate method · python · L46-L75 (30 LOC)
agent/tts_deepgram.py
    async def generate(self, text: str) -> bytes:
        session = await self._get_session()

        try:
            async with session.post(
                self.API_URL, params=self._params, json={"text": text}
            ) as resp:
                if resp.status != 200:
                    body = await resp.text()
                    print(f"  TTS: Deepgram error {resp.status}: {body}")
                    return b""

                pcm_data = await resp.read()
        except aiohttp.ClientError as e:
            print(f"  TTS: Deepgram connection error: {e}")
            # Reset session on error so next call creates a fresh one
            self._session = None
            return b""

        if not pcm_data:
            return b""

        # Short fade-out to prevent clicks between sentence chunks
        samples = np.frombuffer(pcm_data, dtype=np.int16).copy()
        fade_len = min(80, len(samples))  # ~5ms at 16kHz
        if fade_len > 0:
            fade = np.linspace(1.0
TTSEngine class · python · L10-L19 (10 LOC)
agent/tts_engine.py
class TTSEngine(ABC):
    @abstractmethod
    async def generate(self, text: str) -> bytes:
        """Generate PCM 16-bit 16kHz mono audio bytes from text."""
        ...

    @property
    @abstractmethod
    def name(self) -> str:
        ...
generate method · python · L12-L14 (3 LOC)
agent/tts_engine.py
    async def generate(self, text: str) -> bytes:
        """Generate PCM 16-bit 16kHz mono audio bytes from text."""
        ...
ChatterboxTTSEngine class · python · L22-L61 (40 LOC)
agent/tts_engine.py
class ChatterboxTTSEngine(TTSEngine):
    def __init__(self):
        from chatterbox.tts import ChatterboxTTS
        import torch

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"  TTS: Loading Chatterbox on {self.device}...")
        self.model = ChatterboxTTS.from_pretrained(device=self.device)

        # Load voice reference if available
        self.audio_prompt = None
        voice_file = os.environ.get("TTS_VOICE_FILE", "")
        if voice_file and os.path.exists(voice_file):
            print(f"  TTS: Using voice reference: {voice_file}")
            self.audio_prompt = voice_file
        print("  TTS: Chatterbox loaded")

    @property
    def name(self):
        return "chatterbox"

    async def generate(self, text: str) -> bytes:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_sync, text)

    def _generate_sync(self, text: str) -> bytes:
        import torch
        import scipy.s
__init__ method · python · L23-L37 (15 LOC)
agent/tts_engine.py
    def __init__(self):
        from chatterbox.tts import ChatterboxTTS
        import torch

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"  TTS: Loading Chatterbox on {self.device}...")
        self.model = ChatterboxTTS.from_pretrained(device=self.device)

        # Load voice reference if available
        self.audio_prompt = None
        voice_file = os.environ.get("TTS_VOICE_FILE", "")
        if voice_file and os.path.exists(voice_file):
            print(f"  TTS: Using voice reference: {voice_file}")
            self.audio_prompt = voice_file
        print("  TTS: Chatterbox loaded")
generate method · python · L43-L45 (3 LOC)
agent/tts_engine.py
    async def generate(self, text: str) -> bytes:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_sync, text)
_generate_sync method · python · L47-L61 (15 LOC)
agent/tts_engine.py
    def _generate_sync(self, text: str) -> bytes:
        import torch
        import scipy.signal

        wav = self.model.generate(text, audio_prompt_path=self.audio_prompt)
        # Chatterbox outputs at 24kHz, resample to 16kHz
        if isinstance(wav, torch.Tensor):
            wav = wav.cpu().numpy().squeeze()
        samples_16k = scipy.signal.resample(
            wav, int(len(wav) * 16000 / 24000)
        )
        # Convert to int16 PCM
        samples_16k = np.clip(samples_16k, -1.0, 1.0)
        int16_data = (samples_16k * 32767).astype(np.int16)
        return int16_data.tobytes()
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
KokoroTTSEngine class · python · L64-L101 (38 LOC)
agent/tts_engine.py
class KokoroTTSEngine(TTSEngine):
    def __init__(self):
        from kokoro import KPipeline

        self.voice = os.environ.get("KOKORO_VOICE", "af_heart")
        self.speed = float(os.environ.get("KOKORO_SPEED", "1.1"))
        lang = "b" if self.voice.startswith("b") else "a"
        print(f"  TTS: Loading Kokoro (voice: {self.voice}, speed: {self.speed})...")
        self.pipeline = KPipeline(lang_code=lang)
        print("  TTS: Kokoro loaded")

    @property
    def name(self):
        return "kokoro"

    async def generate(self, text: str) -> bytes:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_sync, text)

    def _generate_sync(self, text: str) -> bytes:
        import scipy.signal

        chunks = []
        for _, _, audio in self.pipeline(text, voice=self.voice, speed=self.speed):
            if audio is not None:
                chunks.append(audio)

        if not chunks:
            return b""

        wav = 
__init__ method · python · L65-L73 (9 LOC)
agent/tts_engine.py
    def __init__(self):
        from kokoro import KPipeline

        self.voice = os.environ.get("KOKORO_VOICE", "af_heart")
        self.speed = float(os.environ.get("KOKORO_SPEED", "1.1"))
        lang = "b" if self.voice.startswith("b") else "a"
        print(f"  TTS: Loading Kokoro (voice: {self.voice}, speed: {self.speed})...")
        self.pipeline = KPipeline(lang_code=lang)
        print("  TTS: Kokoro loaded")
generate method · python · L79-L81 (3 LOC)
agent/tts_engine.py
    async def generate(self, text: str) -> bytes:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_sync, text)
_generate_sync method · python · L83-L101 (19 LOC)
agent/tts_engine.py
    def _generate_sync(self, text: str) -> bytes:
        import scipy.signal

        chunks = []
        for _, _, audio in self.pipeline(text, voice=self.voice, speed=self.speed):
            if audio is not None:
                chunks.append(audio)

        if not chunks:
            return b""

        wav = np.concatenate(chunks)
        # Kokoro outputs at 24kHz, resample to 16kHz
        samples_16k = scipy.signal.resample(
            wav, int(len(wav) * 16000 / 24000)
        )
        samples_16k = np.clip(samples_16k, -1.0, 1.0)
        int16_data = (samples_16k * 32767).astype(np.int16)
        return int16_data.tobytes()
EdgeTTSEngine class · python · L104-L144 (41 LOC)
agent/tts_engine.py
class EdgeTTSEngine(TTSEngine):
    def __init__(self):
        import edge_tts  # noqa: F401 — verify import
        self.voice = os.environ.get("EDGE_TTS_VOICE", "en-US-AndrewMultilingualNeural")
        print(f"  TTS: Using Edge TTS (voice: {self.voice})")

    @property
    def name(self):
        return "edge"

    async def generate(self, text: str) -> bytes:
        import edge_tts
        import soundfile as sf

        communicate = edge_tts.Communicate(text, self.voice)
        mp3_data = b""
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                mp3_data += chunk["data"]

        if not mp3_data:
            return b""

        # Decode MP3 to float32
        samples, orig_rate = sf.read(io.BytesIO(mp3_data), dtype="float32")
        if len(samples) == 0:
            return b""

        if samples.ndim == 2:
            samples = samples.mean(axis=1)

        # Resample to 16kHz
        if orig_rate != 16000:
            imp
__init__ method · python · L105-L108 (4 LOC)
agent/tts_engine.py
    def __init__(self):
        import edge_tts  # noqa: F401 — verify import
        self.voice = os.environ.get("EDGE_TTS_VOICE", "en-US-AndrewMultilingualNeural")
        print(f"  TTS: Using Edge TTS (voice: {self.voice})")
generate method · python · L114-L144 (31 LOC)
agent/tts_engine.py
    async def generate(self, text: str) -> bytes:
        import edge_tts
        import soundfile as sf

        communicate = edge_tts.Communicate(text, self.voice)
        mp3_data = b""
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                mp3_data += chunk["data"]

        if not mp3_data:
            return b""

        # Decode MP3 to float32
        samples, orig_rate = sf.read(io.BytesIO(mp3_data), dtype="float32")
        if len(samples) == 0:
            return b""

        if samples.ndim == 2:
            samples = samples.mean(axis=1)

        # Resample to 16kHz
        if orig_rate != 16000:
            import scipy.signal
            samples = scipy.signal.resample(
                samples, int(len(samples) * 16000 / orig_rate)
            )

        samples = np.clip(samples, -1.0, 1.0)
        int16_data = (samples * 32767).astype(np.int16)
        return int16_data.tobytes()
SystemTTSEngine class · python · L147-L244 (98 LOC)
agent/tts_engine.py
class SystemTTSEngine(TTSEngine):
    def __init__(self):
        import platform
        import shutil

        self.use_say = platform.system() == "Darwin" and shutil.which("say")
        if self.use_say:
            print("  TTS: Using macOS 'say' command")
        else:
            import pyttsx3
            self.engine = pyttsx3.init()
            self.engine.setProperty("rate", 175)
            print("  TTS: Using system TTS (pyttsx3)")

    @property
    def name(self):
        return "system"

    async def generate(self, text: str) -> bytes:
        if self.use_say:
            return await self._generate_say(text)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_pyttsx3, text)

    async def _generate_say(self, text: str) -> bytes:
        """Use macOS 'say' command — runs as async subprocess, doesn't block."""
        import tempfile
        import soundfile as sf

        tmp_path = tempfile.mktemp(suffix=".aiff")
      
If a scraper extracted this row, it came from Repobility (https://repobility.com)
__init__ method · python · L148-L159 (12 LOC)
agent/tts_engine.py
    def __init__(self):
        import platform
        import shutil

        self.use_say = platform.system() == "Darwin" and shutil.which("say")
        if self.use_say:
            print("  TTS: Using macOS 'say' command")
        else:
            import pyttsx3
            self.engine = pyttsx3.init()
            self.engine.setProperty("rate", 175)
            print("  TTS: Using system TTS (pyttsx3)")
generate method · python · L165-L169 (5 LOC)
agent/tts_engine.py
    async def generate(self, text: str) -> bytes:
        if self.use_say:
            return await self._generate_say(text)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._generate_pyttsx3, text)
_generate_say method · python · L171-L209 (39 LOC)
agent/tts_engine.py
    async def _generate_say(self, text: str) -> bytes:
        """Use macOS 'say' command — runs as async subprocess, doesn't block."""
        import tempfile
        import soundfile as sf

        tmp_path = tempfile.mktemp(suffix=".aiff")
        try:
            proc = await asyncio.create_subprocess_exec(
                "say", "-o", tmp_path, text,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
            )
            await proc.wait()

            if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
                return b""

            samples, orig_rate = sf.read(tmp_path, dtype="float32")
            if len(samples) == 0:
                return b""

            if samples.ndim == 2:
                samples = samples.mean(axis=1)

            # Already 16kHz from --data-format, but resample if not
            if orig_rate != 16000:
                import scipy.signal
                samples = scipy.signa
_generate_pyttsx3 method · python · L211-L244 (34 LOC)
agent/tts_engine.py
    def _generate_pyttsx3(self, text: str) -> bytes:
        """Fallback for non-macOS. Runs in executor thread."""
        import tempfile
        import soundfile as sf

        tmp_path = tempfile.mktemp(suffix=".aiff")
        try:
            self.engine.save_to_file(text, tmp_path)
            self.engine.runAndWait()

            if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
                return b""

            samples, orig_rate = sf.read(tmp_path, dtype="float32")
            if len(samples) == 0:
                return b""

            if samples.ndim == 2:
                samples = samples.mean(axis=1)

            if orig_rate != 16000:
                import scipy.signal
                samples = scipy.signal.resample(
                    samples, int(len(samples) * 16000 / orig_rate)
                )

            samples = np.clip(samples, -1.0, 1.0)
            int16_data = (samples * 32767).astype(np.int16)
            return int16_data.tobytes(
create_tts_engine function · python · L263-L280 (18 LOC)
agent/tts_engine.py
def create_tts_engine(overrides=None) -> TTSEngine:
    # Temporarily set env vars from overrides, then restore after
    saved = {}
    if overrides:
        for key, env_var in OVERRIDE_ENV_MAP.items():
            if key in overrides and overrides[key]:
                saved[env_var] = os.environ.get(env_var)
                os.environ[env_var] = str(overrides[key])

    try:
        return _create_tts_engine()
    finally:
        # Restore original env vars
        for env_var, original in saved.items():
            if original is None:
                os.environ.pop(env_var, None)
            else:
                os.environ[env_var] = original
_create_tts_engine function · python · L283-L322 (40 LOC)
agent/tts_engine.py
def _create_tts_engine() -> TTSEngine:
    engine_name = os.environ.get("TTS_ENGINE", "edge").lower()

    # Deepgram cloud engine — no fallback chain (explicit cloud choice)
    if engine_name == "deepgram":
        from agent.tts_deepgram import DeepgramTTSEngine
        return DeepgramTTSEngine()

    # OpenAI cloud engine — no fallback chain (explicit cloud choice)
    if engine_name == "openai":
        from agent.tts_openai import OpenAITTSEngine
        return OpenAITTSEngine()

    # VoiceBox local engine — no fallback chain (explicit local choice)
    if engine_name == "voicebox":
        from agent.tts_voicebox import VoiceBoxTTSEngine
        return VoiceBoxTTSEngine()

    # Try requested engine first, then fallback chain
    engines_to_try = []
    if engine_name == "chatterbox":
        engines_to_try = [ChatterboxTTSEngine, KokoroTTSEngine, EdgeTTSEngine, SystemTTSEngine]
    elif engine_name == "kokoro":
        engines_to_try = [KokoroTTSEngine, EdgeTTSEngine, SystemTT
OpenAITTSEngine class · python · L8-L51 (44 LOC)
agent/tts_openai.py
class OpenAITTSEngine(TTSEngine):
    """TTS engine using OpenAI's text-to-speech API."""

    def __init__(self):
        from openai import AsyncOpenAI

        self.client = AsyncOpenAI()
        self.tts_model = os.environ.get("OPENAI_TTS_MODEL", "tts-1")
        self.voice = os.environ.get("OPENAI_TTS_VOICE", "nova")
        self.speed = float(os.environ.get("OPENAI_TTS_SPEED", "1.0"))
        print(f"  TTS: OpenAI ({self.tts_model}, voice: {self.voice}, speed: {self.speed})")

    @property
    def name(self):
        return "openai"

    async def generate(self, text: str) -> bytes:
        import scipy.signal

        response = await self.client.audio.speech.create(
            model=self.tts_model,
            voice=self.voice,
            input=text,
            response_format="pcm",
            speed=self.speed,
        )

        # OpenAI PCM is 24kHz int16 mono
        pcm_24k = np.frombuffer(response.content, dtype=np.int16)
        if len(pcm_24k) == 0:
            ret
__init__ method · python · L11-L18 (8 LOC)
agent/tts_openai.py
    def __init__(self):
        from openai import AsyncOpenAI

        self.client = AsyncOpenAI()
        self.tts_model = os.environ.get("OPENAI_TTS_MODEL", "tts-1")
        self.voice = os.environ.get("OPENAI_TTS_VOICE", "nova")
        self.speed = float(os.environ.get("OPENAI_TTS_SPEED", "1.0"))
        print(f"  TTS: OpenAI ({self.tts_model}, voice: {self.voice}, speed: {self.speed})")
Powered by Repobility — scan your code at https://repobility.com
generate method · python · L24-L51 (28 LOC)
agent/tts_openai.py
    async def generate(self, text: str) -> bytes:
        import scipy.signal

        response = await self.client.audio.speech.create(
            model=self.tts_model,
            voice=self.voice,
            input=text,
            response_format="pcm",
            speed=self.speed,
        )

        # OpenAI PCM is 24kHz int16 mono
        pcm_24k = np.frombuffer(response.content, dtype=np.int16)
        if len(pcm_24k) == 0:
            return b""

        # Convert to float for resampling
        float_24k = pcm_24k.astype(np.float32) / 32768.0

        # Resample 24kHz -> 16kHz
        samples_16k = scipy.signal.resample(
            float_24k, int(len(float_24k) * 16000 / 24000)
        )

        # Convert back to int16
        samples_16k = np.clip(samples_16k, -1.0, 1.0)
        int16_data = (samples_16k * 32767).astype(np.int16)
        return int16_data.tobytes()
VoiceBoxTTSEngine class · python · L9-L75 (67 LOC)
agent/tts_voicebox.py
class VoiceBoxTTSEngine(TTSEngine):
    """TTS engine using VoiceBox (Qwen3-TTS) local REST API."""

    def __init__(self):
        import httpx

        self.base_url = os.environ.get("VOICEBOX_URL", "http://localhost:8000").rstrip("/")
        self.profile_id = os.environ.get("VOICEBOX_PROFILE_ID")
        if not self.profile_id:
            raise ValueError("VOICEBOX_PROFILE_ID env var is required")
        self.model_size = os.environ.get("VOICEBOX_MODEL_SIZE", "0.6B")
        self.language = os.environ.get("VOICEBOX_LANGUAGE", "en")

        # Verify VoiceBox is reachable
        print(f"  TTS: Connecting to VoiceBox at {self.base_url}...")
        resp = httpx.get(f"{self.base_url}/health", timeout=5)
        resp.raise_for_status()
        print(f"  TTS: VoiceBox ready (profile: {self.profile_id}, model: {self.model_size})")

    @property
    def name(self):
        return "voicebox"

    async def generate(self, text: str) -> bytes:
        import httpx
        import scipy.s
__init__ method · python · L12-L26 (15 LOC)
agent/tts_voicebox.py
    def __init__(self):
        import httpx

        self.base_url = os.environ.get("VOICEBOX_URL", "http://localhost:8000").rstrip("/")
        self.profile_id = os.environ.get("VOICEBOX_PROFILE_ID")
        if not self.profile_id:
            raise ValueError("VOICEBOX_PROFILE_ID env var is required")
        self.model_size = os.environ.get("VOICEBOX_MODEL_SIZE", "0.6B")
        self.language = os.environ.get("VOICEBOX_LANGUAGE", "en")

        # Verify VoiceBox is reachable
        print(f"  TTS: Connecting to VoiceBox at {self.base_url}...")
        resp = httpx.get(f"{self.base_url}/health", timeout=5)
        resp.raise_for_status()
        print(f"  TTS: VoiceBox ready (profile: {self.profile_id}, model: {self.model_size})")
‹ prevpage 2 / 5next ›