← back to jooliperbush__voiceagent

Function bodies 220 total

All specs Real LLM only Function bodies
create_admin_app function · python · L9-L53 (45 LOC)
admin/api.py
def create_admin_app(config, ws_handler=None):
    app = web.Application()

    app["agent_config"] = config

    # Accept either a WebSocketHandler object or a raw coroutine
    if ws_handler is not None:
        from agent.websocket_handler import WebSocketHandler
        if isinstance(ws_handler, WebSocketHandler):
            app["ws_handler_obj"] = ws_handler
            app.router.add_get("/ws", ws_handler.handle_connection)
        else:
            app.router.add_get("/ws", ws_handler)

    app.router.add_get("/api/tts", handle_get_tts)
    app.router.add_post("/api/tts", handle_update_tts)
    app.router.add_post("/api/tts/preview", handle_tts_preview)
    app.router.add_get("/api/stats", handle_stats)
    app.router.add_get("/api/calls", handle_calls)
    app.router.add_get("/api/calls/{call_id}", handle_call_detail)
    app.router.add_get("/api/bookings", handle_bookings)
    app.router.add_get("/api/tickets", handle_tickets)
    app.router.add_get("/api/config", handle_get_
handle_stats function · python · L56-L78 (23 LOC)
admin/api.py
async def handle_stats(request):
    db = await get_db()
    try:
        stats = {}
        for table, key in [
            ("call_logs", "total_calls"),
            ("bookings", "total_bookings"),
            ("tickets", "total_tickets"),
            ("customers", "total_customers"),
        ]:
            val = await db.fetch_val(f"SELECT COUNT(*) FROM {table}")
            stats[key] = val

        # Today's calls
        today_expr = sql_today()
        val = await db.fetch_val(
            f"SELECT COUNT(*) FROM call_logs WHERE DATE(started_at) = {today_expr}"
        )
        stats["calls_today"] = val

        return web.json_response(stats)
    finally:
        await db.close()
handle_calls function · python · L81-L96 (16 LOC)
admin/api.py
async def handle_calls(request):
    db = await get_db()
    try:
        rows = await db.fetch_all(
            "SELECT * FROM call_logs ORDER BY started_at DESC LIMIT 50"
        )
        calls = []
        for call in rows:
            if call.get("transcript"):
                call["transcript"] = json.loads(call["transcript"])
            if call.get("actions_taken"):
                call["actions_taken"] = json.loads(call["actions_taken"])
            calls.append(call)
        return web.json_response(calls)
    finally:
        await db.close()
handle_call_detail function · python · L99-L114 (16 LOC)
admin/api.py
async def handle_call_detail(request):
    call_id = request.match_info["call_id"]
    db = await get_db()
    try:
        call = await db.fetch_one(
            "SELECT * FROM call_logs WHERE call_id = ?", (call_id,)
        )
        if not call:
            return web.json_response({"error": "Call not found"}, status=404)
        if call.get("transcript"):
            call["transcript"] = json.loads(call["transcript"])
        if call.get("actions_taken"):
            call["actions_taken"] = json.loads(call["actions_taken"])
        return web.json_response(call)
    finally:
        await db.close()
handle_bookings function · python · L117-L129 (13 LOC)
admin/api.py
async def handle_bookings(request):
    db = await get_db()
    try:
        rows = await db.fetch_all(
            """SELECT b.*, c.name as customer_name, s.name as service_name
               FROM bookings b
               LEFT JOIN customers c ON b.customer_id = c.id
               LEFT JOIN services s ON b.service_id = s.id
               ORDER BY b.created_at DESC"""
        )
        return web.json_response(rows)
    finally:
        await db.close()
handle_tickets function · python · L132-L143 (12 LOC)
admin/api.py
async def handle_tickets(request):
    db = await get_db()
    try:
        rows = await db.fetch_all(
            """SELECT t.*, c.name as customer_name
               FROM tickets t
               LEFT JOIN customers c ON t.customer_id = c.id
               ORDER BY t.created_at DESC"""
        )
        return web.json_response(rows)
    finally:
        await db.close()
handle_get_config function · python · L146-L148 (3 LOC)
admin/api.py
async def handle_get_config(request):
    config = request.app["agent_config"]
    return web.json_response(config.to_dict())
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
handle_update_config function · python · L151-L158 (8 LOC)
admin/api.py
async def handle_update_config(request):
    config = request.app["agent_config"]
    try:
        data = await request.json()
        config.update(data)
        return web.json_response({"status": "updated", "config": config.to_dict()})
    except Exception as e:
        return web.json_response({"error": str(e)}, status=400)
handle_get_tts function · python · L166-L193 (28 LOC)
admin/api.py
async def handle_get_tts(request):
    config = request.app["agent_config"]
    ws_handler = request.app.get("ws_handler_obj")

    # Current active engine name
    active = None
    if ws_handler and ws_handler.tts_engine:
        active = ws_handler.tts_engine.name
    elif config.get("tts_engine"):
        active = config.get("tts_engine")

    return web.json_response({
        "engine": active,
        "available": AVAILABLE_ENGINES,
        "settings": {
            "edge_voice": config.get("edge_voice", ""),
            "kokoro_voice": config.get("kokoro_voice", ""),
            "kokoro_speed": config.get("kokoro_speed", ""),
            "openai_voice": config.get("openai_voice", ""),
            "openai_speed": config.get("openai_speed", ""),
            "openai_model": config.get("openai_model", ""),
            "voicebox_url": config.get("voicebox_url", ""),
            "voicebox_profile_id": config.get("voicebox_profile_id", ""),
            "voicebox_model_size": config.get
handle_update_tts function · python · L196-L253 (58 LOC)
admin/api.py
async def handle_update_tts(request):
    config = request.app["agent_config"]
    ws_handler = request.app.get("ws_handler_obj")

    try:
        data = await request.json()
    except Exception:
        return web.json_response({"error": "Invalid JSON"}, status=400)

    engine_name = data.get("engine")
    if not engine_name or engine_name not in AVAILABLE_ENGINES:
        return web.json_response(
            {"error": f"Invalid engine. Choose from: {AVAILABLE_ENGINES}"}, status=400
        )

    # Build overrides dict
    overrides = {"engine": engine_name}
    tts_fields = [
        "edge_voice", "kokoro_voice", "kokoro_speed",
        "openai_voice", "openai_speed", "openai_model",
        "voicebox_url", "voicebox_profile_id", "voicebox_model_size", "voicebox_language",
        "deepgram_tts_model",
    ]
    for field in tts_fields:
        if field in data:
            overrides[field] = data[field]

    # Try to create the new engine
    try:
        from agent.tts_engine 
handle_tts_preview function · python · L259-L303 (45 LOC)
admin/api.py
async def handle_tts_preview(request):
    """Generate a short audio preview for a given Deepgram voice model."""
    try:
        data = await request.json()
    except Exception:
        return web.json_response({"error": "Invalid JSON"}, status=400)

    model = data.get("model", "").strip()
    if not model:
        return web.json_response({"error": "model is required"}, status=400)

    api_key = os.environ.get("DEEPGRAM_API_KEY", "")
    if not api_key:
        return web.json_response({"error": "DEEPGRAM_API_KEY not configured"}, status=500)

    import aiohttp as _aiohttp

    url = "https://api.deepgram.com/v1/speak"
    headers = {
        "Authorization": f"Token {api_key}",
        "Content-Type": "application/json",
    }
    params = {
        "model": model,
        "encoding": "linear16",
        "sample_rate": "16000",
        "container": "none",
    }

    try:
        async with _aiohttp.ClientSession() as session:
            async with session.post(url, headers=h
handle_generate_flow function · python · L308-L344 (37 LOC)
admin/api.py
async def handle_generate_flow(request):
    config = request.app["agent_config"]
    desc = (config.get("company_description") or "").strip()
    if not desc:
        return web.json_response(
            {"error": "Company description is required. Fill it in on the Setup page first."},
            status=400,
        )

    try:
        from agent.flow_generator import generate_flow_from_profile, intermediate_to_drawflow

        intermediate = await generate_flow_from_profile(config)
        drawflow_json, flow_json = intermediate_to_drawflow(intermediate)

        flow_name = intermediate.get("flow_name", f"{config.get('business_name', 'My')} Flow")
        flow_description = intermediate.get("flow_description", "Auto-generated from company profile")

        # Save to DB
        db = await get_db()
        try:
            flow_id = await db.insert_returning_id(
                "INSERT INTO flows (name, description, flow_json, drawflow_json) VALUES (?, ?, ?, ?)",
                (
handle_list_flows function · python · L349-L357 (9 LOC)
admin/api.py
async def handle_list_flows(request):
    db = await get_db()
    try:
        rows = await db.fetch_all(
            "SELECT id, name, description, is_active, created_at, updated_at FROM flows ORDER BY updated_at DESC"
        )
        return web.json_response(rows)
    finally:
        await db.close()
handle_create_flow function · python · L360-L382 (23 LOC)
admin/api.py
async def handle_create_flow(request):
    db = await get_db()
    try:
        data = await request.json()
        name = data.get("name", "").strip()
        if not name:
            return web.json_response({"error": "Name is required"}, status=400)
        flow_json = data.get("flow_json", "{}")
        drawflow_json = data.get("drawflow_json", "{}")
        description = data.get("description", "")
        if isinstance(flow_json, dict):
            flow_json = json.dumps(flow_json)
        if isinstance(drawflow_json, dict):
            drawflow_json = json.dumps(drawflow_json)
        new_id = await db.insert_returning_id(
            "INSERT INTO flows (name, description, flow_json, drawflow_json) VALUES (?, ?, ?, ?)",
            (name, description, flow_json, drawflow_json),
        )
        return web.json_response({"id": new_id, "status": "created"})
    except Exception as e:
        return web.json_response({"error": str(e)}, status=400)
    finally:
        await db.clo
handle_get_flow function · python · L385-L401 (17 LOC)
admin/api.py
async def handle_get_flow(request):
    flow_id = request.match_info["id"]
    db = await get_db()
    try:
        flow = await db.fetch_one("SELECT * FROM flows WHERE id = ?", (flow_id,))
        if not flow:
            return web.json_response({"error": "Flow not found"}, status=404)
        # Parse JSON fields for the response
        for field in ("flow_json", "drawflow_json"):
            if flow.get(field):
                try:
                    flow[field] = json.loads(flow[field])
                except (json.JSONDecodeError, TypeError):
                    pass
        return web.json_response(flow)
    finally:
        await db.close()
Same scanner, your repo: https://repobility.com — Repobility
handle_update_flow function · python · L404-L429 (26 LOC)
admin/api.py
async def handle_update_flow(request):
    flow_id = request.match_info["id"]
    db = await get_db()
    try:
        data = await request.json()
        # Build SET clause from provided fields
        allowed = {"name", "description", "flow_json", "drawflow_json", "is_active"}
        updates = {}
        for key in allowed:
            if key in data:
                val = data[key]
                if key in ("flow_json", "drawflow_json") and isinstance(val, dict):
                    val = json.dumps(val)
                updates[key] = val
        if not updates:
            return web.json_response({"error": "No fields to update"}, status=400)
        updates["updated_at"] = datetime.utcnow().isoformat()
        set_clause = ", ".join(f"{k} = ?" for k in updates)
        values = list(updates.values()) + [flow_id]
        await db.execute(f"UPDATE flows SET {set_clause} WHERE id = ?", values)
        await db.commit()
        return web.json_response({"status": "updated"})
    exc
handle_delete_flow function · python · L432-L444 (13 LOC)
admin/api.py
async def handle_delete_flow(request):
    flow_id = request.match_info["id"]
    db = await get_db()
    try:
        rowcount = await db.execute_with_rowcount(
            "DELETE FROM flows WHERE id = ?", (flow_id,)
        )
        await db.commit()
        if rowcount == 0:
            return web.json_response({"error": "Flow not found"}, status=404)
        return web.json_response({"status": "deleted"})
    finally:
        await db.close()
handle_activate_flow function · python · L447-L458 (12 LOC)
admin/api.py
async def handle_activate_flow(request):
    flow_id = request.match_info["id"]
    db = await get_db()
    try:
        now = datetime.utcnow().isoformat()
        await db.execute(
            "UPDATE flows SET is_active = TRUE, updated_at = ? WHERE id = ?", (now, flow_id)
        )
        await db.commit()
        return web.json_response({"status": "activated"})
    finally:
        await db.close()
handle_deactivate_flow function · python · L461-L472 (12 LOC)
admin/api.py
async def handle_deactivate_flow(request):
    flow_id = request.match_info["id"]
    db = await get_db()
    try:
        now = datetime.utcnow().isoformat()
        await db.execute(
            "UPDATE flows SET is_active = FALSE, updated_at = ? WHERE id = ?", (now, flow_id)
        )
        await db.commit()
        return web.json_response({"status": "deactivated"})
    finally:
        await db.close()
AgentConfig class · python · L63-L147 (85 LOC)
agent/config.py
class AgentConfig:
    def __init__(self):
        self._config = dict(DEFAULT_CONFIG)
        self._load()

    def _load(self):
        # 1. Load from JSON file (local dev)
        if os.path.exists(CONFIG_PATH):
            with open(CONFIG_PATH, "r") as f:
                stored = json.load(f)
                self._config.update(stored)

        # 2. Override with AGENT_* env vars (cloud)
        for env_key, config_key in ENV_VAR_MAP.items():
            val = os.environ.get(env_key)
            if val is not None:
                if config_key in LIST_KEYS:
                    self._config[config_key] = [s.strip() for s in val.split(",") if s.strip()]
                elif config_key in INT_KEYS:
                    self._config[config_key] = int(val)
                else:
                    self._config[config_key] = val

    def _save(self):
        # Only write to JSON file in SQLite mode (local dev)
        if os.environ.get("DB_BACKEND", "sqlite").lower() != "sqlite":
      
__init__ method · python · L64-L66 (3 LOC)
agent/config.py
    def __init__(self):
        self._config = dict(DEFAULT_CONFIG)
        self._load()
_load method · python · L68-L84 (17 LOC)
agent/config.py
    def _load(self):
        # 1. Load from JSON file (local dev)
        if os.path.exists(CONFIG_PATH):
            with open(CONFIG_PATH, "r") as f:
                stored = json.load(f)
                self._config.update(stored)

        # 2. Override with AGENT_* env vars (cloud)
        for env_key, config_key in ENV_VAR_MAP.items():
            val = os.environ.get(env_key)
            if val is not None:
                if config_key in LIST_KEYS:
                    self._config[config_key] = [s.strip() for s in val.split(",") if s.strip()]
                elif config_key in INT_KEYS:
                    self._config[config_key] = int(val)
                else:
                    self._config[config_key] = val
_save method · python · L86-L92 (7 LOC)
agent/config.py
    def _save(self):
        # Only write to JSON file in SQLite mode (local dev)
        if os.environ.get("DB_BACKEND", "sqlite").lower() != "sqlite":
            return
        os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
        with open(CONFIG_PATH, "w") as f:
            json.dump(self._config, f, indent=2)
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
update method · python · L100-L102 (3 LOC)
agent/config.py
    def update(self, updates: dict):
        self._config.update(updates)
        self._save()
get_greeting method · python · L104-L107 (4 LOC)
agent/config.py
    def get_greeting(self) -> str:
        return self._config["greeting"].format(
            business_name=self._config["business_name"]
        )
_build_company_profile_section method · python · L109-L136 (28 LOC)
agent/config.py
    def _build_company_profile_section(self) -> str:
        """Build company profile section for the system prompt."""
        lines = []
        desc = self._config.get("company_description", "").strip()
        not_do = self._config.get("company_not_do", "").strip()
        tone = self._config.get("company_tone", "").strip()
        faqs = self._config.get("company_faqs", "").strip()
        extra = self._config.get("company_extra", "").strip()

        if not any([desc, not_do, tone, faqs, extra]):
            return ""

        lines.append("")
        lines.append("== COMPANY IDENTITY ==")
        if desc:
            lines.append(f"What we do: {desc}")
        if not_do:
            lines.append(f"What we DO NOT do (politely decline these): {not_do}")
            lines.append("If someone asks about something we don't do, be honest and helpful — say something like \"oh sorry, that's not something we handle — we're actually [what you do]. Is there anything else I can help with?\""
get_system_prompt method · python · L138-L147 (10 LOC)
agent/config.py
    def get_system_prompt(self) -> str:
        from agent.llm_config import SYSTEM_PROMPT

        return SYSTEM_PROMPT.format(
            business_name=self._config["business_name"],
            services_list=", ".join(self._config["services"]),
            business_hours=self._config["business_hours"],
            current_date=date.today().strftime("%A, %B %d, %Y"),
            company_profile_section=self._build_company_profile_section(),
        )
ConversationSession class · python · L16-L279 (264 LOC)
agent/conversation.py
class ConversationSession:
    def __init__(self, db, system_prompt, tts_engine=None, flow_tracker=None,
                 agent_id=None, company_id=None):
        self.db = db
        self.system_prompt = system_prompt
        self.tts_engine = tts_engine
        self.flow_tracker = flow_tracker
        self.agent_id = agent_id
        self.company_id = company_id
        self.messages = []
        self.actions_taken = []
        self.call_id = str(uuid.uuid4())[:8]
        self.started_at = datetime.utcnow()
        self.client = AsyncAnthropic()
        self.model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")

    async def process_message(self, user_text, send_callback):
        self.messages.append({"role": "user", "content": user_text})
        # Detect flow match on each user message
        if self.flow_tracker:
            self.flow_tracker.detect_flow(user_text)
        await self._call_claude(send_callback)

    def _build_effective_prompt(self):
        """Build 
__init__ method · python · L17-L30 (14 LOC)
agent/conversation.py
    def __init__(self, db, system_prompt, tts_engine=None, flow_tracker=None,
                 agent_id=None, company_id=None):
        self.db = db
        self.system_prompt = system_prompt
        self.tts_engine = tts_engine
        self.flow_tracker = flow_tracker
        self.agent_id = agent_id
        self.company_id = company_id
        self.messages = []
        self.actions_taken = []
        self.call_id = str(uuid.uuid4())[:8]
        self.started_at = datetime.utcnow()
        self.client = AsyncAnthropic()
        self.model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")
process_message method · python · L32-L37 (6 LOC)
agent/conversation.py
    async def process_message(self, user_text, send_callback):
        self.messages.append({"role": "user", "content": user_text})
        # Detect flow match on each user message
        if self.flow_tracker:
            self.flow_tracker.detect_flow(user_text)
        await self._call_claude(send_callback)
_build_effective_prompt method · python · L39-L49 (11 LOC)
agent/conversation.py
    def _build_effective_prompt(self):
        """Build system prompt with flow context appended if active."""
        prompt = self.system_prompt
        if self.flow_tracker:
            flows_section = self.flow_tracker.compile_flows_prompt()
            if flows_section:
                prompt += flows_section
            hint = self.flow_tracker.get_context_hint()
            if hint:
                prompt += hint
        return prompt
Repobility · severity-and-effort ranking · https://repobility.com
_call_claude method · python · L51-L182 (132 LOC)
agent/conversation.py
    async def _call_claude(self, send_callback):
        while True:
            full_text = ""
            chunker = SentenceChunker()
            effective_prompt = self._build_effective_prompt()

            # Pipeline TTS: fire off generation concurrently, send in order
            tts_task_queue = asyncio.Queue() if self.tts_engine else None

            async def tts_sender():
                """Await TTS tasks in order and send audio as each completes."""
                while True:
                    item = await tts_task_queue.get()
                    if item is None:
                        break
                    task, text = item
                    try:
                        audio_bytes = await task
                        if audio_bytes:
                            b64 = base64.b64encode(audio_bytes).decode("ascii")
                            await send_callback({"type": "audio", "data": b64})
                        else:
                            print(f"  [TTS
_clean_for_tts method · python · L185-L223 (39 LOC)
agent/conversation.py
    def _clean_for_tts(text: str) -> str:
        """Strip emojis, markdown, and special characters before TTS."""
        # Remove emoji unicode ranges
        text = re.sub(
            r'[\U0001F600-\U0001F64F'  # emoticons
            r'\U0001F300-\U0001F5FF'    # symbols & pictographs
            r'\U0001F680-\U0001F6FF'    # transport & map
            r'\U0001F1E0-\U0001F1FF'    # flags
            r'\U00002702-\U000027B0'    # dingbats
            r'\U0000FE00-\U0000FE0F'    # variation selectors
            r'\U0001F900-\U0001F9FF'    # supplemental symbols
            r'\U0001FA00-\U0001FA6F'    # chess symbols
            r'\U0001FA70-\U0001FAFF'    # symbols extended-A
            r'\U00002600-\U000026FF'    # misc symbols
            r'\U0000200D'               # zero width joiner
            r'\U00002B50\U00002B55'     # stars, circles
            r']+', '', text
        )
        # Remove markdown bold/italic markers
        text = re.sub(r'\*+', '', text)
        # Remo
_enqueue_tts method · python · L225-L230 (6 LOC)
agent/conversation.py
    def _enqueue_tts(self, text, queue):
        """Clean text and fire off TTS generation as a concurrent task."""
        cleaned = self._clean_for_tts(text)
        if cleaned:
            task = asyncio.create_task(self.tts_engine.generate(cleaned))
            queue.put_nowait((task, cleaned))
end_session method · python · L232-L279 (48 LOC)
agent/conversation.py
    async def end_session(self):
        ended_at = datetime.utcnow()
        duration = int((ended_at - self.started_at).total_seconds())

        transcript = []
        for msg in self.messages:
            if msg["role"] == "user":
                content = msg["content"]
                if isinstance(content, str):
                    transcript.append({"role": "user", "text": content})
            elif msg["role"] == "assistant":
                content = msg["content"]
                if isinstance(content, list):
                    for block in content:
                        if hasattr(block, "text"):
                            transcript.append({"role": "agent", "text": block.text})
                elif isinstance(content, str):
                    transcript.append({"role": "agent", "text": content})

        outcome = "resolved"
        for action in self.actions_taken:
            if action["tool"] == "create_booking":
                outcome = "booking_made"
          
generate_flow_from_profile function · python · L67-L93 (27 LOC)
agent/flow_generator.py
async def generate_flow_from_profile(config: AgentConfig) -> dict:
    """Generate a call flow from the company profile using Claude.

    Returns validated call script JSON.
    """
    profile = _build_profile_message(config)

    client = AsyncAnthropic()
    model = os.environ.get("LLM_MODEL", "claude-sonnet-4-20250514")

    # First attempt
    result = await _call_claude(client, model, profile)
    errors = validate_intermediate(result)

    # Retry once with error feedback
    if errors:
        retry_msg = (
            f"The previous output had these issues:\n"
            + "\n".join(f"- {e}" for e in errors)
            + "\n\nPlease fix them and return the corrected JSON."
        )
        result = await _call_claude(client, model, profile, retry_msg)
        errors = validate_intermediate(result)
        if errors:
            raise ValueError(f"Flow generation failed after retry: {'; '.join(errors)}")

    return result
_build_profile_message function · python · L96-L134 (39 LOC)
agent/flow_generator.py
def _build_profile_message(config: AgentConfig) -> str:
    """Assemble company profile into a user message for Claude."""
    parts = [f"Company: {config.get('business_name', 'Unknown')}"]

    desc = config.get("company_description", "")
    if desc:
        parts.append(f"Description: {desc}")

    services = config.get("services", [])
    if services:
        parts.append(f"Services: {', '.join(services)}")

    hours = config.get("business_hours", "")
    if hours:
        parts.append(f"Business hours: {hours}")

    not_do = config.get("company_not_do", "")
    if not_do:
        parts.append(f"What we do NOT do: {not_do}")

    faqs = config.get("company_faqs", "")
    if faqs:
        parts.append(f"FAQs and key info:\n{faqs}")

    extra = config.get("company_extra", "")
    if extra:
        parts.append(f"Additional context:\n{extra}")

    tone = config.get("company_tone", "")
    if tone:
        parts.append(f"Tone: {tone}")

    parts.append(
        "\nDesign a call sc
_call_claude function · python · L137-L159 (23 LOC)
agent/flow_generator.py
async def _call_claude(client, model, profile, retry_feedback=None):
    """Call Claude API and parse JSON response."""
    messages = [{"role": "user", "content": profile}]
    if retry_feedback:
        messages.append({"role": "assistant", "content": "I'll fix those issues."})
        messages.append({"role": "user", "content": retry_feedback})

    response = await client.messages.create(
        model=model,
        max_tokens=4096,
        system=GENERATION_PROMPT,
        messages=messages,
    )

    text = response.content[0].text

    # Strip markdown code block if present
    text = text.strip()
    if text.startswith("```"):
        text = re.sub(r"^```(?:json)?\s*\n?", "", text)
        text = re.sub(r"\n?```\s*$", "", text)

    return json.loads(text)
validate_intermediate function · python · L162-L183 (22 LOC)
agent/flow_generator.py
def validate_intermediate(data: dict) -> list[str]:
    """Validate call script JSON. Returns list of error strings (empty = valid)."""
    errors = []

    if not isinstance(data.get("greeting"), str) or not data["greeting"].strip():
        errors.append("'greeting' must be a non-empty string")

    if not isinstance(data.get("knowledge_base"), str):
        errors.append("'knowledge_base' must be a string")

    if not isinstance(data.get("goodbye"), str) or not data["goodbye"].strip():
        errors.append("'goodbye' must be a non-empty string")

    tools = data.get("enabled_tools")
    if not isinstance(tools, list):
        errors.append("'enabled_tools' must be an array")
    else:
        for t in tools:
            if t not in AVAILABLE_TOOLS:
                errors.append(f"Unknown tool: '{t}'")

    return errors
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
intermediate_to_drawflow function · python · L186-L193 (8 LOC)
agent/flow_generator.py
def intermediate_to_drawflow(data: dict) -> tuple[dict, dict]:
    """Convert call script format to (drawflow_json, flow_json) tuple.

    Accepts the simple format: { greeting, knowledge_base, enabled_tools, goodbye }
    Returns ({}, flow_json) — drawflow_json is empty since Drawflow is no longer used.
    """
    flow_json = _simple_to_flow_json(data)
    return {}, flow_json
_simple_to_flow_json function · python · L196-L283 (88 LOC)
agent/flow_generator.py
def _simple_to_flow_json(data: dict) -> dict:
    """Convert simple call script format to flow_json format.

    Produces a linear chain: entry -> welcome -> tool nodes -> end
    """
    nodes = {}
    node_id = 1

    greeting = data.get("greeting", "")
    knowledge_base = data.get("knowledge_base", "")
    tool_instructions = data.get("tool_instructions", "")
    enabled_tools = data.get("enabled_tools", [])
    goodbye = data.get("goodbye", "")

    # Build full knowledge base — company info + tool instructions + transfer note
    kb = knowledge_base
    if tool_instructions:
        kb += ("\n\n" if kb else "") + tool_instructions
    has_transfer = "transfer" in enabled_tools
    if has_transfer:
        kb += ("\n\n" if kb else "") + "You can transfer the caller to a human agent if they request it or the issue requires human attention."

    # Entry node
    nodes[f"node_{node_id}"] = {
        "type": "entry",
        "label": "Call Starts",
        "position": {"x": 50, "y": 
FlowTracker class · python · L10-L312 (303 LOC)
agent/flow_tracker.py
class FlowTracker:
    def __init__(self, flows):
        """Initialize with a list of active flow dicts (from DB)."""
        self.flows = flows  # list of {id, name, flow_json (parsed dict)}
        self.active_flow = None  # matched flow name
        self.current_step = None  # current node id
        self.step_number = 0
        self.collected = {}  # variables collected during flow

    @classmethod
    async def load_active_flows(cls, db=None):
        """Load all active flows from the database and return a FlowTracker."""
        close_db = False
        if db is None:
            db = await get_db()
            close_db = True
        try:
            rows = await db.fetch_all(
                "SELECT id, name, flow_json FROM flows WHERE is_active = TRUE"
            )
            flows = []
            for r in rows:
                flow_json = r["flow_json"]
                if isinstance(flow_json, str):
                    try:
                        flow_json = json.loads(
__init__ method · python · L11-L17 (7 LOC)
agent/flow_tracker.py
    def __init__(self, flows):
        """Initialize with a list of active flow dicts (from DB)."""
        self.flows = flows  # list of {id, name, flow_json (parsed dict)}
        self.active_flow = None  # matched flow name
        self.current_step = None  # current node id
        self.step_number = 0
        self.collected = {}  # variables collected during flow
load_active_flows method · python · L20-L46 (27 LOC)
agent/flow_tracker.py
    async def load_active_flows(cls, db=None):
        """Load all active flows from the database and return a FlowTracker."""
        close_db = False
        if db is None:
            db = await get_db()
            close_db = True
        try:
            rows = await db.fetch_all(
                "SELECT id, name, flow_json FROM flows WHERE is_active = TRUE"
            )
            flows = []
            for r in rows:
                flow_json = r["flow_json"]
                if isinstance(flow_json, str):
                    try:
                        flow_json = json.loads(flow_json)
                    except (json.JSONDecodeError, TypeError):
                        continue
                flows.append({
                    "id": r["id"],
                    "name": r["name"],
                    "flow_json": flow_json,
                })
            return cls(flows)
        finally:
            if close_db:
                await db.close()
compile_flows_prompt method · python · L48-L121 (74 LOC)
agent/flow_tracker.py
    def compile_flows_prompt(self):
        """Convert all active flows into a human-readable prompt section."""
        if not self.flows:
            return ""

        lines = []
        all_knowledge = []
        welcome_prompt = ""

        for flow in self.flows:
            fj = flow["flow_json"]
            nodes = fj.get("nodes", {})
            if not nodes:
                continue

            # Collect knowledge base and welcome info from message nodes
            for node in nodes.values():
                data = node.get("data", {})
                kb = data.get("knowledge_base", "")
                kb_lower = kb.strip().lower() if kb else ""
                is_placeholder = any(kb_lower.startswith(p) for p in [
                    "paste your", "edit this", "add your", "replace this",
                    "example for", "put your",
                ])
                if kb_lower and not is_placeholder:
                    all_knowledge.append(kb.strip())
                #
_walk_flow method · python · L123-L155 (33 LOC)
agent/flow_tracker.py
    def _walk_flow(self, nodes):
        """Walk the flow graph starting from entry nodes, producing step descriptions."""
        # Find entry node
        entry_id = None
        for nid, node in nodes.items():
            if node.get("type") == "entry":
                entry_id = nid
                break
        if not entry_id:
            return []

        steps = []
        visited = set()
        queue = self._get_next_nodes(nodes, entry_id)

        while queue:
            nid = queue.pop(0)
            if nid in visited or nid not in nodes:
                continue
            visited.add(nid)

            node = nodes[nid]
            step_text = self._describe_step(node)
            if step_text:
                steps.append(step_text)

            # Follow primary output (first connection of first output)
            next_nodes = self._get_next_nodes(nodes, nid)
            for nn in next_nodes:
                if nn not in visited:
                    queue.append(nn)

_get_next_nodes method · python · L157-L165 (9 LOC)
agent/flow_tracker.py
    def _get_next_nodes(self, nodes, node_id):
        """Get connected node IDs from a node's outputs."""
        node = nodes.get(node_id, {})
        result = []
        for output in node.get("outputs", {}).values():
            for conn in output.get("connections", []):
                if conn not in result:
                    result.append(conn)
        return result
Same scanner, your repo: https://repobility.com — Repobility
_describe_step method · python · L167-L217 (51 LOC)
agent/flow_tracker.py
    def _describe_step(self, node):
        """Produce a human-readable step description for a node."""
        ntype = node.get("type", "")
        data = node.get("data", {})
        label = node.get("label", "")

        if ntype == "message":
            prompt = data.get("prompt", label)
            kb = data.get("knowledge_base", "")
            kb_note = " (use knowledge base)" if kb and kb.strip() else ""
            return f"[MESSAGE] {prompt}{kb_note}"

        elif ntype == "question":
            prompt = data.get("prompt", label)
            var = data.get("collect_variable", "")
            collect = f" (collect: {var})" if var else ""
            branches = data.get("branches") or data.get("response_branches", [])
            branch_text = ""
            if branches:
                if isinstance(branches[0], str):
                    branch_text = " | ".join(branches)
                elif isinstance(branches[0], dict):
                    branch_text = " | ".join(b.get(
detect_flow method · python · L219-L241 (23 LOC)
agent/flow_tracker.py
    def detect_flow(self, user_text):
        """Check if user text matches any flow's entry trigger keywords."""
        if self.active_flow:
            return self.active_flow

        text_lower = user_text.lower()
        for flow in self.flows:
            fj = flow["flow_json"]
            nodes = fj.get("nodes", {})
            for node in nodes.values():
                if node.get("type") != "entry":
                    continue
                keywords = node.get("data", {}).get("trigger_keywords", "")
                if not keywords:
                    continue
                kw_list = [k.strip().lower() for k in keywords.split(",") if k.strip()]
                for kw in kw_list:
                    if kw in text_lower:
                        self.active_flow = flow["name"]
                        self.current_step = self._find_first_step(nodes)
                        self.step_number = 1
                        return self.active_flow
        return None
_find_first_step method · python · L243-L249 (7 LOC)
agent/flow_tracker.py
    def _find_first_step(self, nodes):
        """Find the first non-entry node in the flow."""
        for nid, node in nodes.items():
            if node.get("type") == "entry":
                next_nodes = self._get_next_nodes(nodes, nid)
                return next_nodes[0] if next_nodes else None
        return None
page 1 / 5next ›