Function bodies 212 total
RoleCheckRule.evaluate method · python · L42-L119 (78 LOC)src/agent_gov/rules/role_check.py
def evaluate(
self,
action: dict[str, object],
config: dict[str, object],
) -> RuleVerdict:
"""Check that the agent role satisfies at least one required role.
Parameters
----------
action:
Must contain the key specified by ``agent_role_field`` (default
``"agent_role"``). The value may be a ``str`` or ``list[str]``.
config:
Supported keys:
- ``required_roles``: ``list[str]`` — roles that satisfy the check.
- ``agent_role_field``: ``str`` — field name containing agent role(s).
Returns
-------
RuleVerdict
``passed=False`` when the agent role does not match any required role.
"""
required_roles: list[str] = _coerce_string_list(
config.get("required_roles", [])
)
role_field: str = str(config.get("agent_role_field", "agent_role"))
if not required_roles:
returnRoleCheckRule.validate_config method · python · L121-L133 (13 LOC)src/agent_gov/rules/role_check.py
def validate_config(self, config: dict[str, object]) -> list[str]:
"""Validate that ``required_roles`` is a non-empty list of strings."""
errors: list[str] = []
required_roles = config.get("required_roles")
if required_roles is None:
errors.append("role_check: 'required_roles' is not configured.")
elif not isinstance(required_roles, list):
errors.append(
f"role_check: 'required_roles' must be a list, got {type(required_roles).__name__}."
)
elif not required_roles:
errors.append("role_check: 'required_roles' must not be empty.")
return errors_coerce_string_list function · python · L136-L142 (7 LOC)src/agent_gov/rules/role_check.py
def _coerce_string_list(value: object) -> list[str]:
"""Convert a string or list-of-strings to a guaranteed list of strings."""
if isinstance(value, str):
return [value]
if isinstance(value, list):
return [str(item) for item in value]
return [str(value)]TraceSimulationResult.to_dict method · python · L79-L93 (15 LOC)src/agent_gov/simulation/policy_simulator.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"trace_id": self.trace_id,
"proposed_blocked": self.proposed_result.blocked_events,
"proposed_passed": self.proposed_result.passed_events,
"baseline_blocked": (
self.baseline_result.blocked_events if self.baseline_result else 0
),
"baseline_passed": (
self.baseline_result.passed_events if self.baseline_result else None
),
"new_blocks": self.new_blocks,
"new_passes": self.new_passes,
}SimulationReport.to_dict method · python · L134-L146 (13 LOC)src/agent_gov/simulation/policy_simulator.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"label": self.label,
"proposed_policy": self.proposed_policy_name,
"baseline_policy": self.baseline_policy_name,
"total_events": self.total_events,
"would_block_count": self.would_block_count,
"baseline_block_count": self.baseline_block_count,
"false_positive_rate": self.false_positive_rate,
"impact_score": self.impact_score,
"trace_results": [r.to_dict() for r in self.trace_results],
}PolicySimulator.simulate method · python · L186-L276 (91 LOC)src/agent_gov/simulation/policy_simulator.py
def simulate(self, config: SimulationConfig) -> SimulationReport:
"""Run the simulation for a given configuration.
Parameters
----------
config:
Simulation configuration with proposed policy, optional baseline,
and list of traces to replay.
Returns
-------
SimulationReport
Aggregated statistics comparing proposed vs. baseline policy impact.
"""
trace_results: list[TraceSimulationResult] = []
total_events = 0
would_block_count = 0
baseline_block_count = 0
new_blocks_total = 0
for trace in config.traces:
# Replay under proposed policy
proposed_result = self._replayer.replay(
trace, config.proposed_policy, self._evaluator
)
# Replay under baseline policy (if provided)
baseline_result: Optional[TraceReplayResult] = None
if config.baseline_policy is nTraceEvent.to_dict method · python · L48-L55 (8 LOC)src/agent_gov/simulation/trace_replayer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"event_id": self.event_id,
"action": self.action,
"timestamp": self.timestamp,
"metadata": self.metadata,
}Repobility — same analyzer, your code, free for public repos · /scan/
AgentTrace.to_dict method · python · L84-L92 (9 LOC)src/agent_gov/simulation/trace_replayer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"trace_id": self.trace_id,
"agent_id": self.agent_id,
"source": self.source,
"event_count": self.event_count,
"events": [e.to_dict() for e in self.events],
}TraceReplayResult.to_dict method · python · L126-L135 (10 LOC)src/agent_gov/simulation/trace_replayer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"trace_id": self.trace_id,
"total_events": self.total_events,
"blocked_events": self.blocked_events,
"passed_events": self.passed_events,
"block_rate": self.block_rate,
"event_results": self.event_results,
}TraceReplayer.load_dict method · python · L157-L214 (58 LOC)src/agent_gov/simulation/trace_replayer.py
def load_dict(self, data: dict[str, object], *, source: str = "") -> AgentTrace:
"""Build an AgentTrace from a plain dictionary.
Parameters
----------
data:
Dictionary containing trace data.
source:
Optional label identifying where this trace came from.
Returns
-------
AgentTrace
Parsed trace object.
Raises
------
ValueError
If the data is missing required fields or is malformed.
"""
if not isinstance(data, dict):
raise ValueError("Trace data must be a dictionary.")
raw_events = data.get("events", [])
if not isinstance(raw_events, list):
raise ValueError("Trace 'events' field must be a list.")
trace_id = str(data.get("trace_id", f"trace-{datetime.now(timezone.utc).timestamp()}"))
agent_id = str(data.get("agent_id", "unknown-agent"))
events: list[TraceEvent] = []
TraceReplayer.load_json method · python · L216-L235 (20 LOC)src/agent_gov/simulation/trace_replayer.py
def load_json(self, json_text: str, *, source: str = "") -> AgentTrace:
"""Parse a JSON string into an AgentTrace.
Parameters
----------
json_text:
JSON string conforming to the trace format.
source:
Optional label identifying where this trace came from.
Raises
------
ValueError
If the JSON is invalid or the structure is malformed.
"""
try:
data = json.loads(json_text)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON trace: {exc}") from exc
return self.load_dict(data, source=source)TraceReplayer.replay method · python · L237-L278 (42 LOC)src/agent_gov/simulation/trace_replayer.py
def replay(
self,
trace: AgentTrace,
policy: object,
evaluator: object,
) -> TraceReplayResult:
"""Replay all events in a trace through the given policy evaluator.
Parameters
----------
trace:
The agent trace to replay.
policy:
A PolicyConfig instance to evaluate events against.
evaluator:
A PolicyEvaluator instance with an ``evaluate(policy, action)`` method.
Returns
-------
TraceReplayResult
Counts of blocked and passed events with per-event results.
"""
blocked = 0
passed = 0
event_results: dict[str, bool] = {}
for event in trace.events:
report = evaluator.evaluate(policy, event.action)
event_passed = bool(report.passed)
event_results[event.event_id] = event_passed
if event_passed:
passed += 1
else:
‹ prevpage 5 / 5