← back to Optimal-Living-Systems__deepflow

Function bodies 55 total

All specs Real LLM only Function bodies
open_checkpointer function · python · L81-L84 (4 LOC)
src/deepflow_runtime/sqlite_compat.py
def open_checkpointer(path: str):
    """Open a DeepFlow SQLite checkpointer with async compatibility."""
    with SqliteSaver.from_conn_string(path) as saver:
        yield AsyncCompatibleSqliteSaver(saver)
create_runtime_tools function · python · L16-L21 (6 LOC)
src/deepflow_runtime/tools.py
def create_runtime_tools(settings: DeepFlowSettings) -> list[BaseTool | Callable]:
    """Build the custom toolset for the research runtime."""
    return [
        make_web_search_tool(settings),
        make_fetch_url_tool(settings),
    ]
make_web_search_tool function · python · L24-L66 (43 LOC)
src/deepflow_runtime/tools.py
def make_web_search_tool(settings: DeepFlowSettings) -> BaseTool:
    """Create a web search tool with Tavily and DDGS fallback."""

    @tool
    def web_search(
        query: str,
        max_results: int = 5,
        topic: Literal["general", "news"] = "general",
    ) -> str:
        """Search the web for current information."""
        if settings.provider_status()["tavily"]:
            from tavily import TavilyClient

            client = TavilyClient()
            result = client.search(query=query, max_results=max_results, topic=topic)
            rows = []
            for item in result.get("results", []):
                rows.append(
                    "\n".join(
                        [
                            f"Title: {item.get('title', '')}",
                            f"URL: {item.get('url', '')}",
                            f"Content: {item.get('content', '')}",
                        ]
                    )
                )
            return "\n\n".join(row
make_fetch_url_tool function · python · L69-L84 (16 LOC)
src/deepflow_runtime/tools.py
def make_fetch_url_tool(settings: DeepFlowSettings) -> BaseTool:
    """Create a URL fetcher that returns markdown text."""

    @tool
    def fetch_url(url: str) -> str:
        """Fetch a URL and return the page as markdown-like text."""
        headers = {"User-Agent": settings.user_agent}
        with httpx.Client(timeout=settings.request_timeout_seconds, headers=headers, follow_redirects=True) as client:
            response = client.get(url)
            response.raise_for_status()
        content_type = response.headers.get("content-type", "")
        if "html" in content_type:
            return html_to_markdown(response.text)
        return response.text

    return fetch_url
print_banner function · python · L8-L11 (4 LOC)
src/deepflow_runtime/utils/banner.py
def print_banner() -> None:
    """Print the shared DeepFlow ASCII banner."""
    banner_path = Path(__file__).resolve().parents[3] / "assets" / "ascii" / "deepflow_banner.txt"
    print(banner_path.read_text(encoding="utf-8"))
‹ prevpage 2 / 2