← back to gradion-ai__freeact

Function bodies 189 total

All specs Real LLM only Function bodies
main function · python · L24-L56 (33 LOC)
examples/basic_agent.py
async def main() -> None:
    # --8<-- [start:config]
    config = await Config.init()
    # --8<-- [end:config]

    # --8<-- [start:apigen]
    # Generate Python APIs for MCP servers in ptc_servers
    for server_name, params in config.ptc_servers.items():
        if not (config.generated_dir / "mcptools" / server_name).exists():
            await generate_mcp_sources({server_name: params}, config.generated_dir)
    # --8<-- [end:apigen]

    # --8<-- [start:agent]
    async with Agent(config=config) as agent:
        prompt = "Who is the F1 world champion 2025?"

        async for event in agent.stream(prompt):
            match event:
                case ApprovalRequest(tool_name="ipybox_execute_ipython_cell", tool_args=args) as request:
                    print(f"Code action:\n{args['code']}")
                    request.approve(True)
                case ApprovalRequest(tool_name=name, tool_args=args) as request:
                    print(f"Tool: {name}")
                    pr
handle_events function · python · L15-L32 (18 LOC)
examples/persistent_agent.py
async def handle_events(agent: Agent, prompt: str) -> None:
    async for event in agent.stream(prompt):
        match event:
            case ApprovalRequest(tool_name="ipybox_execute_ipython_cell", tool_args=args) as request:
                print(f"Code action:\n{args['code']}")
                request.approve(True)
            case ApprovalRequest(tool_name=name, tool_args=args) as request:
                print(f"Tool: {name}")
                print(f"Args: {args}")
                request.approve(True)
            case Thoughts(content=content):
                print(f"Thinking: {content}")
            case CodeExecutionOutput(text=text):
                print(f"Code execution output: {text}")
            case ToolOutput(content=content):
                print(f"Tool call result: {content}")
            case Response(content=content):
                print(content)
main function · python · L35-L65 (31 LOC)
examples/persistent_agent.py
async def main() -> None:
    config = await Config.init()

    for server_name, params in config.ptc_servers.items():
        if not (config.generated_dir / "mcptools" / server_name).exists():
            await generate_mcp_sources({server_name: params}, config.generated_dir)

    # --8<-- [start:session-run-no-id]
    # No session_id: agent creates a new session ID internally.
    async with Agent(config=config) as agent:
        print(f"Generated session ID: {agent.session_id}")
        await handle_events(agent, "What is the capital of France?")
        await handle_events(agent, "What about Germany?")
    # --8<-- [end:session-run-no-id]

    # --8<-- [start:session-create]
    # Choose an explicit session ID.
    session_id = "countries-session"
    # --8<-- [end:session-create]

    # --8<-- [start:session-run-existing]
    # Create-or-resume behavior: resume if present, otherwise start new.
    async with Agent(config=config, session_id=session_id) as agent:
        await handl
Config.model_post_init method · python · L118-L154 (37 LOC)
freeact/agent/config/config.py
    def model_post_init(self, __context: Any) -> None:
        object.__setattr__(self, "working_dir", self.working_dir.resolve())
        resolution_env = self._resolution_env()
        object.__setattr__(
            self,
            "_resolved_model_instance",
            resolve_model_instance(
                model=self.model,
                provider_settings=self.provider_settings,
                resolution_env=resolution_env,
            ),
        )
        object.__setattr__(
            self,
            "_resolved_mcp_servers",
            resolve_mcp_servers(
                tool_search=self.tool_search,
                mcp_servers=self.mcp_servers,
                basic_search_mcp_server_config=BASIC_SEARCH_MCP_SERVER_CONFIG,
                hybrid_search_mcp_server_config=HYBRID_SEARCH_MCP_SERVER_CONFIG,
                filesystem_mcp_server_config=FILESYSTEM_MCP_SERVER_CONFIG,
                resolution_env=resolution_env,
            ),
        )
        object.__set
Config.load method · python · L161-L174 (14 LOC)
freeact/agent/config/config.py
    async def load(cls, working_dir: Path | None = None) -> "Config":
        """Load persisted config if present, otherwise return defaults."""
        config = cls(working_dir=working_dir or Path.cwd())
        config_file = config.freeact_dir / "agent.json"
        if not config_file.exists():
            return config

        data = await arun(lambda: json.loads(config_file.read_text()))
        return cls.model_validate(
            {
                **data,
                "working_dir": config.working_dir,
            }
        )
Config.init method · python · L177-L184 (8 LOC)
freeact/agent/config/config.py
    async def init(cls, working_dir: Path | None = None) -> "Config":
        """Load config from `.freeact/` when present, otherwise save defaults."""
        config = cls(working_dir=working_dir or Path.cwd())
        if config.freeact_dir.exists():
            return await cls.load(working_dir=config.working_dir)

        await config.save()
        return config
Config._save_sync method · python · L264-L282 (19 LOC)
freeact/agent/config/config.py
    def _save_sync(self) -> None:
        if not isinstance(self.model, str):
            raise ValueError("model must be a string when saving config")

        self.freeact_dir.mkdir(parents=True, exist_ok=True)

        payload = self.model_dump(mode="json", exclude={"working_dir"})
        config_file = self.freeact_dir / "agent.json"
        config_file.write_text(json.dumps(payload, indent=2) + "\n")

        self.generated_dir.mkdir(parents=True, exist_ok=True)
        self.plans_dir.mkdir(parents=True, exist_ok=True)
        self.sessions_dir.mkdir(parents=True, exist_ok=True)

        materialize_bundled_skills(
            skills_dir=self.skills_dir,
            generated_rel_dir=self.generated_rel_dir,
            plans_rel_dir=self.plans_rel_dir,
        )
Powered by Repobility — scan your code at https://repobility.com
Config._resolution_env method · python · L284-L291 (8 LOC)
freeact/agent/config/config.py
    def _resolution_env(self) -> dict[str, str]:
        env = dict(os.environ)
        env.setdefault("PYTOOLS_DIR", str(self.generated_rel_dir))
        env.setdefault("PYTOOLS_DB_PATH", str(self.search_db_file))
        if self.tool_search == "hybrid":
            for key, default in HYBRID_SEARCH_ENV_DEFAULTS.items():
                env.setdefault(key, default)
        return env
load_system_prompt function · python · L8-L29 (22 LOC)
freeact/agent/config/prompts.py
def load_system_prompt(
    *,
    tool_search: Literal["basic", "hybrid"],
    working_dir: Path,
    generated_rel_dir: Path,
    project_instructions_file: Path,
    skills_metadata: list[SkillMetadata],
) -> str:
    prompts = files("freeact.agent.config").joinpath("prompts")
    with as_file(prompts) as prompts_dir:
        template_name = "system-hybrid.md" if tool_search == "hybrid" else "system-basic.md"
        template = (prompts_dir / template_name).read_text()

    return template.format(
        working_dir=working_dir,
        generated_rel_dir=generated_rel_dir,
        project_instructions=_render_section(
            "project-instructions",
            _load_project_instructions_content(project_instructions_file),
        ),
        skills=_render_section("agent-skills", _load_skills_content(skills_metadata, working_dir)),
    )
_render_section function · python · L32-L39 (8 LOC)
freeact/agent/config/prompts.py
def _render_section(section_name: str, content: str | None) -> str:
    if content is None:
        return ""

    prompts = files("freeact.agent.config").joinpath("prompts")
    with as_file(prompts) as prompts_dir:
        template = (prompts_dir / f"section-{section_name}.md").read_text()
    return template.format(content=content)
_load_project_instructions_content function · python · L42-L47 (6 LOC)
freeact/agent/config/prompts.py
def _load_project_instructions_content(project_instructions_file: Path) -> str | None:
    if not project_instructions_file.exists():
        return None

    content = project_instructions_file.read_text().strip()
    return content or None
_load_skills_content function · python · L50-L60 (11 LOC)
freeact/agent/config/prompts.py
def _load_skills_content(skills_metadata: list[SkillMetadata], working_dir: Path) -> str | None:
    if not skills_metadata:
        return None

    lines: list[str] = []
    for skill in skills_metadata:
        relative_path = _relative_to_working_dir(skill.path, working_dir)
        lines.append(f"- **{skill.name}**: {skill.description}")
        lines.append(f"  - Location: `{relative_path}`")

    return "\n".join(lines)
resolve_model_instance function · python · L10-L35 (26 LOC)
freeact/agent/config/runtime.py
def resolve_model_instance(
    *,
    model: str | Model,
    provider_settings: dict[str, Any] | None,
    resolution_env: Mapping[str, str],
) -> str | Model:
    if isinstance(model, Model):
        return model

    if provider_settings is None:
        return model

    result = replace_variables(provider_settings, resolution_env)
    if result.missing_variables:
        raise ValueError(f"Missing environment variables for provider_settings: {result.missing_variables}")

    resolved = result.replaced

    def provider_factory(name: str) -> Provider[Any]:
        kwargs = dict(resolved)
        if name in ("google-vertex", "google-gla"):
            kwargs.setdefault("vertexai", name == "google-vertex")
        provider_class = infer_provider_class(name)
        return provider_class(**kwargs)

    return infer_model(model, provider_factory=provider_factory)
resolve_kernel_env function · python · L38-L57 (20 LOC)
freeact/agent/config/runtime.py
def resolve_kernel_env(
    *,
    kernel_env: dict[str, str],
    generated_dir: Path,
    resolution_env: Mapping[str, str],
) -> dict[str, str]:
    env: dict[str, str] = {
        "PYTHONPATH": str(generated_dir),
    }

    if home := resolution_env.get("HOME"):
        env["HOME"] = home

    env.update(kernel_env)

    result = replace_variables(env, resolution_env)
    if result.missing_variables:
        raise ValueError(f"Missing environment variables for kernel_env: {result.missing_variables}")

    return result.replaced
resolve_mcp_servers function · python · L60-L84 (25 LOC)
freeact/agent/config/runtime.py
def resolve_mcp_servers(
    *,
    tool_search: Literal["basic", "hybrid"],
    mcp_servers: dict[str, dict[str, Any]],
    basic_search_mcp_server_config: dict[str, Any],
    hybrid_search_mcp_server_config: dict[str, Any],
    filesystem_mcp_server_config: dict[str, Any],
    resolution_env: Mapping[str, str],
) -> dict[str, dict[str, Any]]:
    pytools = hybrid_search_mcp_server_config if tool_search == "hybrid" else basic_search_mcp_server_config
    internal = {
        "pytools": copy.deepcopy(pytools),
        "filesystem": copy.deepcopy(filesystem_mcp_server_config),
    }

    merged = {
        **internal,
        **mcp_servers,
    }

    result = replace_variables(merged, resolution_env)
    if result.missing_variables:
        raise ValueError(f"Missing environment variables for mcp_servers: {result.missing_variables}")

    return result.replaced
Repobility · code-quality intelligence · https://repobility.com
validate_ptc_servers function · python · L87-L94 (8 LOC)
freeact/agent/config/runtime.py
def validate_ptc_servers(
    *,
    ptc_servers: dict[str, dict[str, Any]],
    resolution_env: Mapping[str, str],
) -> None:
    result = replace_variables(ptc_servers, resolution_env)
    if result.missing_variables:
        raise ValueError(f"Missing environment variables for ptc_servers: {result.missing_variables}")
materialize_bundled_skills function · python · L22-L47 (26 LOC)
freeact/agent/config/skills.py
def materialize_bundled_skills(*, skills_dir: Path, generated_rel_dir: Path, plans_rel_dir: Path) -> None:
    placeholders = {
        "generated_rel_dir": str(generated_rel_dir),
        "plans_rel_dir": str(plans_rel_dir),
    }

    templates_root = files("freeact.agent.config").joinpath("templates", "skills")
    with as_file(templates_root) as skills_template_dir:
        for template_skill_dir in skills_template_dir.iterdir():
            if not template_skill_dir.is_dir():
                continue

            target_skill_dir = skills_dir / template_skill_dir.name
            if target_skill_dir.exists():
                continue

            for template_file in template_skill_dir.rglob("*"):
                relative = template_file.relative_to(template_skill_dir)
                target_file = target_skill_dir / relative
                if template_file.is_dir():
                    target_file.mkdir(parents=True, exist_ok=True)
                    continue

                t
_scan_skills_dir function · python · L50-L67 (18 LOC)
freeact/agent/config/skills.py
def _scan_skills_dir(skills_dir: Path) -> list[SkillMetadata]:
    if not skills_dir.exists():
        return []

    skills: list[SkillMetadata] = []
    for skill_dir in skills_dir.iterdir():
        if not skill_dir.is_dir():
            continue

        skill_file = skill_dir / "SKILL.md"
        if not skill_file.exists():
            continue

        metadata = _parse_skill_file(skill_file)
        if metadata is not None:
            skills.append(metadata)

    return skills
_parse_skill_file function · python · L70-L89 (20 LOC)
freeact/agent/config/skills.py
def _parse_skill_file(skill_file: Path) -> SkillMetadata | None:
    content = skill_file.read_text()
    if not content.startswith("---"):
        return None

    parts = content.split("---", 2)
    if len(parts) < 3:
        return None

    frontmatter = yaml.safe_load(parts[1])
    if not isinstance(frontmatter, dict):
        return None

    try:
        name = frontmatter["name"]
        description = frontmatter["description"]
    except KeyError:
        return None

    return SkillMetadata(name=name, description=description, path=skill_file)
Agent.__init__ method · python · L89-L165 (77 LOC)
freeact/agent/core.py
    def __init__(
        self,
        config: Config,
        agent_id: str | None = None,
        session_id: str | None = None,
        sandbox: bool = False,
        sandbox_config: Path | None = None,
    ):
        """Initialize the agent.

        Args:
            config: Agent configuration containing model, system prompt,
                MCP servers, kernel env, timeouts, and subagent settings.
            agent_id: Identifier for this agent instance. Defaults to
                `"main"` when not provided.
            session_id: Optional session identifier for persistence.
                If `None` and persistence is enabled, a new session ID
                is generated. If provided and persistence is enabled, that
                session ID is used. Existing session history is resumed when
                present; otherwise a new session starts with that ID.
            sandbox: Run the kernel in sandbox mode.
            sandbox_config: Path to custom sandbox configurati
Agent.start method · python · L195-L238 (44 LOC)
freeact/agent/core.py
    async def start(self) -> None:
        """Restore persisted history, start the code executor and MCP servers.

        Automatically called when entering the async context manager.
        """
        if self._resource_supervisors:
            return

        if self._session_store is not None and self._history_agent_id == "main" and not self._message_history:
            self._message_history = await arun(self._session_store.load_messages, agent_id="main")

        self._mcp_server_instances = self._create_mcp_servers()

        resource_supervisors = [_ResourceSupervisor(self._code_executor, "code-executor")]
        for name, server in self._mcp_server_instances.items():
            logger.info(f"Starting MCP server: {name}")
            server.tool_prefix = name
            resource_supervisors.append(_ResourceSupervisor(server, f"mcp-server-{name}"))

        try:
            await asyncio.gather(*(supervisor.start() for supervisor in resource_supervisors))
        except Exce
Agent.stop method · python · L240-L262 (23 LOC)
freeact/agent/core.py
    async def stop(self) -> None:
        """Stop the code executor and MCP servers.

        Automatically called when exiting the async context manager.
        """
        self._tool_definitions = []
        self._tool_mapping = {}

        resource_supervisors = self._resource_supervisors
        self._resource_supervisors = []
        if not resource_supervisors:
            return

        results = await asyncio.gather(
            *(supervisor.stop() for supervisor in resource_supervisors),
            return_exceptions=True,
        )
        errors = [result for result in results if isinstance(result, Exception)]
        if errors:
            if len(errors) == 1:
                raise errors[0]
            raise ExceptionGroup("Multiple errors while stopping agent resources", errors)
        self._mcp_server_instances = {}
Agent._create_mcp_servers method · python · L264-L287 (24 LOC)
freeact/agent/core.py
    def _create_mcp_servers(self) -> dict[str, MCPServer]:
        if not self._mcp_servers:
            return {}

        servers: dict[str, MCPServer] = {}

        for name, raw_cfg in self._mcp_servers.items():
            cfg = dict(raw_cfg)
            excluded_tools = cfg.pop("excluded_tools", None)
            match cfg:
                case {"command": _}:
                    if excluded_tools:
                        servers[name] = _MCPServerStdioFiltered(
                            excluded_tools=frozenset(excluded_tools),
                            **cfg,
                        )
                    else:
                        servers[name] = MCPServerStdio(**cfg)
                case {"url": _}:
                    servers[name] = MCPServerStreamableHTTP(**cfg)
                case _:
                    raise ValueError(f"Invalid server config for {name}: must have 'command' or 'url'")

        return servers
Want this analysis on your repo? https://repobility.com/scan/
Agent._create_model_request method · python · L289-L296 (8 LOC)
freeact/agent/core.py
    def _create_model_request(self, user_prompt: str | Sequence[UserContent]) -> ModelRequest:
        parts: list[SystemPromptPart | UserPromptPart] = []

        if not self._message_history:
            parts.append(SystemPromptPart(content=self._system_prompt))
        parts.append(UserPromptPart(content=user_prompt))

        return ModelRequest(parts=parts)
Agent.stream method · python · L298-L391 (94 LOC)
freeact/agent/core.py
    async def stream(
        self,
        prompt: str | Sequence[UserContent],
        max_turns: int | None = None,
    ) -> AsyncIterator[AgentEvent]:
        """Run a single agent turn, yielding events as they occur.

        Loops through model responses and tool executions until the model
        produces a response without tool calls. All code actions and tool
        calls yield an [`ApprovalRequest`][freeact.agent.ApprovalRequest]
        that must be resolved before execution proceeds.

        Args:
            prompt: User message as text or multimodal content sequence.
            max_turns: Maximum number of tool-execution rounds. Each round
                consists of a model response followed by tool execution.
                If `None`, runs until the model stops calling tools.

        Returns:
            An async event iterator.
        """
        request = self._create_model_request(prompt)
        request_params = ModelRequestParameters(function_tools=self._tool
Agent._execute_tool method · python · L393-L473 (81 LOC)
freeact/agent/core.py
    async def _execute_tool(self, call: ToolCallPart) -> AsyncIterator[AgentEvent | ToolReturnPart]:
        tool_name = call.tool_name
        tool_args = call.args_as_dict()
        corr_id = uuid.uuid4().hex[:8]

        if tool_name not in self.tool_names:
            yield ToolReturnPart(
                tool_call_id=call.tool_call_id,
                tool_name=tool_name,
                content=f"Unknown tool name: {tool_name}",
                metadata={"rejected": False},
            )
            return

        approval = ApprovalRequest(
            tool_name=tool_name,
            tool_args=tool_args,
            agent_id=self.agent_id,
            corr_id=corr_id,
        )

        yield approval

        if not await approval.approved():
            yield ToolReturnPart(
                tool_call_id=call.tool_call_id,
                tool_name=tool_name,
                content="Tool call rejected",
                metadata={"rejected": True},
            )
            r
Agent._execute_subagent_task method · python · L475-L499 (25 LOC)
freeact/agent/core.py
    async def _execute_subagent_task(self, prompt: str, max_turns: int, corr_id: str) -> AsyncIterator[AgentEvent]:
        subagent_config = self._config.for_subagent()
        subagent = Agent(
            config=subagent_config,
            agent_id=f"sub-{uuid.uuid4().hex[:4]}",
            sandbox=self._sandbox,
            sandbox_config=self._sandbox_config,
            session_id=self._session_id,
        )
        runner = _SubagentRunner(subagent=subagent, semaphore=self._subagent_semaphore)

        last_response = ""
        try:
            async for item in runner.stream(prompt, max_turns=max_turns):
                yield item
                match item:
                    case Response(content=content):
                        last_response = content
        except Exception as e:
            error_content = f"Subagent error: {e}"
            yield ToolOutput(content=error_content, agent_id=self.agent_id, corr_id=corr_id)
            return

        final_content = awai
Agent._ipybox_execute_ipython_cell method · python · L501-L530 (30 LOC)
freeact/agent/core.py
    async def _ipybox_execute_ipython_cell(
        self, code: str
    ) -> AsyncIterator[ApprovalRequest | CodeExecutionOutputChunk | CodeExecutionOutput]:
        try:
            async with self._code_executor_lock:
                async for item in self._code_executor.stream(code, timeout=self._execution_timeout, chunks=True):
                    match item:
                        case ipybox.ApprovalRequest(
                            server_name=server_name,
                            tool_name=tool_name,
                            tool_args=tool_args,
                        ):
                            ptc_request = ApprovalRequest(
                                tool_name=f"{server_name}_{tool_name}",  # type: ignore[has-type]
                                tool_args=tool_args,  # type: ignore[has-type]
                                ptc=True,
                                agent_id=self.agent_id,
                            )
                            yield ptc_r
Agent._ipybox_reset method · python · L532-L538 (7 LOC)
freeact/agent/core.py
    async def _ipybox_reset(self) -> str:
        try:
            async with self._code_executor_lock:
                await self._code_executor.reset()
                return "Kernel reset successfully."
        except Exception as e:
            return f"Kernel reset failed: {str(e)}"
Agent._call_mcp_tool method · python · L540-L552 (13 LOC)
freeact/agent/core.py
    async def _call_mcp_tool(self, tool_name: str, tool_args: dict[str, object]) -> ToolResult:
        try:
            mcp_server = self._tool_mapping[tool_name]
            resolved_name = tool_name.removeprefix(f"{mcp_server.tool_prefix}_")
            result = await mcp_server.direct_call_tool(
                name=resolved_name,
                args=tool_args,
            )
            if tool_name in {"filesystem_read_text_file", "filesystem_read_multiple_files"}:
                return self._extract_file_content(result)
            return result
        except Exception as e:
            return f"MCP tool call failed: {str(e)}"
Agent._extract_file_content method · python · L555-L570 (16 LOC)
freeact/agent/core.py
    def _extract_file_content(result: ToolResult) -> ToolResult:
        match result:
            case {"content": content} as payload if len(payload) == 1:
                return content
            case str() as raw:
                try:
                    decoded = json.loads(raw)
                except json.JSONDecodeError:
                    return result
                match decoded:
                    case {"content": content} as payload if len(payload) == 1:
                        return content
                    case _:
                        return result
            case _:
                return result
About: code-quality intelligence by Repobility · https://repobility.com
Agent._append_message_history method · python · L572-L582 (11 LOC)
freeact/agent/core.py
    async def _append_message_history(self, messages: list[ModelMessage]) -> None:
        if not messages:
            return

        self._message_history.extend(messages)
        if self._session_store is not None:
            await arun(
                self._session_store.append_messages,
                agent_id=self._history_agent_id,
                messages=messages,
            )
CodeExecutionOutput.ptc_rejected method · python · L72-L79 (8 LOC)
freeact/agent/events.py
    def ptc_rejected(self) -> bool:
        """Whether the output indicates a rejected programmatic tool call."""
        if not self.text:
            return False

        # TODO: make detection of PTC rejection more robust ...
        pattern = r"ToolRunnerError: Approval request for \S+ rejected"
        return bool(re.search(pattern, self.text))
CodeExecutionOutput.format method · python · L81-L88 (8 LOC)
freeact/agent/events.py
    def format(self) -> str:
        """Format output with image markdown links."""
        parts: list[str] = []
        if self.text:
            parts.append(self.text)
        for image_path in self.images:
            parts.append(f"![Image]({image_path})")
        return "\n".join(parts) if parts else ""
ApprovalRequest.approve method · python · L105-L112 (8 LOC)
freeact/agent/events.py
    def approve(self, decision: bool) -> None:
        """Resolve this approval request.

        Args:
            decision: `True` to execute, `False` to reject and end
                the current agent turn.
        """
        self._future.set_result(decision)
SessionStore.__init__ method · python · L18-L26 (9 LOC)
freeact/agent/store.py
    def __init__(
        self,
        sessions_root: Path,
        session_id: str,
        flush_after_append: bool = False,
    ):
        self._sessions_root = sessions_root
        self._session_id = session_id
        self._flush_after_append = flush_after_append
SessionStore.append_messages method · python · L28-L53 (26 LOC)
freeact/agent/store.py
    def append_messages(self, agent_id: str, messages: list[ModelMessage]) -> None:
        """Append serialized messages to an agent-specific session log.

        Each message is written as a versioned JSONL envelope with a UTC
        timestamp. The session file is created on demand.

        Args:
            agent_id: Logical agent stream name (for example, `"main"` or
                `"sub-1234"`), used as the JSONL filename stem.
            messages: Messages to append in order.
        """
        session_dir = self._sessions_root / self._session_id
        session_dir.mkdir(parents=True, exist_ok=True)
        session_file = session_dir / f"{agent_id}.jsonl"

        with session_file.open("a", encoding="utf-8") as f:
            for message in messages:
                envelope = {
                    "v": 1,
                    "message": to_jsonable_python(message, bytes_mode="base64"),
                    "meta": {"ts": datetime.now(UTC).isoformat().replace("+00:00", "Z")
SessionStore.load_messages method · python · L55-L86 (32 LOC)
freeact/agent/store.py
    def load_messages(self, agent_id: str) -> list[ModelMessage]:
        """Load and validate all persisted messages for an agent.

        Returns an empty list when no session file exists. If the final line is
        truncated (for example from an interrupted write), that line is ignored.
        Earlier malformed lines raise `ValueError`.

        Args:
            agent_id: Logical agent stream name used to locate the JSONL file.

        Returns:
            Deserialized message history in append order.
        """
        session_file = self._sessions_root / self._session_id / f"{agent_id}.jsonl"
        if not session_file.exists():
            return []

        lines = session_file.read_text(encoding="utf-8").splitlines()
        serialized_messages: list[Any] = []

        for index, line in enumerate(lines):
            try:
                envelope = json.loads(line)
            except json.JSONDecodeError as e:
                if index == len(lines) - 1:
                
SessionStore.save_tool_result method · python · L88-L102 (15 LOC)
freeact/agent/store.py
    def save_tool_result(self, payload: bytes, extension: str) -> Path:
        """Persist a tool-result payload under the session's `tool-results/` directory."""
        safe_extension = self._sanitize_extension(extension)
        tool_results_dir = self._sessions_root / self._session_id / "tool-results"
        tool_results_dir.mkdir(parents=True, exist_ok=True)

        while True:
            file_id = uuid.uuid4().hex[:8]
            filename = f"{file_id}.{safe_extension}"
            path = tool_results_dir / filename
            if not path.exists():
                break

        path.write_bytes(payload)
        return path
Powered by Repobility — scan your code at https://repobility.com
SessionStore._validate_envelope method · python · L105-L126 (22 LOC)
freeact/agent/store.py
    def _validate_envelope(envelope: Any, line_no: int, session_file: Path) -> None:
        if not isinstance(envelope, dict):
            raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")

        required_keys = {"v", "message", "meta"}
        if not required_keys.issubset(envelope):
            raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")

        if envelope["v"] != 1:
            raise ValueError(f"Unsupported session envelope version on line {line_no} in {session_file}")

        meta = envelope["meta"]
        if not isinstance(meta, dict):
            raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")

        if "agent_id" in meta:
            raise ValueError(
                f"Invalid session envelope on line {line_no} in {session_file}: meta.agent_id is forbidden"
            )

        if "ts" not in meta:
            raise ValueError(f"Malformed JSONL line {line_no} in {session_file}")
SessionStore._sanitize_extension method · python · L129-L137 (9 LOC)
freeact/agent/store.py
    def _sanitize_extension(extension: str) -> str:
        raw = extension.lower().lstrip(".")

        if not raw:
            return "bin"

        if re.fullmatch(r"[a-z0-9]+", raw):
            return raw
        return "bin"
ToolResultMaterializer.__init__ method · python · L150-L161 (12 LOC)
freeact/agent/store.py
    def __init__(
        self,
        *,
        session_store: SessionStore,
        inline_max_bytes: int,
        preview_lines: int,
        working_dir: Path,
    ) -> None:
        self._session_store = session_store
        self._inline_max_bytes = inline_max_bytes
        self._preview_lines = preview_lines
        self._working_dir = working_dir
ToolResultMaterializer.materialize method · python · L163-L183 (21 LOC)
freeact/agent/store.py
    def materialize(self, content: ToolResult) -> ToolResult:
        canonical = self._canonicalize(content)
        actual_size_bytes = len(canonical.payload)

        if actual_size_bytes <= self._inline_max_bytes:
            return content

        try:
            stored_path = self._session_store.save_tool_result(canonical.payload, canonical.extension)
        except Exception:
            return content

        lines = [
            f"Tool result exceeded configured inline threshold ({self._inline_max_bytes} bytes).",
            f"Actual size: {actual_size_bytes} bytes.",
        ]
        if canonical.preview_lines:
            lines.append(f"Preview (first and last {self._preview_lines} lines):")
            lines.extend(canonical.preview_lines)
        lines.append(f"Full content saved to: {stored_path.relative_to(self._working_dir).as_posix()}")
        return "\n".join(lines)
ToolResultMaterializer._canonicalize method · python · L185-L211 (27 LOC)
freeact/agent/store.py
    def _canonicalize(self, content: ToolResult) -> _CanonicalToolResult:
        match content:
            case str() as text:
                return _CanonicalToolResult(
                    payload=text.encode("utf-8"),
                    extension="txt",
                    preview_lines=self._take_preview_lines(text),
                )
            case BinaryContent(data=data, media_type=media_type):
                return _CanonicalToolResult(
                    payload=data,
                    extension=self._media_type_to_ext(media_type),
                    preview_lines=[],
                )
            case _:
                normalized = to_jsonable_python(content, bytes_mode="base64")
                rendered = json.dumps(
                    normalized,
                    ensure_ascii=False,
                    indent=2,
                    sort_keys=True,
                )
                return _CanonicalToolResult(
                    payload=rendered.encode("utf-
ToolResultMaterializer._take_preview_lines method · python · L213-L230 (18 LOC)
freeact/agent/store.py
    def _take_preview_lines(self, text: str) -> list[str]:
        if self._preview_lines <= 0:
            return []

        lines = text.splitlines()
        if not lines:
            return ["<empty>"]

        boundary = self._preview_lines
        if len(lines) <= boundary * 2:
            return lines

        omitted = len(lines) - (boundary * 2)
        return [
            *lines[:boundary],
            f"... ({omitted} lines omitted) ...",
            *lines[-boundary:],
        ]
_SubagentRunner.stream method · python · L20-L47 (28 LOC)
freeact/agent/_subagent.py
    async def stream(self, prompt: str, max_turns: int) -> AsyncIterator[AgentEvent]:
        queue: asyncio.Queue[AgentEvent | Exception | None] = asyncio.Queue()

        async def run_subagent() -> None:
            try:
                async with self._semaphore:
                    async with self._subagent:
                        async for event in self._subagent.stream(prompt, max_turns=max_turns):
                            await queue.put(event)
            except Exception as e:
                queue.put_nowait(e)
            finally:
                queue.put_nowait(None)

        task = asyncio.create_task(run_subagent())
        try:
            while True:
                item = await queue.get()
                if item is None:
                    return
                if isinstance(item, Exception):
                    raise item
                yield item
        finally:
            if not task.done():
                task.cancel()
                with suppress(asy
_ResourceSupervisor.__init__ method · python · L14-L21 (8 LOC)
freeact/agent/_supervisor.py
    def __init__(self, resource: Any, name: str):
        self._resource = resource
        self._name = name
        self._task: asyncio.Task[None] | None = None
        self._ready = asyncio.Event()
        self._stop = asyncio.Event()
        self._entered_resource: Any | None = None
        self._error: Exception | None = None
Repobility · code-quality intelligence · https://repobility.com
_ResourceSupervisor.start method · python · L23-L34 (12 LOC)
freeact/agent/_supervisor.py
    async def start(self) -> Any:
        """Start resource task and wait until context is entered."""
        if self._task is not None:
            raise RuntimeError(f"Resource supervisor for '{self._name}' already started")

        self._task = asyncio.create_task(self._run(), name=f"resource-{self._name}")
        await self._ready.wait()

        if self._error is not None:
            raise RuntimeError(f"Failed to start resource '{self._name}'") from self._error

        return self._entered_resource
_ResourceSupervisor.stop method · python · L36-L42 (7 LOC)
freeact/agent/_supervisor.py
    async def stop(self) -> None:
        """Signal resource task to exit context and wait for completion."""
        if self._task is None:
            return

        self._stop.set()
        await self._task
_ResourceSupervisor._run method · python · L44-L53 (10 LOC)
freeact/agent/_supervisor.py
    async def _run(self) -> None:
        try:
            async with self._resource as entered_resource:
                self._entered_resource = entered_resource
                self._ready.set()
                await self._stop.wait()
        except Exception as e:
            self._error = e
            self._ready.set()
            raise
page 1 / 4next ›