Function bodies 189 total
main function · python · L24-L56 (33 LOC)examples/basic_agent.py
async def main() -> None:
# --8<-- [start:config]
config = await Config.init()
# --8<-- [end:config]
# --8<-- [start:apigen]
# Generate Python APIs for MCP servers in ptc_servers
for server_name, params in config.ptc_servers.items():
if not (config.generated_dir / "mcptools" / server_name).exists():
await generate_mcp_sources({server_name: params}, config.generated_dir)
# --8<-- [end:apigen]
# --8<-- [start:agent]
async with Agent(config=config) as agent:
prompt = "Who is the F1 world champion 2025?"
async for event in agent.stream(prompt):
match event:
case ApprovalRequest(tool_name="ipybox_execute_ipython_cell", tool_args=args) as request:
print(f"Code action:\n{args['code']}")
request.approve(True)
case ApprovalRequest(tool_name=name, tool_args=args) as request:
print(f"Tool: {name}")
prhandle_events function · python · L15-L32 (18 LOC)examples/persistent_agent.py
async def handle_events(agent: Agent, prompt: str) -> None:
async for event in agent.stream(prompt):
match event:
case ApprovalRequest(tool_name="ipybox_execute_ipython_cell", tool_args=args) as request:
print(f"Code action:\n{args['code']}")
request.approve(True)
case ApprovalRequest(tool_name=name, tool_args=args) as request:
print(f"Tool: {name}")
print(f"Args: {args}")
request.approve(True)
case Thoughts(content=content):
print(f"Thinking: {content}")
case CodeExecutionOutput(text=text):
print(f"Code execution output: {text}")
case ToolOutput(content=content):
print(f"Tool call result: {content}")
case Response(content=content):
print(content)main function · python · L35-L65 (31 LOC)examples/persistent_agent.py
async def main() -> None:
config = await Config.init()
for server_name, params in config.ptc_servers.items():
if not (config.generated_dir / "mcptools" / server_name).exists():
await generate_mcp_sources({server_name: params}, config.generated_dir)
# --8<-- [start:session-run-no-id]
# No session_id: agent creates a new session ID internally.
async with Agent(config=config) as agent:
print(f"Generated session ID: {agent.session_id}")
await handle_events(agent, "What is the capital of France?")
await handle_events(agent, "What about Germany?")
# --8<-- [end:session-run-no-id]
# --8<-- [start:session-create]
# Choose an explicit session ID.
session_id = "countries-session"
# --8<-- [end:session-create]
# --8<-- [start:session-run-existing]
# Create-or-resume behavior: resume if present, otherwise start new.
async with Agent(config=config, session_id=session_id) as agent:
await handlConfig.model_post_init method · python · L118-L154 (37 LOC)freeact/agent/config/config.py
def model_post_init(self, __context: Any) -> None:
object.__setattr__(self, "working_dir", self.working_dir.resolve())
resolution_env = self._resolution_env()
object.__setattr__(
self,
"_resolved_model_instance",
resolve_model_instance(
model=self.model,
provider_settings=self.provider_settings,
resolution_env=resolution_env,
),
)
object.__setattr__(
self,
"_resolved_mcp_servers",
resolve_mcp_servers(
tool_search=self.tool_search,
mcp_servers=self.mcp_servers,
basic_search_mcp_server_config=BASIC_SEARCH_MCP_SERVER_CONFIG,
hybrid_search_mcp_server_config=HYBRID_SEARCH_MCP_SERVER_CONFIG,
filesystem_mcp_server_config=FILESYSTEM_MCP_SERVER_CONFIG,
resolution_env=resolution_env,
),
)
object.__setConfig.load method · python · L161-L174 (14 LOC)freeact/agent/config/config.py
async def load(cls, working_dir: Path | None = None) -> "Config":
"""Load persisted config if present, otherwise return defaults."""
config = cls(working_dir=working_dir or Path.cwd())
config_file = config.freeact_dir / "agent.json"
if not config_file.exists():
return config
data = await arun(lambda: json.loads(config_file.read_text()))
return cls.model_validate(
{
**data,
"working_dir": config.working_dir,
}
)Config.init method · python · L177-L184 (8 LOC)freeact/agent/config/config.py
async def init(cls, working_dir: Path | None = None) -> "Config":
"""Load config from `.freeact/` when present, otherwise save defaults."""
config = cls(working_dir=working_dir or Path.cwd())
if config.freeact_dir.exists():
return await cls.load(working_dir=config.working_dir)
await config.save()
return configConfig._save_sync method · python · L264-L282 (19 LOC)freeact/agent/config/config.py
def _save_sync(self) -> None:
if not isinstance(self.model, str):
raise ValueError("model must be a string when saving config")
self.freeact_dir.mkdir(parents=True, exist_ok=True)
payload = self.model_dump(mode="json", exclude={"working_dir"})
config_file = self.freeact_dir / "agent.json"
config_file.write_text(json.dumps(payload, indent=2) + "\n")
self.generated_dir.mkdir(parents=True, exist_ok=True)
self.plans_dir.mkdir(parents=True, exist_ok=True)
self.sessions_dir.mkdir(parents=True, exist_ok=True)
materialize_bundled_skills(
skills_dir=self.skills_dir,
generated_rel_dir=self.generated_rel_dir,
plans_rel_dir=self.plans_rel_dir,
)Powered by Repobility — scan your code at https://repobility.com
Config._resolution_env method · python · L284-L291 (8 LOC)freeact/agent/config/config.py
def _resolution_env(self) -> dict[str, str]:
env = dict(os.environ)
env.setdefault("PYTOOLS_DIR", str(self.generated_rel_dir))
env.setdefault("PYTOOLS_DB_PATH", str(self.search_db_file))
if self.tool_search == "hybrid":
for key, default in HYBRID_SEARCH_ENV_DEFAULTS.items():
env.setdefault(key, default)
return envload_system_prompt function · python · L8-L29 (22 LOC)freeact/agent/config/prompts.py
def load_system_prompt(
*,
tool_search: Literal["basic", "hybrid"],
working_dir: Path,
generated_rel_dir: Path,
project_instructions_file: Path,
skills_metadata: list[SkillMetadata],
) -> str:
prompts = files("freeact.agent.config").joinpath("prompts")
with as_file(prompts) as prompts_dir:
template_name = "system-hybrid.md" if tool_search == "hybrid" else "system-basic.md"
template = (prompts_dir / template_name).read_text()
return template.format(
working_dir=working_dir,
generated_rel_dir=generated_rel_dir,
project_instructions=_render_section(
"project-instructions",
_load_project_instructions_content(project_instructions_file),
),
skills=_render_section("agent-skills", _load_skills_content(skills_metadata, working_dir)),
)_render_section function · python · L32-L39 (8 LOC)freeact/agent/config/prompts.py
def _render_section(section_name: str, content: str | None) -> str:
if content is None:
return ""
prompts = files("freeact.agent.config").joinpath("prompts")
with as_file(prompts) as prompts_dir:
template = (prompts_dir / f"section-{section_name}.md").read_text()
return template.format(content=content)_load_project_instructions_content function · python · L42-L47 (6 LOC)freeact/agent/config/prompts.py
def _load_project_instructions_content(project_instructions_file: Path) -> str | None:
if not project_instructions_file.exists():
return None
content = project_instructions_file.read_text().strip()
return content or None_load_skills_content function · python · L50-L60 (11 LOC)freeact/agent/config/prompts.py
def _load_skills_content(skills_metadata: list[SkillMetadata], working_dir: Path) -> str | None:
if not skills_metadata:
return None
lines: list[str] = []
for skill in skills_metadata:
relative_path = _relative_to_working_dir(skill.path, working_dir)
lines.append(f"- **{skill.name}**: {skill.description}")
lines.append(f" - Location: `{relative_path}`")
return "\n".join(lines)resolve_model_instance function · python · L10-L35 (26 LOC)freeact/agent/config/runtime.py
def resolve_model_instance(
*,
model: str | Model,
provider_settings: dict[str, Any] | None,
resolution_env: Mapping[str, str],
) -> str | Model:
if isinstance(model, Model):
return model
if provider_settings is None:
return model
result = replace_variables(provider_settings, resolution_env)
if result.missing_variables:
raise ValueError(f"Missing environment variables for provider_settings: {result.missing_variables}")
resolved = result.replaced
def provider_factory(name: str) -> Provider[Any]:
kwargs = dict(resolved)
if name in ("google-vertex", "google-gla"):
kwargs.setdefault("vertexai", name == "google-vertex")
provider_class = infer_provider_class(name)
return provider_class(**kwargs)
return infer_model(model, provider_factory=provider_factory)resolve_kernel_env function · python · L38-L57 (20 LOC)freeact/agent/config/runtime.py
def resolve_kernel_env(
*,
kernel_env: dict[str, str],
generated_dir: Path,
resolution_env: Mapping[str, str],
) -> dict[str, str]:
env: dict[str, str] = {
"PYTHONPATH": str(generated_dir),
}
if home := resolution_env.get("HOME"):
env["HOME"] = home
env.update(kernel_env)
result = replace_variables(env, resolution_env)
if result.missing_variables:
raise ValueError(f"Missing environment variables for kernel_env: {result.missing_variables}")
return result.replacedresolve_mcp_servers function · python · L60-L84 (25 LOC)freeact/agent/config/runtime.py
def resolve_mcp_servers(
*,
tool_search: Literal["basic", "hybrid"],
mcp_servers: dict[str, dict[str, Any]],
basic_search_mcp_server_config: dict[str, Any],
hybrid_search_mcp_server_config: dict[str, Any],
filesystem_mcp_server_config: dict[str, Any],
resolution_env: Mapping[str, str],
) -> dict[str, dict[str, Any]]:
pytools = hybrid_search_mcp_server_config if tool_search == "hybrid" else basic_search_mcp_server_config
internal = {
"pytools": copy.deepcopy(pytools),
"filesystem": copy.deepcopy(filesystem_mcp_server_config),
}
merged = {
**internal,
**mcp_servers,
}
result = replace_variables(merged, resolution_env)
if result.missing_variables:
raise ValueError(f"Missing environment variables for mcp_servers: {result.missing_variables}")
return result.replacedRepobility · code-quality intelligence · https://repobility.com
validate_ptc_servers function · python · L87-L94 (8 LOC)freeact/agent/config/runtime.py
def validate_ptc_servers(
*,
ptc_servers: dict[str, dict[str, Any]],
resolution_env: Mapping[str, str],
) -> None:
result = replace_variables(ptc_servers, resolution_env)
if result.missing_variables:
raise ValueError(f"Missing environment variables for ptc_servers: {result.missing_variables}")materialize_bundled_skills function · python · L22-L47 (26 LOC)freeact/agent/config/skills.py
def materialize_bundled_skills(*, skills_dir: Path, generated_rel_dir: Path, plans_rel_dir: Path) -> None:
placeholders = {
"generated_rel_dir": str(generated_rel_dir),
"plans_rel_dir": str(plans_rel_dir),
}
templates_root = files("freeact.agent.config").joinpath("templates", "skills")
with as_file(templates_root) as skills_template_dir:
for template_skill_dir in skills_template_dir.iterdir():
if not template_skill_dir.is_dir():
continue
target_skill_dir = skills_dir / template_skill_dir.name
if target_skill_dir.exists():
continue
for template_file in template_skill_dir.rglob("*"):
relative = template_file.relative_to(template_skill_dir)
target_file = target_skill_dir / relative
if template_file.is_dir():
target_file.mkdir(parents=True, exist_ok=True)
continue
t_scan_skills_dir function · python · L50-L67 (18 LOC)freeact/agent/config/skills.py
def _scan_skills_dir(skills_dir: Path) -> list[SkillMetadata]:
if not skills_dir.exists():
return []
skills: list[SkillMetadata] = []
for skill_dir in skills_dir.iterdir():
if not skill_dir.is_dir():
continue
skill_file = skill_dir / "SKILL.md"
if not skill_file.exists():
continue
metadata = _parse_skill_file(skill_file)
if metadata is not None:
skills.append(metadata)
return skills_parse_skill_file function · python · L70-L89 (20 LOC)freeact/agent/config/skills.py
def _parse_skill_file(skill_file: Path) -> SkillMetadata | None:
content = skill_file.read_text()
if not content.startswith("---"):
return None
parts = content.split("---", 2)
if len(parts) < 3:
return None
frontmatter = yaml.safe_load(parts[1])
if not isinstance(frontmatter, dict):
return None
try:
name = frontmatter["name"]
description = frontmatter["description"]
except KeyError:
return None
return SkillMetadata(name=name, description=description, path=skill_file)Agent.__init__ method · python · L89-L165 (77 LOC)freeact/agent/core.py
def __init__(
self,
config: Config,
agent_id: str | None = None,
session_id: str | None = None,
sandbox: bool = False,
sandbox_config: Path | None = None,
):
"""Initialize the agent.
Args:
config: Agent configuration containing model, system prompt,
MCP servers, kernel env, timeouts, and subagent settings.
agent_id: Identifier for this agent instance. Defaults to
`"main"` when not provided.
session_id: Optional session identifier for persistence.
If `None` and persistence is enabled, a new session ID
is generated. If provided and persistence is enabled, that
session ID is used. Existing session history is resumed when
present; otherwise a new session starts with that ID.
sandbox: Run the kernel in sandbox mode.
sandbox_config: Path to custom sandbox configuratiAgent.start method · python · L195-L238 (44 LOC)freeact/agent/core.py
async def start(self) -> None:
"""Restore persisted history, start the code executor and MCP servers.
Automatically called when entering the async context manager.
"""
if self._resource_supervisors:
return
if self._session_store is not None and self._history_agent_id == "main" and not self._message_history:
self._message_history = await arun(self._session_store.load_messages, agent_id="main")
self._mcp_server_instances = self._create_mcp_servers()
resource_supervisors = [_ResourceSupervisor(self._code_executor, "code-executor")]
for name, server in self._mcp_server_instances.items():
logger.info(f"Starting MCP server: {name}")
server.tool_prefix = name
resource_supervisors.append(_ResourceSupervisor(server, f"mcp-server-{name}"))
try:
await asyncio.gather(*(supervisor.start() for supervisor in resource_supervisors))
except ExceAgent.stop method · python · L240-L262 (23 LOC)freeact/agent/core.py
async def stop(self) -> None:
"""Stop the code executor and MCP servers.
Automatically called when exiting the async context manager.
"""
self._tool_definitions = []
self._tool_mapping = {}
resource_supervisors = self._resource_supervisors
self._resource_supervisors = []
if not resource_supervisors:
return
results = await asyncio.gather(
*(supervisor.stop() for supervisor in resource_supervisors),
return_exceptions=True,
)
errors = [result for result in results if isinstance(result, Exception)]
if errors:
if len(errors) == 1:
raise errors[0]
raise ExceptionGroup("Multiple errors while stopping agent resources", errors)
self._mcp_server_instances = {}Agent._create_mcp_servers method · python · L264-L287 (24 LOC)freeact/agent/core.py
def _create_mcp_servers(self) -> dict[str, MCPServer]:
if not self._mcp_servers:
return {}
servers: dict[str, MCPServer] = {}
for name, raw_cfg in self._mcp_servers.items():
cfg = dict(raw_cfg)
excluded_tools = cfg.pop("excluded_tools", None)
match cfg:
case {"command": _}:
if excluded_tools:
servers[name] = _MCPServerStdioFiltered(
excluded_tools=frozenset(excluded_tools),
**cfg,
)
else:
servers[name] = MCPServerStdio(**cfg)
case {"url": _}:
servers[name] = MCPServerStreamableHTTP(**cfg)
case _:
raise ValueError(f"Invalid server config for {name}: must have 'command' or 'url'")
return serversWant this analysis on your repo? https://repobility.com/scan/
Agent._create_model_request method · python · L289-L296 (8 LOC)freeact/agent/core.py
def _create_model_request(self, user_prompt: str | Sequence[UserContent]) -> ModelRequest:
parts: list[SystemPromptPart | UserPromptPart] = []
if not self._message_history:
parts.append(SystemPromptPart(content=self._system_prompt))
parts.append(UserPromptPart(content=user_prompt))
return ModelRequest(parts=parts)Agent.stream method · python · L298-L391 (94 LOC)freeact/agent/core.py
async def stream(
self,
prompt: str | Sequence[UserContent],
max_turns: int | None = None,
) -> AsyncIterator[AgentEvent]:
"""Run a single agent turn, yielding events as they occur.
Loops through model responses and tool executions until the model
produces a response without tool calls. All code actions and tool
calls yield an [`ApprovalRequest`][freeact.agent.ApprovalRequest]
that must be resolved before execution proceeds.
Args:
prompt: User message as text or multimodal content sequence.
max_turns: Maximum number of tool-execution rounds. Each round
consists of a model response followed by tool execution.
If `None`, runs until the model stops calling tools.
Returns:
An async event iterator.
"""
request = self._create_model_request(prompt)
request_params = ModelRequestParameters(function_tools=self._toolAgent._execute_tool method · python · L393-L473 (81 LOC)freeact/agent/core.py
async def _execute_tool(self, call: ToolCallPart) -> AsyncIterator[AgentEvent | ToolReturnPart]:
tool_name = call.tool_name
tool_args = call.args_as_dict()
corr_id = uuid.uuid4().hex[:8]
if tool_name not in self.tool_names:
yield ToolReturnPart(
tool_call_id=call.tool_call_id,
tool_name=tool_name,
content=f"Unknown tool name: {tool_name}",
metadata={"rejected": False},
)
return
approval = ApprovalRequest(
tool_name=tool_name,
tool_args=tool_args,
agent_id=self.agent_id,
corr_id=corr_id,
)
yield approval
if not await approval.approved():
yield ToolReturnPart(
tool_call_id=call.tool_call_id,
tool_name=tool_name,
content="Tool call rejected",
metadata={"rejected": True},
)
rAgent._execute_subagent_task method · python · L475-L499 (25 LOC)freeact/agent/core.py
async def _execute_subagent_task(self, prompt: str, max_turns: int, corr_id: str) -> AsyncIterator[AgentEvent]:
subagent_config = self._config.for_subagent()
subagent = Agent(
config=subagent_config,
agent_id=f"sub-{uuid.uuid4().hex[:4]}",
sandbox=self._sandbox,
sandbox_config=self._sandbox_config,
session_id=self._session_id,
)
runner = _SubagentRunner(subagent=subagent, semaphore=self._subagent_semaphore)
last_response = ""
try:
async for item in runner.stream(prompt, max_turns=max_turns):
yield item
match item:
case Response(content=content):
last_response = content
except Exception as e:
error_content = f"Subagent error: {e}"
yield ToolOutput(content=error_content, agent_id=self.agent_id, corr_id=corr_id)
return
final_content = awaiAgent._ipybox_execute_ipython_cell method · python · L501-L530 (30 LOC)freeact/agent/core.py
async def _ipybox_execute_ipython_cell(
self, code: str
) -> AsyncIterator[ApprovalRequest | CodeExecutionOutputChunk | CodeExecutionOutput]:
try:
async with self._code_executor_lock:
async for item in self._code_executor.stream(code, timeout=self._execution_timeout, chunks=True):
match item:
case ipybox.ApprovalRequest(
server_name=server_name,
tool_name=tool_name,
tool_args=tool_args,
):
ptc_request = ApprovalRequest(
tool_name=f"{server_name}_{tool_name}", # type: ignore[has-type]
tool_args=tool_args, # type: ignore[has-type]
ptc=True,
agent_id=self.agent_id,
)
yield ptc_rAgent._ipybox_reset method · python · L532-L538 (7 LOC)freeact/agent/core.py
async def _ipybox_reset(self) -> str:
try:
async with self._code_executor_lock:
await self._code_executor.reset()
return "Kernel reset successfully."
except Exception as e:
return f"Kernel reset failed: {str(e)}"Agent._call_mcp_tool method · python · L540-L552 (13 LOC)freeact/agent/core.py
async def _call_mcp_tool(self, tool_name: str, tool_args: dict[str, object]) -> ToolResult:
try:
mcp_server = self._tool_mapping[tool_name]
resolved_name = tool_name.removeprefix(f"{mcp_server.tool_prefix}_")
result = await mcp_server.direct_call_tool(
name=resolved_name,
args=tool_args,
)
if tool_name in {"filesystem_read_text_file", "filesystem_read_multiple_files"}:
return self._extract_file_content(result)
return result
except Exception as e:
return f"MCP tool call failed: {str(e)}"Agent._extract_file_content method · python · L555-L570 (16 LOC)freeact/agent/core.py
def _extract_file_content(result: ToolResult) -> ToolResult:
match result:
case {"content": content} as payload if len(payload) == 1:
return content
case str() as raw:
try:
decoded = json.loads(raw)
except json.JSONDecodeError:
return result
match decoded:
case {"content": content} as payload if len(payload) == 1:
return content
case _:
return result
case _:
return resultAbout: code-quality intelligence by Repobility · https://repobility.com
Agent._append_message_history method · python · L572-L582 (11 LOC)freeact/agent/core.py
async def _append_message_history(self, messages: list[ModelMessage]) -> None:
if not messages:
return
self._message_history.extend(messages)
if self._session_store is not None:
await arun(
self._session_store.append_messages,
agent_id=self._history_agent_id,
messages=messages,
)CodeExecutionOutput.ptc_rejected method · python · L72-L79 (8 LOC)freeact/agent/events.py
def ptc_rejected(self) -> bool:
"""Whether the output indicates a rejected programmatic tool call."""
if not self.text:
return False
# TODO: make detection of PTC rejection more robust ...
pattern = r"ToolRunnerError: Approval request for \S+ rejected"
return bool(re.search(pattern, self.text))CodeExecutionOutput.format method · python · L81-L88 (8 LOC)freeact/agent/events.py
def format(self) -> str:
"""Format output with image markdown links."""
parts: list[str] = []
if self.text:
parts.append(self.text)
for image_path in self.images:
parts.append(f"")
return "\n".join(parts) if parts else ""ApprovalRequest.approve method · python · L105-L112 (8 LOC)freeact/agent/events.py
def approve(self, decision: bool) -> None:
"""Resolve this approval request.
Args:
decision: `True` to execute, `False` to reject and end
the current agent turn.
"""
self._future.set_result(decision)SessionStore.__init__ method · python · L18-L26 (9 LOC)freeact/agent/store.py
def __init__(
self,
sessions_root: Path,
session_id: str,
flush_after_append: bool = False,
):
self._sessions_root = sessions_root
self._session_id = session_id
self._flush_after_append = flush_after_appendSessionStore.append_messages method · python · L28-L53 (26 LOC)freeact/agent/store.py
def append_messages(self, agent_id: str, messages: list[ModelMessage]) -> None:
"""Append serialized messages to an agent-specific session log.
Each message is written as a versioned JSONL envelope with a UTC
timestamp. The session file is created on demand.
Args:
agent_id: Logical agent stream name (for example, `"main"` or
`"sub-1234"`), used as the JSONL filename stem.
messages: Messages to append in order.
"""
session_dir = self._sessions_root / self._session_id
session_dir.mkdir(parents=True, exist_ok=True)
session_file = session_dir / f"{agent_id}.jsonl"
with session_file.open("a", encoding="utf-8") as f:
for message in messages:
envelope = {
"v": 1,
"message": to_jsonable_python(message, bytes_mode="base64"),
"meta": {"ts": datetime.now(UTC).isoformat().replace("+00:00", "Z")SessionStore.load_messages method · python · L55-L86 (32 LOC)freeact/agent/store.py
def load_messages(self, agent_id: str) -> list[ModelMessage]:
"""Load and validate all persisted messages for an agent.
Returns an empty list when no session file exists. If the final line is
truncated (for example from an interrupted write), that line is ignored.
Earlier malformed lines raise `ValueError`.
Args:
agent_id: Logical agent stream name used to locate the JSONL file.
Returns:
Deserialized message history in append order.
"""
session_file = self._sessions_root / self._session_id / f"{agent_id}.jsonl"
if not session_file.exists():
return []
lines = session_file.read_text(encoding="utf-8").splitlines()
serialized_messages: list[Any] = []
for index, line in enumerate(lines):
try:
envelope = json.loads(line)
except json.JSONDecodeError as e:
if index == len(lines) - 1:
SessionStore.save_tool_result method · python · L88-L102 (15 LOC)freeact/agent/store.py
def save_tool_result(self, payload: bytes, extension: str) -> Path:
"""Persist a tool-result payload under the session's `tool-results/` directory."""
safe_extension = self._sanitize_extension(extension)
tool_results_dir = self._sessions_root / self._session_id / "tool-results"
tool_results_dir.mkdir(parents=True, exist_ok=True)
while True:
file_id = uuid.uuid4().hex[:8]
filename = f"{file_id}.{safe_extension}"
path = tool_results_dir / filename
if not path.exists():
break
path.write_bytes(payload)
return pathPowered by Repobility — scan your code at https://repobility.com
SessionStore._validate_envelope method · python · L105-L126 (22 LOC)freeact/agent/store.py
def _validate_envelope(envelope: Any, line_no: int, session_file: Path) -> None:
if not isinstance(envelope, dict):
raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")
required_keys = {"v", "message", "meta"}
if not required_keys.issubset(envelope):
raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")
if envelope["v"] != 1:
raise ValueError(f"Unsupported session envelope version on line {line_no} in {session_file}")
meta = envelope["meta"]
if not isinstance(meta, dict):
raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")
if "agent_id" in meta:
raise ValueError(
f"Invalid session envelope on line {line_no} in {session_file}: meta.agent_id is forbidden"
)
if "ts" not in meta:
raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")SessionStore._sanitize_extension method · python · L129-L137 (9 LOC)freeact/agent/store.py
def _sanitize_extension(extension: str) -> str:
raw = extension.lower().lstrip(".")
if not raw:
return "bin"
if re.fullmatch(r"[a-z0-9]+", raw):
return raw
return "bin"ToolResultMaterializer.__init__ method · python · L150-L161 (12 LOC)freeact/agent/store.py
def __init__(
self,
*,
session_store: SessionStore,
inline_max_bytes: int,
preview_lines: int,
working_dir: Path,
) -> None:
self._session_store = session_store
self._inline_max_bytes = inline_max_bytes
self._preview_lines = preview_lines
self._working_dir = working_dirToolResultMaterializer.materialize method · python · L163-L183 (21 LOC)freeact/agent/store.py
def materialize(self, content: ToolResult) -> ToolResult:
canonical = self._canonicalize(content)
actual_size_bytes = len(canonical.payload)
if actual_size_bytes <= self._inline_max_bytes:
return content
try:
stored_path = self._session_store.save_tool_result(canonical.payload, canonical.extension)
except Exception:
return content
lines = [
f"Tool result exceeded configured inline threshold ({self._inline_max_bytes} bytes).",
f"Actual size: {actual_size_bytes} bytes.",
]
if canonical.preview_lines:
lines.append(f"Preview (first and last {self._preview_lines} lines):")
lines.extend(canonical.preview_lines)
lines.append(f"Full content saved to: {stored_path.relative_to(self._working_dir).as_posix()}")
return "\n".join(lines)ToolResultMaterializer._canonicalize method · python · L185-L211 (27 LOC)freeact/agent/store.py
def _canonicalize(self, content: ToolResult) -> _CanonicalToolResult:
match content:
case str() as text:
return _CanonicalToolResult(
payload=text.encode("utf-8"),
extension="txt",
preview_lines=self._take_preview_lines(text),
)
case BinaryContent(data=data, media_type=media_type):
return _CanonicalToolResult(
payload=data,
extension=self._media_type_to_ext(media_type),
preview_lines=[],
)
case _:
normalized = to_jsonable_python(content, bytes_mode="base64")
rendered = json.dumps(
normalized,
ensure_ascii=False,
indent=2,
sort_keys=True,
)
return _CanonicalToolResult(
payload=rendered.encode("utf-ToolResultMaterializer._take_preview_lines method · python · L213-L230 (18 LOC)freeact/agent/store.py
def _take_preview_lines(self, text: str) -> list[str]:
if self._preview_lines <= 0:
return []
lines = text.splitlines()
if not lines:
return ["<empty>"]
boundary = self._preview_lines
if len(lines) <= boundary * 2:
return lines
omitted = len(lines) - (boundary * 2)
return [
*lines[:boundary],
f"... ({omitted} lines omitted) ...",
*lines[-boundary:],
]_SubagentRunner.stream method · python · L20-L47 (28 LOC)freeact/agent/_subagent.py
async def stream(self, prompt: str, max_turns: int) -> AsyncIterator[AgentEvent]:
queue: asyncio.Queue[AgentEvent | Exception | None] = asyncio.Queue()
async def run_subagent() -> None:
try:
async with self._semaphore:
async with self._subagent:
async for event in self._subagent.stream(prompt, max_turns=max_turns):
await queue.put(event)
except Exception as e:
queue.put_nowait(e)
finally:
queue.put_nowait(None)
task = asyncio.create_task(run_subagent())
try:
while True:
item = await queue.get()
if item is None:
return
if isinstance(item, Exception):
raise item
yield item
finally:
if not task.done():
task.cancel()
with suppress(asy_ResourceSupervisor.__init__ method · python · L14-L21 (8 LOC)freeact/agent/_supervisor.py
def __init__(self, resource: Any, name: str):
self._resource = resource
self._name = name
self._task: asyncio.Task[None] | None = None
self._ready = asyncio.Event()
self._stop = asyncio.Event()
self._entered_resource: Any | None = None
self._error: Exception | None = NoneRepobility · code-quality intelligence · https://repobility.com
_ResourceSupervisor.start method · python · L23-L34 (12 LOC)freeact/agent/_supervisor.py
async def start(self) -> Any:
"""Start resource task and wait until context is entered."""
if self._task is not None:
raise RuntimeError(f"Resource supervisor for '{self._name}' already started")
self._task = asyncio.create_task(self._run(), name=f"resource-{self._name}")
await self._ready.wait()
if self._error is not None:
raise RuntimeError(f"Failed to start resource '{self._name}'") from self._error
return self._entered_resource_ResourceSupervisor.stop method · python · L36-L42 (7 LOC)freeact/agent/_supervisor.py
async def stop(self) -> None:
"""Signal resource task to exit context and wait for completion."""
if self._task is None:
return
self._stop.set()
await self._task_ResourceSupervisor._run method · python · L44-L53 (10 LOC)freeact/agent/_supervisor.py
async def _run(self) -> None:
try:
async with self._resource as entered_resource:
self._entered_resource = entered_resource
self._ready.set()
await self._stop.wait()
except Exception as e:
self._error = e
self._ready.set()
raisepage 1 / 4next ›