Function bodies 212 total
_populate_builtins function · python · L166-L182 (17 LOC)src/agent_gov/plugins/registry.py
def _populate_builtins() -> None:
"""Register all built-in rules and frameworks into the global singletons."""
from agent_gov.rules.cost_limit import CostLimitRule
from agent_gov.rules.keyword_block import KeywordBlockRule
from agent_gov.rules.pii_check import PiiCheckRule
from agent_gov.rules.role_check import RoleCheckRule
for rule_cls in (PiiCheckRule, RoleCheckRule, CostLimitRule, KeywordBlockRule):
rule_registry.register_class(rule_cls.name, rule_cls)
from agent_gov.frameworks.eu_ai_act import EuAiActFramework
from agent_gov.frameworks.gdpr import GdprFramework
from agent_gov.frameworks.hipaa import HipaaFramework
from agent_gov.frameworks.soc2 import Soc2Framework
for fw_cls in (EuAiActFramework, GdprFramework, HipaaFramework, Soc2Framework):
framework_registry.register_class(fw_cls.name, fw_cls)LibraryPolicyInstaller.install method · python · L87-L203 (117 LOC)src/agent_gov/policies/installer.py
def install(
self,
source: str | Path,
target: str | Path,
*,
domain: Optional[str] = None,
overwrite: bool = True,
dry_run: bool = False,
) -> list[InstallResult]:
"""Install policies from *source* into *target*.
Parameters
----------
source:
Directory containing source ``.yaml`` policy files. Scanned
recursively.
target:
Destination directory. Created if it does not exist (unless
``dry_run`` is ``True``).
domain:
When provided, only install policies whose ``domain`` matches.
overwrite:
When ``False``, skip files that already exist in *target*.
dry_run:
When ``True``, report what would be installed without writing any
files or creating directories.
Returns
-------
list[InstallResult]
One entry per discovered YAML file.
LibraryPolicyInstaller.list_available method · python · L205-L231 (27 LOC)src/agent_gov/policies/installer.py
def list_available(
self,
source: str | Path,
*,
domain: Optional[str] = None,
) -> list[str]:
"""List policy IDs available in the source directory.
Parameters
----------
source:
Directory to scan for policy YAML files.
domain:
When provided, only return IDs for the given domain.
Returns
-------
list[str]
Sorted list of policy ``id`` values found.
"""
try:
policies = self._loader.load_directory(
source, recursive=True, domain_filter=domain
)
except LibraryPolicyLoadError:
return []
return sorted(policy.id for policy in policies)LibraryPolicyLoader.load_file method · python · L62-L108 (47 LOC)src/agent_gov/policies/loader.py
def load_file(self, path: str | Path) -> LibraryPolicyConfig:
"""Load a single library policy YAML file.
Parameters
----------
path:
Filesystem path to the ``.yaml`` or ``.yml`` file.
Returns
-------
LibraryPolicyConfig
Validated library policy configuration.
Raises
------
LibraryPolicyLoadError
If the file does not exist, is not valid YAML, or fails
Pydantic schema validation.
"""
resolved = Path(path).resolve()
if not resolved.exists():
raise LibraryPolicyLoadError(resolved, "file does not exist")
if not resolved.is_file():
raise LibraryPolicyLoadError(resolved, "path is not a file")
try:
raw_text = resolved.read_text(encoding="utf-8")
except OSError as exc:
raise LibraryPolicyLoadError(resolved, f"cannot read file: {exc}") from exc
try:
LibraryPolicyLoader.load_directory method · python · L110-L174 (65 LOC)src/agent_gov/policies/loader.py
def load_directory(
self,
directory: str | Path,
*,
recursive: bool = True,
domain_filter: str | None = None,
) -> list[LibraryPolicyConfig]:
"""Load all library YAML policy files from a directory.
Parameters
----------
directory:
Path to a directory containing ``.yaml`` / ``.yml`` policy files.
recursive:
When ``True`` (default), scan subdirectories as well. Set to
``False`` to scan only the top-level directory.
domain_filter:
When provided, only policies whose ``domain`` value matches this
string are returned.
Returns
-------
list[LibraryPolicyConfig]
All successfully validated policies, sorted by file path.
Individual file failures are logged as warnings and skipped.
Raises
------
LibraryPolicyLoadError
If ``directory`` does not exist or is noLibraryPolicyValidator.validate_dict method · python · L100-L166 (67 LOC)src/agent_gov/policies/validator.py
def validate_dict(self, data: object) -> ValidationResult:
"""Validate a raw Python object (from YAML parsing) against the schema.
Parameters
----------
data:
The parsed YAML document, expected to be a ``dict``.
Returns
-------
ValidationResult
Contains ``valid=True`` with no errors, or ``valid=False``
with a populated ``errors`` list.
"""
errors: list[str] = []
if not isinstance(data, dict):
return ValidationResult(valid=False, errors=["Policy document must be a YAML mapping (dict)."])
# Required top-level fields
for required_field in _REQUIRED_TOP_LEVEL_FIELDS:
if required_field not in data:
errors.append(f"Missing required field: '{required_field}'.")
# Domain validation
domain_value = data.get("domain")
if domain_value is not None and domain_value not in _VALID_DOMAINS:
LibraryPolicyValidator.validate_file method · python · L168-L207 (40 LOC)src/agent_gov/policies/validator.py
def validate_file(self, path: str | Path) -> ValidationResult:
"""Validate a YAML policy file on disk.
Parameters
----------
path:
Filesystem path to the ``.yaml`` or ``.yml`` policy file.
Returns
-------
ValidationResult
Contains ``valid=True`` with no errors, or ``valid=False``
with a populated ``errors`` list describing each problem.
"""
resolved = Path(path).resolve()
if not resolved.exists():
return ValidationResult(
valid=False, errors=[f"File does not exist: {resolved}"]
)
if not resolved.is_file():
return ValidationResult(
valid=False, errors=[f"Path is not a file: {resolved}"]
)
try:
raw_text = resolved.read_text(encoding="utf-8")
except OSError as exc:
return ValidationResult(
valid=False, errors=[f"Cannot read fRepobility · code-quality intelligence platform · https://repobility.com
LibraryPolicyValidator.assert_valid method · python · L209-L230 (22 LOC)src/agent_gov/policies/validator.py
def assert_valid(self, data: object) -> LibraryPolicyConfig:
"""Validate and return the parsed policy, raising on failure.
Parameters
----------
data:
The parsed YAML document as a Python ``dict``.
Returns
-------
LibraryPolicyConfig
The validated policy model.
Raises
------
LibraryPolicyValidationError
If validation fails.
"""
result = self.validate_dict(data)
if not result.valid:
raise LibraryPolicyValidationError(result.errors)
return LibraryPolicyConfig.model_validate(data)RuleResolutionError.__init__ method · python · L39-L44 (6 LOC)src/agent_gov/policy/evaluator.py
def __init__(self, rule_type: str) -> None:
self.rule_type = rule_type
super().__init__(
f"No rule implementation registered for type {rule_type!r}. "
"Register the rule with the rule registry before evaluating."
)PolicyEvaluator._register_builtins method · python · L68-L77 (10 LOC)src/agent_gov/policy/evaluator.py
def _register_builtins(self) -> None:
"""Register the four built-in rule implementations."""
from agent_gov.rules.cost_limit import CostLimitRule
from agent_gov.rules.keyword_block import KeywordBlockRule
from agent_gov.rules.pii_check import PiiCheckRule
from agent_gov.rules.role_check import RoleCheckRule
for rule_cls in (PiiCheckRule, RoleCheckRule, CostLimitRule, KeywordBlockRule):
instance = rule_cls()
self._rules[instance.name] = instancePolicyEvaluator.register_rule method · python · L79-L89 (11 LOC)src/agent_gov/policy/evaluator.py
def register_rule(self, rule: PolicyRule) -> None:
"""Register a custom rule implementation.
Parameters
----------
rule:
An instantiated :class:`~agent_gov.policy.rule.PolicyRule`.
If a rule with the same name is already registered it is replaced.
"""
logger.debug("Registering rule %r", rule.name)
self._rules[rule.name] = rulePolicyEvaluator.evaluate method · python · L95-L175 (81 LOC)src/agent_gov/policy/evaluator.py
def evaluate(
self,
policy: PolicyConfig,
action: dict[str, object],
) -> EvaluationReport:
"""Evaluate an agent action against all enabled rules in a policy.
Parameters
----------
policy:
Validated :class:`~agent_gov.policy.schema.PolicyConfig` to
evaluate the action against.
action:
Arbitrary dictionary describing the agent action.
Returns
-------
EvaluationReport
Aggregated result; ``report.passed`` is ``True`` only when every
enabled rule passes.
Raises
------
RuleResolutionError
If :attr:`strict` is ``True`` and a rule type cannot be resolved.
"""
verdicts: list[RuleVerdict] = []
overall_passed = True
timestamp = datetime.now(timezone.utc)
for rule_config in policy.enabled_rules:
rule = self._rules.get(rule_config.type)
if ruPolicyLoader.load_file method · python · L48-L92 (45 LOC)src/agent_gov/policy/loader.py
def load_file(self, path: str | Path) -> PolicyConfig:
"""Load a single YAML policy file.
Parameters
----------
path:
Filesystem path to the ``.yaml`` or ``.yml`` file.
Returns
-------
PolicyConfig
Validated policy configuration.
Raises
------
PolicyLoadError
If the file does not exist, is not valid YAML, or fails
Pydantic validation.
"""
resolved = Path(path).resolve()
if not resolved.exists():
raise PolicyLoadError(resolved, "file does not exist")
if not resolved.is_file():
raise PolicyLoadError(resolved, "path is not a file")
try:
raw_text = resolved.read_text(encoding="utf-8")
except OSError as exc:
raise PolicyLoadError(resolved, f"cannot read file: {exc}") from exc
try:
data = yaml.safe_load(raw_text)
except yaml.YAMLError PolicyLoader.load_directory method · python · L94-L144 (51 LOC)src/agent_gov/policy/loader.py
def load_directory(
self,
directory: str | Path,
*,
recursive: bool = False,
) -> list[PolicyConfig]:
"""Load all YAML policy files from a directory.
Parameters
----------
directory:
Path to a directory containing ``.yaml`` / ``.yml`` files.
recursive:
When ``True``, scan subdirectories as well.
Returns
-------
list[PolicyConfig]
All successfully validated policies, sorted by filename.
Raises
------
PolicyLoadError
If ``directory`` does not exist or is not a directory.
Individual file failures are logged as warnings and skipped.
"""
resolved = Path(directory).resolve()
if not resolved.exists():
raise PolicyLoadError(resolved, "directory does not exist")
if not resolved.is_dir():
raise PolicyLoadError(resolved, "path is not a directory")
PolicyLoader.load_string method · python · L146-L182 (37 LOC)src/agent_gov/policy/loader.py
def load_string(self, content: str, *, source_name: str = "<string>") -> PolicyConfig:
"""Load a policy from a YAML string.
Useful for testing or loading policies from configuration stores.
Parameters
----------
content:
Raw YAML text.
source_name:
Logical name used in error messages.
Returns
-------
PolicyConfig
Validated policy configuration.
Raises
------
PolicyLoadError
If the content is not valid YAML or fails schema validation.
"""
fake_path = Path(source_name)
try:
data = yaml.safe_load(content)
except yaml.YAMLError as exc:
raise PolicyLoadError(fake_path, f"YAML parse error: {exc}") from exc
if not isinstance(data, dict):
raise PolicyLoadError(fake_path, "YAML root must be a mapping (dict)")
try:
policy = PolicyConfig.model_validatProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EvaluationReport.to_dict method · python · L65-L75 (11 LOC)src/agent_gov/policy/result.py
def to_dict(self) -> dict[str, object]:
"""Serialise the report to a plain dictionary."""
return {
"policy_name": self.policy_name,
"action": self.action,
"passed": self.passed,
"timestamp": self.timestamp.isoformat(),
"violation_count": self.violation_count,
"highest_severity": self.highest_severity,
"verdicts": [v.to_dict() for v in self.verdicts],
}EvaluationReport.summary method · python · L77-L84 (8 LOC)src/agent_gov/policy/result.py
def summary(self) -> str:
"""Return a one-line human-readable summary."""
status = "PASS" if self.passed else "FAIL"
return (
f"[{status}] policy={self.policy_name!r} "
f"violations={self.violation_count} "
f"highest_severity={self.highest_severity}"
)RuleVerdict.to_dict method · python · L59-L67 (9 LOC)src/agent_gov/policy/rule.py
def to_dict(self) -> dict[str, object]:
"""Serialise verdict to a plain dictionary."""
return {
"rule_name": self.rule_name,
"passed": self.passed,
"severity": self.severity,
"message": self.message,
"details": self.details,
}PolicyRule.evaluate method · python · L84-L104 (21 LOC)src/agent_gov/policy/rule.py
def evaluate(
self,
action: dict[str, object],
config: dict[str, object],
) -> RuleVerdict:
"""Evaluate an agent action against this rule.
Parameters
----------
action:
Arbitrary key/value dictionary describing the agent action being
evaluated. The keys available depend on the calling system.
config:
The ``params`` dict from the matching
:class:`~agent_gov.policy.schema.RuleConfig` instance.
Returns
-------
RuleVerdict
The result of this rule's evaluation.
"""PolicyRule.validate_config method · python · L106-L124 (19 LOC)src/agent_gov/policy/rule.py
def validate_config(self, config: dict[str, object]) -> list[str]:
"""Validate the rule's configuration parameters.
Override this method to provide config validation at policy load time.
Return a list of human-readable error strings; an empty list means the
config is valid.
Parameters
----------
config:
The ``params`` dict from the matching
:class:`~agent_gov.policy.schema.RuleConfig` instance.
Returns
-------
list[str]
Validation error messages. Empty list means valid.
"""
return []ReportGenerator.governance_report method · python · L47-L124 (78 LOC)src/agent_gov/reporting/generator.py
def governance_report(
self,
*,
policy_name: Optional[str] = None,
evaluation_reports: Optional[list[EvaluationReport]] = None,
title: str = "Agent Governance Report",
) -> dict[str, object]:
"""Build a governance report payload.
Parameters
----------
policy_name:
Filter audit entries to a specific policy name.
evaluation_reports:
Recent :class:`~agent_gov.policy.result.EvaluationReport` objects
to include in the report.
title:
Report title string.
Returns
-------
dict[str, object]
Structured report data suitable for rendering.
"""
generated_at = datetime.now(timezone.utc).isoformat()
# Build audit summary
audit_summary: dict[str, object] = {}
if self._reader is not None:
all_stats = self._reader.stats()
if policy_name:
policy_ReportGenerator.compliance_report method · python · L126-L162 (37 LOC)src/agent_gov/reporting/generator.py
def compliance_report(
self,
*,
framework_reports: list[FrameworkReport],
title: str = "Compliance Framework Report",
) -> dict[str, object]:
"""Build a compliance framework report payload.
Parameters
----------
framework_reports:
List of :class:`~agent_gov.frameworks.base.FrameworkReport` objects.
title:
Report title string.
Returns
-------
dict[str, object]
Structured report data suitable for rendering.
"""
generated_at = datetime.now(timezone.utc).isoformat()
frameworks_data: list[dict[str, object]] = [
report.to_dict() for report in framework_reports
]
overall_score: float = 0.0
if framework_reports:
overall_score = sum(r.score for r in framework_reports) / len(framework_reports)
return {
"title": title,
"generated_at": generated_at,
ReportGenerator.full_report method · python · L164-L207 (44 LOC)src/agent_gov/reporting/generator.py
def full_report(
self,
*,
policy_name: Optional[str] = None,
evaluation_reports: Optional[list[EvaluationReport]] = None,
framework_reports: Optional[list[FrameworkReport]] = None,
title: str = "Full Agent Governance and Compliance Report",
) -> dict[str, object]:
"""Build a combined governance and compliance report.
Parameters
----------
policy_name:
Optional policy name filter for audit data.
evaluation_reports:
Optional list of evaluation reports.
framework_reports:
Optional list of framework check reports.
title:
Report title string.
Returns
-------
dict[str, object]
Combined report payload.
"""
gov_report = self.governance_report(
policy_name=policy_name,
evaluation_reports=evaluation_reports,
title=title,
)
compliance:Repobility analyzer · published findings · https://repobility.com
JsonReporter.render method · python · L38-L57 (20 LOC)src/agent_gov/reporting/json_report.py
def render(self, payload: dict[str, object]) -> str:
"""Render a report payload to a JSON string.
Parameters
----------
payload:
Structured report data (produced by
:class:`~agent_gov.reporting.generator.ReportGenerator`).
Returns
-------
str
Pretty-printed JSON string.
"""
return json.dumps(
payload,
indent=self._indent,
ensure_ascii=self._ensure_ascii,
default=str, # Fallback for datetime and other non-JSON types
)JsonReporter.write method · python · L59-L78 (20 LOC)src/agent_gov/reporting/json_report.py
def write(self, payload: dict[str, object], output_path: str | Path) -> Path:
"""Render and write a report to a JSON file.
Parameters
----------
payload:
Structured report data.
output_path:
Destination file path. Parent directories are created if needed.
Returns
-------
Path
The resolved path of the written file.
"""
resolved = Path(output_path).resolve()
resolved.parent.mkdir(parents=True, exist_ok=True)
json_content = self.render(payload)
resolved.write_text(json_content, encoding="utf-8")
return resolvedMarkdownReporter.__init__ method · python · L45-L53 (9 LOC)src/agent_gov/reporting/markdown.py
def __init__(self) -> None:
self._env: Optional[object] = None
if _JINJA2_AVAILABLE:
self._env = Environment( # type: ignore[assignment]
loader=FileSystemLoader(str(_TEMPLATES_DIR)),
autoescape=select_autoescape([]),
trim_blocks=True,
lstrip_blocks=True,
)MarkdownReporter.render_governance method · python · L55-L75 (21 LOC)src/agent_gov/reporting/markdown.py
def render_governance(self, payload: dict[str, object]) -> str:
"""Render a governance report payload to Markdown.
Parameters
----------
payload:
Structured governance report data produced by
:class:`~agent_gov.reporting.generator.ReportGenerator`.
Returns
-------
str
Markdown-formatted report string.
"""
if self._env is not None:
from jinja2 import Environment
env = self._env # type: ignore[assignment]
template = env.get_template("governance_report.md.j2") # type: ignore[union-attr]
return template.render(**payload) # type: ignore[union-attr]
return _fallback_governance(payload)MarkdownReporter.render_compliance method · python · L77-L96 (20 LOC)src/agent_gov/reporting/markdown.py
def render_compliance(self, payload: dict[str, object]) -> str:
"""Render a compliance framework report payload to Markdown.
Parameters
----------
payload:
Structured compliance report data.
Returns
-------
str
Markdown-formatted report string.
"""
if self._env is not None:
from jinja2 import Environment
env = self._env # type: ignore[assignment]
template = env.get_template("compliance_report.md.j2") # type: ignore[union-attr]
return template.render(**payload) # type: ignore[union-attr]
return _fallback_compliance(payload)MarkdownReporter.write_governance method · python · L98-L120 (23 LOC)src/agent_gov/reporting/markdown.py
def write_governance(
self,
payload: dict[str, object],
output_path: str | Path,
) -> Path:
"""Render and write a governance report to a Markdown file.
Parameters
----------
payload:
Structured governance report data.
output_path:
Destination file path.
Returns
-------
Path
Resolved path of the written file.
"""
resolved = Path(output_path).resolve()
resolved.parent.mkdir(parents=True, exist_ok=True)
resolved.write_text(self.render_governance(payload), encoding="utf-8")
return resolvedMarkdownReporter.write_compliance method · python · L122-L144 (23 LOC)src/agent_gov/reporting/markdown.py
def write_compliance(
self,
payload: dict[str, object],
output_path: str | Path,
) -> Path:
"""Render and write a compliance report to a Markdown file.
Parameters
----------
payload:
Structured compliance report data.
output_path:
Destination file path.
Returns
-------
Path
Resolved path of the written file.
"""
resolved = Path(output_path).resolve()
resolved.parent.mkdir(parents=True, exist_ok=True)
resolved.write_text(self.render_compliance(payload), encoding="utf-8")
return resolved_fallback_governance function · python · L147-L183 (37 LOC)src/agent_gov/reporting/markdown.py
def _fallback_governance(payload: dict[str, object]) -> str:
"""Plain-text Markdown fallback when Jinja2 is not installed."""
lines: list[str] = [
f"# {payload.get('title', 'Agent Governance Report')}",
"",
f"**Generated:** {payload.get('generated_at', 'N/A')} ",
f"**Policy:** {payload.get('policy_name', 'All')} ",
f"**Pass Rate:** {payload.get('pass_rate_percent', 'N/A')}%",
"",
"## Audit Summary",
"",
]
audit = payload.get("audit_summary", {})
if isinstance(audit, dict):
lines.append(f"- Total entries: {audit.get('total_entries', 0)}")
lines.append(f"- Passed: {audit.get('pass_count', 0)}")
lines.append(f"- Failed: {audit.get('fail_count', 0)}")
agents = audit.get("agents", [])
if isinstance(agents, list):
lines.append(f"- Distinct agents: {len(agents)}")
lines += ["", "## Evaluation Results", ""]
evals = payload.get("evaluation_results", If a scraper extracted this row, it came from Repobility (https://repobility.com)
_fallback_compliance function · python · L186-L222 (37 LOC)src/agent_gov/reporting/markdown.py
def _fallback_compliance(payload: dict[str, object]) -> str:
"""Plain-text Markdown fallback when Jinja2 is not installed."""
lines: list[str] = [
f"# {payload.get('title', 'Compliance Report')}",
"",
f"**Generated:** {payload.get('generated_at', 'N/A')} ",
f"**Overall Score:** {payload.get('overall_score', 0.0):.1f}%",
"",
]
frameworks = payload.get("frameworks", [])
if isinstance(frameworks, list):
for framework_data in frameworks:
if not isinstance(framework_data, dict):
continue
lines.append(f"## {framework_data.get('framework', 'Framework')}")
lines.append("")
lines.append(f"**Score:** {framework_data.get('score_percent', 0.0):.1f}% ")
lines.append(
f"**Passed:** {framework_data.get('passed', 0)} / "
f"{framework_data.get('total', 0)}"
)
lines.append("")
results = framewlist_templates function · python · L23-L35 (13 LOC)src/agent_gov/reporting/templates/__init__.py
def list_templates() -> list[str]:
"""Return the names of all built-in Jinja2 templates.
Returns
-------
list[str]
Sorted list of template file names available in this package.
"""
return sorted(
p.name
for p in _TEMPLATES_DIR.iterdir()
if p.suffix in (".j2", ".jinja2") and p.is_file()
)get_template function · python · L38-L62 (25 LOC)src/agent_gov/reporting/templates/__init__.py
def get_template(name: str) -> str:
"""Return the raw text content of a built-in template.
Parameters
----------
name:
Template file name (e.g. ``"governance_report.md.j2"``).
Returns
-------
str
Raw template text.
Raises
------
FileNotFoundError
If no template with ``name`` exists in the templates directory.
"""
template_path = _TEMPLATES_DIR / name
if not template_path.is_file():
available = list_templates()
raise FileNotFoundError(
f"Template {name!r} not found. Available templates: {available!r}"
)
return template_path.read_text(encoding="utf-8")write_template function · python · L65-L91 (27 LOC)src/agent_gov/reporting/templates/__init__.py
def write_template(name: str, destination: str | Path) -> Path:
"""Copy a built-in template to a destination path.
Useful for scaffolding custom templates that extend the built-ins.
Parameters
----------
name:
Template file name to copy.
destination:
Destination file path. Parent directories are created if needed.
Returns
-------
Path
Resolved path of the written file.
Raises
------
FileNotFoundError
If the named template does not exist.
"""
content = get_template(name)
resolved = Path(destination).resolve()
resolved.parent.mkdir(parents=True, exist_ok=True)
resolved.write_text(content, encoding="utf-8")
return resolvedget_template function · python · L92-L115 (24 LOC)src/agent_gov/reporting/templates.py
def get_template(name: str) -> str:
"""Return the YAML source for a built-in policy template.
Parameters
----------
name:
Template name. Use :func:`list_templates` to discover available names.
Returns
-------
str
YAML-formatted policy template string.
Raises
------
KeyError
If ``name`` is not a recognised built-in template.
"""
if name not in TEMPLATES:
available = list_templates()
raise KeyError(
f"Unknown template: {name!r}. Available templates: {available}"
)
return TEMPLATES[name]list_templates function · python · L118-L126 (9 LOC)src/agent_gov/reporting/templates.py
def list_templates() -> list[str]:
"""Return the names of all available built-in policy templates.
Returns
-------
list[str]
Alphabetically sorted list of template names.
"""
return sorted(TEMPLATES.keys())write_template function · python · L129-L154 (26 LOC)src/agent_gov/reporting/templates.py
def write_template(name: str, output_path: Path) -> Path:
"""Write a built-in policy template to ``output_path``.
Parent directories are created automatically if they do not exist.
Parameters
----------
name:
Template name, as returned by :func:`list_templates`.
output_path:
Destination file path (typically ending in ``.yaml``).
Returns
-------
Path
The resolved path of the written file.
Raises
------
KeyError
If ``name`` is not a recognised built-in template.
"""
content = get_template(name)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(content, encoding="utf-8")
return output_pathCostLimitRule.evaluate method · python · L55-L153 (99 LOC)src/agent_gov/rules/cost_limit.py
def evaluate(
self,
action: dict[str, object],
config: dict[str, object],
) -> RuleVerdict:
"""Check action cost against configured thresholds.
Parameters
----------
action:
Must contain the key identified by ``cost_field`` (default
``"cost"``). The value must be numeric (``int`` or ``float``).
Missing cost field is treated as ``0.0``.
config:
Supported keys: ``max_cost_per_action``, ``max_cost_aggregate``,
``cost_field``.
Returns
-------
RuleVerdict
``passed=False`` when either threshold is exceeded.
"""
cost_field: str = str(config.get("cost_field", "cost"))
max_per_action: float = float(config.get("max_cost_per_action", 0))
max_aggregate: float = float(config.get("max_cost_aggregate", 0))
raw_cost = action.get(cost_field, 0)
try:
action_cost = float(raw_cRepobility · code-quality intelligence platform · https://repobility.com
CostLimitRule.validate_config method · python · L155-L166 (12 LOC)src/agent_gov/rules/cost_limit.py
def validate_config(self, config: dict[str, object]) -> list[str]:
"""Validate cost threshold values are non-negative numbers."""
errors: list[str] = []
for key in ("max_cost_per_action", "max_cost_aggregate"):
value = config.get(key)
if value is not None:
try:
if float(value) < 0: # type: ignore[arg-type]
errors.append(f"cost_limit: {key!r} must be non-negative.")
except (TypeError, ValueError):
errors.append(f"cost_limit: {key!r} must be a number, got {value!r}.")
return errorsKeywordBlockRule.evaluate method · python · L46-L111 (66 LOC)src/agent_gov/rules/keyword_block.py
def evaluate(
self,
action: dict[str, object],
config: dict[str, object],
) -> RuleVerdict:
"""Search action string values for blocked keywords.
Parameters
----------
action:
Arbitrary action dict. All string values (recursive) are scanned.
config:
Supported keys: ``keywords`` (list[str]), ``case_sensitive`` (bool),
``match_whole_word`` (bool).
Returns
-------
RuleVerdict
``passed=False`` when any blocked keyword is found.
"""
raw_keywords = config.get("keywords", [])
if not isinstance(raw_keywords, list):
raw_keywords = [str(raw_keywords)]
keywords: list[str] = [str(k) for k in raw_keywords]
case_sensitive: bool = bool(config.get("case_sensitive", False))
match_whole_word: bool = bool(config.get("match_whole_word", False))
if not keywords:
return RuleVerdict(
KeywordBlockRule.validate_config method · python · L113-L125 (13 LOC)src/agent_gov/rules/keyword_block.py
def validate_config(self, config: dict[str, object]) -> list[str]:
"""Validate that ``keywords`` is a non-empty list."""
errors: list[str] = []
keywords = config.get("keywords")
if keywords is None:
errors.append("keyword_block: 'keywords' is not configured.")
elif not isinstance(keywords, list):
errors.append(
f"keyword_block: 'keywords' must be a list, got {type(keywords).__name__}."
)
elif not keywords:
errors.append("keyword_block: 'keywords' list must not be empty.")
return errors_extract_strings function · python · L128-L142 (15 LOC)src/agent_gov/rules/keyword_block.py
def _extract_strings(
data: dict[str, object],
prefix: str = "",
) -> list[tuple[str, str]]:
"""Recursively extract (field_path, string_value) pairs from a dict."""
results: list[tuple[str, str]] = []
for key, value in data.items():
path = f"{prefix}.{key}" if prefix else key
if isinstance(value, str):
results.append((path, value))
elif isinstance(value, dict):
results.extend(_extract_strings(value, path))
elif isinstance(value, list):
results.extend(_extract_strings_from_list(value, path))
return results_extract_strings_from_list function · python · L145-L159 (15 LOC)src/agent_gov/rules/keyword_block.py
def _extract_strings_from_list(
data: list[object],
prefix: str,
) -> list[tuple[str, str]]:
"""Recursively extract strings from a list."""
results: list[tuple[str, str]] = []
for index, item in enumerate(data):
path = f"{prefix}[{index}]"
if isinstance(item, str):
results.append((path, item))
elif isinstance(item, dict):
results.extend(_extract_strings(item, path))
elif isinstance(item, list):
results.extend(_extract_strings_from_list(item, path))
return results_matches function · python · L162-L181 (20 LOC)src/agent_gov/rules/keyword_block.py
def _matches(
text: str,
keyword: str,
*,
case_sensitive: bool,
whole_word: bool,
) -> bool:
"""Return True when ``keyword`` is found in ``text``."""
compare_text = text if case_sensitive else text.lower()
compare_keyword = keyword if case_sensitive else keyword.lower()
if not whole_word:
return compare_keyword in compare_text
# Whole-word matching: keyword must be surrounded by non-word characters
# or start/end of string.
word_boundary = re.compile(
r"(?<![A-Za-z0-9_])" + re.escape(compare_keyword) + r"(?![A-Za-z0-9_])"
)
return bool(word_boundary.search(compare_text))PiiCheckRule.evaluate method · python · L76-L136 (61 LOC)src/agent_gov/rules/pii_check.py
def evaluate(
self,
action: dict[str, object],
config: dict[str, object],
) -> RuleVerdict:
"""Scan action values for PII patterns.
Parameters
----------
action:
The agent action to inspect.
config:
Supported keys: ``check_ssn``, ``check_credit_card``,
``check_email``, ``check_phone`` (all ``bool``, default ``True``).
Returns
-------
RuleVerdict
``passed=False`` when any PII pattern is detected.
"""
check_ssn: bool = bool(config.get("check_ssn", True))
check_credit_card: bool = bool(config.get("check_credit_card", True))
check_email: bool = bool(config.get("check_email", True))
check_phone: bool = bool(config.get("check_phone", True))
active_patterns: list[tuple[str, re.Pattern[str]]] = []
if check_ssn:
active_patterns.append(("ssn", _SSN_PATTERN))
if check_credit_caPiiCheckRule._scan_dict method · python · L138-L153 (16 LOC)src/agent_gov/rules/pii_check.py
def _scan_dict(
self,
data: dict[str, object],
patterns: list[tuple[str, re.Pattern[str]]],
prefix: str,
matches: list[_PiiMatch],
) -> None:
"""Recursively scan string values in a dict for PII patterns."""
for key, value in data.items():
path = f"{prefix}.{key}" if prefix else key
if isinstance(value, str):
self._scan_string(value, path, patterns, matches)
elif isinstance(value, dict):
self._scan_dict(value, patterns, path, matches)
elif isinstance(value, list):
self._scan_list(value, path, patterns, matches)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
PiiCheckRule._scan_list method · python · L155-L168 (14 LOC)src/agent_gov/rules/pii_check.py
def _scan_list(
self,
data: list[object],
prefix: str,
patterns: list[tuple[str, re.Pattern[str]]],
matches: list[_PiiMatch],
) -> None:
"""Scan list elements for PII patterns."""
for index, item in enumerate(data):
path = f"{prefix}[{index}]"
if isinstance(item, str):
self._scan_string(item, path, patterns, matches)
elif isinstance(item, dict):
self._scan_dict(item, patterns, path, matches)PiiCheckRule._scan_string method · python · L170-L186 (17 LOC)src/agent_gov/rules/pii_check.py
def _scan_string(
self,
text: str,
field_path: str,
patterns: list[tuple[str, re.Pattern[str]]],
matches: list[_PiiMatch],
) -> None:
"""Apply all active patterns to a single string value."""
for pattern_name, pattern in patterns:
for match in pattern.finditer(text):
matches.append(
_PiiMatch(
pattern_name=pattern_name,
matched_value=match.group(),
field_path=field_path,
)
)PiiCheckRule.validate_config method · python · L188-L195 (8 LOC)src/agent_gov/rules/pii_check.py
def validate_config(self, config: dict[str, object]) -> list[str]:
"""Validate that all config keys are recognised boolean flags."""
known_keys = {"check_ssn", "check_credit_card", "check_email", "check_phone"}
errors: list[str] = []
for key in config:
if key not in known_keys:
errors.append(f"Unknown config key {key!r} for pii_check rule.")
return errors