Function bodies 212 total
demo_policy_evaluation function · python · L33-L76 (44 LOC)examples/01_quickstart.py
def demo_policy_evaluation() -> None:
"""Demonstrate loading a policy and evaluating actions."""
print("\n=== Policy Evaluation ===\n")
# Load the standard policy pack bundled with agent-gov
packs_dir = Path(agent_gov.__file__).parent / "packs"
loader = PolicyLoader()
policy = loader.load_file(packs_dir / "standard.yaml")
print(f"Loaded policy: {policy.name!r} (v{policy.version})")
print(f"Rules: {[r.name for r in policy.rules]}")
evaluator = PolicyEvaluator()
# Action 1: a plain search — should pass
safe_action: dict[str, object] = {
"type": "search",
"query": "quarterly revenue trends",
"agent_role": "operator",
"cost": 0.02,
}
report1 = evaluator.evaluate(policy, safe_action)
print(f"\nSafe action: {report1.summary()}")
# Action 2: action with a blocked keyword — should fail
risky_action: dict[str, object] = {
"type": "database_query",
"query": "drop table users",
demo_compliance_frameworks function · python · L79-L111 (33 LOC)examples/01_quickstart.py
def demo_compliance_frameworks() -> None:
"""Demonstrate running compliance framework checks."""
print("\n=== Compliance Frameworks ===\n")
# EU AI Act check with partial evidence
eu_framework = EuAiActFramework()
eu_evidence: dict[str, object] = {
"A6": {"status": "pass", "evidence": "System classified as limited risk."},
"A9": {"status": "pass", "evidence": "Risk management plan documented in confluence."},
"A10": {"status": "fail", "evidence": "Training data audit not yet completed."},
"A13": {"status": "pass", "evidence": "Model card published; outputs include confidence scores."},
"A14": {"status": "pass", "evidence": "Human review required for all critical decisions."},
"A15": {"status": "unknown"},
"A52": {"status": "pass", "evidence": "UI displays 'AI-generated' badge on all responses."},
"A60": {"status": "unknown", "evidence": "Awaiting EU database portal access."},
}
eu_report = eu_fdemo_audit_logging function · python · L114-L150 (37 LOC)examples/01_quickstart.py
def demo_audit_logging() -> None:
"""Demonstrate audit log write and read."""
print("\n=== Audit Logging ===\n")
with tempfile.TemporaryDirectory() as tmpdir:
log_path = Path(tmpdir) / "audit.jsonl"
audit_logger = AuditLogger(log_path)
# Load and evaluate some actions
packs_dir = Path(agent_gov.__file__).parent / "packs"
loader = PolicyLoader()
policy = loader.load_file(packs_dir / "minimal.yaml")
evaluator = PolicyEvaluator()
actions: list[dict[str, object]] = [
{"type": "search", "query": "product roadmap", "agent_id": "agent-alpha"},
{"type": "file_read", "path": "/data/config.json", "agent_id": "agent-beta"},
{"type": "api_call", "endpoint": "/api/users", "agent_id": "agent-alpha"},
]
for action in actions:
report = evaluator.evaluate(policy, action)
audit_logger.log_from_report(
report,
agent_id=main function · python · L153-L161 (9 LOC)examples/01_quickstart.py
def main() -> None:
print(f"agent-gov v{agent_gov.__version__}")
print("=" * 50)
demo_policy_evaluation()
demo_compliance_frameworks()
demo_audit_logging()
print("\nQuickstart complete.")AnthropicGovernance._record method · python · L28-L36 (9 LOC)src/agent_gov/adapters/anthropic_sdk.py
def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
"""Append an entry to the audit log with a UTC timestamp."""
entry: dict[str, Any] = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"event_type": event_type,
"result": result,
**context,
}
self._audit_log.append(entry)AnthropicGovernance.check_message method · python · L38-L45 (8 LOC)src/agent_gov/adapters/anthropic_sdk.py
def check_message(self, role: str, content: str) -> dict[str, Any]:
"""Evaluate an Anthropic message (by role) against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Message passed policy checks."}
self._record("check_message", result, role=role, content_length=len(content))
return resultAnthropicGovernance.check_tool_use method · python · L47-L54 (8 LOC)src/agent_gov/adapters/anthropic_sdk.py
def check_tool_use(self, tool_name: str, input: Any) -> dict[str, Any]:
"""Evaluate an Anthropic tool_use content block against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Tool use passed policy checks."}
self._record("check_tool_use", result, tool_name=tool_name)
return resultMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
AnthropicGovernance.check_content method · python · L56-L64 (9 LOC)src/agent_gov/adapters/anthropic_sdk.py
def check_content(self, content_type: str, content: Any) -> dict[str, Any]:
"""Evaluate a content block by type against the policy engine.
Returns a governance decision with allowed status and reason.
"""
content_str = str(content) if content is not None else ""
result: dict[str, Any] = {"allowed": True, "reason": "Content passed policy checks."}
self._record("check_content", result, content_type=content_type, content_length=len(content_str))
return resultCrewAIGovernance._record method · python · L28-L36 (9 LOC)src/agent_gov/adapters/crewai.py
def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
"""Append an entry to the audit log with a UTC timestamp."""
entry: dict[str, Any] = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"event_type": event_type,
"result": result,
**context,
}
self._audit_log.append(entry)CrewAIGovernance.check_task method · python · L38-L45 (8 LOC)src/agent_gov/adapters/crewai.py
def check_task(self, task_name: str, task_input: Any) -> dict[str, Any]:
"""Evaluate a CrewAI task against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Task passed policy checks."}
self._record("check_task", result, task_name=task_name)
return resultCrewAIGovernance.check_agent_action method · python · L47-L54 (8 LOC)src/agent_gov/adapters/crewai.py
def check_agent_action(self, agent_name: str, action: str) -> dict[str, Any]:
"""Evaluate an agent action against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Agent action passed policy checks."}
self._record("check_agent_action", result, agent_name=agent_name, action=action)
return resultCrewAIGovernance.check_delegation method · python · L56-L63 (8 LOC)src/agent_gov/adapters/crewai.py
def check_delegation(self, from_agent: str, to_agent: str) -> dict[str, Any]:
"""Evaluate an agent-to-agent delegation against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Delegation passed policy checks."}
self._record("check_delegation", result, from_agent=from_agent, to_agent=to_agent)
return resultLangChainGovernance._record method · python · L28-L36 (9 LOC)src/agent_gov/adapters/langchain.py
def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
"""Append an entry to the audit log with a UTC timestamp."""
entry: dict[str, Any] = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"event_type": event_type,
"result": result,
**context,
}
self._audit_log.append(entry)LangChainGovernance.check_prompt method · python · L38-L45 (8 LOC)src/agent_gov/adapters/langchain.py
def check_prompt(self, prompt: str) -> dict[str, Any]:
"""Evaluate a user or system prompt against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Prompt passed policy checks."}
self._record("check_prompt", result, prompt_length=len(prompt))
return resultLangChainGovernance.check_output method · python · L47-L54 (8 LOC)src/agent_gov/adapters/langchain.py
def check_output(self, output: str) -> dict[str, Any]:
"""Evaluate an LLM output against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Output passed policy checks."}
self._record("check_output", result, output_length=len(output))
return resultRepobility — same analyzer, your code, free for public repos · /scan/
LangChainGovernance.check_tool_call method · python · L56-L63 (8 LOC)src/agent_gov/adapters/langchain.py
def check_tool_call(self, tool_name: str, args: dict[str, Any]) -> dict[str, Any]:
"""Evaluate a tool call against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Tool call passed policy checks."}
self._record("check_tool_call", result, tool_name=tool_name, arg_count=len(args))
return resultMicrosoftGovernance._record method · python · L28-L36 (9 LOC)src/agent_gov/adapters/microsoft_agents.py
def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
"""Append an entry to the audit log with a UTC timestamp."""
entry: dict[str, Any] = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"event_type": event_type,
"result": result,
**context,
}
self._audit_log.append(entry)MicrosoftGovernance.check_activity method · python · L38-L45 (8 LOC)src/agent_gov/adapters/microsoft_agents.py
def check_activity(self, activity_type: str, data: Any) -> dict[str, Any]:
"""Evaluate a Bot Framework activity against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Activity passed policy checks."}
self._record("check_activity", result, activity_type=activity_type)
return resultMicrosoftGovernance.check_dialog method · python · L47-L54 (8 LOC)src/agent_gov/adapters/microsoft_agents.py
def check_dialog(self, dialog_id: str, step: str) -> dict[str, Any]:
"""Evaluate a dialog waterfall step against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Dialog step passed policy checks."}
self._record("check_dialog", result, dialog_id=dialog_id, step=step)
return resultMicrosoftGovernance.check_turn method · python · L56-L63 (8 LOC)src/agent_gov/adapters/microsoft_agents.py
def check_turn(self, turn_id: str, content: str) -> dict[str, Any]:
"""Evaluate a conversation turn's content against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Turn passed policy checks."}
self._record("check_turn", result, turn_id=turn_id, content_length=len(content))
return resultOpenAIGovernance._record method · python · L28-L36 (9 LOC)src/agent_gov/adapters/openai_agents.py
def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
"""Append an entry to the audit log with a UTC timestamp."""
entry: dict[str, Any] = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"event_type": event_type,
"result": result,
**context,
}
self._audit_log.append(entry)OpenAIGovernance.check_message method · python · L38-L45 (8 LOC)src/agent_gov/adapters/openai_agents.py
def check_message(self, role: str, content: str) -> dict[str, Any]:
"""Evaluate a message (by role) against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Message passed policy checks."}
self._record("check_message", result, role=role, content_length=len(content))
return resultOpenAIGovernance.check_tool_use method · python · L47-L54 (8 LOC)src/agent_gov/adapters/openai_agents.py
def check_tool_use(self, tool_name: str, args: dict[str, Any]) -> dict[str, Any]:
"""Evaluate a tool use request against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Tool use passed policy checks."}
self._record("check_tool_use", result, tool_name=tool_name, arg_count=len(args))
return resultProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
OpenAIGovernance.check_handoff method · python · L56-L63 (8 LOC)src/agent_gov/adapters/openai_agents.py
def check_handoff(self, from_agent: str, to_agent: str) -> dict[str, Any]:
"""Evaluate an agent handoff against the policy engine.
Returns a governance decision with allowed status and reason.
"""
result: dict[str, Any] = {"allowed": True, "reason": "Handoff passed policy checks."}
self._record("check_handoff", result, from_agent=from_agent, to_agent=to_agent)
return resultAuditEntry.to_json method · python · L48-L67 (20 LOC)src/agent_gov/audit/entry.py
def to_json(self) -> str:
"""Serialise the entry to a JSON string (single line, no newlines).
The ``timestamp`` field is rendered as an ISO 8601 string.
Returns
-------
str
JSON representation suitable for JSONL storage.
"""
data: dict[str, object] = {
"agent_id": self.agent_id,
"action_type": self.action_type,
"action_data": self.action_data,
"verdict": self.verdict,
"policy_name": self.policy_name,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
}
return json.dumps(data, ensure_ascii=False, separators=(",", ":"))AuditEntry.from_json method · python · L70-L125 (56 LOC)src/agent_gov/audit/entry.py
def from_json(cls, json_string: str) -> "AuditEntry":
"""Deserialise an entry from a JSON string.
Parameters
----------
json_string:
A single JSONL line produced by :meth:`to_json`.
Returns
-------
AuditEntry
Reconstructed entry.
Raises
------
ValueError
If the JSON is malformed or missing required fields.
"""
try:
data = json.loads(json_string)
except json.JSONDecodeError as exc:
raise ValueError(f"Malformed JSON audit entry: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("Audit entry JSON must be a JSON object (dict).")
required_fields = {"agent_id", "action_type", "action_data", "verdict", "policy_name"}
missing = required_fields - data.keys()
if missing:
raise ValueError(f"Audit entry missing required fields: {missing!r}")
raw_ts AuditEntry.__repr__ method · python · L127-L133 (7 LOC)src/agent_gov/audit/entry.py
def __repr__(self) -> str:
return (
f"AuditEntry(agent_id={self.agent_id!r}, "
f"action_type={self.action_type!r}, "
f"verdict={self.verdict!r}, "
f"timestamp={self.timestamp.isoformat()!r})"
)AuditLogger.log method · python · L56-L68 (13 LOC)src/agent_gov/audit/logger.py
def log(self, entry: AuditEntry) -> None:
"""Append a single audit entry to the log file.
Parameters
----------
entry:
The entry to persist.
"""
line = entry.to_json() + "\n"
with self._lock:
self._path.parent.mkdir(parents=True, exist_ok=True)
with self._path.open("a", encoding="utf-8") as fh:
fh.write(line)AuditLogger.log_from_report method · python · L70-L111 (42 LOC)src/agent_gov/audit/logger.py
def log_from_report(
self,
report: object,
agent_id: str,
*,
metadata: Optional[dict[str, str]] = None,
) -> AuditEntry:
"""Create and log an entry from an :class:`~agent_gov.policy.result.EvaluationReport`.
Parameters
----------
report:
An :class:`~agent_gov.policy.result.EvaluationReport` instance.
agent_id:
The identifier of the agent that performed the action.
metadata:
Optional extra metadata to attach to the entry.
Returns
-------
AuditEntry
The entry that was logged.
"""
# Import here to avoid circular imports at module level
from agent_gov.policy.result import EvaluationReport
if not isinstance(report, EvaluationReport):
raise TypeError(
f"Expected EvaluationReport, got {type(report).__name__}."
)
entry = AuditEntry(
aAuditLogger.read method · python · L113-L137 (25 LOC)src/agent_gov/audit/logger.py
def read(self) -> list[AuditEntry]:
"""Read all audit entries from the log file.
Returns
-------
list[AuditEntry]
All entries in chronological order (oldest first). Returns an
empty list if the file does not exist.
"""
if not self._path.exists():
return []
entries: list[AuditEntry] = []
with self._path.open("r", encoding="utf-8") as fh:
for line_number, raw_line in enumerate(fh, start=1):
stripped = raw_line.strip()
if not stripped:
continue
try:
entries.append(AuditEntry.from_json(stripped))
except ValueError:
# Corrupted lines are skipped to keep the reader resilient
pass
return entriesAuditLogger.count method · python · L139-L147 (9 LOC)src/agent_gov/audit/logger.py
def count(self) -> int:
"""Return the total number of valid entries in the log file.
Returns
-------
int
Entry count, or ``0`` if the file does not exist.
"""
return len(self.read())Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
AuditLogger.query method · python · L149-L181 (33 LOC)src/agent_gov/audit/logger.py
def query(
self,
filters: dict[str, object],
) -> list[AuditEntry]:
"""Return entries matching all supplied filter criteria.
Supported filter keys
----------------------
agent_id : str
Exact match on agent ID.
action_type : str
Exact match on action type.
verdict : str
Exact match on verdict (``"pass"`` or ``"fail"``).
policy_name : str
Exact match on policy name.
since : datetime
Only return entries at or after this datetime (UTC).
until : datetime
Only return entries at or before this datetime (UTC).
Parameters
----------
filters:
Dict of filter criteria. Unknown keys are ignored.
Returns
-------
list[AuditEntry]
Matching entries in chronological order.
"""
entries = self.read()
return _apply_filters(entries, filters)_apply_filters function · python · L184-L215 (32 LOC)src/agent_gov/audit/logger.py
def _apply_filters(
entries: list[AuditEntry],
filters: dict[str, object],
) -> list[AuditEntry]:
"""Apply filter criteria to a list of entries."""
result = entries
agent_id = filters.get("agent_id")
if agent_id is not None:
result = [e for e in result if e.agent_id == str(agent_id)]
action_type = filters.get("action_type")
if action_type is not None:
result = [e for e in result if e.action_type == str(action_type)]
verdict = filters.get("verdict")
if verdict is not None:
result = [e for e in result if e.verdict == str(verdict)]
policy_name = filters.get("policy_name")
if policy_name is not None:
result = [e for e in result if e.policy_name == str(policy_name)]
since = filters.get("since")
if since is not None and isinstance(since, datetime):
result = [e for e in result if e.timestamp >= since]
until = filters.get("until")
if until is not None and isinstance(until, datetime):
AuditReader.all method · python · L52-L60 (9 LOC)src/agent_gov/audit/reader.py
def all(self) -> list[AuditEntry]:
"""Return all audit entries in chronological order.
Returns
-------
list[AuditEntry]
All entries, oldest first.
"""
return self._logger.read()AuditReader.last method · python · L62-L76 (15 LOC)src/agent_gov/audit/reader.py
def last(self, count: int) -> list[AuditEntry]:
"""Return the most recent ``count`` entries.
Parameters
----------
count:
Maximum number of entries to return.
Returns
-------
list[AuditEntry]
Up to ``count`` entries, most recent last.
"""
entries = self._logger.read()
return entries[-count:] if count > 0 else []AuditReader.query method · python · L78-L127 (50 LOC)src/agent_gov/audit/reader.py
def query(
self,
*,
agent_id: Optional[str] = None,
action_type: Optional[str] = None,
verdict: Optional[str] = None,
policy_name: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
) -> list[AuditEntry]:
"""Return entries matching the supplied filter criteria.
All provided filters are combined with AND logic.
Parameters
----------
agent_id:
Filter by exact agent ID.
action_type:
Filter by exact action type string.
verdict:
Filter by verdict (``"pass"`` or ``"fail"``).
policy_name:
Filter by policy name.
since:
Return only entries at or after this UTC datetime.
until:
Return only entries at or before this UTC datetime.
Returns
-------
list[AuditEntry]
Matching entries in chronological ordeAuditReader.stats method · python · L129-L174 (46 LOC)src/agent_gov/audit/reader.py
def stats(self) -> dict[str, object]:
"""Return aggregate statistics about the audit log.
Returns
-------
dict[str, object]
Dictionary with keys:
- ``total`` — total entry count
- ``pass_count`` — entries with verdict ``"pass"``
- ``fail_count`` — entries with verdict ``"fail"``
- ``agents`` — sorted list of distinct agent IDs
- ``action_types`` — sorted list of distinct action types
- ``policies`` — sorted list of distinct policy names
- ``earliest`` — ISO timestamp of earliest entry (or ``None``)
- ``latest`` — ISO timestamp of latest entry (or ``None``)
"""
entries = self._logger.read()
if not entries:
return {
"total": 0,
"pass_count": 0,
"fail_count": 0,
"agents": [],
"action_types": [],
"policies": [],
build_filter function · python · L35-L89 (55 LOC)src/agent_gov/audit/search.py
def build_filter(
agent_id: Optional[str] = None,
action_type: Optional[str] = None,
verdict: Optional[str] = None,
policy_name: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
) -> FilterFn:
"""Build a composite filter function from optional criteria.
All supplied criteria are combined with AND logic — an entry must
satisfy every non-``None`` criterion to pass the filter.
Parameters
----------
agent_id:
Exact match on :attr:`AuditEntry.agent_id`.
action_type:
Exact match on :attr:`AuditEntry.action_type`.
verdict:
Exact match on :attr:`AuditEntry.verdict` (``"pass"`` or ``"fail"``).
policy_name:
Exact match on :attr:`AuditEntry.policy_name`.
since:
Include only entries whose :attr:`AuditEntry.timestamp` is at or
after this datetime.
until:
Include only entries whose :attr:`AuditEntry.timestamp` is at or
before tsearch_entries function · python · L92-L123 (32 LOC)src/agent_gov/audit/search.py
def search_entries(
entries: list[AuditEntry],
filter_fn: FilterFn,
limit: int = 100,
) -> list[AuditEntry]:
"""Apply a filter function to a list of entries, returning up to ``limit`` matches.
Entries are yielded in the same order they appear in ``entries``.
Parameters
----------
entries:
Source list of audit entries to search.
filter_fn:
A callable produced by :func:`build_filter` or any function with
the same signature ``(AuditEntry) -> bool``.
limit:
Maximum number of matching entries to return. Use a large value
(e.g. ``999_999``) to effectively return all matches.
Returns
-------
list[AuditEntry]
Matching entries in original order, capped at ``limit``.
"""
results: list[AuditEntry] = []
for entry in entries:
if filter_fn(entry):
results.append(entry)
if len(results) >= limit:
break
return resultsMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
aggregate_verdicts function · python · L126-L144 (19 LOC)src/agent_gov/audit/search.py
def aggregate_verdicts(entries: list[AuditEntry]) -> dict[str, int]:
"""Count entries grouped by verdict.
Parameters
----------
entries:
Source list of audit entries.
Returns
-------
dict[str, int]
Mapping of verdict string to occurrence count, e.g.
``{"pass": 40, "fail": 10}``. Only verdicts that actually
appear in ``entries`` are included.
"""
counts: dict[str, int] = {}
for entry in entries:
counts[entry.verdict] = counts.get(entry.verdict, 0) + 1
return countsaggregate_by_agent function · python · L147-L164 (18 LOC)src/agent_gov/audit/search.py
def aggregate_by_agent(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
"""Group entries by ``agent_id``.
Parameters
----------
entries:
Source list of audit entries.
Returns
-------
dict[str, list[AuditEntry]]
Mapping of agent ID to the list of entries belonging to that
agent, preserving original order within each group.
"""
groups: dict[str, list[AuditEntry]] = {}
for entry in entries:
groups.setdefault(entry.agent_id, []).append(entry)
return groupsaggregate_by_action_type function · python · L167-L183 (17 LOC)src/agent_gov/audit/search.py
def aggregate_by_action_type(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
"""Group entries by ``action_type``.
Parameters
----------
entries:
Source list of audit entries.
Returns
-------
dict[str, list[AuditEntry]]
Mapping of action type to the list of entries for that type.
"""
groups: dict[str, list[AuditEntry]] = {}
for entry in entries:
groups.setdefault(entry.action_type, []).append(entry)
return groupsaggregate_by_policy function · python · L186-L203 (18 LOC)src/agent_gov/audit/search.py
def aggregate_by_policy(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
"""Group entries by ``policy_name``.
Parameters
----------
entries:
Source list of audit entries.
Returns
-------
dict[str, list[AuditEntry]]
Mapping of policy name to the list of entries evaluated under
that policy.
"""
groups: dict[str, list[AuditEntry]] = {}
for entry in entries:
groups.setdefault(entry.policy_name, []).append(entry)
return groupsCompiledRule.to_dict method · python · L90-L99 (10 LOC)src/agent_gov/authoring/nl_compiler.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"name": self.name,
"type": self.rule_type,
"action": self.action,
"target": self.target,
"severity": self.severity,
"params": self.params,
}CompiledRule.to_yaml_fragment method · python · L101-L122 (22 LOC)src/agent_gov/authoring/nl_compiler.py
def to_yaml_fragment(self, indent: int = 2) -> str:
"""Render this rule as a YAML fragment string."""
pad = " " * indent
lines = [
f"- name: {self.name}",
f"{pad} type: {self.rule_type}",
f"{pad} action: {self.action}",
f"{pad} target: {self.target}",
f"{pad} severity: {self.severity}",
]
if self.params:
lines.append(f"{pad} params:")
for key, value in self.params.items():
if isinstance(value, list):
lines.append(f"{pad} {key}:")
for item in value:
lines.append(f"{pad} - {item}")
elif isinstance(value, bool):
lines.append(f"{pad} {key}: {'true' if value else 'false'}")
else:
lines.append(f"{pad} {key}: {value}")
return ("\n" + pad).join(lines)CompiledPolicy.to_dict method · python · L149-L155 (7 LOC)src/agent_gov/authoring/nl_compiler.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"name": self.name,
"description": self.description,
"rules": [r.to_dict() for r in self.rules],
}CompiledPolicy.to_yaml method · python · L157-L166 (10 LOC)src/agent_gov/authoring/nl_compiler.py
def to_yaml(self) -> str:
"""Render the full policy as a YAML string."""
lines = [
f"name: {self.name}",
f'description: "{self.description}"',
"rules:",
]
for rule in self.rules:
lines.append(" " + rule.to_yaml_fragment(indent=2))
return "\n".join(lines)Repobility — same analyzer, your code, free for public repos · /scan/
_extract_action function · python · L247-L253 (7 LOC)src/agent_gov/authoring/nl_compiler.py
def _extract_action(text_lower: str) -> str:
"""Extract the action keyword from normalized text."""
# Check multi-word first
for keyword, action in sorted(_ACTION_KEYWORDS.items(), key=lambda kv: -len(kv[0])):
if keyword in text_lower:
return action
return "block" # default_extract_subject function · python · L256-L265 (10 LOC)src/agent_gov/authoring/nl_compiler.py
def _extract_subject(
text_lower: str,
) -> tuple[str, str, dict[str, object]]:
"""Return (subject, rule_type, default_params) from normalized text."""
for keyword, (subject, rule_type, params) in sorted(
_SUBJECT_KEYWORDS.items(), key=lambda kv: -len(kv[0])
):
if keyword in text_lower:
return subject, rule_type, dict(params)
return "", "", {}_extract_target function · python · L268-L273 (6 LOC)src/agent_gov/authoring/nl_compiler.py
def _extract_target(text_lower: str) -> str:
"""Extract the target context from normalized text."""
for keyword, target in sorted(_TARGET_KEYWORDS.items(), key=lambda kv: -len(kv[0])):
if keyword in text_lower:
return target
return "any"page 1 / 5next ›