Function bodies 55 total
open_checkpointer function · python · L81-L84 (4 LOC)src/deepflow_runtime/sqlite_compat.py
def open_checkpointer(path: str):
"""Open a DeepFlow SQLite checkpointer with async compatibility."""
with SqliteSaver.from_conn_string(path) as saver:
yield AsyncCompatibleSqliteSaver(saver)create_runtime_tools function · python · L16-L21 (6 LOC)src/deepflow_runtime/tools.py
def create_runtime_tools(settings: DeepFlowSettings) -> list[BaseTool | Callable]:
"""Build the custom toolset for the research runtime."""
return [
make_web_search_tool(settings),
make_fetch_url_tool(settings),
]make_web_search_tool function · python · L24-L66 (43 LOC)src/deepflow_runtime/tools.py
def make_web_search_tool(settings: DeepFlowSettings) -> BaseTool:
"""Create a web search tool with Tavily and DDGS fallback."""
@tool
def web_search(
query: str,
max_results: int = 5,
topic: Literal["general", "news"] = "general",
) -> str:
"""Search the web for current information."""
if settings.provider_status()["tavily"]:
from tavily import TavilyClient
client = TavilyClient()
result = client.search(query=query, max_results=max_results, topic=topic)
rows = []
for item in result.get("results", []):
rows.append(
"\n".join(
[
f"Title: {item.get('title', '')}",
f"URL: {item.get('url', '')}",
f"Content: {item.get('content', '')}",
]
)
)
return "\n\n".join(rowmake_fetch_url_tool function · python · L69-L84 (16 LOC)src/deepflow_runtime/tools.py
def make_fetch_url_tool(settings: DeepFlowSettings) -> BaseTool:
"""Create a URL fetcher that returns markdown text."""
@tool
def fetch_url(url: str) -> str:
"""Fetch a URL and return the page as markdown-like text."""
headers = {"User-Agent": settings.user_agent}
with httpx.Client(timeout=settings.request_timeout_seconds, headers=headers, follow_redirects=True) as client:
response = client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "html" in content_type:
return html_to_markdown(response.text)
return response.text
return fetch_urlprint_banner function · python · L8-L11 (4 LOC)src/deepflow_runtime/utils/banner.py
def print_banner() -> None:
"""Print the shared DeepFlow ASCII banner."""
banner_path = Path(__file__).resolve().parents[3] / "assets" / "ascii" / "deepflow_banner.txt"
print(banner_path.read_text(encoding="utf-8"))‹ prevpage 2 / 2