Function bodies 220 total
create_admin_app function · python · L9-L53 (45 LOC)admin/api.py
def create_admin_app(config, ws_handler=None):
app = web.Application()
app["agent_config"] = config
# Accept either a WebSocketHandler object or a raw coroutine
if ws_handler is not None:
from agent.websocket_handler import WebSocketHandler
if isinstance(ws_handler, WebSocketHandler):
app["ws_handler_obj"] = ws_handler
app.router.add_get("/ws", ws_handler.handle_connection)
else:
app.router.add_get("/ws", ws_handler)
app.router.add_get("/api/tts", handle_get_tts)
app.router.add_post("/api/tts", handle_update_tts)
app.router.add_post("/api/tts/preview", handle_tts_preview)
app.router.add_get("/api/stats", handle_stats)
app.router.add_get("/api/calls", handle_calls)
app.router.add_get("/api/calls/{call_id}", handle_call_detail)
app.router.add_get("/api/bookings", handle_bookings)
app.router.add_get("/api/tickets", handle_tickets)
app.router.add_get("/api/config", handle_get_handle_stats function · python · L56-L78 (23 LOC)admin/api.py
async def handle_stats(request):
db = await get_db()
try:
stats = {}
for table, key in [
("call_logs", "total_calls"),
("bookings", "total_bookings"),
("tickets", "total_tickets"),
("customers", "total_customers"),
]:
val = await db.fetch_val(f"SELECT COUNT(*) FROM {table}")
stats[key] = val
# Today's calls
today_expr = sql_today()
val = await db.fetch_val(
f"SELECT COUNT(*) FROM call_logs WHERE DATE(started_at) = {today_expr}"
)
stats["calls_today"] = val
return web.json_response(stats)
finally:
await db.close()handle_calls function · python · L81-L96 (16 LOC)admin/api.py
async def handle_calls(request):
db = await get_db()
try:
rows = await db.fetch_all(
"SELECT * FROM call_logs ORDER BY started_at DESC LIMIT 50"
)
calls = []
for call in rows:
if call.get("transcript"):
call["transcript"] = json.loads(call["transcript"])
if call.get("actions_taken"):
call["actions_taken"] = json.loads(call["actions_taken"])
calls.append(call)
return web.json_response(calls)
finally:
await db.close()handle_call_detail function · python · L99-L114 (16 LOC)admin/api.py
async def handle_call_detail(request):
call_id = request.match_info["call_id"]
db = await get_db()
try:
call = await db.fetch_one(
"SELECT * FROM call_logs WHERE call_id = ?", (call_id,)
)
if not call:
return web.json_response({"error": "Call not found"}, status=404)
if call.get("transcript"):
call["transcript"] = json.loads(call["transcript"])
if call.get("actions_taken"):
call["actions_taken"] = json.loads(call["actions_taken"])
return web.json_response(call)
finally:
await db.close()handle_bookings function · python · L117-L129 (13 LOC)admin/api.py
async def handle_bookings(request):
db = await get_db()
try:
rows = await db.fetch_all(
"""SELECT b.*, c.name as customer_name, s.name as service_name
FROM bookings b
LEFT JOIN customers c ON b.customer_id = c.id
LEFT JOIN services s ON b.service_id = s.id
ORDER BY b.created_at DESC"""
)
return web.json_response(rows)
finally:
await db.close()handle_tickets function · python · L132-L143 (12 LOC)admin/api.py
async def handle_tickets(request):
db = await get_db()
try:
rows = await db.fetch_all(
"""SELECT t.*, c.name as customer_name
FROM tickets t
LEFT JOIN customers c ON t.customer_id = c.id
ORDER BY t.created_at DESC"""
)
return web.json_response(rows)
finally:
await db.close()handle_get_config function · python · L146-L148 (3 LOC)admin/api.py
async def handle_get_config(request):
config = request.app["agent_config"]
return web.json_response(config.to_dict())Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
handle_update_config function · python · L151-L158 (8 LOC)admin/api.py
async def handle_update_config(request):
config = request.app["agent_config"]
try:
data = await request.json()
config.update(data)
return web.json_response({"status": "updated", "config": config.to_dict()})
except Exception as e:
return web.json_response({"error": str(e)}, status=400)handle_get_tts function · python · L166-L193 (28 LOC)admin/api.py
async def handle_get_tts(request):
config = request.app["agent_config"]
ws_handler = request.app.get("ws_handler_obj")
# Current active engine name
active = None
if ws_handler and ws_handler.tts_engine:
active = ws_handler.tts_engine.name
elif config.get("tts_engine"):
active = config.get("tts_engine")
return web.json_response({
"engine": active,
"available": AVAILABLE_ENGINES,
"settings": {
"edge_voice": config.get("edge_voice", ""),
"kokoro_voice": config.get("kokoro_voice", ""),
"kokoro_speed": config.get("kokoro_speed", ""),
"openai_voice": config.get("openai_voice", ""),
"openai_speed": config.get("openai_speed", ""),
"openai_model": config.get("openai_model", ""),
"voicebox_url": config.get("voicebox_url", ""),
"voicebox_profile_id": config.get("voicebox_profile_id", ""),
"voicebox_model_size": config.gethandle_update_tts function · python · L196-L253 (58 LOC)admin/api.py
async def handle_update_tts(request):
config = request.app["agent_config"]
ws_handler = request.app.get("ws_handler_obj")
try:
data = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
engine_name = data.get("engine")
if not engine_name or engine_name not in AVAILABLE_ENGINES:
return web.json_response(
{"error": f"Invalid engine. Choose from: {AVAILABLE_ENGINES}"}, status=400
)
# Build overrides dict
overrides = {"engine": engine_name}
tts_fields = [
"edge_voice", "kokoro_voice", "kokoro_speed",
"openai_voice", "openai_speed", "openai_model",
"voicebox_url", "voicebox_profile_id", "voicebox_model_size", "voicebox_language",
"deepgram_tts_model",
]
for field in tts_fields:
if field in data:
overrides[field] = data[field]
# Try to create the new engine
try:
from agent.tts_engine handle_tts_preview function · python · L259-L303 (45 LOC)admin/api.py
async def handle_tts_preview(request):
"""Generate a short audio preview for a given Deepgram voice model."""
try:
data = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON"}, status=400)
model = data.get("model", "").strip()
if not model:
return web.json_response({"error": "model is required"}, status=400)
api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not api_key:
return web.json_response({"error": "DEEPGRAM_API_KEY not configured"}, status=500)
import aiohttp as _aiohttp
url = "https://api.deepgram.com/v1/speak"
headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json",
}
params = {
"model": model,
"encoding": "linear16",
"sample_rate": "16000",
"container": "none",
}
try:
async with _aiohttp.ClientSession() as session:
async with session.post(url, headers=hhandle_generate_flow function · python · L308-L344 (37 LOC)admin/api.py
async def handle_generate_flow(request):
config = request.app["agent_config"]
desc = (config.get("company_description") or "").strip()
if not desc:
return web.json_response(
{"error": "Company description is required. Fill it in on the Setup page first."},
status=400,
)
try:
from agent.flow_generator import generate_flow_from_profile, intermediate_to_drawflow
intermediate = await generate_flow_from_profile(config)
drawflow_json, flow_json = intermediate_to_drawflow(intermediate)
flow_name = intermediate.get("flow_name", f"{config.get('business_name', 'My')} Flow")
flow_description = intermediate.get("flow_description", "Auto-generated from company profile")
# Save to DB
db = await get_db()
try:
flow_id = await db.insert_returning_id(
"INSERT INTO flows (name, description, flow_json, drawflow_json) VALUES (?, ?, ?, ?)",
(handle_list_flows function · python · L349-L357 (9 LOC)admin/api.py
async def handle_list_flows(request):
db = await get_db()
try:
rows = await db.fetch_all(
"SELECT id, name, description, is_active, created_at, updated_at FROM flows ORDER BY updated_at DESC"
)
return web.json_response(rows)
finally:
await db.close()handle_create_flow function · python · L360-L382 (23 LOC)admin/api.py
async def handle_create_flow(request):
db = await get_db()
try:
data = await request.json()
name = data.get("name", "").strip()
if not name:
return web.json_response({"error": "Name is required"}, status=400)
flow_json = data.get("flow_json", "{}")
drawflow_json = data.get("drawflow_json", "{}")
description = data.get("description", "")
if isinstance(flow_json, dict):
flow_json = json.dumps(flow_json)
if isinstance(drawflow_json, dict):
drawflow_json = json.dumps(drawflow_json)
new_id = await db.insert_returning_id(
"INSERT INTO flows (name, description, flow_json, drawflow_json) VALUES (?, ?, ?, ?)",
(name, description, flow_json, drawflow_json),
)
return web.json_response({"id": new_id, "status": "created"})
except Exception as e:
return web.json_response({"error": str(e)}, status=400)
finally:
await db.clohandle_get_flow function · python · L385-L401 (17 LOC)admin/api.py
async def handle_get_flow(request):
flow_id = request.match_info["id"]
db = await get_db()
try:
flow = await db.fetch_one("SELECT * FROM flows WHERE id = ?", (flow_id,))
if not flow:
return web.json_response({"error": "Flow not found"}, status=404)
# Parse JSON fields for the response
for field in ("flow_json", "drawflow_json"):
if flow.get(field):
try:
flow[field] = json.loads(flow[field])
except (json.JSONDecodeError, TypeError):
pass
return web.json_response(flow)
finally:
await db.close()Same scanner, your repo: https://repobility.com — Repobility
handle_update_flow function · python · L404-L429 (26 LOC)admin/api.py
async def handle_update_flow(request):
flow_id = request.match_info["id"]
db = await get_db()
try:
data = await request.json()
# Build SET clause from provided fields
allowed = {"name", "description", "flow_json", "drawflow_json", "is_active"}
updates = {}
for key in allowed:
if key in data:
val = data[key]
if key in ("flow_json", "drawflow_json") and isinstance(val, dict):
val = json.dumps(val)
updates[key] = val
if not updates:
return web.json_response({"error": "No fields to update"}, status=400)
updates["updated_at"] = datetime.utcnow().isoformat()
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [flow_id]
await db.execute(f"UPDATE flows SET {set_clause} WHERE id = ?", values)
await db.commit()
return web.json_response({"status": "updated"})
exchandle_delete_flow function · python · L432-L444 (13 LOC)admin/api.py
async def handle_delete_flow(request):
flow_id = request.match_info["id"]
db = await get_db()
try:
rowcount = await db.execute_with_rowcount(
"DELETE FROM flows WHERE id = ?", (flow_id,)
)
await db.commit()
if rowcount == 0:
return web.json_response({"error": "Flow not found"}, status=404)
return web.json_response({"status": "deleted"})
finally:
await db.close()handle_activate_flow function · python · L447-L458 (12 LOC)admin/api.py
async def handle_activate_flow(request):
flow_id = request.match_info["id"]
db = await get_db()
try:
now = datetime.utcnow().isoformat()
await db.execute(
"UPDATE flows SET is_active = TRUE, updated_at = ? WHERE id = ?", (now, flow_id)
)
await db.commit()
return web.json_response({"status": "activated"})
finally:
await db.close()handle_deactivate_flow function · python · L461-L472 (12 LOC)admin/api.py
async def handle_deactivate_flow(request):
flow_id = request.match_info["id"]
db = await get_db()
try:
now = datetime.utcnow().isoformat()
await db.execute(
"UPDATE flows SET is_active = FALSE, updated_at = ? WHERE id = ?", (now, flow_id)
)
await db.commit()
return web.json_response({"status": "deactivated"})
finally:
await db.close()AgentConfig class · python · L63-L147 (85 LOC)agent/config.py
class AgentConfig:
def __init__(self):
self._config = dict(DEFAULT_CONFIG)
self._load()
def _load(self):
# 1. Load from JSON file (local dev)
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, "r") as f:
stored = json.load(f)
self._config.update(stored)
# 2. Override with AGENT_* env vars (cloud)
for env_key, config_key in ENV_VAR_MAP.items():
val = os.environ.get(env_key)
if val is not None:
if config_key in LIST_KEYS:
self._config[config_key] = [s.strip() for s in val.split(",") if s.strip()]
elif config_key in INT_KEYS:
self._config[config_key] = int(val)
else:
self._config[config_key] = val
def _save(self):
# Only write to JSON file in SQLite mode (local dev)
if os.environ.get("DB_BACKEND", "sqlite").lower() != "sqlite":
__init__ method · python · L64-L66 (3 LOC)agent/config.py
def __init__(self):
self._config = dict(DEFAULT_CONFIG)
self._load()_load method · python · L68-L84 (17 LOC)agent/config.py
def _load(self):
# 1. Load from JSON file (local dev)
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, "r") as f:
stored = json.load(f)
self._config.update(stored)
# 2. Override with AGENT_* env vars (cloud)
for env_key, config_key in ENV_VAR_MAP.items():
val = os.environ.get(env_key)
if val is not None:
if config_key in LIST_KEYS:
self._config[config_key] = [s.strip() for s in val.split(",") if s.strip()]
elif config_key in INT_KEYS:
self._config[config_key] = int(val)
else:
self._config[config_key] = val_save method · python · L86-L92 (7 LOC)agent/config.py
def _save(self):
# Only write to JSON file in SQLite mode (local dev)
if os.environ.get("DB_BACKEND", "sqlite").lower() != "sqlite":
return
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "w") as f:
json.dump(self._config, f, indent=2)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
update method · python · L100-L102 (3 LOC)agent/config.py
def update(self, updates: dict):
self._config.update(updates)
self._save()get_greeting method · python · L104-L107 (4 LOC)agent/config.py
def get_greeting(self) -> str:
return self._config["greeting"].format(
business_name=self._config["business_name"]
)_build_company_profile_section method · python · L109-L136 (28 LOC)agent/config.py
def _build_company_profile_section(self) -> str:
"""Build company profile section for the system prompt."""
lines = []
desc = self._config.get("company_description", "").strip()
not_do = self._config.get("company_not_do", "").strip()
tone = self._config.get("company_tone", "").strip()
faqs = self._config.get("company_faqs", "").strip()
extra = self._config.get("company_extra", "").strip()
if not any([desc, not_do, tone, faqs, extra]):
return ""
lines.append("")
lines.append("== COMPANY IDENTITY ==")
if desc:
lines.append(f"What we do: {desc}")
if not_do:
lines.append(f"What we DO NOT do (politely decline these): {not_do}")
lines.append("If someone asks about something we don't do, be honest and helpful — say something like \"oh sorry, that's not something we handle — we're actually [what you do]. Is there anything else I can help with?\""get_system_prompt method · python · L138-L147 (10 LOC)agent/config.py
def get_system_prompt(self) -> str:
from agent.llm_config import SYSTEM_PROMPT
return SYSTEM_PROMPT.format(
business_name=self._config["business_name"],
services_list=", ".join(self._config["services"]),
business_hours=self._config["business_hours"],
current_date=date.today().strftime("%A, %B %d, %Y"),
company_profile_section=self._build_company_profile_section(),
)ConversationSession class · python · L16-L279 (264 LOC)agent/conversation.py
class ConversationSession:
def __init__(self, db, system_prompt, tts_engine=None, flow_tracker=None,
agent_id=None, company_id=None):
self.db = db
self.system_prompt = system_prompt
self.tts_engine = tts_engine
self.flow_tracker = flow_tracker
self.agent_id = agent_id
self.company_id = company_id
self.messages = []
self.actions_taken = []
self.call_id = str(uuid.uuid4())[:8]
self.started_at = datetime.utcnow()
self.client = AsyncAnthropic()
self.model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")
async def process_message(self, user_text, send_callback):
self.messages.append({"role": "user", "content": user_text})
# Detect flow match on each user message
if self.flow_tracker:
self.flow_tracker.detect_flow(user_text)
await self._call_claude(send_callback)
def _build_effective_prompt(self):
"""Build __init__ method · python · L17-L30 (14 LOC)agent/conversation.py
def __init__(self, db, system_prompt, tts_engine=None, flow_tracker=None,
agent_id=None, company_id=None):
self.db = db
self.system_prompt = system_prompt
self.tts_engine = tts_engine
self.flow_tracker = flow_tracker
self.agent_id = agent_id
self.company_id = company_id
self.messages = []
self.actions_taken = []
self.call_id = str(uuid.uuid4())[:8]
self.started_at = datetime.utcnow()
self.client = AsyncAnthropic()
self.model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")process_message method · python · L32-L37 (6 LOC)agent/conversation.py
async def process_message(self, user_text, send_callback):
self.messages.append({"role": "user", "content": user_text})
# Detect flow match on each user message
if self.flow_tracker:
self.flow_tracker.detect_flow(user_text)
await self._call_claude(send_callback)_build_effective_prompt method · python · L39-L49 (11 LOC)agent/conversation.py
def _build_effective_prompt(self):
"""Build system prompt with flow context appended if active."""
prompt = self.system_prompt
if self.flow_tracker:
flows_section = self.flow_tracker.compile_flows_prompt()
if flows_section:
prompt += flows_section
hint = self.flow_tracker.get_context_hint()
if hint:
prompt += hint
return promptRepobility · severity-and-effort ranking · https://repobility.com
_call_claude method · python · L51-L182 (132 LOC)agent/conversation.py
async def _call_claude(self, send_callback):
while True:
full_text = ""
chunker = SentenceChunker()
effective_prompt = self._build_effective_prompt()
# Pipeline TTS: fire off generation concurrently, send in order
tts_task_queue = asyncio.Queue() if self.tts_engine else None
async def tts_sender():
"""Await TTS tasks in order and send audio as each completes."""
while True:
item = await tts_task_queue.get()
if item is None:
break
task, text = item
try:
audio_bytes = await task
if audio_bytes:
b64 = base64.b64encode(audio_bytes).decode("ascii")
await send_callback({"type": "audio", "data": b64})
else:
print(f" [TTS_clean_for_tts method · python · L185-L223 (39 LOC)agent/conversation.py
def _clean_for_tts(text: str) -> str:
"""Strip emojis, markdown, and special characters before TTS."""
# Remove emoji unicode ranges
text = re.sub(
r'[\U0001F600-\U0001F64F' # emoticons
r'\U0001F300-\U0001F5FF' # symbols & pictographs
r'\U0001F680-\U0001F6FF' # transport & map
r'\U0001F1E0-\U0001F1FF' # flags
r'\U00002702-\U000027B0' # dingbats
r'\U0000FE00-\U0000FE0F' # variation selectors
r'\U0001F900-\U0001F9FF' # supplemental symbols
r'\U0001FA00-\U0001FA6F' # chess symbols
r'\U0001FA70-\U0001FAFF' # symbols extended-A
r'\U00002600-\U000026FF' # misc symbols
r'\U0000200D' # zero width joiner
r'\U00002B50\U00002B55' # stars, circles
r']+', '', text
)
# Remove markdown bold/italic markers
text = re.sub(r'\*+', '', text)
# Remo_enqueue_tts method · python · L225-L230 (6 LOC)agent/conversation.py
def _enqueue_tts(self, text, queue):
"""Clean text and fire off TTS generation as a concurrent task."""
cleaned = self._clean_for_tts(text)
if cleaned:
task = asyncio.create_task(self.tts_engine.generate(cleaned))
queue.put_nowait((task, cleaned))end_session method · python · L232-L279 (48 LOC)agent/conversation.py
async def end_session(self):
ended_at = datetime.utcnow()
duration = int((ended_at - self.started_at).total_seconds())
transcript = []
for msg in self.messages:
if msg["role"] == "user":
content = msg["content"]
if isinstance(content, str):
transcript.append({"role": "user", "text": content})
elif msg["role"] == "assistant":
content = msg["content"]
if isinstance(content, list):
for block in content:
if hasattr(block, "text"):
transcript.append({"role": "agent", "text": block.text})
elif isinstance(content, str):
transcript.append({"role": "agent", "text": content})
outcome = "resolved"
for action in self.actions_taken:
if action["tool"] == "create_booking":
outcome = "booking_made"
generate_flow_from_profile function · python · L67-L93 (27 LOC)agent/flow_generator.py
async def generate_flow_from_profile(config: AgentConfig) -> dict:
"""Generate a call flow from the company profile using Claude.
Returns validated call script JSON.
"""
profile = _build_profile_message(config)
client = AsyncAnthropic()
model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")
# First attempt
result = await _call_claude(client, model, profile)
errors = validate_intermediate(result)
# Retry once with error feedback
if errors:
retry_msg = (
f"The previous output had these issues:\n"
+ "\n".join(f"- {e}" for e in errors)
+ "\n\nPlease fix them and return the corrected JSON."
)
result = await _call_claude(client, model, profile, retry_msg)
errors = validate_intermediate(result)
if errors:
raise ValueError(f"Flow generation failed after retry: {'; '.join(errors)}")
return result_build_profile_message function · python · L96-L134 (39 LOC)agent/flow_generator.py
def _build_profile_message(config: AgentConfig) -> str:
"""Assemble company profile into a user message for Claude."""
parts = [f"Company: {config.get('business_name', 'Unknown')}"]
desc = config.get("company_description", "")
if desc:
parts.append(f"Description: {desc}")
services = config.get("services", [])
if services:
parts.append(f"Services: {', '.join(services)}")
hours = config.get("business_hours", "")
if hours:
parts.append(f"Business hours: {hours}")
not_do = config.get("company_not_do", "")
if not_do:
parts.append(f"What we do NOT do: {not_do}")
faqs = config.get("company_faqs", "")
if faqs:
parts.append(f"FAQs and key info:\n{faqs}")
extra = config.get("company_extra", "")
if extra:
parts.append(f"Additional context:\n{extra}")
tone = config.get("company_tone", "")
if tone:
parts.append(f"Tone: {tone}")
parts.append(
"\nDesign a call sc_call_claude function · python · L137-L159 (23 LOC)agent/flow_generator.py
async def _call_claude(client, model, profile, retry_feedback=None):
"""Call Claude API and parse JSON response."""
messages = [{"role": "user", "content": profile}]
if retry_feedback:
messages.append({"role": "assistant", "content": "I'll fix those issues."})
messages.append({"role": "user", "content": retry_feedback})
response = await client.messages.create(
model=model,
max_tokens=4096,
system=GENERATION_PROMPT,
messages=messages,
)
text = response.content[0].text
# Strip markdown code block if present
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text)
return json.loads(text)validate_intermediate function · python · L162-L183 (22 LOC)agent/flow_generator.py
def validate_intermediate(data: dict) -> list[str]:
"""Validate call script JSON. Returns list of error strings (empty = valid)."""
errors = []
if not isinstance(data.get("greeting"), str) or not data["greeting"].strip():
errors.append("'greeting' must be a non-empty string")
if not isinstance(data.get("knowledge_base"), str):
errors.append("'knowledge_base' must be a string")
if not isinstance(data.get("goodbye"), str) or not data["goodbye"].strip():
errors.append("'goodbye' must be a non-empty string")
tools = data.get("enabled_tools")
if not isinstance(tools, list):
errors.append("'enabled_tools' must be an array")
else:
for t in tools:
if t not in AVAILABLE_TOOLS:
errors.append(f"Unknown tool: '{t}'")
return errorsCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
intermediate_to_drawflow function · python · L186-L193 (8 LOC)agent/flow_generator.py
def intermediate_to_drawflow(data: dict) -> tuple[dict, dict]:
"""Convert call script format to (drawflow_json, flow_json) tuple.
Accepts the simple format: { greeting, knowledge_base, enabled_tools, goodbye }
Returns ({}, flow_json) — drawflow_json is empty since Drawflow is no longer used.
"""
flow_json = _simple_to_flow_json(data)
return {}, flow_json_simple_to_flow_json function · python · L196-L283 (88 LOC)agent/flow_generator.py
def _simple_to_flow_json(data: dict) -> dict:
"""Convert simple call script format to flow_json format.
Produces a linear chain: entry -> welcome -> tool nodes -> end
"""
nodes = {}
node_id = 1
greeting = data.get("greeting", "")
knowledge_base = data.get("knowledge_base", "")
tool_instructions = data.get("tool_instructions", "")
enabled_tools = data.get("enabled_tools", [])
goodbye = data.get("goodbye", "")
# Build full knowledge base — company info + tool instructions + transfer note
kb = knowledge_base
if tool_instructions:
kb += ("\n\n" if kb else "") + tool_instructions
has_transfer = "transfer" in enabled_tools
if has_transfer:
kb += ("\n\n" if kb else "") + "You can transfer the caller to a human agent if they request it or the issue requires human attention."
# Entry node
nodes[f"node_{node_id}"] = {
"type": "entry",
"label": "Call Starts",
"position": {"x": 50, "y": FlowTracker class · python · L10-L312 (303 LOC)agent/flow_tracker.py
class FlowTracker:
def __init__(self, flows):
"""Initialize with a list of active flow dicts (from DB)."""
self.flows = flows # list of {id, name, flow_json (parsed dict)}
self.active_flow = None # matched flow name
self.current_step = None # current node id
self.step_number = 0
self.collected = {} # variables collected during flow
@classmethod
async def load_active_flows(cls, db=None):
"""Load all active flows from the database and return a FlowTracker."""
close_db = False
if db is None:
db = await get_db()
close_db = True
try:
rows = await db.fetch_all(
"SELECT id, name, flow_json FROM flows WHERE is_active = TRUE"
)
flows = []
for r in rows:
flow_json = r["flow_json"]
if isinstance(flow_json, str):
try:
flow_json = json.loads(__init__ method · python · L11-L17 (7 LOC)agent/flow_tracker.py
def __init__(self, flows):
"""Initialize with a list of active flow dicts (from DB)."""
self.flows = flows # list of {id, name, flow_json (parsed dict)}
self.active_flow = None # matched flow name
self.current_step = None # current node id
self.step_number = 0
self.collected = {} # variables collected during flowload_active_flows method · python · L20-L46 (27 LOC)agent/flow_tracker.py
async def load_active_flows(cls, db=None):
"""Load all active flows from the database and return a FlowTracker."""
close_db = False
if db is None:
db = await get_db()
close_db = True
try:
rows = await db.fetch_all(
"SELECT id, name, flow_json FROM flows WHERE is_active = TRUE"
)
flows = []
for r in rows:
flow_json = r["flow_json"]
if isinstance(flow_json, str):
try:
flow_json = json.loads(flow_json)
except (json.JSONDecodeError, TypeError):
continue
flows.append({
"id": r["id"],
"name": r["name"],
"flow_json": flow_json,
})
return cls(flows)
finally:
if close_db:
await db.close()compile_flows_prompt method · python · L48-L121 (74 LOC)agent/flow_tracker.py
def compile_flows_prompt(self):
"""Convert all active flows into a human-readable prompt section."""
if not self.flows:
return ""
lines = []
all_knowledge = []
welcome_prompt = ""
for flow in self.flows:
fj = flow["flow_json"]
nodes = fj.get("nodes", {})
if not nodes:
continue
# Collect knowledge base and welcome info from message nodes
for node in nodes.values():
data = node.get("data", {})
kb = data.get("knowledge_base", "")
kb_lower = kb.strip().lower() if kb else ""
is_placeholder = any(kb_lower.startswith(p) for p in [
"paste your", "edit this", "add your", "replace this",
"example for", "put your",
])
if kb_lower and not is_placeholder:
all_knowledge.append(kb.strip())
#_walk_flow method · python · L123-L155 (33 LOC)agent/flow_tracker.py
def _walk_flow(self, nodes):
"""Walk the flow graph starting from entry nodes, producing step descriptions."""
# Find entry node
entry_id = None
for nid, node in nodes.items():
if node.get("type") == "entry":
entry_id = nid
break
if not entry_id:
return []
steps = []
visited = set()
queue = self._get_next_nodes(nodes, entry_id)
while queue:
nid = queue.pop(0)
if nid in visited or nid not in nodes:
continue
visited.add(nid)
node = nodes[nid]
step_text = self._describe_step(node)
if step_text:
steps.append(step_text)
# Follow primary output (first connection of first output)
next_nodes = self._get_next_nodes(nodes, nid)
for nn in next_nodes:
if nn not in visited:
queue.append(nn)
_get_next_nodes method · python · L157-L165 (9 LOC)agent/flow_tracker.py
def _get_next_nodes(self, nodes, node_id):
"""Get connected node IDs from a node's outputs."""
node = nodes.get(node_id, {})
result = []
for output in node.get("outputs", {}).values():
for conn in output.get("connections", []):
if conn not in result:
result.append(conn)
return resultSame scanner, your repo: https://repobility.com — Repobility
_describe_step method · python · L167-L217 (51 LOC)agent/flow_tracker.py
def _describe_step(self, node):
"""Produce a human-readable step description for a node."""
ntype = node.get("type", "")
data = node.get("data", {})
label = node.get("label", "")
if ntype == "message":
prompt = data.get("prompt", label)
kb = data.get("knowledge_base", "")
kb_note = " (use knowledge base)" if kb and kb.strip() else ""
return f"[MESSAGE] {prompt}{kb_note}"
elif ntype == "question":
prompt = data.get("prompt", label)
var = data.get("collect_variable", "")
collect = f" (collect: {var})" if var else ""
branches = data.get("branches") or data.get("response_branches", [])
branch_text = ""
if branches:
if isinstance(branches[0], str):
branch_text = " | ".join(branches)
elif isinstance(branches[0], dict):
branch_text = " | ".join(b.get(detect_flow method · python · L219-L241 (23 LOC)agent/flow_tracker.py
def detect_flow(self, user_text):
"""Check if user text matches any flow's entry trigger keywords."""
if self.active_flow:
return self.active_flow
text_lower = user_text.lower()
for flow in self.flows:
fj = flow["flow_json"]
nodes = fj.get("nodes", {})
for node in nodes.values():
if node.get("type") != "entry":
continue
keywords = node.get("data", {}).get("trigger_keywords", "")
if not keywords:
continue
kw_list = [k.strip().lower() for k in keywords.split(",") if k.strip()]
for kw in kw_list:
if kw in text_lower:
self.active_flow = flow["name"]
self.current_step = self._find_first_step(nodes)
self.step_number = 1
return self.active_flow
return None_find_first_step method · python · L243-L249 (7 LOC)agent/flow_tracker.py
def _find_first_step(self, nodes):
"""Find the first non-entry node in the flow."""
for nid, node in nodes.items():
if node.get("type") == "entry":
next_nodes = self._get_next_nodes(nodes, nid)
return next_nodes[0] if next_nodes else None
return Nonepage 1 / 5next ›