Function bodies 212 total
_extract_severity function · python · L276-L281 (6 LOC)src/agent_gov/authoring/nl_compiler.py
def _extract_severity(text_lower: str) -> str:
"""Extract severity from normalized text."""
for keyword, severity in _SEVERITY_KEYWORDS.items():
if keyword in text_lower:
return severity
return "medium"_extract_cost_limit function · python · L284-L293 (10 LOC)src/agent_gov/authoring/nl_compiler.py
def _extract_cost_limit(text: str) -> Optional[float]:
"""Extract a numeric cost limit from text like 'max $5.00' or 'limit to 2.50'."""
pattern = r"\$?\s*(\d+(?:\.\d+)?)"
match = re.search(pattern, text)
if match:
try:
return float(match.group(1))
except ValueError:
return None
return None_make_rule_name function · python · L301-L306 (6 LOC)src/agent_gov/authoring/nl_compiler.py
def _make_rule_name(action: str, subject: str, target: str) -> str:
"""Generate a slug rule name from action, subject, and target."""
parts = [action, subject.replace(" ", "-")]
if target and target != "any":
parts.append(target.replace("_", "-"))
return "-".join(p for p in parts if p)NlCompiler.__init__ method · python · L340-L347 (8 LOC)src/agent_gov/authoring/nl_compiler.py
def __init__(
self,
*,
policy_name: str = "generated-policy",
strict: bool = False,
) -> None:
self._default_policy_name = policy_name
self._strict = strictNlCompiler.parse_statement method · python · L349-L399 (51 LOC)src/agent_gov/authoring/nl_compiler.py
def parse_statement(self, statement: str) -> ParsedStatement:
"""Parse a single natural language statement into a ParsedStatement.
Parameters
----------
statement:
A plain English policy statement.
Returns
-------
ParsedStatement
Parsed intermediate representation.
Raises
------
NlCompilerError
If strict mode is enabled and the statement cannot be parsed.
"""
cleaned = statement.strip()
text_lower = cleaned.lower()
action = _extract_action(text_lower)
subject, _rule_type, params = _extract_subject(text_lower)
target = _extract_target(text_lower)
severity = _extract_severity(text_lower)
# Enhance params with extracted values
if subject == "cost":
cost_limit = _extract_cost_limit(cleaned)
if cost_limit is not None:
params["max_cost"] = cost_limit
NlCompiler.compile_statement method · python · L401-L446 (46 LOC)src/agent_gov/authoring/nl_compiler.py
def compile_statement(self, statement: str) -> Optional[CompiledRule]:
"""Compile a single natural language statement to a CompiledRule.
Returns None if the statement cannot be compiled and strict=False.
Raises
------
NlCompilerError
If strict=True and compilation fails.
"""
text_lower = statement.strip().lower()
subject, rule_type, default_params = _extract_subject(text_lower)
action = _extract_action(text_lower)
target = _extract_target(text_lower)
severity = _extract_severity(text_lower)
if not subject:
if self._strict:
raise NlCompilerError(
f"Cannot determine rule subject from: {statement!r}"
)
return None
params = dict(default_params)
# Enhance params from statement
if subject == "cost":
cost_limit = _extract_cost_limit(statement)
if cost_limiNlCompiler.compile method · python · L448-L482 (35 LOC)src/agent_gov/authoring/nl_compiler.py
def compile(
self,
statement: str,
*,
policy_name: Optional[str] = None,
description: str = "",
) -> CompiledPolicy:
"""Compile a single statement into a one-rule policy.
Parameters
----------
statement:
Natural language policy statement.
policy_name:
Override for the policy name.
description:
Optional policy description.
Returns
-------
CompiledPolicy
Policy with one compiled rule (or zero rules if compilation failed).
"""
name = policy_name or self._default_policy_name
policy = CompiledPolicy(
name=name,
description=description or f"Policy generated from: {statement}",
source_statements=[statement],
)
rule = self.compile_statement(statement)
if rule is not None:
policy.rules.append(rule)
else:
policy.waAll rows scored by the Repobility analyzer (https://repobility.com)
NlCompiler.compile_many method · python · L484-L530 (47 LOC)src/agent_gov/authoring/nl_compiler.py
def compile_many(
self,
statements: list[str],
*,
policy_name: Optional[str] = None,
description: str = "",
) -> CompiledPolicy:
"""Compile multiple statements into a single multi-rule policy.
Parameters
----------
statements:
List of natural language policy statements.
policy_name:
Override for the policy name.
description:
Optional policy description.
Returns
-------
CompiledPolicy
Policy aggregating all successfully compiled rules.
"""
name = policy_name or self._default_policy_name
policy = CompiledPolicy(
name=name,
description=description or "Policy generated from natural language statements.",
source_statements=list(statements),
)
seen_names: set[str] = set()
for statement in statements:
rule = self.compile_statementNlCompiler.compile_text_block method · python · L532-L562 (31 LOC)src/agent_gov/authoring/nl_compiler.py
def compile_text_block(
self,
text: str,
*,
policy_name: Optional[str] = None,
description: str = "",
) -> CompiledPolicy:
"""Compile a multi-line text block where each line is a statement.
Empty lines and lines starting with ``#`` are treated as comments and
skipped.
Parameters
----------
text:
Multi-line string with one statement per line.
policy_name:
Override for the policy name.
description:
Optional policy description.
"""
statements = [
line.strip()
for line in text.splitlines()
if line.strip() and not line.strip().startswith("#")
]
return self.compile_many(
statements,
policy_name=policy_name,
description=description,
)version_command function · python · L48-L57 (10 LOC)src/agent_gov/cli/main.py
def version_command() -> None:
"""Show detailed version information."""
import platform
console.print(Panel.fit(
f"[bold cyan]agent-gov[/bold cyan] [green]v{__version__}[/green]\n"
f"Python {platform.python_version()} | {platform.system()} {platform.machine()}",
title="Version Info",
border_style="cyan",
))check_command function · python · L92-L167 (76 LOC)src/agent_gov/cli/main.py
def check_command(
policy_path: str,
action_json: str,
audit_log_path: Optional[str],
agent_id: str,
) -> None:
"""Evaluate an action against a policy and print the verdict."""
from agent_gov.policy.evaluator import PolicyEvaluator
from agent_gov.policy.loader import PolicyLoader, PolicyLoadError
# Load policy
loader = PolicyLoader()
try:
policy = loader.load_file(policy_path)
except PolicyLoadError as exc:
err_console.print(f"[red]Error loading policy:[/red] {exc}")
sys.exit(1)
# Parse action
try:
action: dict[str, object] = json.loads(action_json)
except json.JSONDecodeError as exc:
err_console.print(f"[red]Invalid JSON for --action:[/red] {exc}")
sys.exit(1)
if not isinstance(action, dict):
err_console.print("[red]--action must be a JSON object (dict).[/red]")
sys.exit(1)
# Evaluate
evaluator = PolicyEvaluator()
report = evaluator.evaluate(policaudit_show_command function · python · L195-L233 (39 LOC)src/agent_gov/cli/main.py
def audit_show_command(log_path: str, last_n: int) -> None:
"""Show the most recent audit log entries."""
from agent_gov.audit.reader import AuditReader
reader = AuditReader(log_path)
if not Path(log_path).exists():
console.print(f"[yellow]Audit log not found:[/yellow] {log_path}")
return
entries = reader.last(last_n)
if not entries:
console.print("[yellow]No audit entries found.[/yellow]")
return
table = Table(title=f"Last {last_n} Audit Entries", box=None, padding=(0, 1))
table.add_column("Timestamp", style="dim")
table.add_column("Agent ID", style="cyan")
table.add_column("Action Type")
table.add_column("Policy")
table.add_column("Verdict", justify="center")
for entry in reversed(entries):
verdict_text = (
Text("PASS", style="green") if entry.verdict == "pass" else Text("FAIL", style="red")
)
table.add_row(
entry.timestamp.strftime("%Y-%m-%d %H:%M:%audit_query_command function · python · L261-L319 (59 LOC)src/agent_gov/cli/main.py
def audit_query_command(
log_path: str,
agent_id: Optional[str],
action_type: Optional[str],
verdict: Optional[str],
policy_name: Optional[str],
since_str: Optional[str],
limit: int,
) -> None:
"""Query the audit log with filters."""
from agent_gov.audit.reader import AuditReader
since: Optional[datetime] = None
if since_str:
try:
since = datetime.fromisoformat(since_str)
if since.tzinfo is None:
since = since.replace(tzinfo=timezone.utc)
except ValueError:
err_console.print(f"[red]Invalid --since date:[/red] {since_str!r}")
sys.exit(1)
reader = AuditReader(log_path)
if not Path(log_path).exists():
console.print(f"[yellow]Audit log not found:[/yellow] {log_path}")
return
entries = reader.query(
agent_id=agent_id,
action_type=action_type,
verdict=verdict,
policy_name=policy_name,
since=since,
report_generate_command function · python · L368-L409 (42 LOC)src/agent_gov/cli/main.py
def report_generate_command(
policy_path: str,
output_format: str,
output_path: Optional[str],
audit_log_path: str,
title: str,
) -> None:
"""Generate a governance report from a policy and audit log."""
from agent_gov.audit.reader import AuditReader
from agent_gov.policy.loader import PolicyLoader, PolicyLoadError
from agent_gov.reporting.generator import ReportGenerator
from agent_gov.reporting.json_report import JsonReporter
from agent_gov.reporting.markdown import MarkdownReporter
# Load policy
loader = PolicyLoader()
try:
policy = loader.load_file(policy_path)
except PolicyLoadError as exc:
err_console.print(f"[red]Error loading policy:[/red] {exc}")
sys.exit(1)
# Build reader if audit log exists
audit_reader: Optional[AuditReader] = None
if Path(audit_log_path).exists():
audit_reader = AuditReader(audit_log_path)
generator = ReportGenerator(audit_reader=audit_reader)
frameworks_list_command function · python · L423-L441 (19 LOC)src/agent_gov/cli/main.py
def frameworks_list_command() -> None:
"""List all available compliance frameworks."""
from agent_gov.plugins.registry import framework_registry
table = Table(title="Available Compliance Frameworks", box=None, padding=(0, 1))
table.add_column("Name", style="cyan")
table.add_column("Version")
table.add_column("Description")
for fw_name in framework_registry.list_names():
fw_cls = framework_registry.get(fw_name)
instance = fw_cls()
table.add_row(
fw_name,
getattr(instance, "version", "—"),
getattr(instance, "description", "")[:80],
)
console.print(table)Powered by Repobility — scan your code at https://repobility.com
frameworks_check_command function · python · L464-L539 (76 LOC)src/agent_gov/cli/main.py
def frameworks_check_command(
framework_name: str,
evidence_path: Optional[str],
output_path: Optional[str],
) -> None:
"""Run a compliance checklist against evidence."""
import yaml
from agent_gov.plugins.registry import framework_registry
from agent_gov.reporting.json_report import JsonReporter
if framework_name not in framework_registry:
available = framework_registry.list_names()
err_console.print(
f"[red]Unknown framework:[/red] {framework_name!r}. "
f"Available: {available!r}"
)
sys.exit(1)
fw_cls = framework_registry.get(framework_name)
framework = fw_cls()
evidence: dict[str, object] = {}
if evidence_path:
try:
raw = Path(evidence_path).read_text(encoding="utf-8")
loaded = yaml.safe_load(raw)
if isinstance(loaded, dict):
evidence = loaded
except Exception as exc:
err_console.print(f"[red]Errorinit_command function · python · L563-L594 (32 LOC)src/agent_gov/cli/main.py
def init_command(preset: str, output_path: str) -> None:
"""Initialize a project with a policy pack."""
import importlib.resources
# Load the pack from the package data
packs_dir = Path(__file__).parent.parent / "packs"
pack_file = packs_dir / f"{preset}.yaml"
if not pack_file.exists():
err_console.print(f"[red]Pack file not found:[/red] {pack_file}")
sys.exit(1)
destination = Path(output_path)
if destination.exists():
if not click.confirm(
f"[yellow]{output_path}[/yellow] already exists. Overwrite?", default=False
):
console.print("[yellow]Aborted.[/yellow]")
return
destination.parent.mkdir(parents=True, exist_ok=True)
destination.write_text(pack_file.read_text(encoding="utf-8"), encoding="utf-8")
console.print(
f"[green]Initialized[/green] [cyan]{preset}[/cyan] policy pack → [bold]{output_path}[/bold]"
)
console.print(
"\nNext steps:\n"
plugins_list_command function · python · L608-L639 (32 LOC)src/agent_gov/cli/main.py
def plugins_list_command() -> None:
"""List all registered rules and compliance frameworks."""
from agent_gov.plugins.registry import framework_registry, rule_registry
# Rules table
rule_table = Table(title="Registered Rules", box=None, padding=(0, 1))
rule_table.add_column("Name", style="cyan")
rule_table.add_column("Class")
for rule_name in rule_registry.list_names():
rule_cls = rule_registry.get(rule_name)
rule_table.add_row(rule_name, rule_cls.__qualname__)
console.print(rule_table)
console.print()
# Frameworks table
fw_table = Table(title="Registered Frameworks", box=None, padding=(0, 1))
fw_table.add_column("Name", style="cyan")
fw_table.add_column("Version")
fw_table.add_column("Class")
for fw_name in framework_registry.list_names():
fw_cls = framework_registry.get(fw_name)
instance = fw_cls()
fw_table.add_row(
fw_name,
getattr(instance, "version", "—"classify_command function · python · L677-L729 (53 LOC)src/agent_gov/cli/main.py
def classify_command(
description: str,
use_case: tuple[str, ...],
data_category: tuple[str, ...],
output_format: str,
) -> None:
"""Classify an AI system's risk level per the EU AI Act (Annex III)."""
import dataclasses
from agent_gov.frameworks.eu_ai_act_classifier import EUAIActClassifier
classifier = EUAIActClassifier()
result = classifier.classify(
system_description=description,
use_cases=list(use_case) if use_case else None,
data_categories=list(data_category) if data_category else None,
)
if output_format == "json":
click.echo(json.dumps(dataclasses.asdict(result), indent=2))
return
# Rich table output
level_colours: dict[str, str] = {
"unacceptable": "red",
"high": "yellow",
"limited": "cyan",
"minimal": "green",
}
colour = level_colours.get(result.level.value, "white")
console.print(
Panel.fit(
f"[bold {colour}]{resuldocument_command function · python · L771-L815 (45 LOC)src/agent_gov/cli/main.py
def document_command(
system_name: str,
provider: str,
description: str,
output: str,
output_format: str,
) -> None:
"""Generate EU AI Act Annex IV technical documentation."""
from pathlib import Path as _Path
from agent_gov.frameworks.eu_ai_act_docs import AnnexIVDocumentation
doc = AnnexIVDocumentation(
system_name=system_name,
provider_name=provider,
system_description=description,
intended_purpose=description,
)
out_path = _Path(output)
out_path.mkdir(parents=True, exist_ok=True)
created: list[str] = []
if output_format in ("markdown", "both"):
md_path = out_path / "annex-iv-technical-documentation.md"
md_path.write_text(doc.to_markdown(), encoding="utf-8")
created.append(str(md_path))
if output_format in ("json", "both"):
import dataclasses
json_path = out_path / "annex-iv-data.json"
json_path.write_text(
json.dumps(dataclasspolicy_list_command function · python · L850-L921 (72 LOC)src/agent_gov/cli/main.py
def policy_list_command(
source_dir: Optional[str],
domain: Optional[str],
severity: Optional[str],
) -> None:
"""List available governance policies from the policy library."""
from pathlib import Path as _Path
from agent_gov.policies.loader import LibraryPolicyLoadError, LibraryPolicyLoader
# Resolve source directory: use built-in library if not specified
if source_dir:
resolved_source = _Path(source_dir)
else:
resolved_source = _Path(__file__).parent.parent.parent.parent.parent / "policies"
if not resolved_source.exists():
# Fall back to relative path from cwd
resolved_source = _Path.cwd() / "policies"
if not resolved_source.exists():
err_console.print(
f"[red]Policy library directory not found:[/red] {resolved_source}\n"
"Use [cyan]--source[/cyan] to specify the policies directory."
)
sys.exit(1)
loader = LibraryPolicyLoader()
try:
policy_install_command function · python · L960-L1032 (73 LOC)src/agent_gov/cli/main.py
def policy_install_command(
source_dir: str,
target_dir: str,
domain: Optional[str],
no_overwrite: bool,
dry_run: bool,
) -> None:
"""Install governance policies from a source directory into a target directory."""
from agent_gov.policies.installer import LibraryPolicyInstaller
installer = LibraryPolicyInstaller()
try:
results = installer.install(
source=source_dir,
target=target_dir,
domain=domain,
overwrite=not no_overwrite,
dry_run=dry_run,
)
except ValueError as exc:
err_console.print(f"[red]Install error:[/red] {exc}")
sys.exit(1)
if not results:
console.print("[yellow]No policy files found in source directory.[/yellow]")
return
dry_run_label = " [dim](dry-run)[/dim]" if dry_run else ""
table = Table(
title=f"Policy Installation{dry_run_label}",
box=None,
padding=(0, 1),
)
table.add_columnpolicy_validate_command function · python · L1040-L1066 (27 LOC)src/agent_gov/cli/main.py
def policy_validate_command(policy_file: str) -> None:
"""Validate a governance policy YAML file against the library schema."""
from pathlib import Path as _Path
from agent_gov.policies.validator import LibraryPolicyValidator
validator = LibraryPolicyValidator()
result = validator.validate_file(_Path(policy_file))
if result.valid:
console.print(
Panel.fit(
f"[bold green]VALID[/bold green] [cyan]{policy_file}[/cyan]",
title="Policy Validation",
border_style="green",
)
)
else:
error_lines = "\n".join(f" • {err}" for err in result.errors)
console.print(
Panel.fit(
f"[bold red]INVALID[/bold red] [cyan]{policy_file}[/cyan]\n\n{error_lines}",
title="Policy Validation",
border_style="red",
)
)
sys.exit(2)Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ComplianceRequirement.to_dict method · python · L75-L84 (10 LOC)src/agent_gov/compliance_cost/calculator.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"framework": self.framework,
"requirement_id": self.requirement_id,
"description": self.description,
"automation_level": self.automation_level,
"estimated_hours_manual": self.estimated_hours_manual,
"estimated_hours_automated": self.estimated_hours_automated,
}CostReport.to_dict method · python · L132-L147 (16 LOC)src/agent_gov/compliance_cost/calculator.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"framework": self.framework,
"total_requirements": self.total_requirements,
"automated_count": self.automated_count,
"semi_automated_count": self.semi_automated_count,
"manual_count": self.manual_count,
"total_hours_manual": round(self.total_hours_manual, 2),
"total_hours_automated": round(self.total_hours_automated, 2),
"total_cost_manual": round(self.total_cost_manual, 2),
"total_cost_with_automation": round(self.total_cost_with_automation, 2),
"savings_percentage": round(self.savings_percentage, 2),
"hourly_rate": self.hourly_rate,
"requirement_details": list(self.requirement_details),
}CostReport.summary method · python · L149-L157 (9 LOC)src/agent_gov/compliance_cost/calculator.py
def summary(self) -> str:
"""Return a one-line human-readable summary."""
return (
f"Framework: {self.framework} | "
f"Requirements: {self.total_requirements} | "
f"Manual cost: ${self.total_cost_manual:,.0f} | "
f"Automated cost: ${self.total_cost_with_automation:,.0f} | "
f"Savings: {self.savings_percentage:.1f}%"
)ComparisonReport.to_dict method · python · L175-L183 (9 LOC)src/agent_gov/compliance_cost/calculator.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"framework": self.framework,
"scenarios": [
{"label": label, "report": report.to_dict()}
for label, report in self.scenarios
],
}ComplianceCostCalculator.calculate method · python · L215-L239 (25 LOC)src/agent_gov/compliance_cost/calculator.py
def calculate(
self,
framework: str,
automation_coverage: dict[str, str],
) -> CostReport:
"""Calculate compliance costs for *framework* under the given scenario.
Parameters
----------
framework:
Framework name: ``"eu_ai_act"``, ``"gdpr"``, or ``"hipaa"``.
automation_coverage:
Mapping of requirement_id to overridden automation_level string.
Requirements not in this dict use their default level from the
framework catalogue.
Returns
-------
CostReport
Detailed cost breakdown.
"""
from agent_gov.compliance_cost.framework_maps import get_requirements
requirements = get_requirements(framework)
return self._compute_report(framework, requirements, automation_coverage)ComplianceCostCalculator.compare_scenarios method · python · L241-L273 (33 LOC)src/agent_gov/compliance_cost/calculator.py
def compare_scenarios(
self,
framework: str,
scenarios: list[dict[str, object]], # each: {"label": str, "automation_coverage": dict[str, str]}
) -> ComparisonReport:
"""Compare multiple automation scenarios for the same framework.
Parameters
----------
framework:
Framework name.
scenarios:
List of scenario dicts, each with:
- ``"label"`` (str): human-readable scenario name.
- ``"automation_coverage"`` (dict[str, str]): per-requirement
automation level overrides (same format as :meth:`calculate`).
Returns
-------
ComparisonReport
Structured comparison of all scenarios.
"""
pairs: list[tuple[str, CostReport]] = []
for scenario in scenarios:
label = str(scenario.get("label", "unnamed"))
coverage = dict(scenario.get("automation_coverage", {}))
report = self.calComplianceCostCalculator.calculate_with_custom_requirements method · python · L275-L294 (20 LOC)src/agent_gov/compliance_cost/calculator.py
def calculate_with_custom_requirements(
self,
requirements: list[ComplianceRequirement],
automation_coverage: dict[str, str],
) -> CostReport:
"""Calculate costs from a custom list of requirements.
Parameters
----------
requirements:
List of :class:`ComplianceRequirement` objects.
automation_coverage:
Per-requirement automation level overrides.
Returns
-------
CostReport
"""
framework = requirements[0].framework if requirements else "custom"
return self._compute_report(framework, requirements, automation_coverage)ComplianceCostCalculator._compute_report method · python · L305-L374 (70 LOC)src/agent_gov/compliance_cost/calculator.py
def _compute_report(
self,
framework: str,
requirements: list[ComplianceRequirement],
automation_coverage: dict[str, str],
) -> CostReport:
"""Compute cost report from requirements and automation coverage."""
automated_count = 0
semi_automated_count = 0
manual_count = 0
total_hours_manual = 0.0
total_hours_automated = 0.0
details: list[dict[str, object]] = []
for req in requirements:
# Apply coverage override or use default
effective_level = automation_coverage.get(req.requirement_id, req.automation_level)
# Determine effective hours based on level
hours_used = self._hours_for_level(
req,
effective_level,
)
cost_manual = req.cost_manual(self._hourly_rate)
cost_with_automation = hours_used * self._hourly_rate
if effective_level == "fully_automated":
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
ComplianceCostCalculator._hours_for_level method · python · L376-L388 (13 LOC)src/agent_gov/compliance_cost/calculator.py
def _hours_for_level(
self,
req: ComplianceRequirement,
effective_level: str,
) -> float:
"""Return the applicable hours estimate given the effective automation level."""
if effective_level == "fully_automated":
return req.estimated_hours_automated
if effective_level == "semi_automated":
# Blend: midpoint of manual and automated hours
return (req.estimated_hours_manual + req.estimated_hours_automated) / 2.0
# manual
return req.estimated_hours_manualget_requirements function · python · L340-L360 (21 LOC)src/agent_gov/compliance_cost/framework_maps.py
def get_requirements(framework: str) -> list[ComplianceRequirement]:
"""Return the requirement list for the given framework name.
Parameters
----------
framework:
One of ``"eu_ai_act"``, ``"gdpr"``, ``"hipaa"``.
Returns
-------
list[ComplianceRequirement]
Raises
------
KeyError
If *framework* is not registered.
"""
if framework not in FRAMEWORK_REGISTRY:
available = ", ".join(sorted(FRAMEWORK_REGISTRY.keys()))
raise KeyError(f"Unknown framework {framework!r}. Available: {available}")
return list(FRAMEWORK_REGISTRY[framework])CostReportRenderer.to_markdown method · python · L36-L83 (48 LOC)src/agent_gov/compliance_cost/report.py
def to_markdown(self, report: CostReport) -> str:
"""Render a :class:`CostReport` as a Markdown document.
Parameters
----------
report:
The cost report to render.
Returns
-------
str
Formatted Markdown string.
"""
s = self._symbol
lines: list[str] = [
f"# Cost-of-Compliance Report — {report.framework.replace('_', ' ').title()}",
"",
"## Summary",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total Requirements | {report.total_requirements} |",
f"| Fully Automated | {report.automated_count} |",
f"| Semi-Automated | {report.semi_automated_count} |",
f"| Manual | {report.manual_count} |",
f"| Hourly Rate | {s}{report.hourly_rate:,.2f} |",
f"| Manual Total Hours | {report.total_hours_manual:,.1f} |",
f"| Automated Total CostReportRenderer.to_text_summary method · python · L85-L107 (23 LOC)src/agent_gov/compliance_cost/report.py
def to_text_summary(self, report: CostReport) -> str:
"""Render a compact single-paragraph text summary.
Parameters
----------
report:
The cost report to summarise.
Returns
-------
str
Single-paragraph summary string.
"""
return (
f"{report.framework.replace('_', ' ').upper()}: "
f"{report.total_requirements} requirements, "
f"{report.automated_count} fully automated, "
f"{report.semi_automated_count} semi-automated, "
f"{report.manual_count} manual. "
f"Manual cost: {self._symbol}{report.total_cost_manual:,.0f} "
f"| Automated cost: {self._symbol}{report.total_cost_with_automation:,.0f} "
f"| Savings: {report.savings_percentage:.1f}%."
)CostReportRenderer.comparison_to_markdown method · python · L109-L144 (36 LOC)src/agent_gov/compliance_cost/report.py
def comparison_to_markdown(self, comparison: ComparisonReport) -> str:
"""Render a :class:`ComparisonReport` as a Markdown comparison table.
Parameters
----------
comparison:
The comparison report to render.
Returns
-------
str
Formatted Markdown comparison document.
"""
s = self._symbol
lines: list[str] = [
f"# Compliance Cost Comparison — {comparison.framework.replace('_', ' ').title()}",
"",
"## Scenario Comparison",
"",
"| Scenario | Automated | Manual | Total Cost | Savings |",
"|----------|-----------|--------|------------|---------|",
]
for label, report in comparison.scenarios:
lines.append(
f"| {label} | {report.automated_count} | {report.manual_count} "
f"| {s}{report.total_cost_with_automation:,.0f} "
f"| {report.savings_GovernanceEngine.__init__ method · python · L41-L56 (16 LOC)src/agent_gov/convenience.py
def __init__(self, policy_path: str | None = None) -> None:
from agent_gov.policy.evaluator import PolicyEvaluator
from agent_gov.policy.schema import PolicyConfig, RuleConfig
self._evaluator = PolicyEvaluator()
if policy_path is not None:
from agent_gov.policy.loader import PolicyLoader
loader = PolicyLoader()
self._policy = loader.load_file(policy_path)
else:
self._policy = PolicyConfig(
name="quickstart-default",
version="1.0",
rules=[],
)GovernanceEngine.evaluate method · python · L58-L81 (24 LOC)src/agent_gov/convenience.py
def evaluate(self, action: dict[str, Any]) -> Any:
"""Evaluate an agent action against the active policy.
Parameters
----------
action:
Dict describing the agent action. Common keys include
``action`` (action type), ``agent_id``, and action-specific
fields like ``query`` or ``path``.
Returns
-------
EvaluationReport
Report with ``.passed`` bool and per-rule verdicts.
Example
-------
::
engine = GovernanceEngine()
report = engine.evaluate({"action": "search", "query": "test"})
assert report.passed
"""
return self._evaluator.evaluate(self._policy, action)_build_handler function · python · L40-L185 (146 LOC)src/agent_gov/dashboard/dashboard_server.py
def _build_handler(
collector: EvidenceCollector,
scorer: PostureScorer,
generator: ReportGenerator,
) -> type[BaseHTTPRequestHandler]:
"""Build a ``BaseHTTPRequestHandler`` subclass bound to the given services.
Parameters
----------
collector:
Evidence collector instance.
scorer:
Posture scorer instance.
generator:
Report generator instance.
Returns
-------
type[BaseHTTPRequestHandler]
A request handler class with the services in closure.
"""
class _Handler(BaseHTTPRequestHandler):
"""Handles GET requests for the compliance dashboard API."""
_collector = collector
_scorer = scorer
_generator = generator
def log_message(self, fmt: str, *args: object) -> None:
"""Suppress default access log output."""
pass # noqa: PIE790
def do_GET(self) -> None: # noqa: N802
"""Route GET requests to the appropriate endpoint."All rows scored by the Repobility analyzer (https://repobility.com)
DashboardServer.__init__ method · python · L213-L226 (14 LOC)src/agent_gov/dashboard/dashboard_server.py
def __init__(
self,
collector: EvidenceCollector,
scorer: PostureScorer,
generator: ReportGenerator,
host: str = "127.0.0.1",
port: int = 8765,
) -> None:
self._collector = collector
self._scorer = scorer
self._generator = generator
self._host = host
self._port = port
self._server: HTTPServer | None = NoneDashboardServer.build_server method · python · L228-L238 (11 LOC)src/agent_gov/dashboard/dashboard_server.py
def build_server(self) -> HTTPServer:
"""Build and return the underlying ``HTTPServer`` without starting it.
Returns
-------
HTTPServer
"""
handler_cls = _build_handler(self._collector, self._scorer, self._generator)
server = HTTPServer((self._host, self._port), handler_cls)
self._server = server
return serverDashboardServer.serve_forever method · python · L240-L252 (13 LOC)src/agent_gov/dashboard/dashboard_server.py
def serve_forever(self) -> None:
"""Start the HTTP server and block until interrupted.
Raises
------
KeyboardInterrupt
Propagated from the underlying server; triggers clean shutdown.
"""
server = self.build_server()
try:
server.serve_forever()
finally:
server.server_close()create_fastapi_app function · python · L270-L337 (68 LOC)src/agent_gov/dashboard/dashboard_server.py
def create_fastapi_app(
collector: EvidenceCollector,
scorer: PostureScorer,
generator: ReportGenerator,
) -> object:
"""Create a FastAPI ASGI application for the compliance dashboard.
This function is provided as an *optional* integration point. It
requires ``fastapi`` to be installed. When not available, raises
``ImportError`` with a helpful message.
Parameters
----------
collector:
Evidence collector instance.
scorer:
Posture scorer instance.
generator:
Report generator instance.
Returns
-------
fastapi.FastAPI
ASGI application.
Raises
------
ImportError
When ``fastapi`` is not installed.
"""
try:
import fastapi # type: ignore[import]
except ImportError as exc:
raise ImportError(
"FastAPI is not installed. Install it with: pip install fastapi"
) from exc
app = fastapi.FastAPI(title="agent-gov Compliance DashboardEvidenceEntry.to_dict method · python · L59-L73 (15 LOC)src/agent_gov/dashboard/evidence_collector.py
def to_dict(self) -> dict[str, object]:
"""Serialise the entry to a plain dictionary.
Returns
-------
dict[str, object]
JSON-serialisable representation.
"""
return {
"timestamp": self.timestamp.isoformat(),
"policy_id": self.policy_id,
"rule_id": self.rule_id,
"result": self.result,
"context": self.context,
}EvidenceEntry.from_dict method · python · L76-L103 (28 LOC)src/agent_gov/dashboard/evidence_collector.py
def from_dict(cls, data: dict[str, object]) -> EvidenceEntry:
"""Reconstruct an entry from a dictionary produced by :meth:`to_dict`.
Parameters
----------
data:
Dictionary with at least ``timestamp``, ``policy_id``,
``rule_id``, ``result``, and ``context`` keys.
Returns
-------
EvidenceEntry
"""
raw_ts = data.get("timestamp", "")
try:
ts = datetime.fromisoformat(str(raw_ts))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
ts = datetime.now(timezone.utc)
return cls(
timestamp=ts,
policy_id=str(data.get("policy_id", "")),
rule_id=str(data.get("rule_id", "")),
result=str(data.get("result", "skip")),
context=dict(data.get("context", {})),
)EvidenceCollector.record method · python · L129-L139 (11 LOC)src/agent_gov/dashboard/evidence_collector.py
def record(self, entry: EvidenceEntry) -> None:
"""Append *entry* to the evidence log.
Parameters
----------
entry:
The :class:`EvidenceEntry` to record.
"""
self._entries.append(entry)
if self._max_entries is not None and len(self._entries) > self._max_entries:
self._entries.pop(0)EvidenceCollector.record_many method · python · L141-L150 (10 LOC)src/agent_gov/dashboard/evidence_collector.py
def record_many(self, entries: list[EvidenceEntry]) -> None:
"""Append multiple entries in insertion order.
Parameters
----------
entries:
Sequence of :class:`EvidenceEntry` objects to record.
"""
for entry in entries:
self.record(entry)Powered by Repobility — scan your code at https://repobility.com
EvidenceCollector.query method · python · L156-L197 (42 LOC)src/agent_gov/dashboard/evidence_collector.py
def query(
self,
policy_id: str | None = None,
rule_id: str | None = None,
result: str | None = None,
since: datetime | None = None,
until: datetime | None = None,
) -> list[EvidenceEntry]:
"""Return entries matching all supplied filter criteria.
Parameters
----------
policy_id:
Filter to entries with this policy identifier.
rule_id:
Filter to entries with this rule identifier.
result:
Filter to entries with this result (``"pass"`` / ``"fail"`` / ``"skip"``).
since:
Include only entries at or after this UTC timestamp.
until:
Include only entries at or before this UTC timestamp.
Returns
-------
list[EvidenceEntry]
Matching entries in insertion order.
"""
results: list[EvidenceEntry] = []
for entry in self._entries:
if policy_id is not NoEvidenceCollector.all_entries method · python · L199-L206 (8 LOC)src/agent_gov/dashboard/evidence_collector.py
def all_entries(self) -> list[EvidenceEntry]:
"""Return all stored entries in insertion order.
Returns
-------
list[EvidenceEntry]
"""
return list(self._entries)EvidenceCollector.rule_ids method · python · L217-L226 (10 LOC)src/agent_gov/dashboard/evidence_collector.py
def rule_ids(self, policy_id: str | None = None) -> list[str]:
"""Return a sorted deduplicated list of rule IDs.
Parameters
----------
policy_id:
When provided, limit to rule IDs under this policy.
"""
entries = self.query(policy_id=policy_id) if policy_id else self._entries
return sorted({e.rule_id for e in entries})