← back to invincible-jha__agent-gov

Function bodies 212 total

All specs Real LLM only Function bodies
RoleCheckRule.evaluate method · python · L42-L119 (78 LOC)
src/agent_gov/rules/role_check.py
    def evaluate(
        self,
        action: dict[str, object],
        config: dict[str, object],
    ) -> RuleVerdict:
        """Check that the agent role satisfies at least one required role.

        Parameters
        ----------
        action:
            Must contain the key specified by ``agent_role_field`` (default
            ``"agent_role"``).  The value may be a ``str`` or ``list[str]``.
        config:
            Supported keys:
            - ``required_roles``: ``list[str]`` — roles that satisfy the check.
            - ``agent_role_field``: ``str`` — field name containing agent role(s).

        Returns
        -------
        RuleVerdict
            ``passed=False`` when the agent role does not match any required role.
        """
        required_roles: list[str] = _coerce_string_list(
            config.get("required_roles", [])
        )
        role_field: str = str(config.get("agent_role_field", "agent_role"))

        if not required_roles:
            return
RoleCheckRule.validate_config method · python · L121-L133 (13 LOC)
src/agent_gov/rules/role_check.py
    def validate_config(self, config: dict[str, object]) -> list[str]:
        """Validate that ``required_roles`` is a non-empty list of strings."""
        errors: list[str] = []
        required_roles = config.get("required_roles")
        if required_roles is None:
            errors.append("role_check: 'required_roles' is not configured.")
        elif not isinstance(required_roles, list):
            errors.append(
                f"role_check: 'required_roles' must be a list, got {type(required_roles).__name__}."
            )
        elif not required_roles:
            errors.append("role_check: 'required_roles' must not be empty.")
        return errors
_coerce_string_list function · python · L136-L142 (7 LOC)
src/agent_gov/rules/role_check.py
def _coerce_string_list(value: object) -> list[str]:
    """Convert a string or list-of-strings to a guaranteed list of strings."""
    if isinstance(value, str):
        return [value]
    if isinstance(value, list):
        return [str(item) for item in value]
    return [str(value)]
TraceSimulationResult.to_dict method · python · L79-L93 (15 LOC)
src/agent_gov/simulation/policy_simulator.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "trace_id": self.trace_id,
            "proposed_blocked": self.proposed_result.blocked_events,
            "proposed_passed": self.proposed_result.passed_events,
            "baseline_blocked": (
                self.baseline_result.blocked_events if self.baseline_result else 0
            ),
            "baseline_passed": (
                self.baseline_result.passed_events if self.baseline_result else None
            ),
            "new_blocks": self.new_blocks,
            "new_passes": self.new_passes,
        }
SimulationReport.to_dict method · python · L134-L146 (13 LOC)
src/agent_gov/simulation/policy_simulator.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "label": self.label,
            "proposed_policy": self.proposed_policy_name,
            "baseline_policy": self.baseline_policy_name,
            "total_events": self.total_events,
            "would_block_count": self.would_block_count,
            "baseline_block_count": self.baseline_block_count,
            "false_positive_rate": self.false_positive_rate,
            "impact_score": self.impact_score,
            "trace_results": [r.to_dict() for r in self.trace_results],
        }
PolicySimulator.simulate method · python · L186-L276 (91 LOC)
src/agent_gov/simulation/policy_simulator.py
    def simulate(self, config: SimulationConfig) -> SimulationReport:
        """Run the simulation for a given configuration.

        Parameters
        ----------
        config:
            Simulation configuration with proposed policy, optional baseline,
            and list of traces to replay.

        Returns
        -------
        SimulationReport
            Aggregated statistics comparing proposed vs. baseline policy impact.
        """
        trace_results: list[TraceSimulationResult] = []
        total_events = 0
        would_block_count = 0
        baseline_block_count = 0
        new_blocks_total = 0

        for trace in config.traces:
            # Replay under proposed policy
            proposed_result = self._replayer.replay(
                trace, config.proposed_policy, self._evaluator
            )

            # Replay under baseline policy (if provided)
            baseline_result: Optional[TraceReplayResult] = None
            if config.baseline_policy is n
TraceEvent.to_dict method · python · L48-L55 (8 LOC)
src/agent_gov/simulation/trace_replayer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "event_id": self.event_id,
            "action": self.action,
            "timestamp": self.timestamp,
            "metadata": self.metadata,
        }
Repobility — same analyzer, your code, free for public repos · /scan/
AgentTrace.to_dict method · python · L84-L92 (9 LOC)
src/agent_gov/simulation/trace_replayer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "trace_id": self.trace_id,
            "agent_id": self.agent_id,
            "source": self.source,
            "event_count": self.event_count,
            "events": [e.to_dict() for e in self.events],
        }
TraceReplayResult.to_dict method · python · L126-L135 (10 LOC)
src/agent_gov/simulation/trace_replayer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "trace_id": self.trace_id,
            "total_events": self.total_events,
            "blocked_events": self.blocked_events,
            "passed_events": self.passed_events,
            "block_rate": self.block_rate,
            "event_results": self.event_results,
        }
TraceReplayer.load_dict method · python · L157-L214 (58 LOC)
src/agent_gov/simulation/trace_replayer.py
    def load_dict(self, data: dict[str, object], *, source: str = "") -> AgentTrace:
        """Build an AgentTrace from a plain dictionary.

        Parameters
        ----------
        data:
            Dictionary containing trace data.
        source:
            Optional label identifying where this trace came from.

        Returns
        -------
        AgentTrace
            Parsed trace object.

        Raises
        ------
        ValueError
            If the data is missing required fields or is malformed.
        """
        if not isinstance(data, dict):
            raise ValueError("Trace data must be a dictionary.")

        raw_events = data.get("events", [])
        if not isinstance(raw_events, list):
            raise ValueError("Trace 'events' field must be a list.")

        trace_id = str(data.get("trace_id", f"trace-{datetime.now(timezone.utc).timestamp()}"))
        agent_id = str(data.get("agent_id", "unknown-agent"))

        events: list[TraceEvent] = []
 
TraceReplayer.load_json method · python · L216-L235 (20 LOC)
src/agent_gov/simulation/trace_replayer.py
    def load_json(self, json_text: str, *, source: str = "") -> AgentTrace:
        """Parse a JSON string into an AgentTrace.

        Parameters
        ----------
        json_text:
            JSON string conforming to the trace format.
        source:
            Optional label identifying where this trace came from.

        Raises
        ------
        ValueError
            If the JSON is invalid or the structure is malformed.
        """
        try:
            data = json.loads(json_text)
        except json.JSONDecodeError as exc:
            raise ValueError(f"Invalid JSON trace: {exc}") from exc
        return self.load_dict(data, source=source)
TraceReplayer.replay method · python · L237-L278 (42 LOC)
src/agent_gov/simulation/trace_replayer.py
    def replay(
        self,
        trace: AgentTrace,
        policy: object,
        evaluator: object,
    ) -> TraceReplayResult:
        """Replay all events in a trace through the given policy evaluator.

        Parameters
        ----------
        trace:
            The agent trace to replay.
        policy:
            A PolicyConfig instance to evaluate events against.
        evaluator:
            A PolicyEvaluator instance with an ``evaluate(policy, action)`` method.

        Returns
        -------
        TraceReplayResult
            Counts of blocked and passed events with per-event results.
        """
        blocked = 0
        passed = 0
        event_results: dict[str, bool] = {}

        for event in trace.events:
            report = evaluator.evaluate(policy, event.action)
            event_passed = bool(report.passed)
            event_results[event.event_id] = event_passed
            if event_passed:
                passed += 1
            else:
         
‹ prevpage 5 / 5