Function bodies 220 total
advance method · python · L251-L286 (36 LOC)agent/flow_tracker.py
def advance(self, tool_name=None, tool_result=None, assistant_text=None):
"""Advance the flow state based on what just happened."""
if not self.active_flow or not self.current_step:
return
flow = next((f for f in self.flows if f["name"] == self.active_flow), None)
if not flow:
return
nodes = flow["flow_json"].get("nodes", {})
current = nodes.get(self.current_step)
if not current:
return
ntype = current.get("type", "")
# For tool_call nodes, advance on tool completion
if ntype == "tool_call" and tool_name:
next_nodes = self._get_next_nodes(nodes, self.current_step)
if next_nodes:
# First output = success, second = failure
is_success = not (tool_result and isinstance(tool_result, dict) and tool_result.get("error"))
idx = 0 if is_success else min(1, len(next_nodes) - 1)
self.curreget_context_hint method · python · L288-L312 (25 LOC)agent/flow_tracker.py
def get_context_hint(self):
"""Return a context hint string for the current flow position."""
if not self.active_flow:
return ""
flow = next((f for f in self.flows if f["name"] == self.active_flow), None)
if not flow:
return ""
nodes = flow["flow_json"].get("nodes", {})
current = nodes.get(self.current_step)
if not current:
return f'\n[Flow Context: Matched "{self.active_flow}". Flow complete or position unknown.]'
label = current.get("label", current.get("type", ""))
step_desc = self._describe_step(current) or label
# Find what comes next
next_nodes = self._get_next_nodes(nodes, self.current_step)
next_desc = ""
if next_nodes and next_nodes[0] in nodes:
next_label = nodes[next_nodes[0]].get("label", "")
next_desc = f" Next: {next_label}." if next_label else ""
return f'\n[Flow Context: Caller matched "{selfmain function · python · L15-L93 (79 LOC)agent/main.py
async def main():
load_dotenv()
print("\n Voice Agent Starting...\n")
# Database
await init_db()
print(" Database initialized")
await seed_db()
# Config
config = AgentConfig()
print(f" Business: {config.get('business_name')}")
# STT engine (optional)
stt_engine = None
try:
from agent.stt_engine import create_stt_engine
stt_engine = create_stt_engine()
except Exception as e:
print(f" STT: Not available ({e})")
# TTS engine (optional, depends on TTS packages being installed)
tts_engine = None
try:
from agent.tts_engine import create_tts_engine
# Build overrides from saved config (so UI preferences persist across restarts)
tts_overrides = {}
tts_fields = [
"tts_engine", "edge_voice", "kokoro_voice", "kokoro_speed",
"openai_voice", "openai_speed", "openai_model",
"voicebox_url", "voicebox_profile_id", "voicebox_model_size", "vDeepgramSTTEngine class · python · L4-L44 (41 LOC)agent/stt_deepgram.py
class DeepgramSTTEngine:
"""STT engine using Deepgram's pre-recorded (batch) API."""
def __init__(self):
from deepgram import AsyncDeepgramClient
api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not api_key:
raise ValueError("DEEPGRAM_API_KEY environment variable is required")
self.client = AsyncDeepgramClient(api_key=api_key)
self.model = os.environ.get("DEEPGRAM_MODEL", "nova-3")
self.language = os.environ.get("DEEPGRAM_LANGUAGE", "en")
self.available = True
print(f" STT: Deepgram ({self.model}, {self.language})")
async def transcribe(self, audio_bytes: bytes) -> str:
if not self.available:
return ""
response = await self.client.listen.v1.media.transcribe_file(
request=audio_bytes,
model=self.model,
language=self.language,
encoding="linear16",
request_options={
"additional_query_paramet__init__ method · python · L7-L18 (12 LOC)agent/stt_deepgram.py
def __init__(self):
from deepgram import AsyncDeepgramClient
api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not api_key:
raise ValueError("DEEPGRAM_API_KEY environment variable is required")
self.client = AsyncDeepgramClient(api_key=api_key)
self.model = os.environ.get("DEEPGRAM_MODEL", "nova-3")
self.language = os.environ.get("DEEPGRAM_LANGUAGE", "en")
self.available = True
print(f" STT: Deepgram ({self.model}, {self.language})")transcribe method · python · L20-L44 (25 LOC)agent/stt_deepgram.py
async def transcribe(self, audio_bytes: bytes) -> str:
if not self.available:
return ""
response = await self.client.listen.v1.media.transcribe_file(
request=audio_bytes,
model=self.model,
language=self.language,
encoding="linear16",
request_options={
"additional_query_parameters": {
"sample_rate": 16000,
"channels": 1,
},
},
)
# Extract transcript from response
try:
transcript = (
response.results.channels[0].alternatives[0].transcript
)
return transcript.strip()
except (AttributeError, IndexError):
return ""create_stt_engine function · python · L6-L15 (10 LOC)agent/stt_engine.py
def create_stt_engine():
"""Factory that returns the appropriate STT engine based on STT_ENGINE env var."""
engine_name = os.environ.get("STT_ENGINE", "whisper").lower()
if engine_name == "deepgram":
from agent.stt_deepgram import DeepgramSTTEngine
return DeepgramSTTEngine()
# Default: local Whisper
return STTEngine()If a scraper extracted this row, it came from Repobility (https://repobility.com)
STTEngine class · python · L18-L67 (50 LOC)agent/stt_engine.py
class STTEngine:
def __init__(self):
model_size = os.environ.get("WHISPER_MODEL", "base")
language = os.environ.get("WHISPER_LANGUAGE", "en")
self.language = language if language != "auto" else None
try:
from faster_whisper import WhisperModel
# Auto-detect device
device = "cpu"
compute_type = "int8"
try:
import torch
if torch.cuda.is_available():
device = "cuda"
compute_type = "float16"
except ImportError:
pass
print(f" STT: Loading Whisper {model_size} on {device}...")
self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
self.available = True
print(f" STT: Whisper {model_size} loaded")
except ImportError:
print(" STT: faster-whisper not installed, voice input disabled")
self.model __init__ method · python · L19-L44 (26 LOC)agent/stt_engine.py
def __init__(self):
model_size = os.environ.get("WHISPER_MODEL", "base")
language = os.environ.get("WHISPER_LANGUAGE", "en")
self.language = language if language != "auto" else None
try:
from faster_whisper import WhisperModel
# Auto-detect device
device = "cpu"
compute_type = "int8"
try:
import torch
if torch.cuda.is_available():
device = "cuda"
compute_type = "float16"
except ImportError:
pass
print(f" STT: Loading Whisper {model_size} on {device}...")
self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
self.available = True
print(f" STT: Whisper {model_size} loaded")
except ImportError:
print(" STT: faster-whisper not installed, voice input disabled")
self.model = None
transcribe method · python · L46-L57 (12 LOC)agent/stt_engine.py
async def transcribe(self, audio_bytes: bytes) -> str:
if not self.available:
return ""
# Convert PCM int16 bytes to float32 numpy
audio_int16 = np.frombuffer(audio_bytes, dtype=np.int16)
audio_float32 = audio_int16.astype(np.float32) / 32768.0
# Run transcription in executor to not block event loop
loop = asyncio.get_event_loop()
text = await loop.run_in_executor(None, self._transcribe_sync, audio_float32)
return text_transcribe_sync method · python · L59-L67 (9 LOC)agent/stt_engine.py
def _transcribe_sync(self, audio: np.ndarray) -> str:
segments, _ = self.model.transcribe(
audio,
language=self.language,
beam_size=5,
vad_filter=True,
)
text = " ".join(seg.text for seg in segments).strip()
return textSentenceChunker class · python · L11-L54 (44 LOC)agent/text_chunker.py
class SentenceChunker:
"""Buffers streaming text and yields complete sentences for TTS.
Splits on sentence-ending punctuation (.!?) followed by whitespace.
Very long text without punctuation is force-split at a word boundary.
"""
def __init__(self):
self.buffer = ""
def add(self, text_delta: str) -> list[str]:
"""Add text delta, return list of complete sentences (may be empty)."""
self.buffer += text_delta
chunks = []
while True:
match = _SENTENCE_END.search(self.buffer)
if match:
end = match.end()
chunk = self.buffer[:end].strip()
if chunk:
chunks.append(chunk)
self.buffer = self.buffer[end:]
continue
# Force-split very long buffers at last word boundary
if len(self.buffer) >= MAX_CHUNK_LEN:
last_space = self.buffer.rfind(' ', 40)
if last_sadd method · python · L21-L48 (28 LOC)agent/text_chunker.py
def add(self, text_delta: str) -> list[str]:
"""Add text delta, return list of complete sentences (may be empty)."""
self.buffer += text_delta
chunks = []
while True:
match = _SENTENCE_END.search(self.buffer)
if match:
end = match.end()
chunk = self.buffer[:end].strip()
if chunk:
chunks.append(chunk)
self.buffer = self.buffer[end:]
continue
# Force-split very long buffers at last word boundary
if len(self.buffer) >= MAX_CHUNK_LEN:
last_space = self.buffer.rfind(' ', 40)
if last_space > 0:
chunk = self.buffer[:last_space].strip()
if chunk:
chunks.append(chunk)
self.buffer = self.buffer[last_space:].lstrip()
continue
break
return chunksflush method · python · L50-L54 (5 LOC)agent/text_chunker.py
def flush(self) -> str | None:
"""Flush remaining buffer as a final chunk."""
remaining = self.buffer.strip()
self.buffer = ""
return remaining if remaining else Nonecheck_availability function · python · L5-L31 (27 LOC)agent/tools.py
async def check_availability(db, date, service_type=None):
if service_type:
rows = await db.fetch_all(
"""SELECT a.date, a.time, s.name as service_name
FROM availability a
LEFT JOIN services s ON a.service_id = s.id
WHERE a.date = ? AND a.is_booked = FALSE
AND (s.name LIKE ? OR a.service_id IS NULL)
ORDER BY a.time""",
(date, f"%{service_type}%"),
)
else:
rows = await db.fetch_all(
"""SELECT a.date, a.time, s.name as service_name
FROM availability a
LEFT JOIN services s ON a.service_id = s.id
WHERE a.date = ? AND a.is_booked = FALSE
ORDER BY a.time""",
(date,),
)
slots = [{"time": r["time"], "service": r["service_name"] or "Any"} for r in rows]
if not slots:
return {"available_slots": [], "message": f"No available slots on {date}."}
return {
Powered by Repobility — scan your code at https://repobility.com
create_booking function · python · L34-L70 (37 LOC)agent/tools.py
async def create_booking(db, customer_name, date, time, service_type, **kwargs):
# Find or create customer
customer_id = await _find_or_create_customer(
db, customer_name, kwargs.get("customer_phone"), kwargs.get("customer_email")
)
# Find service
svc_row = await db.fetch_one(
"SELECT id FROM services WHERE name LIKE ?", (f"%{service_type}%",)
)
service_id = svc_row["id"] if svc_row else None
# Check slot is available
slot = await db.fetch_one(
"SELECT id FROM availability WHERE date = ? AND time = ? AND is_booked = FALSE",
(date, time),
)
if not slot:
return {"status": "failed", "message": f"The {time} slot on {date} is no longer available."}
# Mark slot as booked
await db.execute("UPDATE availability SET is_booked = TRUE WHERE id = ?", (slot["id"],))
# Create booking
ref = await generate_booking_ref(db)
await db.execute(
"""INSERT INTO bookings (booking_ref, customer_idcreate_support_ticket function · python · L73-L90 (18 LOC)agent/tools.py
async def create_support_ticket(db, customer_name, issue_summary, priority, category, **kwargs):
customer_id = await _find_or_create_customer(
db, customer_name, kwargs.get("customer_phone"), kwargs.get("customer_email")
)
ref = await generate_ticket_ref(db)
await db.execute(
"""INSERT INTO tickets (ticket_ref, customer_id, issue_summary, issue_details, priority, category)
VALUES (?, ?, ?, ?, ?, ?)""",
(ref, customer_id, issue_summary, kwargs.get("issue_details"), priority, category),
)
await db.commit()
return {
"status": "created",
"ticket_ref": ref,
"summary": f"Support ticket {ref} created for {customer_name}: {issue_summary} (Priority: {priority}).",
}lookup_customer function · python · L93-L125 (33 LOC)agent/tools.py
async def lookup_customer(db, name=None, phone=None):
if name:
rows = await db.fetch_all(
"SELECT * FROM customers WHERE name LIKE ?", (f"%{name}%",)
)
elif phone:
rows = await db.fetch_all(
"SELECT * FROM customers WHERE phone LIKE ?", (f"%{phone}%",)
)
else:
return {"found": False, "message": "Please provide a name or phone number to search."}
if not rows:
return {"found": False, "message": "No customer found matching that search."}
customers = []
for cust in rows:
# Get recent bookings
cust["recent_bookings"] = await db.fetch_all(
"""SELECT b.booking_ref, b.date, b.time, s.name as service_name, b.status
FROM bookings b LEFT JOIN services s ON b.service_id = s.id
WHERE b.customer_id = ? ORDER BY b.created_at DESC LIMIT 5""",
(cust["id"],),
)
# Get recent tickets
cust["recent_tickets"] = await dbtransfer_to_human function · python · L128-L134 (7 LOC)agent/tools.py
async def transfer_to_human(db, reason, department, context_summary):
return {
"status": "transferring",
"department": department,
"message": f"Transferring to {department} department. Reason: {reason}",
"context_passed": True,
}_find_or_create_customer function · python · L137-L161 (25 LOC)agent/tools.py
async def _find_or_create_customer(db, name, phone=None, email=None):
row = await db.fetch_one("SELECT id FROM customers WHERE name LIKE ?", (f"%{name}%",))
if row:
# Update contact info if provided
if phone or email:
updates = []
params = []
if phone:
updates.append("phone = ?")
params.append(phone)
if email:
updates.append("email = ?")
params.append(email)
params.append(row["id"])
await db.execute(
f"UPDATE customers SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
params,
)
return row["id"]
else:
new_id = await db.insert_returning_id(
"INSERT INTO customers (name, phone, email) VALUES (?, ?, ?)",
(name, phone, email),
)
return new_idexecute_tool function · python · L173-L180 (8 LOC)agent/tools.py
async def execute_tool(db, tool_name, tool_input):
func = TOOL_MAP.get(tool_name)
if not func:
return {"error": f"Unknown tool: {tool_name}"}
try:
return await func(db, **tool_input)
except Exception as e:
return {"error": str(e)}DeepgramTTSEngine class · python · L9-L75 (67 LOC)agent/tts_deepgram.py
class DeepgramTTSEngine(TTSEngine):
"""TTS engine using Deepgram's Aura text-to-speech API.
Keeps a persistent HTTP session to avoid connection overhead per sentence.
Returns PCM 16-bit 16kHz mono directly — no resampling needed.
"""
API_URL = "https://api.deepgram.com/v1/speak"
def __init__(self):
self.api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not self.api_key:
raise ValueError("DEEPGRAM_API_KEY is required for Deepgram TTS")
self.model = os.environ.get("DEEPGRAM_TTS_MODEL", "aura-2-thalia-en")
self._session = None
self._headers = {
"Authorization": f"Token {self.api_key}",
"Content-Type": "application/json",
}
self._params = {
"model": self.model,
"encoding": "linear16",
"sample_rate": "16000",
"container": "none",
}
print(f" TTS: Using Deepgram Aura (model: {self.model})")
@property
__init__ method · python · L18-L35 (18 LOC)agent/tts_deepgram.py
def __init__(self):
self.api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not self.api_key:
raise ValueError("DEEPGRAM_API_KEY is required for Deepgram TTS")
self.model = os.environ.get("DEEPGRAM_TTS_MODEL", "aura-2-thalia-en")
self._session = None
self._headers = {
"Authorization": f"Token {self.api_key}",
"Content-Type": "application/json",
}
self._params = {
"model": self.model,
"encoding": "linear16",
"sample_rate": "16000",
"container": "none",
}
print(f" TTS: Using Deepgram Aura (model: {self.model})")Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
_get_session method · python · L41-L44 (4 LOC)agent/tts_deepgram.py
async def _get_session(self):
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(headers=self._headers)
return self._sessiongenerate method · python · L46-L75 (30 LOC)agent/tts_deepgram.py
async def generate(self, text: str) -> bytes:
session = await self._get_session()
try:
async with session.post(
self.API_URL, params=self._params, json={"text": text}
) as resp:
if resp.status != 200:
body = await resp.text()
print(f" TTS: Deepgram error {resp.status}: {body}")
return b""
pcm_data = await resp.read()
except aiohttp.ClientError as e:
print(f" TTS: Deepgram connection error: {e}")
# Reset session on error so next call creates a fresh one
self._session = None
return b""
if not pcm_data:
return b""
# Short fade-out to prevent clicks between sentence chunks
samples = np.frombuffer(pcm_data, dtype=np.int16).copy()
fade_len = min(80, len(samples)) # ~5ms at 16kHz
if fade_len > 0:
fade = np.linspace(1.0TTSEngine class · python · L10-L19 (10 LOC)agent/tts_engine.py
class TTSEngine(ABC):
@abstractmethod
async def generate(self, text: str) -> bytes:
"""Generate PCM 16-bit 16kHz mono audio bytes from text."""
...
@property
@abstractmethod
def name(self) -> str:
...generate method · python · L12-L14 (3 LOC)agent/tts_engine.py
async def generate(self, text: str) -> bytes:
"""Generate PCM 16-bit 16kHz mono audio bytes from text."""
...ChatterboxTTSEngine class · python · L22-L61 (40 LOC)agent/tts_engine.py
class ChatterboxTTSEngine(TTSEngine):
def __init__(self):
from chatterbox.tts import ChatterboxTTS
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f" TTS: Loading Chatterbox on {self.device}...")
self.model = ChatterboxTTS.from_pretrained(device=self.device)
# Load voice reference if available
self.audio_prompt = None
voice_file = os.environ.get("TTS_VOICE_FILE", "")
if voice_file and os.path.exists(voice_file):
print(f" TTS: Using voice reference: {voice_file}")
self.audio_prompt = voice_file
print(" TTS: Chatterbox loaded")
@property
def name(self):
return "chatterbox"
async def generate(self, text: str) -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_sync, text)
def _generate_sync(self, text: str) -> bytes:
import torch
import scipy.s__init__ method · python · L23-L37 (15 LOC)agent/tts_engine.py
def __init__(self):
from chatterbox.tts import ChatterboxTTS
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f" TTS: Loading Chatterbox on {self.device}...")
self.model = ChatterboxTTS.from_pretrained(device=self.device)
# Load voice reference if available
self.audio_prompt = None
voice_file = os.environ.get("TTS_VOICE_FILE", "")
if voice_file and os.path.exists(voice_file):
print(f" TTS: Using voice reference: {voice_file}")
self.audio_prompt = voice_file
print(" TTS: Chatterbox loaded")generate method · python · L43-L45 (3 LOC)agent/tts_engine.py
async def generate(self, text: str) -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_sync, text)_generate_sync method · python · L47-L61 (15 LOC)agent/tts_engine.py
def _generate_sync(self, text: str) -> bytes:
import torch
import scipy.signal
wav = self.model.generate(text, audio_prompt_path=self.audio_prompt)
# Chatterbox outputs at 24kHz, resample to 16kHz
if isinstance(wav, torch.Tensor):
wav = wav.cpu().numpy().squeeze()
samples_16k = scipy.signal.resample(
wav, int(len(wav) * 16000 / 24000)
)
# Convert to int16 PCM
samples_16k = np.clip(samples_16k, -1.0, 1.0)
int16_data = (samples_16k * 32767).astype(np.int16)
return int16_data.tobytes()Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
KokoroTTSEngine class · python · L64-L101 (38 LOC)agent/tts_engine.py
class KokoroTTSEngine(TTSEngine):
def __init__(self):
from kokoro import KPipeline
self.voice = os.environ.get("KOKORO_VOICE", "af_heart")
self.speed = float(os.environ.get("KOKORO_SPEED", "1.1"))
lang = "b" if self.voice.startswith("b") else "a"
print(f" TTS: Loading Kokoro (voice: {self.voice}, speed: {self.speed})...")
self.pipeline = KPipeline(lang_code=lang)
print(" TTS: Kokoro loaded")
@property
def name(self):
return "kokoro"
async def generate(self, text: str) -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_sync, text)
def _generate_sync(self, text: str) -> bytes:
import scipy.signal
chunks = []
for _, _, audio in self.pipeline(text, voice=self.voice, speed=self.speed):
if audio is not None:
chunks.append(audio)
if not chunks:
return b""
wav = __init__ method · python · L65-L73 (9 LOC)agent/tts_engine.py
def __init__(self):
from kokoro import KPipeline
self.voice = os.environ.get("KOKORO_VOICE", "af_heart")
self.speed = float(os.environ.get("KOKORO_SPEED", "1.1"))
lang = "b" if self.voice.startswith("b") else "a"
print(f" TTS: Loading Kokoro (voice: {self.voice}, speed: {self.speed})...")
self.pipeline = KPipeline(lang_code=lang)
print(" TTS: Kokoro loaded")generate method · python · L79-L81 (3 LOC)agent/tts_engine.py
async def generate(self, text: str) -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_sync, text)_generate_sync method · python · L83-L101 (19 LOC)agent/tts_engine.py
def _generate_sync(self, text: str) -> bytes:
import scipy.signal
chunks = []
for _, _, audio in self.pipeline(text, voice=self.voice, speed=self.speed):
if audio is not None:
chunks.append(audio)
if not chunks:
return b""
wav = np.concatenate(chunks)
# Kokoro outputs at 24kHz, resample to 16kHz
samples_16k = scipy.signal.resample(
wav, int(len(wav) * 16000 / 24000)
)
samples_16k = np.clip(samples_16k, -1.0, 1.0)
int16_data = (samples_16k * 32767).astype(np.int16)
return int16_data.tobytes()EdgeTTSEngine class · python · L104-L144 (41 LOC)agent/tts_engine.py
class EdgeTTSEngine(TTSEngine):
def __init__(self):
import edge_tts # noqa: F401 — verify import
self.voice = os.environ.get("EDGE_TTS_VOICE", "en-US-AndrewMultilingualNeural")
print(f" TTS: Using Edge TTS (voice: {self.voice})")
@property
def name(self):
return "edge"
async def generate(self, text: str) -> bytes:
import edge_tts
import soundfile as sf
communicate = edge_tts.Communicate(text, self.voice)
mp3_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
mp3_data += chunk["data"]
if not mp3_data:
return b""
# Decode MP3 to float32
samples, orig_rate = sf.read(io.BytesIO(mp3_data), dtype="float32")
if len(samples) == 0:
return b""
if samples.ndim == 2:
samples = samples.mean(axis=1)
# Resample to 16kHz
if orig_rate != 16000:
imp__init__ method · python · L105-L108 (4 LOC)agent/tts_engine.py
def __init__(self):
import edge_tts # noqa: F401 — verify import
self.voice = os.environ.get("EDGE_TTS_VOICE", "en-US-AndrewMultilingualNeural")
print(f" TTS: Using Edge TTS (voice: {self.voice})")generate method · python · L114-L144 (31 LOC)agent/tts_engine.py
async def generate(self, text: str) -> bytes:
import edge_tts
import soundfile as sf
communicate = edge_tts.Communicate(text, self.voice)
mp3_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
mp3_data += chunk["data"]
if not mp3_data:
return b""
# Decode MP3 to float32
samples, orig_rate = sf.read(io.BytesIO(mp3_data), dtype="float32")
if len(samples) == 0:
return b""
if samples.ndim == 2:
samples = samples.mean(axis=1)
# Resample to 16kHz
if orig_rate != 16000:
import scipy.signal
samples = scipy.signal.resample(
samples, int(len(samples) * 16000 / orig_rate)
)
samples = np.clip(samples, -1.0, 1.0)
int16_data = (samples * 32767).astype(np.int16)
return int16_data.tobytes()SystemTTSEngine class · python · L147-L244 (98 LOC)agent/tts_engine.py
class SystemTTSEngine(TTSEngine):
def __init__(self):
import platform
import shutil
self.use_say = platform.system() == "Darwin" and shutil.which("say")
if self.use_say:
print(" TTS: Using macOS 'say' command")
else:
import pyttsx3
self.engine = pyttsx3.init()
self.engine.setProperty("rate", 175)
print(" TTS: Using system TTS (pyttsx3)")
@property
def name(self):
return "system"
async def generate(self, text: str) -> bytes:
if self.use_say:
return await self._generate_say(text)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_pyttsx3, text)
async def _generate_say(self, text: str) -> bytes:
"""Use macOS 'say' command — runs as async subprocess, doesn't block."""
import tempfile
import soundfile as sf
tmp_path = tempfile.mktemp(suffix=".aiff")
If a scraper extracted this row, it came from Repobility (https://repobility.com)
__init__ method · python · L148-L159 (12 LOC)agent/tts_engine.py
def __init__(self):
import platform
import shutil
self.use_say = platform.system() == "Darwin" and shutil.which("say")
if self.use_say:
print(" TTS: Using macOS 'say' command")
else:
import pyttsx3
self.engine = pyttsx3.init()
self.engine.setProperty("rate", 175)
print(" TTS: Using system TTS (pyttsx3)")generate method · python · L165-L169 (5 LOC)agent/tts_engine.py
async def generate(self, text: str) -> bytes:
if self.use_say:
return await self._generate_say(text)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._generate_pyttsx3, text)_generate_say method · python · L171-L209 (39 LOC)agent/tts_engine.py
async def _generate_say(self, text: str) -> bytes:
"""Use macOS 'say' command — runs as async subprocess, doesn't block."""
import tempfile
import soundfile as sf
tmp_path = tempfile.mktemp(suffix=".aiff")
try:
proc = await asyncio.create_subprocess_exec(
"say", "-o", tmp_path, text,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.wait()
if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
return b""
samples, orig_rate = sf.read(tmp_path, dtype="float32")
if len(samples) == 0:
return b""
if samples.ndim == 2:
samples = samples.mean(axis=1)
# Already 16kHz from --data-format, but resample if not
if orig_rate != 16000:
import scipy.signal
samples = scipy.signa_generate_pyttsx3 method · python · L211-L244 (34 LOC)agent/tts_engine.py
def _generate_pyttsx3(self, text: str) -> bytes:
"""Fallback for non-macOS. Runs in executor thread."""
import tempfile
import soundfile as sf
tmp_path = tempfile.mktemp(suffix=".aiff")
try:
self.engine.save_to_file(text, tmp_path)
self.engine.runAndWait()
if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
return b""
samples, orig_rate = sf.read(tmp_path, dtype="float32")
if len(samples) == 0:
return b""
if samples.ndim == 2:
samples = samples.mean(axis=1)
if orig_rate != 16000:
import scipy.signal
samples = scipy.signal.resample(
samples, int(len(samples) * 16000 / orig_rate)
)
samples = np.clip(samples, -1.0, 1.0)
int16_data = (samples * 32767).astype(np.int16)
return int16_data.tobytes(create_tts_engine function · python · L263-L280 (18 LOC)agent/tts_engine.py
def create_tts_engine(overrides=None) -> TTSEngine:
# Temporarily set env vars from overrides, then restore after
saved = {}
if overrides:
for key, env_var in OVERRIDE_ENV_MAP.items():
if key in overrides and overrides[key]:
saved[env_var] = os.environ.get(env_var)
os.environ[env_var] = str(overrides[key])
try:
return _create_tts_engine()
finally:
# Restore original env vars
for env_var, original in saved.items():
if original is None:
os.environ.pop(env_var, None)
else:
os.environ[env_var] = original_create_tts_engine function · python · L283-L322 (40 LOC)agent/tts_engine.py
def _create_tts_engine() -> TTSEngine:
engine_name = os.environ.get("TTS_ENGINE", "edge").lower()
# Deepgram cloud engine — no fallback chain (explicit cloud choice)
if engine_name == "deepgram":
from agent.tts_deepgram import DeepgramTTSEngine
return DeepgramTTSEngine()
# OpenAI cloud engine — no fallback chain (explicit cloud choice)
if engine_name == "openai":
from agent.tts_openai import OpenAITTSEngine
return OpenAITTSEngine()
# VoiceBox local engine — no fallback chain (explicit local choice)
if engine_name == "voicebox":
from agent.tts_voicebox import VoiceBoxTTSEngine
return VoiceBoxTTSEngine()
# Try requested engine first, then fallback chain
engines_to_try = []
if engine_name == "chatterbox":
engines_to_try = [ChatterboxTTSEngine, KokoroTTSEngine, EdgeTTSEngine, SystemTTSEngine]
elif engine_name == "kokoro":
engines_to_try = [KokoroTTSEngine, EdgeTTSEngine, SystemTTOpenAITTSEngine class · python · L8-L51 (44 LOC)agent/tts_openai.py
class OpenAITTSEngine(TTSEngine):
"""TTS engine using OpenAI's text-to-speech API."""
def __init__(self):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
self.tts_model = os.environ.get("OPENAI_TTS_MODEL", "tts-1")
self.voice = os.environ.get("OPENAI_TTS_VOICE", "nova")
self.speed = float(os.environ.get("OPENAI_TTS_SPEED", "1.0"))
print(f" TTS: OpenAI ({self.tts_model}, voice: {self.voice}, speed: {self.speed})")
@property
def name(self):
return "openai"
async def generate(self, text: str) -> bytes:
import scipy.signal
response = await self.client.audio.speech.create(
model=self.tts_model,
voice=self.voice,
input=text,
response_format="pcm",
speed=self.speed,
)
# OpenAI PCM is 24kHz int16 mono
pcm_24k = np.frombuffer(response.content, dtype=np.int16)
if len(pcm_24k) == 0:
ret__init__ method · python · L11-L18 (8 LOC)agent/tts_openai.py
def __init__(self):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
self.tts_model = os.environ.get("OPENAI_TTS_MODEL", "tts-1")
self.voice = os.environ.get("OPENAI_TTS_VOICE", "nova")
self.speed = float(os.environ.get("OPENAI_TTS_SPEED", "1.0"))
print(f" TTS: OpenAI ({self.tts_model}, voice: {self.voice}, speed: {self.speed})")Powered by Repobility — scan your code at https://repobility.com
generate method · python · L24-L51 (28 LOC)agent/tts_openai.py
async def generate(self, text: str) -> bytes:
import scipy.signal
response = await self.client.audio.speech.create(
model=self.tts_model,
voice=self.voice,
input=text,
response_format="pcm",
speed=self.speed,
)
# OpenAI PCM is 24kHz int16 mono
pcm_24k = np.frombuffer(response.content, dtype=np.int16)
if len(pcm_24k) == 0:
return b""
# Convert to float for resampling
float_24k = pcm_24k.astype(np.float32) / 32768.0
# Resample 24kHz -> 16kHz
samples_16k = scipy.signal.resample(
float_24k, int(len(float_24k) * 16000 / 24000)
)
# Convert back to int16
samples_16k = np.clip(samples_16k, -1.0, 1.0)
int16_data = (samples_16k * 32767).astype(np.int16)
return int16_data.tobytes()VoiceBoxTTSEngine class · python · L9-L75 (67 LOC)agent/tts_voicebox.py
class VoiceBoxTTSEngine(TTSEngine):
"""TTS engine using VoiceBox (Qwen3-TTS) local REST API."""
def __init__(self):
import httpx
self.base_url = os.environ.get("VOICEBOX_URL", "http://localhost:8000").rstrip("/")
self.profile_id = os.environ.get("VOICEBOX_PROFILE_ID")
if not self.profile_id:
raise ValueError("VOICEBOX_PROFILE_ID env var is required")
self.model_size = os.environ.get("VOICEBOX_MODEL_SIZE", "0.6B")
self.language = os.environ.get("VOICEBOX_LANGUAGE", "en")
# Verify VoiceBox is reachable
print(f" TTS: Connecting to VoiceBox at {self.base_url}...")
resp = httpx.get(f"{self.base_url}/health", timeout=5)
resp.raise_for_status()
print(f" TTS: VoiceBox ready (profile: {self.profile_id}, model: {self.model_size})")
@property
def name(self):
return "voicebox"
async def generate(self, text: str) -> bytes:
import httpx
import scipy.s__init__ method · python · L12-L26 (15 LOC)agent/tts_voicebox.py
def __init__(self):
import httpx
self.base_url = os.environ.get("VOICEBOX_URL", "http://localhost:8000").rstrip("/")
self.profile_id = os.environ.get("VOICEBOX_PROFILE_ID")
if not self.profile_id:
raise ValueError("VOICEBOX_PROFILE_ID env var is required")
self.model_size = os.environ.get("VOICEBOX_MODEL_SIZE", "0.6B")
self.language = os.environ.get("VOICEBOX_LANGUAGE", "en")
# Verify VoiceBox is reachable
print(f" TTS: Connecting to VoiceBox at {self.base_url}...")
resp = httpx.get(f"{self.base_url}/health", timeout=5)
resp.raise_for_status()
print(f" TTS: VoiceBox ready (profile: {self.profile_id}, model: {self.model_size})")