← back to invincible-jha__agent-gov

Function bodies 212 total

All specs Real LLM only Function bodies
demo_policy_evaluation function · python · L33-L76 (44 LOC)
examples/01_quickstart.py
def demo_policy_evaluation() -> None:
    """Demonstrate loading a policy and evaluating actions."""
    print("\n=== Policy Evaluation ===\n")

    # Load the standard policy pack bundled with agent-gov
    packs_dir = Path(agent_gov.__file__).parent / "packs"
    loader = PolicyLoader()
    policy = loader.load_file(packs_dir / "standard.yaml")
    print(f"Loaded policy: {policy.name!r} (v{policy.version})")
    print(f"Rules: {[r.name for r in policy.rules]}")

    evaluator = PolicyEvaluator()

    # Action 1: a plain search — should pass
    safe_action: dict[str, object] = {
        "type": "search",
        "query": "quarterly revenue trends",
        "agent_role": "operator",
        "cost": 0.02,
    }
    report1 = evaluator.evaluate(policy, safe_action)
    print(f"\nSafe action: {report1.summary()}")

    # Action 2: action with a blocked keyword — should fail
    risky_action: dict[str, object] = {
        "type": "database_query",
        "query": "drop table users",
    
demo_compliance_frameworks function · python · L79-L111 (33 LOC)
examples/01_quickstart.py
def demo_compliance_frameworks() -> None:
    """Demonstrate running compliance framework checks."""
    print("\n=== Compliance Frameworks ===\n")

    # EU AI Act check with partial evidence
    eu_framework = EuAiActFramework()
    eu_evidence: dict[str, object] = {
        "A6": {"status": "pass", "evidence": "System classified as limited risk."},
        "A9": {"status": "pass", "evidence": "Risk management plan documented in confluence."},
        "A10": {"status": "fail", "evidence": "Training data audit not yet completed."},
        "A13": {"status": "pass", "evidence": "Model card published; outputs include confidence scores."},
        "A14": {"status": "pass", "evidence": "Human review required for all critical decisions."},
        "A15": {"status": "unknown"},
        "A52": {"status": "pass", "evidence": "UI displays 'AI-generated' badge on all responses."},
        "A60": {"status": "unknown", "evidence": "Awaiting EU database portal access."},
    }
    eu_report = eu_f
demo_audit_logging function · python · L114-L150 (37 LOC)
examples/01_quickstart.py
def demo_audit_logging() -> None:
    """Demonstrate audit log write and read."""
    print("\n=== Audit Logging ===\n")

    with tempfile.TemporaryDirectory() as tmpdir:
        log_path = Path(tmpdir) / "audit.jsonl"
        audit_logger = AuditLogger(log_path)

        # Load and evaluate some actions
        packs_dir = Path(agent_gov.__file__).parent / "packs"
        loader = PolicyLoader()
        policy = loader.load_file(packs_dir / "minimal.yaml")
        evaluator = PolicyEvaluator()

        actions: list[dict[str, object]] = [
            {"type": "search", "query": "product roadmap", "agent_id": "agent-alpha"},
            {"type": "file_read", "path": "/data/config.json", "agent_id": "agent-beta"},
            {"type": "api_call", "endpoint": "/api/users", "agent_id": "agent-alpha"},
        ]

        for action in actions:
            report = evaluator.evaluate(policy, action)
            audit_logger.log_from_report(
                report,
                agent_id=
main function · python · L153-L161 (9 LOC)
examples/01_quickstart.py
def main() -> None:
    print(f"agent-gov v{agent_gov.__version__}")
    print("=" * 50)

    demo_policy_evaluation()
    demo_compliance_frameworks()
    demo_audit_logging()

    print("\nQuickstart complete.")
AnthropicGovernance._record method · python · L28-L36 (9 LOC)
src/agent_gov/adapters/anthropic_sdk.py
    def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
        """Append an entry to the audit log with a UTC timestamp."""
        entry: dict[str, Any] = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "event_type": event_type,
            "result": result,
            **context,
        }
        self._audit_log.append(entry)
AnthropicGovernance.check_message method · python · L38-L45 (8 LOC)
src/agent_gov/adapters/anthropic_sdk.py
    def check_message(self, role: str, content: str) -> dict[str, Any]:
        """Evaluate an Anthropic message (by role) against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Message passed policy checks."}
        self._record("check_message", result, role=role, content_length=len(content))
        return result
AnthropicGovernance.check_tool_use method · python · L47-L54 (8 LOC)
src/agent_gov/adapters/anthropic_sdk.py
    def check_tool_use(self, tool_name: str, input: Any) -> dict[str, Any]:
        """Evaluate an Anthropic tool_use content block against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Tool use passed policy checks."}
        self._record("check_tool_use", result, tool_name=tool_name)
        return result
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
AnthropicGovernance.check_content method · python · L56-L64 (9 LOC)
src/agent_gov/adapters/anthropic_sdk.py
    def check_content(self, content_type: str, content: Any) -> dict[str, Any]:
        """Evaluate a content block by type against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        content_str = str(content) if content is not None else ""
        result: dict[str, Any] = {"allowed": True, "reason": "Content passed policy checks."}
        self._record("check_content", result, content_type=content_type, content_length=len(content_str))
        return result
CrewAIGovernance._record method · python · L28-L36 (9 LOC)
src/agent_gov/adapters/crewai.py
    def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
        """Append an entry to the audit log with a UTC timestamp."""
        entry: dict[str, Any] = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "event_type": event_type,
            "result": result,
            **context,
        }
        self._audit_log.append(entry)
CrewAIGovernance.check_task method · python · L38-L45 (8 LOC)
src/agent_gov/adapters/crewai.py
    def check_task(self, task_name: str, task_input: Any) -> dict[str, Any]:
        """Evaluate a CrewAI task against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Task passed policy checks."}
        self._record("check_task", result, task_name=task_name)
        return result
CrewAIGovernance.check_agent_action method · python · L47-L54 (8 LOC)
src/agent_gov/adapters/crewai.py
    def check_agent_action(self, agent_name: str, action: str) -> dict[str, Any]:
        """Evaluate an agent action against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Agent action passed policy checks."}
        self._record("check_agent_action", result, agent_name=agent_name, action=action)
        return result
CrewAIGovernance.check_delegation method · python · L56-L63 (8 LOC)
src/agent_gov/adapters/crewai.py
    def check_delegation(self, from_agent: str, to_agent: str) -> dict[str, Any]:
        """Evaluate an agent-to-agent delegation against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Delegation passed policy checks."}
        self._record("check_delegation", result, from_agent=from_agent, to_agent=to_agent)
        return result
LangChainGovernance._record method · python · L28-L36 (9 LOC)
src/agent_gov/adapters/langchain.py
    def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
        """Append an entry to the audit log with a UTC timestamp."""
        entry: dict[str, Any] = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "event_type": event_type,
            "result": result,
            **context,
        }
        self._audit_log.append(entry)
LangChainGovernance.check_prompt method · python · L38-L45 (8 LOC)
src/agent_gov/adapters/langchain.py
    def check_prompt(self, prompt: str) -> dict[str, Any]:
        """Evaluate a user or system prompt against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Prompt passed policy checks."}
        self._record("check_prompt", result, prompt_length=len(prompt))
        return result
LangChainGovernance.check_output method · python · L47-L54 (8 LOC)
src/agent_gov/adapters/langchain.py
    def check_output(self, output: str) -> dict[str, Any]:
        """Evaluate an LLM output against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Output passed policy checks."}
        self._record("check_output", result, output_length=len(output))
        return result
Repobility — same analyzer, your code, free for public repos · /scan/
LangChainGovernance.check_tool_call method · python · L56-L63 (8 LOC)
src/agent_gov/adapters/langchain.py
    def check_tool_call(self, tool_name: str, args: dict[str, Any]) -> dict[str, Any]:
        """Evaluate a tool call against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Tool call passed policy checks."}
        self._record("check_tool_call", result, tool_name=tool_name, arg_count=len(args))
        return result
MicrosoftGovernance._record method · python · L28-L36 (9 LOC)
src/agent_gov/adapters/microsoft_agents.py
    def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
        """Append an entry to the audit log with a UTC timestamp."""
        entry: dict[str, Any] = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "event_type": event_type,
            "result": result,
            **context,
        }
        self._audit_log.append(entry)
MicrosoftGovernance.check_activity method · python · L38-L45 (8 LOC)
src/agent_gov/adapters/microsoft_agents.py
    def check_activity(self, activity_type: str, data: Any) -> dict[str, Any]:
        """Evaluate a Bot Framework activity against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Activity passed policy checks."}
        self._record("check_activity", result, activity_type=activity_type)
        return result
MicrosoftGovernance.check_dialog method · python · L47-L54 (8 LOC)
src/agent_gov/adapters/microsoft_agents.py
    def check_dialog(self, dialog_id: str, step: str) -> dict[str, Any]:
        """Evaluate a dialog waterfall step against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Dialog step passed policy checks."}
        self._record("check_dialog", result, dialog_id=dialog_id, step=step)
        return result
MicrosoftGovernance.check_turn method · python · L56-L63 (8 LOC)
src/agent_gov/adapters/microsoft_agents.py
    def check_turn(self, turn_id: str, content: str) -> dict[str, Any]:
        """Evaluate a conversation turn's content against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Turn passed policy checks."}
        self._record("check_turn", result, turn_id=turn_id, content_length=len(content))
        return result
OpenAIGovernance._record method · python · L28-L36 (9 LOC)
src/agent_gov/adapters/openai_agents.py
    def _record(self, event_type: str, result: dict[str, Any], **context: Any) -> None:
        """Append an entry to the audit log with a UTC timestamp."""
        entry: dict[str, Any] = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "event_type": event_type,
            "result": result,
            **context,
        }
        self._audit_log.append(entry)
OpenAIGovernance.check_message method · python · L38-L45 (8 LOC)
src/agent_gov/adapters/openai_agents.py
    def check_message(self, role: str, content: str) -> dict[str, Any]:
        """Evaluate a message (by role) against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Message passed policy checks."}
        self._record("check_message", result, role=role, content_length=len(content))
        return result
OpenAIGovernance.check_tool_use method · python · L47-L54 (8 LOC)
src/agent_gov/adapters/openai_agents.py
    def check_tool_use(self, tool_name: str, args: dict[str, Any]) -> dict[str, Any]:
        """Evaluate a tool use request against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Tool use passed policy checks."}
        self._record("check_tool_use", result, tool_name=tool_name, arg_count=len(args))
        return result
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
OpenAIGovernance.check_handoff method · python · L56-L63 (8 LOC)
src/agent_gov/adapters/openai_agents.py
    def check_handoff(self, from_agent: str, to_agent: str) -> dict[str, Any]:
        """Evaluate an agent handoff against the policy engine.

        Returns a governance decision with allowed status and reason.
        """
        result: dict[str, Any] = {"allowed": True, "reason": "Handoff passed policy checks."}
        self._record("check_handoff", result, from_agent=from_agent, to_agent=to_agent)
        return result
AuditEntry.to_json method · python · L48-L67 (20 LOC)
src/agent_gov/audit/entry.py
    def to_json(self) -> str:
        """Serialise the entry to a JSON string (single line, no newlines).

        The ``timestamp`` field is rendered as an ISO 8601 string.

        Returns
        -------
        str
            JSON representation suitable for JSONL storage.
        """
        data: dict[str, object] = {
            "agent_id": self.agent_id,
            "action_type": self.action_type,
            "action_data": self.action_data,
            "verdict": self.verdict,
            "policy_name": self.policy_name,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata,
        }
        return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
AuditEntry.from_json method · python · L70-L125 (56 LOC)
src/agent_gov/audit/entry.py
    def from_json(cls, json_string: str) -> "AuditEntry":
        """Deserialise an entry from a JSON string.

        Parameters
        ----------
        json_string:
            A single JSONL line produced by :meth:`to_json`.

        Returns
        -------
        AuditEntry
            Reconstructed entry.

        Raises
        ------
        ValueError
            If the JSON is malformed or missing required fields.
        """
        try:
            data = json.loads(json_string)
        except json.JSONDecodeError as exc:
            raise ValueError(f"Malformed JSON audit entry: {exc}") from exc

        if not isinstance(data, dict):
            raise ValueError("Audit entry JSON must be a JSON object (dict).")

        required_fields = {"agent_id", "action_type", "action_data", "verdict", "policy_name"}
        missing = required_fields - data.keys()
        if missing:
            raise ValueError(f"Audit entry missing required fields: {missing!r}")

        raw_ts 
AuditEntry.__repr__ method · python · L127-L133 (7 LOC)
src/agent_gov/audit/entry.py
    def __repr__(self) -> str:
        return (
            f"AuditEntry(agent_id={self.agent_id!r}, "
            f"action_type={self.action_type!r}, "
            f"verdict={self.verdict!r}, "
            f"timestamp={self.timestamp.isoformat()!r})"
        )
AuditLogger.log method · python · L56-L68 (13 LOC)
src/agent_gov/audit/logger.py
    def log(self, entry: AuditEntry) -> None:
        """Append a single audit entry to the log file.

        Parameters
        ----------
        entry:
            The entry to persist.
        """
        line = entry.to_json() + "\n"
        with self._lock:
            self._path.parent.mkdir(parents=True, exist_ok=True)
            with self._path.open("a", encoding="utf-8") as fh:
                fh.write(line)
AuditLogger.log_from_report method · python · L70-L111 (42 LOC)
src/agent_gov/audit/logger.py
    def log_from_report(
        self,
        report: object,
        agent_id: str,
        *,
        metadata: Optional[dict[str, str]] = None,
    ) -> AuditEntry:
        """Create and log an entry from an :class:`~agent_gov.policy.result.EvaluationReport`.

        Parameters
        ----------
        report:
            An :class:`~agent_gov.policy.result.EvaluationReport` instance.
        agent_id:
            The identifier of the agent that performed the action.
        metadata:
            Optional extra metadata to attach to the entry.

        Returns
        -------
        AuditEntry
            The entry that was logged.
        """
        # Import here to avoid circular imports at module level
        from agent_gov.policy.result import EvaluationReport

        if not isinstance(report, EvaluationReport):
            raise TypeError(
                f"Expected EvaluationReport, got {type(report).__name__}."
            )

        entry = AuditEntry(
            a
AuditLogger.read method · python · L113-L137 (25 LOC)
src/agent_gov/audit/logger.py
    def read(self) -> list[AuditEntry]:
        """Read all audit entries from the log file.

        Returns
        -------
        list[AuditEntry]
            All entries in chronological order (oldest first).  Returns an
            empty list if the file does not exist.
        """
        if not self._path.exists():
            return []

        entries: list[AuditEntry] = []
        with self._path.open("r", encoding="utf-8") as fh:
            for line_number, raw_line in enumerate(fh, start=1):
                stripped = raw_line.strip()
                if not stripped:
                    continue
                try:
                    entries.append(AuditEntry.from_json(stripped))
                except ValueError:
                    # Corrupted lines are skipped to keep the reader resilient
                    pass

        return entries
AuditLogger.count method · python · L139-L147 (9 LOC)
src/agent_gov/audit/logger.py
    def count(self) -> int:
        """Return the total number of valid entries in the log file.

        Returns
        -------
        int
            Entry count, or ``0`` if the file does not exist.
        """
        return len(self.read())
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
AuditLogger.query method · python · L149-L181 (33 LOC)
src/agent_gov/audit/logger.py
    def query(
        self,
        filters: dict[str, object],
    ) -> list[AuditEntry]:
        """Return entries matching all supplied filter criteria.

        Supported filter keys
        ----------------------
        agent_id : str
            Exact match on agent ID.
        action_type : str
            Exact match on action type.
        verdict : str
            Exact match on verdict (``"pass"`` or ``"fail"``).
        policy_name : str
            Exact match on policy name.
        since : datetime
            Only return entries at or after this datetime (UTC).
        until : datetime
            Only return entries at or before this datetime (UTC).

        Parameters
        ----------
        filters:
            Dict of filter criteria.  Unknown keys are ignored.

        Returns
        -------
        list[AuditEntry]
            Matching entries in chronological order.
        """
        entries = self.read()
        return _apply_filters(entries, filters)
_apply_filters function · python · L184-L215 (32 LOC)
src/agent_gov/audit/logger.py
def _apply_filters(
    entries: list[AuditEntry],
    filters: dict[str, object],
) -> list[AuditEntry]:
    """Apply filter criteria to a list of entries."""
    result = entries

    agent_id = filters.get("agent_id")
    if agent_id is not None:
        result = [e for e in result if e.agent_id == str(agent_id)]

    action_type = filters.get("action_type")
    if action_type is not None:
        result = [e for e in result if e.action_type == str(action_type)]

    verdict = filters.get("verdict")
    if verdict is not None:
        result = [e for e in result if e.verdict == str(verdict)]

    policy_name = filters.get("policy_name")
    if policy_name is not None:
        result = [e for e in result if e.policy_name == str(policy_name)]

    since = filters.get("since")
    if since is not None and isinstance(since, datetime):
        result = [e for e in result if e.timestamp >= since]

    until = filters.get("until")
    if until is not None and isinstance(until, datetime):
 
AuditReader.all method · python · L52-L60 (9 LOC)
src/agent_gov/audit/reader.py
    def all(self) -> list[AuditEntry]:
        """Return all audit entries in chronological order.

        Returns
        -------
        list[AuditEntry]
            All entries, oldest first.
        """
        return self._logger.read()
AuditReader.last method · python · L62-L76 (15 LOC)
src/agent_gov/audit/reader.py
    def last(self, count: int) -> list[AuditEntry]:
        """Return the most recent ``count`` entries.

        Parameters
        ----------
        count:
            Maximum number of entries to return.

        Returns
        -------
        list[AuditEntry]
            Up to ``count`` entries, most recent last.
        """
        entries = self._logger.read()
        return entries[-count:] if count > 0 else []
AuditReader.query method · python · L78-L127 (50 LOC)
src/agent_gov/audit/reader.py
    def query(
        self,
        *,
        agent_id: Optional[str] = None,
        action_type: Optional[str] = None,
        verdict: Optional[str] = None,
        policy_name: Optional[str] = None,
        since: Optional[datetime] = None,
        until: Optional[datetime] = None,
    ) -> list[AuditEntry]:
        """Return entries matching the supplied filter criteria.

        All provided filters are combined with AND logic.

        Parameters
        ----------
        agent_id:
            Filter by exact agent ID.
        action_type:
            Filter by exact action type string.
        verdict:
            Filter by verdict (``"pass"`` or ``"fail"``).
        policy_name:
            Filter by policy name.
        since:
            Return only entries at or after this UTC datetime.
        until:
            Return only entries at or before this UTC datetime.

        Returns
        -------
        list[AuditEntry]
            Matching entries in chronological orde
AuditReader.stats method · python · L129-L174 (46 LOC)
src/agent_gov/audit/reader.py
    def stats(self) -> dict[str, object]:
        """Return aggregate statistics about the audit log.

        Returns
        -------
        dict[str, object]
            Dictionary with keys:
            - ``total`` — total entry count
            - ``pass_count`` — entries with verdict ``"pass"``
            - ``fail_count`` — entries with verdict ``"fail"``
            - ``agents`` — sorted list of distinct agent IDs
            - ``action_types`` — sorted list of distinct action types
            - ``policies`` — sorted list of distinct policy names
            - ``earliest`` — ISO timestamp of earliest entry (or ``None``)
            - ``latest`` — ISO timestamp of latest entry (or ``None``)
        """
        entries = self._logger.read()
        if not entries:
            return {
                "total": 0,
                "pass_count": 0,
                "fail_count": 0,
                "agents": [],
                "action_types": [],
                "policies": [],
     
build_filter function · python · L35-L89 (55 LOC)
src/agent_gov/audit/search.py
def build_filter(
    agent_id: Optional[str] = None,
    action_type: Optional[str] = None,
    verdict: Optional[str] = None,
    policy_name: Optional[str] = None,
    since: Optional[datetime] = None,
    until: Optional[datetime] = None,
) -> FilterFn:
    """Build a composite filter function from optional criteria.

    All supplied criteria are combined with AND logic — an entry must
    satisfy every non-``None`` criterion to pass the filter.

    Parameters
    ----------
    agent_id:
        Exact match on :attr:`AuditEntry.agent_id`.
    action_type:
        Exact match on :attr:`AuditEntry.action_type`.
    verdict:
        Exact match on :attr:`AuditEntry.verdict` (``"pass"`` or ``"fail"``).
    policy_name:
        Exact match on :attr:`AuditEntry.policy_name`.
    since:
        Include only entries whose :attr:`AuditEntry.timestamp` is at or
        after this datetime.
    until:
        Include only entries whose :attr:`AuditEntry.timestamp` is at or
        before t
search_entries function · python · L92-L123 (32 LOC)
src/agent_gov/audit/search.py
def search_entries(
    entries: list[AuditEntry],
    filter_fn: FilterFn,
    limit: int = 100,
) -> list[AuditEntry]:
    """Apply a filter function to a list of entries, returning up to ``limit`` matches.

    Entries are yielded in the same order they appear in ``entries``.

    Parameters
    ----------
    entries:
        Source list of audit entries to search.
    filter_fn:
        A callable produced by :func:`build_filter` or any function with
        the same signature ``(AuditEntry) -> bool``.
    limit:
        Maximum number of matching entries to return.  Use a large value
        (e.g. ``999_999``) to effectively return all matches.

    Returns
    -------
    list[AuditEntry]
        Matching entries in original order, capped at ``limit``.
    """
    results: list[AuditEntry] = []
    for entry in entries:
        if filter_fn(entry):
            results.append(entry)
            if len(results) >= limit:
                break
    return results
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
aggregate_verdicts function · python · L126-L144 (19 LOC)
src/agent_gov/audit/search.py
def aggregate_verdicts(entries: list[AuditEntry]) -> dict[str, int]:
    """Count entries grouped by verdict.

    Parameters
    ----------
    entries:
        Source list of audit entries.

    Returns
    -------
    dict[str, int]
        Mapping of verdict string to occurrence count, e.g.
        ``{"pass": 40, "fail": 10}``.  Only verdicts that actually
        appear in ``entries`` are included.
    """
    counts: dict[str, int] = {}
    for entry in entries:
        counts[entry.verdict] = counts.get(entry.verdict, 0) + 1
    return counts
aggregate_by_agent function · python · L147-L164 (18 LOC)
src/agent_gov/audit/search.py
def aggregate_by_agent(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
    """Group entries by ``agent_id``.

    Parameters
    ----------
    entries:
        Source list of audit entries.

    Returns
    -------
    dict[str, list[AuditEntry]]
        Mapping of agent ID to the list of entries belonging to that
        agent, preserving original order within each group.
    """
    groups: dict[str, list[AuditEntry]] = {}
    for entry in entries:
        groups.setdefault(entry.agent_id, []).append(entry)
    return groups
aggregate_by_action_type function · python · L167-L183 (17 LOC)
src/agent_gov/audit/search.py
def aggregate_by_action_type(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
    """Group entries by ``action_type``.

    Parameters
    ----------
    entries:
        Source list of audit entries.

    Returns
    -------
    dict[str, list[AuditEntry]]
        Mapping of action type to the list of entries for that type.
    """
    groups: dict[str, list[AuditEntry]] = {}
    for entry in entries:
        groups.setdefault(entry.action_type, []).append(entry)
    return groups
aggregate_by_policy function · python · L186-L203 (18 LOC)
src/agent_gov/audit/search.py
def aggregate_by_policy(entries: list[AuditEntry]) -> dict[str, list[AuditEntry]]:
    """Group entries by ``policy_name``.

    Parameters
    ----------
    entries:
        Source list of audit entries.

    Returns
    -------
    dict[str, list[AuditEntry]]
        Mapping of policy name to the list of entries evaluated under
        that policy.
    """
    groups: dict[str, list[AuditEntry]] = {}
    for entry in entries:
        groups.setdefault(entry.policy_name, []).append(entry)
    return groups
CompiledRule.to_dict method · python · L90-L99 (10 LOC)
src/agent_gov/authoring/nl_compiler.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "name": self.name,
            "type": self.rule_type,
            "action": self.action,
            "target": self.target,
            "severity": self.severity,
            "params": self.params,
        }
CompiledRule.to_yaml_fragment method · python · L101-L122 (22 LOC)
src/agent_gov/authoring/nl_compiler.py
    def to_yaml_fragment(self, indent: int = 2) -> str:
        """Render this rule as a YAML fragment string."""
        pad = " " * indent
        lines = [
            f"- name: {self.name}",
            f"{pad}  type: {self.rule_type}",
            f"{pad}  action: {self.action}",
            f"{pad}  target: {self.target}",
            f"{pad}  severity: {self.severity}",
        ]
        if self.params:
            lines.append(f"{pad}  params:")
            for key, value in self.params.items():
                if isinstance(value, list):
                    lines.append(f"{pad}    {key}:")
                    for item in value:
                        lines.append(f"{pad}      - {item}")
                elif isinstance(value, bool):
                    lines.append(f"{pad}    {key}: {'true' if value else 'false'}")
                else:
                    lines.append(f"{pad}    {key}: {value}")
        return ("\n" + pad).join(lines)
CompiledPolicy.to_dict method · python · L149-L155 (7 LOC)
src/agent_gov/authoring/nl_compiler.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "name": self.name,
            "description": self.description,
            "rules": [r.to_dict() for r in self.rules],
        }
CompiledPolicy.to_yaml method · python · L157-L166 (10 LOC)
src/agent_gov/authoring/nl_compiler.py
    def to_yaml(self) -> str:
        """Render the full policy as a YAML string."""
        lines = [
            f"name: {self.name}",
            f'description: "{self.description}"',
            "rules:",
        ]
        for rule in self.rules:
            lines.append("  " + rule.to_yaml_fragment(indent=2))
        return "\n".join(lines)
Repobility — same analyzer, your code, free for public repos · /scan/
_extract_action function · python · L247-L253 (7 LOC)
src/agent_gov/authoring/nl_compiler.py
def _extract_action(text_lower: str) -> str:
    """Extract the action keyword from normalized text."""
    # Check multi-word first
    for keyword, action in sorted(_ACTION_KEYWORDS.items(), key=lambda kv: -len(kv[0])):
        if keyword in text_lower:
            return action
    return "block"  # default
_extract_subject function · python · L256-L265 (10 LOC)
src/agent_gov/authoring/nl_compiler.py
def _extract_subject(
    text_lower: str,
) -> tuple[str, str, dict[str, object]]:
    """Return (subject, rule_type, default_params) from normalized text."""
    for keyword, (subject, rule_type, params) in sorted(
        _SUBJECT_KEYWORDS.items(), key=lambda kv: -len(kv[0])
    ):
        if keyword in text_lower:
            return subject, rule_type, dict(params)
    return "", "", {}
_extract_target function · python · L268-L273 (6 LOC)
src/agent_gov/authoring/nl_compiler.py
def _extract_target(text_lower: str) -> str:
    """Extract the target context from normalized text."""
    for keyword, target in sorted(_TARGET_KEYWORDS.items(), key=lambda kv: -len(kv[0])):
        if keyword in text_lower:
            return target
    return "any"
page 1 / 5next ›