Function bodies 55 total
print_banner function · python · L4-L8 (5 LOC)deepflow/utils/banner.py
def print_banner():
with (Path(__file__).resolve().parents[2] / "assets" / "ascii" / "deepflow_banner.txt").open(
encoding="utf-8"
) as f:
print(f.read())DeepFlowRuntimeComponent class · python · L15-L76 (62 LOC)langflow_components/deepflow_runtime_component.py
class DeepFlowRuntimeComponent(Component):
display_name = "DeepFlow Runtime"
description = "Call the external DeepFlow Deep Agents runtime over HTTP."
icon = "Bot"
inputs = [
StrInput(
name="runtime_url",
display_name="Runtime URL",
value=os.getenv("DEEPFLOW_RUNTIME_URL", "http://127.0.0.1:8011"),
info="Base URL of the DeepFlow runtime.",
),
MessageTextInput(
name="prompt",
display_name="Prompt",
info="Prompt to send to the DeepFlow runtime.",
tool_mode=True,
),
StrInput(
name="thread_id",
display_name="Thread ID",
value="langflow-thread",
info="Conversation thread identifier used by DeepFlow.",
advanced=True,
),
IntInput(
name="timeout_seconds",
display_name="Timeout Seconds",
value=120,
info="HTTP timeout for thrun_deepflow method · python · L53-L76 (24 LOC)langflow_components/deepflow_runtime_component.py
def run_deepflow(self) -> Data:
url = self.runtime_url.rstrip("/") + "/invoke"
payload = {"prompt": self.prompt, "thread_id": self.thread_id}
try:
with httpx.Client(timeout=float(self.timeout_seconds)) as client:
response = client.post(url, json=payload)
response.raise_for_status()
result = response.json()
except httpx.HTTPStatusError as exc:
result = {
"error": f"DeepFlow runtime returned {exc.response.status_code}",
"detail": exc.response.text,
}
except httpx.HTTPError as exc:
result = {
"error": "DeepFlow runtime request failed",
"detail": str(exc),
}
data = Data(
text=result.get("output_text", result.get("detail", result.get("error", ""))),
data=result,
)
self.status = data
return databuild_chat_bridge_flow function · python · L21-L35 (15 LOC)scripts/generate_langflow_examples.py
def build_chat_bridge_flow() -> dict:
"""Build a chat-oriented DeepFlow bridge flow."""
chat_input = ChatInput().set(should_store_message=False)
bridge = DeepFlowRuntimeComponent().set(
runtime_url="http://127.0.0.1:8011",
thread_id="deepflow-chat-bridge",
timeout_seconds=120,
prompt=chat_input.message_response,
)
chat_output = ChatOutput().set(input_value=bridge.run_deepflow)
graph = Graph(start=chat_input, end=chat_output)
return graph.dump(
name="DeepFlow Chat Bridge",
description="ChatInput -> DeepFlow Runtime -> ChatOutput",
)build_text_bridge_flow function · python · L38-L55 (18 LOC)scripts/generate_langflow_examples.py
def build_text_bridge_flow() -> dict:
"""Build a text-in DeepFlow bridge flow with chat output."""
text_input = TextInputComponent().set(input_value="Research the latest LangGraph release notes.")
bridge = DeepFlowRuntimeComponent().set(
runtime_url="http://127.0.0.1:8011",
thread_id="deepflow-text-bridge",
timeout_seconds=120,
prompt=text_input.text_response,
)
chat_output = ChatOutput().set(
input_value=bridge.run_deepflow,
should_store_message=False,
)
graph = Graph(start=text_input, end=chat_output)
return graph.dump(
name="DeepFlow Text Bridge",
description="TextInput -> DeepFlow Runtime -> ChatOutput",
)write_flow function · python · L58-L62 (5 LOC)scripts/generate_langflow_examples.py
def write_flow(filename: str, payload: dict) -> None:
"""Write one flow payload to disk."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
path = OUTPUT_DIR / filename
path.write_text(json.dumps(payload, indent=2))main function · python · L65-L68 (4 LOC)scripts/generate_langflow_examples.py
def main() -> None:
"""Generate all checked-in Langflow example assets."""
write_flow("deepflow_chat_bridge.json", build_chat_bridge_flow())
write_flow("deepflow_text_bridge.json", build_text_bridge_flow())Powered by Repobility — scan your code at https://repobility.com
run_acp_server function · python · L16-L48 (33 LOC)src/deepflow_runtime/acp_server.py
async def run_acp_server(settings: DeepFlowSettings) -> None:
"""Run the ACP server with a shared SQLite checkpointer."""
with open_checkpointer(str(settings.sqlite_path)) as checkpointer:
def build_agent(context: AgentSessionContext):
return build_acp_agent(
settings,
project_root=Path(context.cwd),
checkpointer=checkpointer,
mode=context.mode,
)
modes = SessionModeState(
current_mode_id="accept_edits",
available_modes=[
SessionMode(
id="ask_before_edits",
name="Ask before edits",
description="Ask before edits, plans, and shell execution.",
),
SessionMode(
id="accept_edits",
name="Accept edits",
description="Auto-accept edits but ask before plans and shell execution.",
resolve_model function · python · L21-L45 (25 LOC)src/deepflow_runtime/agent.py
def resolve_model(settings: DeepFlowSettings) -> BaseChatModel:
"""Resolve the first available model configuration."""
if settings.model:
return _build_model_from_name(settings.model)
providers = settings.provider_status()
if providers["anthropic"]:
return _build_model_from_name(settings.anthropic_default_model)
if providers["openai"]:
return _build_model_from_name(settings.openai_default_model)
if providers["google"]:
return _build_model_from_name(settings.google_default_model)
if providers["openrouter"]:
return _build_model_from_name(settings.openrouter_default_model)
if providers["deepseek"]:
return _build_model_from_name(settings.deepseek_default_model)
if os.getenv("OLLAMA_HOST") or os.getenv("OLLAMA_BASE_URL"):
return _build_model_from_name(settings.ollama_default_model)
msg = (
"No model provider configured. Set ANTHROPIC_API_KEY, OPENAI_API_KEY, GOOGLE_API_KEY, "
"O_build_model_from_name function · python · L48-L66 (19 LOC)src/deepflow_runtime/agent.py
def _build_model_from_name(model_name: str) -> BaseChatModel:
"""Build a chat model, using provider-specific classes when needed."""
if model_name.startswith("openrouter:"):
from langchain_openrouter import ChatOpenRouter
return ChatOpenRouter(
model=model_name.split(":", 1)[1],
temperature=0.0,
max_retries=2,
)
if model_name.startswith("deepseek:"):
from langchain_deepseek import ChatDeepSeek
return ChatDeepSeek(
model=model_name.split(":", 1)[1],
temperature=0.0,
max_retries=2,
)
return init_chat_model(model_name, temperature=0.0)create_runtime_backend function · python · L69-L77 (9 LOC)src/deepflow_runtime/agent.py
def create_runtime_backend(settings: DeepFlowSettings) -> CompositeBackend:
"""Create the bounded filesystem backend used by the HTTP runtime."""
return CompositeBackend(
default=FilesystemBackend(root_dir=str(settings.workspace_dir), virtual_mode=False),
routes={
"/memories/": FilesystemBackend(root_dir=str(settings.memories_dir), virtual_mode=True),
"/skills/": FilesystemBackend(root_dir=str(settings.skills_dir), virtual_mode=True),
},
)build_runtime_agent function · python · L80-L102 (23 LOC)src/deepflow_runtime/agent.py
def build_runtime_agent(settings: DeepFlowSettings, *, checkpointer: Checkpointer) -> Any:
"""Build the research-oriented runtime agent."""
model = resolve_model(settings)
tools = create_runtime_tools(settings)
subagents = [
{
"name": "researcher",
"description": "Use this subagent for focused web research on a single topic.",
"system_prompt": RESEARCH_SUBAGENT_PROMPT,
"tools": tools,
}
]
return create_deep_agent(
model=model,
tools=tools,
system_prompt=MAIN_SYSTEM_PROMPT,
subagents=subagents,
skills=[str(settings.skills_dir)],
memory=[str(settings.memory_file)],
checkpointer=checkpointer,
backend=create_runtime_backend(settings),
name="deepflow-runtime",
)build_acp_agent function · python · L105-L148 (44 LOC)src/deepflow_runtime/agent.py
def build_acp_agent(
settings: DeepFlowSettings,
*,
project_root: Path,
checkpointer: Checkpointer,
mode: str,
) -> Any:
"""Build the ACP editor-facing agent."""
interrupt_on = {
"edit_file": {"allowed_decisions": ["approve", "reject"]},
"write_file": {"allowed_decisions": ["approve", "reject"]},
"write_todos": {"allowed_decisions": ["approve", "reject"]},
"execute": {"allowed_decisions": ["approve", "reject"]},
}
if mode == "accept_edits":
interrupt_on.pop("edit_file", None)
interrupt_on.pop("write_file", None)
elif mode == "accept_everything":
interrupt_on = {}
def create_backend(runtime: Any | None = None) -> CompositeBackend:
ephemeral_backend = StateBackend(runtime) if runtime is not None else None
shell_backend = LocalShellBackend(
root_dir=str(project_root),
inherit_env=True,
env=os.environ.copy(),
)
routes: dicmain function · python · L19-L22 (4 LOC)src/deepflow_runtime/cli.py
def main() -> None:
"""Run the DeepFlow CLI with startup branding."""
print_banner()
app()doctor function · python · L26-L36 (11 LOC)src/deepflow_runtime/cli.py
def doctor() -> None:
"""Print runtime status and missing prerequisites."""
settings = get_settings()
payload = {
"home_dir": str(settings.home_dir),
"workspace_dir": str(settings.workspace_dir),
"sqlite_path": str(settings.sqlite_path),
"provider_status": settings.provider_status(),
"model": settings.model or "auto",
}
typer.echo(json.dumps(payload, indent=2, sort_keys=True))Want this analysis on your repo? https://repobility.com/scan/
serve function · python · L40-L48 (9 LOC)src/deepflow_runtime/cli.py
def serve() -> None:
"""Run the DeepFlow runtime API."""
settings = get_settings()
uvicorn.run(
create_app(settings),
host=settings.host,
port=settings.port,
log_level="info",
)run function · python · L52-L54 (3 LOC)src/deepflow_runtime/cli.py
def run(prompt: str, thread_id: str = "deepflow-cli") -> None:
"""Run one prompt against the DeepFlow runtime graph."""
asyncio.run(_run_once(prompt=prompt, thread_id=thread_id))chat function · python · L58-L60 (3 LOC)src/deepflow_runtime/cli.py
def chat(thread_id: str = "deepflow-chat") -> None:
"""Run an interactive REPL."""
asyncio.run(_chat_loop(thread_id=thread_id))acp function · python · L64-L66 (3 LOC)src/deepflow_runtime/cli.py
def acp() -> None:
"""Run the ACP server for ACP-capable editors."""
asyncio.run(run_acp_server(get_settings()))mcp_command function · python · L70-L79 (10 LOC)src/deepflow_runtime/cli.py
def mcp_command(
transport: str = typer.Option("stdio", help="MCP transport: stdio or streamable-http."),
host: str | None = typer.Option(None, help="Host for streamable-http transport."),
port: int | None = typer.Option(None, help="Port for streamable-http transport."),
) -> None:
"""Run the DeepFlow MCP server for IDE and Langflow MCP clients."""
settings = get_settings()
resolved_host = host or settings.host
resolved_port = port or 8012
run_mcp_server(transport=transport, host=resolved_host, port=resolved_port)_run_once function · python · L82-L90 (9 LOC)src/deepflow_runtime/cli.py
async def _run_once(*, prompt: str, thread_id: str) -> None:
settings = get_settings()
service = RuntimeService(settings)
await service.start()
try:
response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
finally:
await service.stop()
typer.echo(response.output_text)_chat_loop function · python · L93-L106 (14 LOC)src/deepflow_runtime/cli.py
async def _chat_loop(*, thread_id: str) -> None:
settings = get_settings()
service = RuntimeService(settings)
await service.start()
typer.echo("DeepFlow chat. Type 'exit' or 'quit' to stop.")
try:
while True:
prompt = typer.prompt("you")
if prompt.strip().lower() in {"exit", "quit"}:
break
response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
typer.echo(f"deepflow> {response.output_text}")
finally:
await service.stop()DeepFlowSettings class · python · L18-L86 (69 LOC)src/deepflow_runtime/config.py
class DeepFlowSettings(BaseSettings):
"""Runtime settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_prefix="DEEPFLOW_",
env_file=(str(PROJECT_ENV_PATH), str(HOME_ENV_PATH)),
extra="ignore",
)
home_dir: Path = Field(default=PROJECT_ROOT)
host: str = "127.0.0.1"
port: int = 8011
api_key: str | None = None
model: str | None = None
anthropic_default_model: str = "anthropic:claude-sonnet-4-6"
openai_default_model: str = "openai:gpt-4.1"
google_default_model: str = "google_genai:gemini-2.5-pro"
openrouter_default_model: str = "openrouter:anthropic/claude-sonnet-4.5"
deepseek_default_model: str = "deepseek:deepseek-chat"
ollama_default_model: str = "ollama:qwen3:latest"
request_timeout_seconds: float = 60.0
user_agent: str = "DeepFlow/0.1 (+https://github.com/langchain-ai/deepagents)"
@property
def workspace_dir(self) -> Path:
return self.home_dir / "woIf a scraper extracted this row, it came from Repobility (https://repobility.com)
ensure_directories method · python · L65-L74 (10 LOC)src/deepflow_runtime/config.py
def ensure_directories(self) -> None:
"""Create the runtime directories if they do not exist."""
for path in (
self.home_dir,
self.workspace_dir,
self.data_dir,
self.memories_dir,
self.skills_dir,
):
path.mkdir(parents=True, exist_ok=True)provider_status method · python · L76-L86 (11 LOC)src/deepflow_runtime/config.py
def provider_status(self) -> dict[str, bool]:
"""Return provider availability without exposing secret values."""
return {
"anthropic": bool(os.getenv("ANTHROPIC_API_KEY")),
"openai": bool(os.getenv("OPENAI_API_KEY")),
"google": bool(os.getenv("GOOGLE_API_KEY")),
"openrouter": bool(os.getenv("OPENROUTER_API_KEY")),
"deepseek": bool(os.getenv("DEEPSEEK_API_KEY")),
"tavily": bool(os.getenv("TAVILY_API_KEY")),
"langsmith": bool(os.getenv("LANGSMITH_API_KEY")),
}get_settings function · python · L90-L100 (11 LOC)src/deepflow_runtime/config.py
def get_settings() -> DeepFlowSettings:
"""Return cached DeepFlow settings."""
for key, value in dotenv_values(PROJECT_ENV_PATH).items():
if value is not None:
os.environ.setdefault(key, value)
for key, value in dotenv_values(HOME_ENV_PATH).items():
if value is not None:
os.environ.setdefault(key, value)
settings = DeepFlowSettings()
settings.ensure_directories()
return settings_service_guard function · python · L30-L34 (5 LOC)src/deepflow_runtime/mcp_server.py
def _service_guard() -> asyncio.Lock:
global _service_lock
if _service_lock is None:
_service_lock = asyncio.Lock()
return _service_lockget_runtime_service function · python · L37-L47 (11 LOC)src/deepflow_runtime/mcp_server.py
async def get_runtime_service() -> RuntimeService:
"""Return a started singleton runtime service for MCP tool calls."""
global _service
if _service is not None:
return _service
async with _service_guard():
if _service is None:
service = RuntimeService(get_settings())
await service.start()
_service = service
return _servicedeepflow_research function · python · L51-L64 (14 LOC)src/deepflow_runtime/mcp_server.py
async def deepflow_research(
prompt: str,
thread_id: str = "deepflow-mcp",
ctx: Context | None = None,
) -> dict[str, Any]:
"""Run one prompt through the DeepFlow runtime and return structured output."""
if ctx is not None:
await ctx.info(f"Invoking DeepFlow on thread '{thread_id}'")
service = await get_runtime_service()
response = await service.invoke(InvokeRequest(prompt=prompt, thread_id=thread_id))
payload = response.model_dump()
if ctx is not None:
await ctx.info(f"DeepFlow completed with {payload['message_count']} messages")
return payloaddeepflow_status function · python · L68-L79 (12 LOC)src/deepflow_runtime/mcp_server.py
async def deepflow_status() -> dict[str, Any]:
"""Return runtime configuration and provider status for DeepFlow."""
settings = get_settings()
service = await get_runtime_service()
return {
"status": "ok",
"model": settings.model or "auto",
"provider_status": settings.provider_status(),
"workspace_dir": str(settings.workspace_dir),
"sqlite_path": str(settings.sqlite_path),
"startup_error": service.startup_error,
}deepflow_health_resource function · python · L83-L91 (9 LOC)src/deepflow_runtime/mcp_server.py
async def deepflow_health_resource() -> str:
"""Return a compact health summary for clients that browse resources."""
payload = await deepflow_status()
return (
f"status={payload['status']}\n"
f"model={payload['model']}\n"
f"startup_error={payload['startup_error']}\n"
f"provider_status={payload['provider_status']}"
)About: code-quality intelligence by Repobility · https://repobility.com
research_prompt function · python · L95-L100 (6 LOC)src/deepflow_runtime/mcp_server.py
def research_prompt(topic: str) -> str:
"""Create a DeepFlow-oriented research prompt."""
return (
"Research the following topic and produce a concise, source-aware summary "
f"with clear next steps: {topic}"
)run_mcp_server function · python · L103-L110 (8 LOC)src/deepflow_runtime/mcp_server.py
def run_mcp_server(*, transport: str, host: str, port: int) -> None:
"""Run the FastMCP server on the requested transport."""
if transport not in {"stdio", "streamable-http"}:
msg = "transport must be 'stdio' or 'streamable-http'"
raise ValueError(msg)
mcp.settings.host = host
mcp.settings.port = port
mcp.run(transport=transport)require_api_key function · python · L28-L36 (9 LOC)src/deepflow_runtime/runtime_api.py
def require_api_key(
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
settings: DeepFlowSettings = Depends(get_settings),
) -> None:
"""Validate Bearer token when DEEPFLOW_API_KEY is set."""
if settings.api_key is None:
return
if credentials is None or credentials.credentials != settings.api_key:
raise HTTPException(status_code=401, detail="Invalid or missing API key.")InvokeRequest class · python · L39-L43 (5 LOC)src/deepflow_runtime/runtime_api.py
class InvokeRequest(BaseModel):
"""Invocation payload for the DeepFlow runtime."""
prompt: str = Field(min_length=1)
thread_id: str = Field(default="deepflow-default")InvokeResponse class · python · L46-L51 (6 LOC)src/deepflow_runtime/runtime_api.py
class InvokeResponse(BaseModel):
"""Response payload for the DeepFlow runtime."""
thread_id: str
output_text: str
message_count: intRuntimeService class · python · L55-L135 (81 LOC)src/deepflow_runtime/runtime_api.py
class RuntimeService:
"""Manages the lifecycle of the DeepFlow runtime graph."""
settings: DeepFlowSettings
checkpointer: AsyncCompatibleSqliteSaver | None = None
checkpointer_cm: Any | None = None
agent: Any | None = None
startup_error: str | None = None
async def start(self) -> None:
"""Start the runtime and compile the graph."""
self.settings.ensure_directories()
self.checkpointer_cm = open_checkpointer(str(self.settings.sqlite_path))
self.checkpointer = self.checkpointer_cm.__enter__()
try:
self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
self.startup_error = None
logger.info("DeepFlow runtime started")
except RuntimeError as exc:
self.agent = None
self.startup_error = str(exc)
logger.warning("DeepFlow runtime started without a model: %s", exc)
async def stop(self) -> None:
"""Stop the runtistart method · python · L64-L76 (13 LOC)src/deepflow_runtime/runtime_api.py
async def start(self) -> None:
"""Start the runtime and compile the graph."""
self.settings.ensure_directories()
self.checkpointer_cm = open_checkpointer(str(self.settings.sqlite_path))
self.checkpointer = self.checkpointer_cm.__enter__()
try:
self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
self.startup_error = None
logger.info("DeepFlow runtime started")
except RuntimeError as exc:
self.agent = None
self.startup_error = str(exc)
logger.warning("DeepFlow runtime started without a model: %s", exc)stop method · python · L78-L85 (8 LOC)src/deepflow_runtime/runtime_api.py
async def stop(self) -> None:
"""Stop the runtime and release the database handle."""
if self.checkpointer_cm is not None:
self.checkpointer_cm.__exit__(None, None, None)
self.checkpointer = None
self.checkpointer_cm = None
self.agent = None
self.startup_error = NonePowered by Repobility — scan your code at https://repobility.com
_get_agent method · python · L87-L94 (8 LOC)src/deepflow_runtime/runtime_api.py
def _get_agent(self) -> Any:
"""Return the compiled agent, rebuilding if needed."""
if self.agent is None:
if self.checkpointer is None:
raise RuntimeError("RuntimeService.start() must be called before invoke().")
self.agent = build_runtime_agent(self.settings, checkpointer=self.checkpointer)
self.startup_error = None
return self.agentinvoke method · python · L96-L111 (16 LOC)src/deepflow_runtime/runtime_api.py
async def invoke(self, request: InvokeRequest) -> InvokeResponse:
"""Run the graph for one user request."""
agent = self._get_agent()
logger.info("invoke thread_id=%s prompt_len=%d", request.thread_id, len(request.prompt))
result = await agent.ainvoke(
{"messages": [("user", request.prompt)]},
config={"configurable": {"thread_id": request.thread_id}},
)
messages = result.get("messages", [])
response = InvokeResponse(
thread_id=request.thread_id,
output_text=extract_text(messages),
message_count=len(messages),
)
logger.info("invoke complete thread_id=%s message_count=%d", request.thread_id, response.message_count)
return responsestream method · python · L113-L135 (23 LOC)src/deepflow_runtime/runtime_api.py
async def stream(self, request: InvokeRequest) -> AsyncGenerator[str, None]:
"""Stream token chunks as SSE for one user request."""
agent = self._get_agent()
config = {"configurable": {"thread_id": request.thread_id}}
async for event in agent.astream_events(
{"messages": [("user", request.prompt)]},
config=config,
version="v2",
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"].get("chunk")
if chunk is None:
continue
content = chunk.content
if isinstance(content, str) and content:
yield f"data: {json.dumps({'text': content})}\n\n"
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text", "")
extract_text function · python · L138-L145 (8 LOC)src/deepflow_runtime/runtime_api.py
def extract_text(messages: list[Any]) -> str:
"""Extract the last assistant text from a LangGraph message list."""
for message in reversed(messages):
if isinstance(message, AIMessage):
return _message_to_text(message)
if isinstance(message, BaseMessage) and getattr(message, "type", "") == "ai":
return _message_to_text(message)
return ""_message_to_text function · python · L148-L161 (14 LOC)src/deepflow_runtime/runtime_api.py
def _message_to_text(message: BaseMessage) -> str:
"""Convert a message content payload into plain text."""
content = message.content
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(str(item.get("text", "")))
else:
parts.append(str(item))
return "\n".join(part for part in parts if part)
return str(content)ThreadMessage class · python · L164-L168 (5 LOC)src/deepflow_runtime/runtime_api.py
class ThreadMessage(BaseModel):
"""A single message in a thread's history."""
role: str
content: strThreadHistoryResponse class · python · L171-L175 (5 LOC)src/deepflow_runtime/runtime_api.py
class ThreadHistoryResponse(BaseModel):
"""Conversation history for a thread."""
thread_id: str
messages: list[ThreadMessage]create_app function · python · L178-L251 (74 LOC)src/deepflow_runtime/runtime_api.py
def create_app(settings: DeepFlowSettings | None = None) -> FastAPI:
"""Create the DeepFlow FastAPI app."""
runtime_settings = settings or get_settings()
service = RuntimeService(runtime_settings)
@asynccontextmanager
async def lifespan(_: FastAPI):
await service.start()
try:
yield
finally:
await service.stop()
app = FastAPI(title="DeepFlow Runtime", version="0.1.0", lifespan=lifespan)
app.state.service = service
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type"],
)
@app.get("/health")
async def health() -> dict[str, Any]:
return {
"status": "ok",
"provider_status": runtime_settings.provider_status(),
"workspace_dir": str(runtime_settings.workspace_dir),
"sqlite_path": str(runtime_settings.sqlite_path),
"startWant this analysis on your repo? https://repobility.com/scan/
AsyncCompatibleSqliteSaver class · python · L13-L77 (65 LOC)src/deepflow_runtime/sqlite_compat.py
class AsyncCompatibleSqliteSaver(BaseCheckpointSaver[Any]):
"""Wrap `SqliteSaver` with async methods backed by thread offload."""
def __init__(self, saver: SqliteSaver) -> None:
super().__init__(serde=saver.serde)
self._saver = saver
def __getattr__(self, name: str) -> Any:
return getattr(self._saver, name)
def get_tuple(self, config: Any) -> Any:
return self._saver.get_tuple(config)
def put(self, config: Any, checkpoint: Any, metadata: Any, new_versions: Any) -> Any:
return self._saver.put(config, checkpoint, metadata, new_versions)
def put_writes(self, config: Any, writes: Any, task_id: str, task_path: str = "") -> None:
self._saver.put_writes(config, writes, task_id, task_path)
def list(self, config: Any, *, filter: dict[str, Any] | None = None, before: Any = None, limit: int | None = None):
return self._saver.list(config, filter=filter, before=before, limit=limit)
def delete_thread(self, __init__ method · python · L16-L18 (3 LOC)src/deepflow_runtime/sqlite_compat.py
def __init__(self, saver: SqliteSaver) -> None:
super().__init__(serde=saver.serde)
self._saver = saveralist method · python · L59-L62 (4 LOC)src/deepflow_runtime/sqlite_compat.py
async def alist(self, config: Any, *, filter: dict[str, Any] | None = None, before: Any = None, limit: int | None = None):
items = await asyncio.to_thread(lambda: list(self._saver.list(config, filter=filter, before=before, limit=limit)))
for item in items:
yield itempage 1 / 2next ›