← back to invincible-jha__agent-gov

Function bodies 212 total

All specs Real LLM only Function bodies
_populate_builtins function · python · L166-L182 (17 LOC)
src/agent_gov/plugins/registry.py
def _populate_builtins() -> None:
    """Register all built-in rules and frameworks into the global singletons."""
    from agent_gov.rules.cost_limit import CostLimitRule
    from agent_gov.rules.keyword_block import KeywordBlockRule
    from agent_gov.rules.pii_check import PiiCheckRule
    from agent_gov.rules.role_check import RoleCheckRule

    for rule_cls in (PiiCheckRule, RoleCheckRule, CostLimitRule, KeywordBlockRule):
        rule_registry.register_class(rule_cls.name, rule_cls)

    from agent_gov.frameworks.eu_ai_act import EuAiActFramework
    from agent_gov.frameworks.gdpr import GdprFramework
    from agent_gov.frameworks.hipaa import HipaaFramework
    from agent_gov.frameworks.soc2 import Soc2Framework

    for fw_cls in (EuAiActFramework, GdprFramework, HipaaFramework, Soc2Framework):
        framework_registry.register_class(fw_cls.name, fw_cls)
LibraryPolicyInstaller.install method · python · L87-L203 (117 LOC)
src/agent_gov/policies/installer.py
    def install(
        self,
        source: str | Path,
        target: str | Path,
        *,
        domain: Optional[str] = None,
        overwrite: bool = True,
        dry_run: bool = False,
    ) -> list[InstallResult]:
        """Install policies from *source* into *target*.

        Parameters
        ----------
        source:
            Directory containing source ``.yaml`` policy files.  Scanned
            recursively.
        target:
            Destination directory.  Created if it does not exist (unless
            ``dry_run`` is ``True``).
        domain:
            When provided, only install policies whose ``domain`` matches.
        overwrite:
            When ``False``, skip files that already exist in *target*.
        dry_run:
            When ``True``, report what would be installed without writing any
            files or creating directories.

        Returns
        -------
        list[InstallResult]
            One entry per discovered YAML file.
      
LibraryPolicyInstaller.list_available method · python · L205-L231 (27 LOC)
src/agent_gov/policies/installer.py
    def list_available(
        self,
        source: str | Path,
        *,
        domain: Optional[str] = None,
    ) -> list[str]:
        """List policy IDs available in the source directory.

        Parameters
        ----------
        source:
            Directory to scan for policy YAML files.
        domain:
            When provided, only return IDs for the given domain.

        Returns
        -------
        list[str]
            Sorted list of policy ``id`` values found.
        """
        try:
            policies = self._loader.load_directory(
                source, recursive=True, domain_filter=domain
            )
        except LibraryPolicyLoadError:
            return []
        return sorted(policy.id for policy in policies)
LibraryPolicyLoader.load_file method · python · L62-L108 (47 LOC)
src/agent_gov/policies/loader.py
    def load_file(self, path: str | Path) -> LibraryPolicyConfig:
        """Load a single library policy YAML file.

        Parameters
        ----------
        path:
            Filesystem path to the ``.yaml`` or ``.yml`` file.

        Returns
        -------
        LibraryPolicyConfig
            Validated library policy configuration.

        Raises
        ------
        LibraryPolicyLoadError
            If the file does not exist, is not valid YAML, or fails
            Pydantic schema validation.
        """
        resolved = Path(path).resolve()
        if not resolved.exists():
            raise LibraryPolicyLoadError(resolved, "file does not exist")
        if not resolved.is_file():
            raise LibraryPolicyLoadError(resolved, "path is not a file")

        try:
            raw_text = resolved.read_text(encoding="utf-8")
        except OSError as exc:
            raise LibraryPolicyLoadError(resolved, f"cannot read file: {exc}") from exc

        try:
         
LibraryPolicyLoader.load_directory method · python · L110-L174 (65 LOC)
src/agent_gov/policies/loader.py
    def load_directory(
        self,
        directory: str | Path,
        *,
        recursive: bool = True,
        domain_filter: str | None = None,
    ) -> list[LibraryPolicyConfig]:
        """Load all library YAML policy files from a directory.

        Parameters
        ----------
        directory:
            Path to a directory containing ``.yaml`` / ``.yml`` policy files.
        recursive:
            When ``True`` (default), scan subdirectories as well.  Set to
            ``False`` to scan only the top-level directory.
        domain_filter:
            When provided, only policies whose ``domain`` value matches this
            string are returned.

        Returns
        -------
        list[LibraryPolicyConfig]
            All successfully validated policies, sorted by file path.
            Individual file failures are logged as warnings and skipped.

        Raises
        ------
        LibraryPolicyLoadError
            If ``directory`` does not exist or is no
LibraryPolicyValidator.validate_dict method · python · L100-L166 (67 LOC)
src/agent_gov/policies/validator.py
    def validate_dict(self, data: object) -> ValidationResult:
        """Validate a raw Python object (from YAML parsing) against the schema.

        Parameters
        ----------
        data:
            The parsed YAML document, expected to be a ``dict``.

        Returns
        -------
        ValidationResult
            Contains ``valid=True`` with no errors, or ``valid=False``
            with a populated ``errors`` list.
        """
        errors: list[str] = []

        if not isinstance(data, dict):
            return ValidationResult(valid=False, errors=["Policy document must be a YAML mapping (dict)."])

        # Required top-level fields
        for required_field in _REQUIRED_TOP_LEVEL_FIELDS:
            if required_field not in data:
                errors.append(f"Missing required field: '{required_field}'.")

        # Domain validation
        domain_value = data.get("domain")
        if domain_value is not None and domain_value not in _VALID_DOMAINS:
          
LibraryPolicyValidator.validate_file method · python · L168-L207 (40 LOC)
src/agent_gov/policies/validator.py
    def validate_file(self, path: str | Path) -> ValidationResult:
        """Validate a YAML policy file on disk.

        Parameters
        ----------
        path:
            Filesystem path to the ``.yaml`` or ``.yml`` policy file.

        Returns
        -------
        ValidationResult
            Contains ``valid=True`` with no errors, or ``valid=False``
            with a populated ``errors`` list describing each problem.
        """
        resolved = Path(path).resolve()

        if not resolved.exists():
            return ValidationResult(
                valid=False, errors=[f"File does not exist: {resolved}"]
            )
        if not resolved.is_file():
            return ValidationResult(
                valid=False, errors=[f"Path is not a file: {resolved}"]
            )

        try:
            raw_text = resolved.read_text(encoding="utf-8")
        except OSError as exc:
            return ValidationResult(
                valid=False, errors=[f"Cannot read f
Repobility · code-quality intelligence platform · https://repobility.com
LibraryPolicyValidator.assert_valid method · python · L209-L230 (22 LOC)
src/agent_gov/policies/validator.py
    def assert_valid(self, data: object) -> LibraryPolicyConfig:
        """Validate and return the parsed policy, raising on failure.

        Parameters
        ----------
        data:
            The parsed YAML document as a Python ``dict``.

        Returns
        -------
        LibraryPolicyConfig
            The validated policy model.

        Raises
        ------
        LibraryPolicyValidationError
            If validation fails.
        """
        result = self.validate_dict(data)
        if not result.valid:
            raise LibraryPolicyValidationError(result.errors)
        return LibraryPolicyConfig.model_validate(data)
RuleResolutionError.__init__ method · python · L39-L44 (6 LOC)
src/agent_gov/policy/evaluator.py
    def __init__(self, rule_type: str) -> None:
        self.rule_type = rule_type
        super().__init__(
            f"No rule implementation registered for type {rule_type!r}. "
            "Register the rule with the rule registry before evaluating."
        )
PolicyEvaluator._register_builtins method · python · L68-L77 (10 LOC)
src/agent_gov/policy/evaluator.py
    def _register_builtins(self) -> None:
        """Register the four built-in rule implementations."""
        from agent_gov.rules.cost_limit import CostLimitRule
        from agent_gov.rules.keyword_block import KeywordBlockRule
        from agent_gov.rules.pii_check import PiiCheckRule
        from agent_gov.rules.role_check import RoleCheckRule

        for rule_cls in (PiiCheckRule, RoleCheckRule, CostLimitRule, KeywordBlockRule):
            instance = rule_cls()
            self._rules[instance.name] = instance
PolicyEvaluator.register_rule method · python · L79-L89 (11 LOC)
src/agent_gov/policy/evaluator.py
    def register_rule(self, rule: PolicyRule) -> None:
        """Register a custom rule implementation.

        Parameters
        ----------
        rule:
            An instantiated :class:`~agent_gov.policy.rule.PolicyRule`.
            If a rule with the same name is already registered it is replaced.
        """
        logger.debug("Registering rule %r", rule.name)
        self._rules[rule.name] = rule
PolicyEvaluator.evaluate method · python · L95-L175 (81 LOC)
src/agent_gov/policy/evaluator.py
    def evaluate(
        self,
        policy: PolicyConfig,
        action: dict[str, object],
    ) -> EvaluationReport:
        """Evaluate an agent action against all enabled rules in a policy.

        Parameters
        ----------
        policy:
            Validated :class:`~agent_gov.policy.schema.PolicyConfig` to
            evaluate the action against.
        action:
            Arbitrary dictionary describing the agent action.

        Returns
        -------
        EvaluationReport
            Aggregated result; ``report.passed`` is ``True`` only when every
            enabled rule passes.

        Raises
        ------
        RuleResolutionError
            If :attr:`strict` is ``True`` and a rule type cannot be resolved.
        """
        verdicts: list[RuleVerdict] = []
        overall_passed = True
        timestamp = datetime.now(timezone.utc)

        for rule_config in policy.enabled_rules:
            rule = self._rules.get(rule_config.type)
            if ru
PolicyLoader.load_file method · python · L48-L92 (45 LOC)
src/agent_gov/policy/loader.py
    def load_file(self, path: str | Path) -> PolicyConfig:
        """Load a single YAML policy file.

        Parameters
        ----------
        path:
            Filesystem path to the ``.yaml`` or ``.yml`` file.

        Returns
        -------
        PolicyConfig
            Validated policy configuration.

        Raises
        ------
        PolicyLoadError
            If the file does not exist, is not valid YAML, or fails
            Pydantic validation.
        """
        resolved = Path(path).resolve()
        if not resolved.exists():
            raise PolicyLoadError(resolved, "file does not exist")
        if not resolved.is_file():
            raise PolicyLoadError(resolved, "path is not a file")

        try:
            raw_text = resolved.read_text(encoding="utf-8")
        except OSError as exc:
            raise PolicyLoadError(resolved, f"cannot read file: {exc}") from exc

        try:
            data = yaml.safe_load(raw_text)
        except yaml.YAMLError 
PolicyLoader.load_directory method · python · L94-L144 (51 LOC)
src/agent_gov/policy/loader.py
    def load_directory(
        self,
        directory: str | Path,
        *,
        recursive: bool = False,
    ) -> list[PolicyConfig]:
        """Load all YAML policy files from a directory.

        Parameters
        ----------
        directory:
            Path to a directory containing ``.yaml`` / ``.yml`` files.
        recursive:
            When ``True``, scan subdirectories as well.

        Returns
        -------
        list[PolicyConfig]
            All successfully validated policies, sorted by filename.

        Raises
        ------
        PolicyLoadError
            If ``directory`` does not exist or is not a directory.
            Individual file failures are logged as warnings and skipped.
        """
        resolved = Path(directory).resolve()
        if not resolved.exists():
            raise PolicyLoadError(resolved, "directory does not exist")
        if not resolved.is_dir():
            raise PolicyLoadError(resolved, "path is not a directory")

     
PolicyLoader.load_string method · python · L146-L182 (37 LOC)
src/agent_gov/policy/loader.py
    def load_string(self, content: str, *, source_name: str = "<string>") -> PolicyConfig:
        """Load a policy from a YAML string.

        Useful for testing or loading policies from configuration stores.

        Parameters
        ----------
        content:
            Raw YAML text.
        source_name:
            Logical name used in error messages.

        Returns
        -------
        PolicyConfig
            Validated policy configuration.

        Raises
        ------
        PolicyLoadError
            If the content is not valid YAML or fails schema validation.
        """
        fake_path = Path(source_name)
        try:
            data = yaml.safe_load(content)
        except yaml.YAMLError as exc:
            raise PolicyLoadError(fake_path, f"YAML parse error: {exc}") from exc

        if not isinstance(data, dict):
            raise PolicyLoadError(fake_path, "YAML root must be a mapping (dict)")

        try:
            policy = PolicyConfig.model_validat
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EvaluationReport.to_dict method · python · L65-L75 (11 LOC)
src/agent_gov/policy/result.py
    def to_dict(self) -> dict[str, object]:
        """Serialise the report to a plain dictionary."""
        return {
            "policy_name": self.policy_name,
            "action": self.action,
            "passed": self.passed,
            "timestamp": self.timestamp.isoformat(),
            "violation_count": self.violation_count,
            "highest_severity": self.highest_severity,
            "verdicts": [v.to_dict() for v in self.verdicts],
        }
EvaluationReport.summary method · python · L77-L84 (8 LOC)
src/agent_gov/policy/result.py
    def summary(self) -> str:
        """Return a one-line human-readable summary."""
        status = "PASS" if self.passed else "FAIL"
        return (
            f"[{status}] policy={self.policy_name!r} "
            f"violations={self.violation_count} "
            f"highest_severity={self.highest_severity}"
        )
RuleVerdict.to_dict method · python · L59-L67 (9 LOC)
src/agent_gov/policy/rule.py
    def to_dict(self) -> dict[str, object]:
        """Serialise verdict to a plain dictionary."""
        return {
            "rule_name": self.rule_name,
            "passed": self.passed,
            "severity": self.severity,
            "message": self.message,
            "details": self.details,
        }
PolicyRule.evaluate method · python · L84-L104 (21 LOC)
src/agent_gov/policy/rule.py
    def evaluate(
        self,
        action: dict[str, object],
        config: dict[str, object],
    ) -> RuleVerdict:
        """Evaluate an agent action against this rule.

        Parameters
        ----------
        action:
            Arbitrary key/value dictionary describing the agent action being
            evaluated.  The keys available depend on the calling system.
        config:
            The ``params`` dict from the matching
            :class:`~agent_gov.policy.schema.RuleConfig` instance.

        Returns
        -------
        RuleVerdict
            The result of this rule's evaluation.
        """
PolicyRule.validate_config method · python · L106-L124 (19 LOC)
src/agent_gov/policy/rule.py
    def validate_config(self, config: dict[str, object]) -> list[str]:
        """Validate the rule's configuration parameters.

        Override this method to provide config validation at policy load time.
        Return a list of human-readable error strings; an empty list means the
        config is valid.

        Parameters
        ----------
        config:
            The ``params`` dict from the matching
            :class:`~agent_gov.policy.schema.RuleConfig` instance.

        Returns
        -------
        list[str]
            Validation error messages.  Empty list means valid.
        """
        return []
ReportGenerator.governance_report method · python · L47-L124 (78 LOC)
src/agent_gov/reporting/generator.py
    def governance_report(
        self,
        *,
        policy_name: Optional[str] = None,
        evaluation_reports: Optional[list[EvaluationReport]] = None,
        title: str = "Agent Governance Report",
    ) -> dict[str, object]:
        """Build a governance report payload.

        Parameters
        ----------
        policy_name:
            Filter audit entries to a specific policy name.
        evaluation_reports:
            Recent :class:`~agent_gov.policy.result.EvaluationReport` objects
            to include in the report.
        title:
            Report title string.

        Returns
        -------
        dict[str, object]
            Structured report data suitable for rendering.
        """
        generated_at = datetime.now(timezone.utc).isoformat()

        # Build audit summary
        audit_summary: dict[str, object] = {}
        if self._reader is not None:
            all_stats = self._reader.stats()
            if policy_name:
                policy_
ReportGenerator.compliance_report method · python · L126-L162 (37 LOC)
src/agent_gov/reporting/generator.py
    def compliance_report(
        self,
        *,
        framework_reports: list[FrameworkReport],
        title: str = "Compliance Framework Report",
    ) -> dict[str, object]:
        """Build a compliance framework report payload.

        Parameters
        ----------
        framework_reports:
            List of :class:`~agent_gov.frameworks.base.FrameworkReport` objects.
        title:
            Report title string.

        Returns
        -------
        dict[str, object]
            Structured report data suitable for rendering.
        """
        generated_at = datetime.now(timezone.utc).isoformat()

        frameworks_data: list[dict[str, object]] = [
            report.to_dict() for report in framework_reports
        ]

        overall_score: float = 0.0
        if framework_reports:
            overall_score = sum(r.score for r in framework_reports) / len(framework_reports)

        return {
            "title": title,
            "generated_at": generated_at,
   
ReportGenerator.full_report method · python · L164-L207 (44 LOC)
src/agent_gov/reporting/generator.py
    def full_report(
        self,
        *,
        policy_name: Optional[str] = None,
        evaluation_reports: Optional[list[EvaluationReport]] = None,
        framework_reports: Optional[list[FrameworkReport]] = None,
        title: str = "Full Agent Governance and Compliance Report",
    ) -> dict[str, object]:
        """Build a combined governance and compliance report.

        Parameters
        ----------
        policy_name:
            Optional policy name filter for audit data.
        evaluation_reports:
            Optional list of evaluation reports.
        framework_reports:
            Optional list of framework check reports.
        title:
            Report title string.

        Returns
        -------
        dict[str, object]
            Combined report payload.
        """
        gov_report = self.governance_report(
            policy_name=policy_name,
            evaluation_reports=evaluation_reports,
            title=title,
        )
        compliance:
Repobility analyzer · published findings · https://repobility.com
JsonReporter.render method · python · L38-L57 (20 LOC)
src/agent_gov/reporting/json_report.py
    def render(self, payload: dict[str, object]) -> str:
        """Render a report payload to a JSON string.

        Parameters
        ----------
        payload:
            Structured report data (produced by
            :class:`~agent_gov.reporting.generator.ReportGenerator`).

        Returns
        -------
        str
            Pretty-printed JSON string.
        """
        return json.dumps(
            payload,
            indent=self._indent,
            ensure_ascii=self._ensure_ascii,
            default=str,  # Fallback for datetime and other non-JSON types
        )
JsonReporter.write method · python · L59-L78 (20 LOC)
src/agent_gov/reporting/json_report.py
    def write(self, payload: dict[str, object], output_path: str | Path) -> Path:
        """Render and write a report to a JSON file.

        Parameters
        ----------
        payload:
            Structured report data.
        output_path:
            Destination file path.  Parent directories are created if needed.

        Returns
        -------
        Path
            The resolved path of the written file.
        """
        resolved = Path(output_path).resolve()
        resolved.parent.mkdir(parents=True, exist_ok=True)
        json_content = self.render(payload)
        resolved.write_text(json_content, encoding="utf-8")
        return resolved
MarkdownReporter.__init__ method · python · L45-L53 (9 LOC)
src/agent_gov/reporting/markdown.py
    def __init__(self) -> None:
        self._env: Optional[object] = None
        if _JINJA2_AVAILABLE:
            self._env = Environment(  # type: ignore[assignment]
                loader=FileSystemLoader(str(_TEMPLATES_DIR)),
                autoescape=select_autoescape([]),
                trim_blocks=True,
                lstrip_blocks=True,
            )
MarkdownReporter.render_governance method · python · L55-L75 (21 LOC)
src/agent_gov/reporting/markdown.py
    def render_governance(self, payload: dict[str, object]) -> str:
        """Render a governance report payload to Markdown.

        Parameters
        ----------
        payload:
            Structured governance report data produced by
            :class:`~agent_gov.reporting.generator.ReportGenerator`.

        Returns
        -------
        str
            Markdown-formatted report string.
        """
        if self._env is not None:
            from jinja2 import Environment

            env = self._env  # type: ignore[assignment]
            template = env.get_template("governance_report.md.j2")  # type: ignore[union-attr]
            return template.render(**payload)  # type: ignore[union-attr]
        return _fallback_governance(payload)
MarkdownReporter.render_compliance method · python · L77-L96 (20 LOC)
src/agent_gov/reporting/markdown.py
    def render_compliance(self, payload: dict[str, object]) -> str:
        """Render a compliance framework report payload to Markdown.

        Parameters
        ----------
        payload:
            Structured compliance report data.

        Returns
        -------
        str
            Markdown-formatted report string.
        """
        if self._env is not None:
            from jinja2 import Environment

            env = self._env  # type: ignore[assignment]
            template = env.get_template("compliance_report.md.j2")  # type: ignore[union-attr]
            return template.render(**payload)  # type: ignore[union-attr]
        return _fallback_compliance(payload)
MarkdownReporter.write_governance method · python · L98-L120 (23 LOC)
src/agent_gov/reporting/markdown.py
    def write_governance(
        self,
        payload: dict[str, object],
        output_path: str | Path,
    ) -> Path:
        """Render and write a governance report to a Markdown file.

        Parameters
        ----------
        payload:
            Structured governance report data.
        output_path:
            Destination file path.

        Returns
        -------
        Path
            Resolved path of the written file.
        """
        resolved = Path(output_path).resolve()
        resolved.parent.mkdir(parents=True, exist_ok=True)
        resolved.write_text(self.render_governance(payload), encoding="utf-8")
        return resolved
MarkdownReporter.write_compliance method · python · L122-L144 (23 LOC)
src/agent_gov/reporting/markdown.py
    def write_compliance(
        self,
        payload: dict[str, object],
        output_path: str | Path,
    ) -> Path:
        """Render and write a compliance report to a Markdown file.

        Parameters
        ----------
        payload:
            Structured compliance report data.
        output_path:
            Destination file path.

        Returns
        -------
        Path
            Resolved path of the written file.
        """
        resolved = Path(output_path).resolve()
        resolved.parent.mkdir(parents=True, exist_ok=True)
        resolved.write_text(self.render_compliance(payload), encoding="utf-8")
        return resolved
_fallback_governance function · python · L147-L183 (37 LOC)
src/agent_gov/reporting/markdown.py
def _fallback_governance(payload: dict[str, object]) -> str:
    """Plain-text Markdown fallback when Jinja2 is not installed."""
    lines: list[str] = [
        f"# {payload.get('title', 'Agent Governance Report')}",
        "",
        f"**Generated:** {payload.get('generated_at', 'N/A')}  ",
        f"**Policy:** {payload.get('policy_name', 'All')}  ",
        f"**Pass Rate:** {payload.get('pass_rate_percent', 'N/A')}%",
        "",
        "## Audit Summary",
        "",
    ]
    audit = payload.get("audit_summary", {})
    if isinstance(audit, dict):
        lines.append(f"- Total entries: {audit.get('total_entries', 0)}")
        lines.append(f"- Passed: {audit.get('pass_count', 0)}")
        lines.append(f"- Failed: {audit.get('fail_count', 0)}")
        agents = audit.get("agents", [])
        if isinstance(agents, list):
            lines.append(f"- Distinct agents: {len(agents)}")

    lines += ["", "## Evaluation Results", ""]
    evals = payload.get("evaluation_results", 
If a scraper extracted this row, it came from Repobility (https://repobility.com)
_fallback_compliance function · python · L186-L222 (37 LOC)
src/agent_gov/reporting/markdown.py
def _fallback_compliance(payload: dict[str, object]) -> str:
    """Plain-text Markdown fallback when Jinja2 is not installed."""
    lines: list[str] = [
        f"# {payload.get('title', 'Compliance Report')}",
        "",
        f"**Generated:** {payload.get('generated_at', 'N/A')}  ",
        f"**Overall Score:** {payload.get('overall_score', 0.0):.1f}%",
        "",
    ]
    frameworks = payload.get("frameworks", [])
    if isinstance(frameworks, list):
        for framework_data in frameworks:
            if not isinstance(framework_data, dict):
                continue
            lines.append(f"## {framework_data.get('framework', 'Framework')}")
            lines.append("")
            lines.append(f"**Score:** {framework_data.get('score_percent', 0.0):.1f}%  ")
            lines.append(
                f"**Passed:** {framework_data.get('passed', 0)} / "
                f"{framework_data.get('total', 0)}"
            )
            lines.append("")
            results = framew
list_templates function · python · L23-L35 (13 LOC)
src/agent_gov/reporting/templates/__init__.py
def list_templates() -> list[str]:
    """Return the names of all built-in Jinja2 templates.

    Returns
    -------
    list[str]
        Sorted list of template file names available in this package.
    """
    return sorted(
        p.name
        for p in _TEMPLATES_DIR.iterdir()
        if p.suffix in (".j2", ".jinja2") and p.is_file()
    )
get_template function · python · L38-L62 (25 LOC)
src/agent_gov/reporting/templates/__init__.py
def get_template(name: str) -> str:
    """Return the raw text content of a built-in template.

    Parameters
    ----------
    name:
        Template file name (e.g. ``"governance_report.md.j2"``).

    Returns
    -------
    str
        Raw template text.

    Raises
    ------
    FileNotFoundError
        If no template with ``name`` exists in the templates directory.
    """
    template_path = _TEMPLATES_DIR / name
    if not template_path.is_file():
        available = list_templates()
        raise FileNotFoundError(
            f"Template {name!r} not found. Available templates: {available!r}"
        )
    return template_path.read_text(encoding="utf-8")
write_template function · python · L65-L91 (27 LOC)
src/agent_gov/reporting/templates/__init__.py
def write_template(name: str, destination: str | Path) -> Path:
    """Copy a built-in template to a destination path.

    Useful for scaffolding custom templates that extend the built-ins.

    Parameters
    ----------
    name:
        Template file name to copy.
    destination:
        Destination file path.  Parent directories are created if needed.

    Returns
    -------
    Path
        Resolved path of the written file.

    Raises
    ------
    FileNotFoundError
        If the named template does not exist.
    """
    content = get_template(name)
    resolved = Path(destination).resolve()
    resolved.parent.mkdir(parents=True, exist_ok=True)
    resolved.write_text(content, encoding="utf-8")
    return resolved
get_template function · python · L92-L115 (24 LOC)
src/agent_gov/reporting/templates.py
def get_template(name: str) -> str:
    """Return the YAML source for a built-in policy template.

    Parameters
    ----------
    name:
        Template name.  Use :func:`list_templates` to discover available names.

    Returns
    -------
    str
        YAML-formatted policy template string.

    Raises
    ------
    KeyError
        If ``name`` is not a recognised built-in template.
    """
    if name not in TEMPLATES:
        available = list_templates()
        raise KeyError(
            f"Unknown template: {name!r}. Available templates: {available}"
        )
    return TEMPLATES[name]
list_templates function · python · L118-L126 (9 LOC)
src/agent_gov/reporting/templates.py
def list_templates() -> list[str]:
    """Return the names of all available built-in policy templates.

    Returns
    -------
    list[str]
        Alphabetically sorted list of template names.
    """
    return sorted(TEMPLATES.keys())
write_template function · python · L129-L154 (26 LOC)
src/agent_gov/reporting/templates.py
def write_template(name: str, output_path: Path) -> Path:
    """Write a built-in policy template to ``output_path``.

    Parent directories are created automatically if they do not exist.

    Parameters
    ----------
    name:
        Template name, as returned by :func:`list_templates`.
    output_path:
        Destination file path (typically ending in ``.yaml``).

    Returns
    -------
    Path
        The resolved path of the written file.

    Raises
    ------
    KeyError
        If ``name`` is not a recognised built-in template.
    """
    content = get_template(name)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(content, encoding="utf-8")
    return output_path
CostLimitRule.evaluate method · python · L55-L153 (99 LOC)
src/agent_gov/rules/cost_limit.py
    def evaluate(
        self,
        action: dict[str, object],
        config: dict[str, object],
    ) -> RuleVerdict:
        """Check action cost against configured thresholds.

        Parameters
        ----------
        action:
            Must contain the key identified by ``cost_field`` (default
            ``"cost"``).  The value must be numeric (``int`` or ``float``).
            Missing cost field is treated as ``0.0``.
        config:
            Supported keys: ``max_cost_per_action``, ``max_cost_aggregate``,
            ``cost_field``.

        Returns
        -------
        RuleVerdict
            ``passed=False`` when either threshold is exceeded.
        """
        cost_field: str = str(config.get("cost_field", "cost"))
        max_per_action: float = float(config.get("max_cost_per_action", 0))
        max_aggregate: float = float(config.get("max_cost_aggregate", 0))

        raw_cost = action.get(cost_field, 0)
        try:
            action_cost = float(raw_c
Repobility · code-quality intelligence platform · https://repobility.com
CostLimitRule.validate_config method · python · L155-L166 (12 LOC)
src/agent_gov/rules/cost_limit.py
    def validate_config(self, config: dict[str, object]) -> list[str]:
        """Validate cost threshold values are non-negative numbers."""
        errors: list[str] = []
        for key in ("max_cost_per_action", "max_cost_aggregate"):
            value = config.get(key)
            if value is not None:
                try:
                    if float(value) < 0:  # type: ignore[arg-type]
                        errors.append(f"cost_limit: {key!r} must be non-negative.")
                except (TypeError, ValueError):
                    errors.append(f"cost_limit: {key!r} must be a number, got {value!r}.")
        return errors
KeywordBlockRule.evaluate method · python · L46-L111 (66 LOC)
src/agent_gov/rules/keyword_block.py
    def evaluate(
        self,
        action: dict[str, object],
        config: dict[str, object],
    ) -> RuleVerdict:
        """Search action string values for blocked keywords.

        Parameters
        ----------
        action:
            Arbitrary action dict.  All string values (recursive) are scanned.
        config:
            Supported keys: ``keywords`` (list[str]), ``case_sensitive`` (bool),
            ``match_whole_word`` (bool).

        Returns
        -------
        RuleVerdict
            ``passed=False`` when any blocked keyword is found.
        """
        raw_keywords = config.get("keywords", [])
        if not isinstance(raw_keywords, list):
            raw_keywords = [str(raw_keywords)]
        keywords: list[str] = [str(k) for k in raw_keywords]

        case_sensitive: bool = bool(config.get("case_sensitive", False))
        match_whole_word: bool = bool(config.get("match_whole_word", False))

        if not keywords:
            return RuleVerdict(
KeywordBlockRule.validate_config method · python · L113-L125 (13 LOC)
src/agent_gov/rules/keyword_block.py
    def validate_config(self, config: dict[str, object]) -> list[str]:
        """Validate that ``keywords`` is a non-empty list."""
        errors: list[str] = []
        keywords = config.get("keywords")
        if keywords is None:
            errors.append("keyword_block: 'keywords' is not configured.")
        elif not isinstance(keywords, list):
            errors.append(
                f"keyword_block: 'keywords' must be a list, got {type(keywords).__name__}."
            )
        elif not keywords:
            errors.append("keyword_block: 'keywords' list must not be empty.")
        return errors
_extract_strings function · python · L128-L142 (15 LOC)
src/agent_gov/rules/keyword_block.py
def _extract_strings(
    data: dict[str, object],
    prefix: str = "",
) -> list[tuple[str, str]]:
    """Recursively extract (field_path, string_value) pairs from a dict."""
    results: list[tuple[str, str]] = []
    for key, value in data.items():
        path = f"{prefix}.{key}" if prefix else key
        if isinstance(value, str):
            results.append((path, value))
        elif isinstance(value, dict):
            results.extend(_extract_strings(value, path))
        elif isinstance(value, list):
            results.extend(_extract_strings_from_list(value, path))
    return results
_extract_strings_from_list function · python · L145-L159 (15 LOC)
src/agent_gov/rules/keyword_block.py
def _extract_strings_from_list(
    data: list[object],
    prefix: str,
) -> list[tuple[str, str]]:
    """Recursively extract strings from a list."""
    results: list[tuple[str, str]] = []
    for index, item in enumerate(data):
        path = f"{prefix}[{index}]"
        if isinstance(item, str):
            results.append((path, item))
        elif isinstance(item, dict):
            results.extend(_extract_strings(item, path))
        elif isinstance(item, list):
            results.extend(_extract_strings_from_list(item, path))
    return results
_matches function · python · L162-L181 (20 LOC)
src/agent_gov/rules/keyword_block.py
def _matches(
    text: str,
    keyword: str,
    *,
    case_sensitive: bool,
    whole_word: bool,
) -> bool:
    """Return True when ``keyword`` is found in ``text``."""
    compare_text = text if case_sensitive else text.lower()
    compare_keyword = keyword if case_sensitive else keyword.lower()

    if not whole_word:
        return compare_keyword in compare_text

    # Whole-word matching: keyword must be surrounded by non-word characters
    # or start/end of string.
    word_boundary = re.compile(
        r"(?<![A-Za-z0-9_])" + re.escape(compare_keyword) + r"(?![A-Za-z0-9_])"
    )
    return bool(word_boundary.search(compare_text))
PiiCheckRule.evaluate method · python · L76-L136 (61 LOC)
src/agent_gov/rules/pii_check.py
    def evaluate(
        self,
        action: dict[str, object],
        config: dict[str, object],
    ) -> RuleVerdict:
        """Scan action values for PII patterns.

        Parameters
        ----------
        action:
            The agent action to inspect.
        config:
            Supported keys: ``check_ssn``, ``check_credit_card``,
            ``check_email``, ``check_phone`` (all ``bool``, default ``True``).

        Returns
        -------
        RuleVerdict
            ``passed=False`` when any PII pattern is detected.
        """
        check_ssn: bool = bool(config.get("check_ssn", True))
        check_credit_card: bool = bool(config.get("check_credit_card", True))
        check_email: bool = bool(config.get("check_email", True))
        check_phone: bool = bool(config.get("check_phone", True))

        active_patterns: list[tuple[str, re.Pattern[str]]] = []
        if check_ssn:
            active_patterns.append(("ssn", _SSN_PATTERN))
        if check_credit_ca
PiiCheckRule._scan_dict method · python · L138-L153 (16 LOC)
src/agent_gov/rules/pii_check.py
    def _scan_dict(
        self,
        data: dict[str, object],
        patterns: list[tuple[str, re.Pattern[str]]],
        prefix: str,
        matches: list[_PiiMatch],
    ) -> None:
        """Recursively scan string values in a dict for PII patterns."""
        for key, value in data.items():
            path = f"{prefix}.{key}" if prefix else key
            if isinstance(value, str):
                self._scan_string(value, path, patterns, matches)
            elif isinstance(value, dict):
                self._scan_dict(value, patterns, path, matches)
            elif isinstance(value, list):
                self._scan_list(value, path, patterns, matches)
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
PiiCheckRule._scan_list method · python · L155-L168 (14 LOC)
src/agent_gov/rules/pii_check.py
    def _scan_list(
        self,
        data: list[object],
        prefix: str,
        patterns: list[tuple[str, re.Pattern[str]]],
        matches: list[_PiiMatch],
    ) -> None:
        """Scan list elements for PII patterns."""
        for index, item in enumerate(data):
            path = f"{prefix}[{index}]"
            if isinstance(item, str):
                self._scan_string(item, path, patterns, matches)
            elif isinstance(item, dict):
                self._scan_dict(item, patterns, path, matches)
PiiCheckRule._scan_string method · python · L170-L186 (17 LOC)
src/agent_gov/rules/pii_check.py
    def _scan_string(
        self,
        text: str,
        field_path: str,
        patterns: list[tuple[str, re.Pattern[str]]],
        matches: list[_PiiMatch],
    ) -> None:
        """Apply all active patterns to a single string value."""
        for pattern_name, pattern in patterns:
            for match in pattern.finditer(text):
                matches.append(
                    _PiiMatch(
                        pattern_name=pattern_name,
                        matched_value=match.group(),
                        field_path=field_path,
                    )
                )
PiiCheckRule.validate_config method · python · L188-L195 (8 LOC)
src/agent_gov/rules/pii_check.py
    def validate_config(self, config: dict[str, object]) -> list[str]:
        """Validate that all config keys are recognised boolean flags."""
        known_keys = {"check_ssn", "check_credit_card", "check_email", "check_phone"}
        errors: list[str] = []
        for key in config:
            if key not in known_keys:
                errors.append(f"Unknown config key {key!r} for pii_check rule.")
        return errors
‹ prevpage 4 / 5next ›