← back to Optimal-Living-Systems__deepflow

Function bodies 55 total

All specs Real LLM only Function bodies
print_banner function · python · L4-L8 (5 LOC)
deepflow/utils/banner.py
def print_banner():
    with (Path(__file__).resolve().parents[2] / "assets" / "ascii" / "deepflow_banner.txt").open(
        encoding="utf-8"
    ) as f:
        print(f.read())
DeepFlowRuntimeComponent class · python · L15-L76 (62 LOC)
langflow_components/deepflow_runtime_component.py
class DeepFlowRuntimeComponent(Component):
    display_name = "DeepFlow Runtime"
    description = "Call the external DeepFlow Deep Agents runtime over HTTP."
    icon = "Bot"

    inputs = [
        StrInput(
            name="runtime_url",
            display_name="Runtime URL",
            value=os.getenv("DEEPFLOW_RUNTIME_URL", "http://127.0.0.1:8011"),
            info="Base URL of the DeepFlow runtime.",
        ),
        MessageTextInput(
            name="prompt",
            display_name="Prompt",
            info="Prompt to send to the DeepFlow runtime.",
            tool_mode=True,
        ),
        StrInput(
            name="thread_id",
            display_name="Thread ID",
            value="langflow-thread",
            info="Conversation thread identifier used by DeepFlow.",
            advanced=True,
        ),
        IntInput(
            name="timeout_seconds",
            display_name="Timeout Seconds",
            value=120,
            info="HTTP timeout for th
run_deepflow method · python · L53-L76 (24 LOC)
langflow_components/deepflow_runtime_component.py
    def run_deepflow(self) -> Data:
        url = self.runtime_url.rstrip("/") + "/invoke"
        payload = {"prompt": self.prompt, "thread_id": self.thread_id}
        try:
            with httpx.Client(timeout=float(self.timeout_seconds)) as client:
                response = client.post(url, json=payload)
                response.raise_for_status()
            result = response.json()
        except httpx.HTTPStatusError as exc:
            result = {
                "error": f"DeepFlow runtime returned {exc.response.status_code}",
                "detail": exc.response.text,
            }
        except httpx.HTTPError as exc:
            result = {
                "error": "DeepFlow runtime request failed",
                "detail": str(exc),
            }
        data = Data(
            text=result.get("output_text", result.get("detail", result.get("error", ""))),
            data=result,
        )
        self.status = data
        return data
build_chat_bridge_flow function · python · L21-L35 (15 LOC)
scripts/generate_langflow_examples.py
def build_chat_bridge_flow() -> dict:
    """Build a chat-oriented DeepFlow bridge flow."""
    chat_input = ChatInput().set(should_store_message=False)
    bridge = DeepFlowRuntimeComponent().set(
        runtime_url="http://127.0.0.1:8011",
        thread_id="deepflow-chat-bridge",
        timeout_seconds=120,
        prompt=chat_input.message_response,
    )
    chat_output = ChatOutput().set(input_value=bridge.run_deepflow)
    graph = Graph(start=chat_input, end=chat_output)
    return graph.dump(
        name="DeepFlow Chat Bridge",
        description="ChatInput -> DeepFlow Runtime -> ChatOutput",
    )
build_text_bridge_flow function · python · L38-L55 (18 LOC)
scripts/generate_langflow_examples.py
def build_text_bridge_flow() -> dict:
    """Build a text-in DeepFlow bridge flow with chat output."""
    text_input = TextInputComponent().set(input_value="Research the latest LangGraph release notes.")
    bridge = DeepFlowRuntimeComponent().set(
        runtime_url="http://127.0.0.1:8011",
        thread_id="deepflow-text-bridge",
        timeout_seconds=120,
        prompt=text_input.text_response,
    )
    chat_output = ChatOutput().set(
        input_value=bridge.run_deepflow,
        should_store_message=False,
    )
    graph = Graph(start=text_input, end=chat_output)
    return graph.dump(
        name="DeepFlow Text Bridge",
        description="TextInput -> DeepFlow Runtime -> ChatOutput",
    )
write_flow function · python · L58-L62 (5 LOC)
scripts/generate_langflow_examples.py
def write_flow(filename: str, payload: dict) -> None:
    """Write one flow payload to disk."""
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    path = OUTPUT_DIR / filename
    path.write_text(json.dumps(payload, indent=2))
main function · python · L65-L68 (4 LOC)
scripts/generate_langflow_examples.py
def main() -> None:
    """Generate all checked-in Langflow example assets."""
    write_flow("deepflow_chat_bridge.json", build_chat_bridge_flow())
    write_flow("deepflow_text_bridge.json", build_text_bridge_flow())
Powered by Repobility — scan your code at https://repobility.com
run_acp_server function · python · L16-L48 (33 LOC)
src/deepflow_runtime/acp_server.py
async def run_acp_server(settings: DeepFlowSettings) -> None:
    """Run the ACP server with a shared SQLite checkpointer."""
    with open_checkpointer(str(settings.sqlite_path)) as checkpointer:

        def build_agent(context: AgentSessionContext):
            return build_acp_agent(
                settings,
                project_root=Path(context.cwd),
                checkpointer=checkpointer,
                mode=context.mode,
            )

        modes = SessionModeState(
            current_mode_id="accept_edits",
            available_modes=[
                SessionMode(
                    id="ask_before_edits",
                    name="Ask before edits",
                    description="Ask before edits, plans, and shell execution.",
                ),
                SessionMode(
                    id="accept_edits",
                    name="Accept edits",
                    description="Auto-accept edits but ask before plans and shell execution.",
               
resolve_model function · python · L21-L45 (25 LOC)
src/deepflow_runtime/agent.py
def resolve_model(settings: DeepFlowSettings) -> BaseChatModel:
    """Resolve the first available model configuration."""
    if settings.model:
        return _build_model_from_name(settings.model)

    providers = settings.provider_status()
    if providers["anthropic"]:
        return _build_model_from_name(settings.anthropic_default_model)
    if providers["openai"]:
        return _build_model_from_name(settings.openai_default_model)
    if providers["google"]:
        return _build_model_from_name(settings.google_default_model)
    if providers["openrouter"]:
        return _build_model_from_name(settings.openrouter_default_model)
    if providers["deepseek"]:
        return _build_model_from_name(settings.deepseek_default_model)
    if os.getenv("OLLAMA_HOST") or os.getenv("OLLAMA_BASE_URL"):
        return _build_model_from_name(settings.ollama_default_model)

    msg = (
        "No model provider configured. Set ANTHROPIC_API_KEY, OPENAI_API_KEY, GOOGLE_API_KEY, "
        "O
_build_model_from_name function · python · L48-L66 (19 LOC)
src/deepflow_runtime/agent.py
def _build_model_from_name(model_name: str) -> BaseChatModel:
    """Build a chat model, using provider-specific classes when needed."""
    if model_name.startswith("openrouter:"):
        from langchain_openrouter import ChatOpenRouter

        return ChatOpenRouter(
            model=model_name.split(":", 1)[1],
            temperature=0.0,
            max_retries=2,
        )
    if model_name.startswith("deepseek:"):
        from langchain_deepseek import ChatDeepSeek

        return ChatDeepSeek(
            model=model_name.split(":", 1)[1],
            temperature=0.0,
            max_retries=2,
        )
    return init_chat_model(model_name, temperature=0.0)
create_runtime_backend function · python · L69-L77 (9 LOC)
src/deepflow_runtime/agent.py
def create_runtime_backend(settings: DeepFlowSettings) -> CompositeBackend:
    """Create the bounded filesystem backend used by the HTTP runtime."""
    return CompositeBackend(
        default=FilesystemBackend(root_dir=str(settings.workspace_dir), virtual_mode=False),
        routes={
            "/memories/": FilesystemBackend(root_dir=str(settings.memories_dir), virtual_mode=True),
            "/skills/": FilesystemBackend(root_dir=str(settings.skills_dir), virtual_mode=True),
        },
    )
build_runtime_agent function · python · L80-L102 (23 LOC)
src/deepflow_runtime/agent.py
def build_runtime_agent(settings: DeepFlowSettings, *, checkpointer: Checkpointer) -> Any:
    """Build the research-oriented runtime agent."""
    model = resolve_model(settings)
    tools = create_runtime_tools(settings)
    subagents = [
        {
            "name": "researcher",
            "description": "Use this subagent for focused web research on a single topic.",
            "system_prompt": RESEARCH_SUBAGENT_PROMPT,
            "tools": tools,
        }
    ]
    return create_deep_agent(
        model=model,
        tools=tools,
        system_prompt=MAIN_SYSTEM_PROMPT,
        subagents=subagents,
        skills=[str(settings.skills_dir)],
        memory=[str(settings.memory_file)],
        checkpointer=checkpointer,
        backend=create_runtime_backend(settings),
        name="deepflow-runtime",
    )
build_acp_agent function · python · L105-L148 (44 LOC)
src/deepflow_runtime/agent.py
def build_acp_agent(
    settings: DeepFlowSettings,
    *,
    project_root: Path,
    checkpointer: Checkpointer,
    mode: str,
) -> Any:
    """Build the ACP editor-facing agent."""

    interrupt_on = {
        "edit_file": {"allowed_decisions": ["approve", "reject"]},
        "write_file": {"allowed_decisions": ["approve", "reject"]},
        "write_todos": {"allowed_decisions": ["approve", "reject"]},
        "execute": {"allowed_decisions": ["approve", "reject"]},
    }
    if mode == "accept_edits":
        interrupt_on.pop("edit_file", None)
        interrupt_on.pop("write_file", None)
    elif mode == "accept_everything":
        interrupt_on = {}

    def create_backend(runtime: Any | None = None) -> CompositeBackend:
        ephemeral_backend = StateBackend(runtime) if runtime is not None else None
        shell_backend = LocalShellBackend(
            root_dir=str(project_root),
            inherit_env=True,
            env=os.environ.copy(),
        )
        routes: dic
main function · python · L19-L22 (4 LOC)
src/deepflow_runtime/cli.py
def main() -> None:
    """Run the DeepFlow CLI with startup branding."""
    print_banner()
    app()
doctor function · python · L26-L36 (11 LOC)
src/deepflow_runtime/cli.py
def doctor() -> None:
    """Print runtime status and missing prerequisites."""
    settings = get_settings()
    payload = {
        "home_dir": str(settings.home_dir),
        "workspace_dir": str(settings.workspace_dir),
        "sqlite_path": str(settings.sqlite_path),
        "provider_status": settings.provider_status(),
        "model": settings.model or "auto",
    }
    typer.echo(json.dumps(payload, indent=2, sort_keys=True))
Want this analysis on your repo? https://repobility.com/scan/
serve function · python · L40-L48 (9 LOC)
src/deepflow_runtime/cli.py
def serve() -> None:
    """Run the DeepFlow runtime API."""
    settings = get_settings()
    uvicorn.run(
        create_app(settings),
        host=settings.host,
        port=settings.port,
        log_level="info",
    )
run function · python · L52-L54 (3 LOC)
src/deepflow_runtime/cli.py
def run(prompt: str, thread_id: str = "deepflow-cli") -> None:
    """Run one prompt against the DeepFlow runtime graph."""
    asyncio.run(_run_once(prompt=prompt, thread_id=thread_id))
chat function · python · L58-L60 (3 LOC)
src/deepflow_runtime/cli.py
def chat(thread_id: str = "deepflow-chat") -> None:
    """Run an interactive REPL."""
    asyncio.run(_chat_loop(thread_id=thread_id))
acp function · python · L64-L66 (3 LOC)
src/deepflow_runtime/cli.py
def acp() -> None:
    """Run the ACP server for ACP-capable editors."""
    asyncio.run(run_acp_server(get_settings()))
mcp_command function · python · L70-L79 (10 LOC)
src/deepflow_runtime/cli.py
def mcp_command(
    transport: str = typer.Option("stdio", help="MCP transport: stdio or streamable-http."),
    host: str | None = typer.Option(None, help="Host for streamable-http transport."),
    port: int | None = typer.Option(None, help="Port for streamable-http transport."),
) -> None:
    """Run the DeepFlow MCP server for IDE and Langflow MCP clients."""
    settings = get_settings()
    resolved_host = host or settings.host
    resolved_port = port or 8012
    run_mcp_server(transport=transport, host=resolved_host, port=resolved_port)
_run_once function · python · L82-L90 (9 LOC)
src/deepflow_runtime/cli.py
async def _run_once(*, prompt: str, thread_id: str) -> None:
    settings = get_settings()
    service = RuntimeService(settings)
    await service.start()
    try:
        response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
    finally:
        await service.stop()
    typer.echo(response.output_text)
_chat_loop function · python · L93-L106 (14 LOC)
src/deepflow_runtime/cli.py
async def _chat_loop(*, thread_id: str) -> None:
    settings = get_settings()
    service = RuntimeService(settings)
    await service.start()
    typer.echo("DeepFlow chat. Type 'exit' or 'quit' to stop.")
    try:
        while True:
            prompt = typer.prompt("you")
            if prompt.strip().lower() in {"exit", "quit"}:
                break
            response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
            typer.echo(f"deepflow> {response.output_text}")
    finally:
        await service.stop()
DeepFlowSettings class · python · L18-L86 (69 LOC)
src/deepflow_runtime/config.py
class DeepFlowSettings(BaseSettings):
    """Runtime settings loaded from environment variables."""

    model_config = SettingsConfigDict(
        env_prefix="DEEPFLOW_",
        env_file=(str(PROJECT_ENV_PATH), str(HOME_ENV_PATH)),
        extra="ignore",
    )

    home_dir: Path = Field(default=PROJECT_ROOT)
    host: str = "127.0.0.1"
    port: int = 8011
    api_key: str | None = None
    model: str | None = None
    anthropic_default_model: str = "anthropic:claude-sonnet-4-6"
    openai_default_model: str = "openai:gpt-4.1"
    google_default_model: str = "google_genai:gemini-2.5-pro"
    openrouter_default_model: str = "openrouter:anthropic/claude-sonnet-4.5"
    deepseek_default_model: str = "deepseek:deepseek-chat"
    ollama_default_model: str = "ollama:qwen3:latest"
    request_timeout_seconds: float = 60.0
    user_agent: str = "DeepFlow/0.1 (+https://github.com/langchain-ai/deepagents)"

    @property
    def workspace_dir(self) -> Path:
        return self.home_dir / "wo
If a scraper extracted this row, it came from Repobility (https://repobility.com)
ensure_directories method · python · L65-L74 (10 LOC)
src/deepflow_runtime/config.py
    def ensure_directories(self) -> None:
        """Create the runtime directories if they do not exist."""
        for path in (
            self.home_dir,
            self.workspace_dir,
            self.data_dir,
            self.memories_dir,
            self.skills_dir,
        ):
            path.mkdir(parents=True, exist_ok=True)
provider_status method · python · L76-L86 (11 LOC)
src/deepflow_runtime/config.py
    def provider_status(self) -> dict[str, bool]:
        """Return provider availability without exposing secret values."""
        return {
            "anthropic": bool(os.getenv("ANTHROPIC_API_KEY")),
            "openai": bool(os.getenv("OPENAI_API_KEY")),
            "google": bool(os.getenv("GOOGLE_API_KEY")),
            "openrouter": bool(os.getenv("OPENROUTER_API_KEY")),
            "deepseek": bool(os.getenv("DEEPSEEK_API_KEY")),
            "tavily": bool(os.getenv("TAVILY_API_KEY")),
            "langsmith": bool(os.getenv("LANGSMITH_API_KEY")),
        }
get_settings function · python · L90-L100 (11 LOC)
src/deepflow_runtime/config.py
def get_settings() -> DeepFlowSettings:
    """Return cached DeepFlow settings."""
    for key, value in dotenv_values(PROJECT_ENV_PATH).items():
        if value is not None:
            os.environ.setdefault(key, value)
    for key, value in dotenv_values(HOME_ENV_PATH).items():
        if value is not None:
            os.environ.setdefault(key, value)
    settings = DeepFlowSettings()
    settings.ensure_directories()
    return settings
_service_guard function · python · L30-L34 (5 LOC)
src/deepflow_runtime/mcp_server.py
def _service_guard() -> asyncio.Lock:
    global _service_lock
    if _service_lock is None:
        _service_lock = asyncio.Lock()
    return _service_lock
get_runtime_service function · python · L37-L47 (11 LOC)
src/deepflow_runtime/mcp_server.py
async def get_runtime_service() -> RuntimeService:
    """Return a started singleton runtime service for MCP tool calls."""
    global _service
    if _service is not None:
        return _service
    async with _service_guard():
        if _service is None:
            service = RuntimeService(get_settings())
            await service.start()
            _service = service
    return _service
deepflow_research function · python · L51-L64 (14 LOC)
src/deepflow_runtime/mcp_server.py
async def deepflow_research(
    prompt: str,
    thread_id: str = "deepflow-mcp",
    ctx: Context | None = None,
) -> dict[str, Any]:
    """Run one prompt through the DeepFlow runtime and return structured output."""
    if ctx is not None:
        await ctx.info(f"Invoking DeepFlow on thread '{thread_id}'")
    service = await get_runtime_service()
    response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
    payload = response.model_dump()
    if ctx is not None:
        await ctx.info(f"DeepFlow completed with {payload['message_count']} messages")
    return payload
deepflow_status function · python · L68-L79 (12 LOC)
src/deepflow_runtime/mcp_server.py
async def deepflow_status() -> dict[str, Any]:
    """Return runtime configuration and provider status for DeepFlow."""
    settings = get_settings()
    service = await get_runtime_service()
    return {
        "status": "ok",
        "model": settings.model or "auto",
        "provider_status": settings.provider_status(),
        "workspace_dir": str(settings.workspace_dir),
        "sqlite_path": str(settings.sqlite_path),
        "startup_error": service.startup_error,
    }
deepflow_health_resource function · python · L83-L91 (9 LOC)
src/deepflow_runtime/mcp_server.py
async def deepflow_health_resource() -> str:
    """Return a compact health summary for clients that browse resources."""
    payload = await deepflow_status()
    return (
        f"status={payload['status']}\n"
        f"model={payload['model']}\n"
        f"startup_error={payload['startup_error']}\n"
        f"provider_status={payload['provider_status']}"
    )
About: code-quality intelligence by Repobility · https://repobility.com
research_prompt function · python · L95-L100 (6 LOC)
src/deepflow_runtime/mcp_server.py
def research_prompt(topic: str) -> str:
    """Create a DeepFlow-oriented research prompt."""
    return (
        "Research the following topic and produce a concise, source-aware summary "
        f"with clear next steps: {topic}"
    )
run_mcp_server function · python · L103-L110 (8 LOC)
src/deepflow_runtime/mcp_server.py
def run_mcp_server(*, transport: str, host: str, port: int) -> None:
    """Run the FastMCP server on the requested transport."""
    if transport not in {"stdio", "streamable-http"}:
        msg = "transport must be 'stdio' or 'streamable-http'"
        raise ValueError(msg)
    mcp.settings.host = host
    mcp.settings.port = port
    mcp.run(transport=transport)
require_api_key function · python · L28-L36 (9 LOC)
src/deepflow_runtime/runtime_api.py
def require_api_key(
    credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
    settings: DeepFlowSettings = Depends(get_settings),
) -> None:
    """Validate Bearer token when DEEPFLOW_API_KEY is set."""
    if settings.api_key is None:
        return
    if credentials is None or credentials.credentials != settings.api_key:
        raise HTTPException(status_code=401, detail="Invalid or missing API key.")
InvokeRequest class · python · L39-L43 (5 LOC)
src/deepflow_runtime/runtime_api.py
class InvokeRequest(BaseModel):
    """Invocation payload for the DeepFlow runtime."""

    prompt: str = Field(min_length=1)
    thread_id: str = Field(default="deepflow-default")
InvokeResponse class · python · L46-L51 (6 LOC)
src/deepflow_runtime/runtime_api.py
class InvokeResponse(BaseModel):
    """Response payload for the DeepFlow runtime."""

    thread_id: str
    output_text: str
    message_count: int
RuntimeService class · python · L55-L135 (81 LOC)
src/deepflow_runtime/runtime_api.py
class RuntimeService:
    """Manages the lifecycle of the DeepFlow runtime graph."""

    settings: DeepFlowSettings
    checkpointer: AsyncCompatibleSqliteSaver | None = None
    checkpointer_cm: Any | None = None
    agent: Any | None = None
    startup_error: str | None = None

    async def start(self) -> None:
        """Start the runtime and compile the graph."""
        self.settings.ensure_directories()
        self.checkpointer_cm = open_checkpointer(str(self.settings.sqlite_path))
        self.checkpointer = self.checkpointer_cm.__enter__()
        try:
            self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
            self.startup_error = None
            logger.info("DeepFlow runtime started")
        except RuntimeError as exc:
            self.agent = None
            self.startup_error = str(exc)
            logger.warning("DeepFlow runtime started without a model: %s", exc)

    async def stop(self) -> None:
        """Stop the runti
start method · python · L64-L76 (13 LOC)
src/deepflow_runtime/runtime_api.py
    async def start(self) -> None:
        """Start the runtime and compile the graph."""
        self.settings.ensure_directories()
        self.checkpointer_cm = open_checkpointer(str(self.settings.sqlite_path))
        self.checkpointer = self.checkpointer_cm.__enter__()
        try:
            self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
            self.startup_error = None
            logger.info("DeepFlow runtime started")
        except RuntimeError as exc:
            self.agent = None
            self.startup_error = str(exc)
            logger.warning("DeepFlow runtime started without a model: %s", exc)
stop method · python · L78-L85 (8 LOC)
src/deepflow_runtime/runtime_api.py
    async def stop(self) -> None:
        """Stop the runtime and release the database handle."""
        if self.checkpointer_cm is not None:
            self.checkpointer_cm.__exit__(None, None, None)
        self.checkpointer = None
        self.checkpointer_cm = None
        self.agent = None
        self.startup_error = None
Powered by Repobility — scan your code at https://repobility.com
_get_agent method · python · L87-L94 (8 LOC)
src/deepflow_runtime/runtime_api.py
    def _get_agent(self) -> Any:
        """Return the compiled agent, rebuilding if needed."""
        if self.agent is None:
            if self.checkpointer is None:
                raise RuntimeError("RuntimeService.start() must be called before invoke().")
            self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
            self.startup_error = None
        return self.agent
invoke method · python · L96-L111 (16 LOC)
src/deepflow_runtime/runtime_api.py
    async def invoke(self, request: InvokeRequest) -> InvokeResponse:
        """Run the graph for one user request."""
        agent = self._get_agent()
        logger.info("invoke thread_id=%s prompt_len=%d", request.thread_id, len(request.prompt))
        result = await agent.ainvoke(
            {"messages": [("user", request.prompt)]},
            config={"configurable": {"thread_id": request.thread_id}},
        )
        messages = result.get("messages", [])
        response = InvokeResponse(
            thread_id=request.thread_id,
            output_text=extract_text(messages),
            message_count=len(messages),
        )
        logger.info("invoke complete thread_id=%s message_count=%d", request.thread_id, response.message_count)
        return response
stream method · python · L113-L135 (23 LOC)
src/deepflow_runtime/runtime_api.py
    async def stream(self, request: InvokeRequest) -> AsyncGenerator[str, None]:
        """Stream token chunks as SSE for one user request."""
        agent = self._get_agent()
        config = {"configurable": {"thread_id": request.thread_id}}
        async for event in agent.astream_events(
            {"messages": [("user", request.prompt)]},
            config=config,
            version="v2",
        ):
            if event["event"] == "on_chat_model_stream":
                chunk = event["data"].get("chunk")
                if chunk is None:
                    continue
                content = chunk.content
                if isinstance(content, str) and content:
                    yield f"data: {json.dumps({'text': content})}\n\n"
                elif isinstance(content, list):
                    for part in content:
                        if isinstance(part, dict) and part.get("type") == "text":
                            text = part.get("text", "")
                     
extract_text function · python · L138-L145 (8 LOC)
src/deepflow_runtime/runtime_api.py
def extract_text(messages: list[Any]) -> str:
    """Extract the last assistant text from a LangGraph message list."""
    for message in reversed(messages):
        if isinstance(message, AIMessage):
            return _message_to_text(message)
        if isinstance(message, BaseMessage) and getattr(message, "type", "") == "ai":
            return _message_to_text(message)
    return ""
_message_to_text function · python · L148-L161 (14 LOC)
src/deepflow_runtime/runtime_api.py
def _message_to_text(message: BaseMessage) -> str:
    """Convert a message content payload into plain text."""
    content = message.content
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts = []
        for item in content:
            if isinstance(item, dict) and item.get("type") == "text":
                parts.append(str(item.get("text", "")))
            else:
                parts.append(str(item))
        return "\n".join(part for part in parts if part)
    return str(content)
ThreadMessage class · python · L164-L168 (5 LOC)
src/deepflow_runtime/runtime_api.py
class ThreadMessage(BaseModel):
    """A single message in a thread's history."""

    role: str
    content: str
ThreadHistoryResponse class · python · L171-L175 (5 LOC)
src/deepflow_runtime/runtime_api.py
class ThreadHistoryResponse(BaseModel):
    """Conversation history for a thread."""

    thread_id: str
    messages: list[ThreadMessage]
create_app function · python · L178-L251 (74 LOC)
src/deepflow_runtime/runtime_api.py
def create_app(settings: DeepFlowSettings | None = None) -> FastAPI:
    """Create the DeepFlow FastAPI app."""
    runtime_settings = settings or get_settings()
    service = RuntimeService(runtime_settings)

    @asynccontextmanager
    async def lifespan(_: FastAPI):
        await service.start()
        try:
            yield
        finally:
            await service.stop()

    app = FastAPI(title="DeepFlow Runtime", version="0.1.0", lifespan=lifespan)
    app.state.service = service

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_methods=["GET", "POST"],
        allow_headers=["Authorization", "Content-Type"],
    )

    @app.get("/health")
    async def health() -> dict[str, Any]:
        return {
            "status": "ok",
            "provider_status": runtime_settings.provider_status(),
            "workspace_dir": str(runtime_settings.workspace_dir),
            "sqlite_path": str(runtime_settings.sqlite_path),
            "start
Want this analysis on your repo? https://repobility.com/scan/
AsyncCompatibleSqliteSaver class · python · L13-L77 (65 LOC)
src/deepflow_runtime/sqlite_compat.py
class AsyncCompatibleSqliteSaver(BaseCheckpointSaver[Any]):
    """Wrap `SqliteSaver` with async methods backed by thread offload."""

    def __init__(self, saver: SqliteSaver) -> None:
        super().__init__(serde=saver.serde)
        self._saver = saver

    def __getattr__(self, name: str) -> Any:
        return getattr(self._saver, name)

    def get_tuple(self, config: Any) -> Any:
        return self._saver.get_tuple(config)

    def put(self, config: Any, checkpoint: Any, metadata: Any, new_versions: Any) -> Any:
        return self._saver.put(config, checkpoint, metadata, new_versions)

    def put_writes(self, config: Any, writes: Any, task_id: str, task_path: str = "") -> None:
        self._saver.put_writes(config, writes, task_id, task_path)

    def list(self, config: Any, *, filter: dict[str, Any] | None = None, before: Any = None, limit: int | None = None):
        return self._saver.list(config, filter=filter, before=before, limit=limit)

    def delete_thread(self, 
__init__ method · python · L16-L18 (3 LOC)
src/deepflow_runtime/sqlite_compat.py
    def __init__(self, saver: SqliteSaver) -> None:
        super().__init__(serde=saver.serde)
        self._saver = saver
alist method · python · L59-L62 (4 LOC)
src/deepflow_runtime/sqlite_compat.py
    async def alist(self, config: Any, *, filter: dict[str, Any] | None = None, before: Any = None, limit: int | None = None):
        items = await asyncio.to_thread(lambda: list(self._saver.list(config, filter=filter, before=before, limit=limit)))
        for item in items:
            yield item
page 1 / 2next ›