← back to invincible-jha__agent-gov

Function bodies 212 total

All specs Real LLM only Function bodies
_extract_severity function · python · L276-L281 (6 LOC)
src/agent_gov/authoring/nl_compiler.py
def _extract_severity(text_lower: str) -> str:
    """Extract severity from normalized text."""
    for keyword, severity in _SEVERITY_KEYWORDS.items():
        if keyword in text_lower:
            return severity
    return "medium"
_extract_cost_limit function · python · L284-L293 (10 LOC)
src/agent_gov/authoring/nl_compiler.py
def _extract_cost_limit(text: str) -> Optional[float]:
    """Extract a numeric cost limit from text like 'max $5.00' or 'limit to 2.50'."""
    pattern = r"\$?\s*(\d+(?:\.\d+)?)"
    match = re.search(pattern, text)
    if match:
        try:
            return float(match.group(1))
        except ValueError:
            return None
    return None
_make_rule_name function · python · L301-L306 (6 LOC)
src/agent_gov/authoring/nl_compiler.py
def _make_rule_name(action: str, subject: str, target: str) -> str:
    """Generate a slug rule name from action, subject, and target."""
    parts = [action, subject.replace(" ", "-")]
    if target and target != "any":
        parts.append(target.replace("_", "-"))
    return "-".join(p for p in parts if p)
NlCompiler.__init__ method · python · L340-L347 (8 LOC)
src/agent_gov/authoring/nl_compiler.py
    def __init__(
        self,
        *,
        policy_name: str = "generated-policy",
        strict: bool = False,
    ) -> None:
        self._default_policy_name = policy_name
        self._strict = strict
NlCompiler.parse_statement method · python · L349-L399 (51 LOC)
src/agent_gov/authoring/nl_compiler.py
    def parse_statement(self, statement: str) -> ParsedStatement:
        """Parse a single natural language statement into a ParsedStatement.

        Parameters
        ----------
        statement:
            A plain English policy statement.

        Returns
        -------
        ParsedStatement
            Parsed intermediate representation.

        Raises
        ------
        NlCompilerError
            If strict mode is enabled and the statement cannot be parsed.
        """
        cleaned = statement.strip()
        text_lower = cleaned.lower()

        action = _extract_action(text_lower)
        subject, _rule_type, params = _extract_subject(text_lower)
        target = _extract_target(text_lower)
        severity = _extract_severity(text_lower)

        # Enhance params with extracted values
        if subject == "cost":
            cost_limit = _extract_cost_limit(cleaned)
            if cost_limit is not None:
                params["max_cost"] = cost_limit

       
NlCompiler.compile_statement method · python · L401-L446 (46 LOC)
src/agent_gov/authoring/nl_compiler.py
    def compile_statement(self, statement: str) -> Optional[CompiledRule]:
        """Compile a single natural language statement to a CompiledRule.

        Returns None if the statement cannot be compiled and strict=False.

        Raises
        ------
        NlCompilerError
            If strict=True and compilation fails.
        """
        text_lower = statement.strip().lower()
        subject, rule_type, default_params = _extract_subject(text_lower)
        action = _extract_action(text_lower)
        target = _extract_target(text_lower)
        severity = _extract_severity(text_lower)

        if not subject:
            if self._strict:
                raise NlCompilerError(
                    f"Cannot determine rule subject from: {statement!r}"
                )
            return None

        params = dict(default_params)

        # Enhance params from statement
        if subject == "cost":
            cost_limit = _extract_cost_limit(statement)
            if cost_limi
NlCompiler.compile method · python · L448-L482 (35 LOC)
src/agent_gov/authoring/nl_compiler.py
    def compile(
        self,
        statement: str,
        *,
        policy_name: Optional[str] = None,
        description: str = "",
    ) -> CompiledPolicy:
        """Compile a single statement into a one-rule policy.

        Parameters
        ----------
        statement:
            Natural language policy statement.
        policy_name:
            Override for the policy name.
        description:
            Optional policy description.

        Returns
        -------
        CompiledPolicy
            Policy with one compiled rule (or zero rules if compilation failed).
        """
        name = policy_name or self._default_policy_name
        policy = CompiledPolicy(
            name=name,
            description=description or f"Policy generated from: {statement}",
            source_statements=[statement],
        )
        rule = self.compile_statement(statement)
        if rule is not None:
            policy.rules.append(rule)
        else:
            policy.wa
All rows scored by the Repobility analyzer (https://repobility.com)
NlCompiler.compile_many method · python · L484-L530 (47 LOC)
src/agent_gov/authoring/nl_compiler.py
    def compile_many(
        self,
        statements: list[str],
        *,
        policy_name: Optional[str] = None,
        description: str = "",
    ) -> CompiledPolicy:
        """Compile multiple statements into a single multi-rule policy.

        Parameters
        ----------
        statements:
            List of natural language policy statements.
        policy_name:
            Override for the policy name.
        description:
            Optional policy description.

        Returns
        -------
        CompiledPolicy
            Policy aggregating all successfully compiled rules.
        """
        name = policy_name or self._default_policy_name
        policy = CompiledPolicy(
            name=name,
            description=description or "Policy generated from natural language statements.",
            source_statements=list(statements),
        )

        seen_names: set[str] = set()
        for statement in statements:
            rule = self.compile_statement
NlCompiler.compile_text_block method · python · L532-L562 (31 LOC)
src/agent_gov/authoring/nl_compiler.py
    def compile_text_block(
        self,
        text: str,
        *,
        policy_name: Optional[str] = None,
        description: str = "",
    ) -> CompiledPolicy:
        """Compile a multi-line text block where each line is a statement.

        Empty lines and lines starting with ``#`` are treated as comments and
        skipped.

        Parameters
        ----------
        text:
            Multi-line string with one statement per line.
        policy_name:
            Override for the policy name.
        description:
            Optional policy description.
        """
        statements = [
            line.strip()
            for line in text.splitlines()
            if line.strip() and not line.strip().startswith("#")
        ]
        return self.compile_many(
            statements,
            policy_name=policy_name,
            description=description,
        )
version_command function · python · L48-L57 (10 LOC)
src/agent_gov/cli/main.py
def version_command() -> None:
    """Show detailed version information."""
    import platform

    console.print(Panel.fit(
        f"[bold cyan]agent-gov[/bold cyan] [green]v{__version__}[/green]\n"
        f"Python {platform.python_version()}  |  {platform.system()} {platform.machine()}",
        title="Version Info",
        border_style="cyan",
    ))
check_command function · python · L92-L167 (76 LOC)
src/agent_gov/cli/main.py
def check_command(
    policy_path: str,
    action_json: str,
    audit_log_path: Optional[str],
    agent_id: str,
) -> None:
    """Evaluate an action against a policy and print the verdict."""
    from agent_gov.policy.evaluator import PolicyEvaluator
    from agent_gov.policy.loader import PolicyLoader, PolicyLoadError

    # Load policy
    loader = PolicyLoader()
    try:
        policy = loader.load_file(policy_path)
    except PolicyLoadError as exc:
        err_console.print(f"[red]Error loading policy:[/red] {exc}")
        sys.exit(1)

    # Parse action
    try:
        action: dict[str, object] = json.loads(action_json)
    except json.JSONDecodeError as exc:
        err_console.print(f"[red]Invalid JSON for --action:[/red] {exc}")
        sys.exit(1)

    if not isinstance(action, dict):
        err_console.print("[red]--action must be a JSON object (dict).[/red]")
        sys.exit(1)

    # Evaluate
    evaluator = PolicyEvaluator()
    report = evaluator.evaluate(polic
audit_show_command function · python · L195-L233 (39 LOC)
src/agent_gov/cli/main.py
def audit_show_command(log_path: str, last_n: int) -> None:
    """Show the most recent audit log entries."""
    from agent_gov.audit.reader import AuditReader

    reader = AuditReader(log_path)
    if not Path(log_path).exists():
        console.print(f"[yellow]Audit log not found:[/yellow] {log_path}")
        return

    entries = reader.last(last_n)
    if not entries:
        console.print("[yellow]No audit entries found.[/yellow]")
        return

    table = Table(title=f"Last {last_n} Audit Entries", box=None, padding=(0, 1))
    table.add_column("Timestamp", style="dim")
    table.add_column("Agent ID", style="cyan")
    table.add_column("Action Type")
    table.add_column("Policy")
    table.add_column("Verdict", justify="center")

    for entry in reversed(entries):
        verdict_text = (
            Text("PASS", style="green") if entry.verdict == "pass" else Text("FAIL", style="red")
        )
        table.add_row(
            entry.timestamp.strftime("%Y-%m-%d %H:%M:%
audit_query_command function · python · L261-L319 (59 LOC)
src/agent_gov/cli/main.py
def audit_query_command(
    log_path: str,
    agent_id: Optional[str],
    action_type: Optional[str],
    verdict: Optional[str],
    policy_name: Optional[str],
    since_str: Optional[str],
    limit: int,
) -> None:
    """Query the audit log with filters."""
    from agent_gov.audit.reader import AuditReader

    since: Optional[datetime] = None
    if since_str:
        try:
            since = datetime.fromisoformat(since_str)
            if since.tzinfo is None:
                since = since.replace(tzinfo=timezone.utc)
        except ValueError:
            err_console.print(f"[red]Invalid --since date:[/red] {since_str!r}")
            sys.exit(1)

    reader = AuditReader(log_path)
    if not Path(log_path).exists():
        console.print(f"[yellow]Audit log not found:[/yellow] {log_path}")
        return

    entries = reader.query(
        agent_id=agent_id,
        action_type=action_type,
        verdict=verdict,
        policy_name=policy_name,
        since=since,
  
report_generate_command function · python · L368-L409 (42 LOC)
src/agent_gov/cli/main.py
def report_generate_command(
    policy_path: str,
    output_format: str,
    output_path: Optional[str],
    audit_log_path: str,
    title: str,
) -> None:
    """Generate a governance report from a policy and audit log."""
    from agent_gov.audit.reader import AuditReader
    from agent_gov.policy.loader import PolicyLoader, PolicyLoadError
    from agent_gov.reporting.generator import ReportGenerator
    from agent_gov.reporting.json_report import JsonReporter
    from agent_gov.reporting.markdown import MarkdownReporter

    # Load policy
    loader = PolicyLoader()
    try:
        policy = loader.load_file(policy_path)
    except PolicyLoadError as exc:
        err_console.print(f"[red]Error loading policy:[/red] {exc}")
        sys.exit(1)

    # Build reader if audit log exists
    audit_reader: Optional[AuditReader] = None
    if Path(audit_log_path).exists():
        audit_reader = AuditReader(audit_log_path)

    generator = ReportGenerator(audit_reader=audit_reader)
    
frameworks_list_command function · python · L423-L441 (19 LOC)
src/agent_gov/cli/main.py
def frameworks_list_command() -> None:
    """List all available compliance frameworks."""
    from agent_gov.plugins.registry import framework_registry

    table = Table(title="Available Compliance Frameworks", box=None, padding=(0, 1))
    table.add_column("Name", style="cyan")
    table.add_column("Version")
    table.add_column("Description")

    for fw_name in framework_registry.list_names():
        fw_cls = framework_registry.get(fw_name)
        instance = fw_cls()
        table.add_row(
            fw_name,
            getattr(instance, "version", "—"),
            getattr(instance, "description", "")[:80],
        )

    console.print(table)
Powered by Repobility — scan your code at https://repobility.com
frameworks_check_command function · python · L464-L539 (76 LOC)
src/agent_gov/cli/main.py
def frameworks_check_command(
    framework_name: str,
    evidence_path: Optional[str],
    output_path: Optional[str],
) -> None:
    """Run a compliance checklist against evidence."""
    import yaml

    from agent_gov.plugins.registry import framework_registry
    from agent_gov.reporting.json_report import JsonReporter

    if framework_name not in framework_registry:
        available = framework_registry.list_names()
        err_console.print(
            f"[red]Unknown framework:[/red] {framework_name!r}. "
            f"Available: {available!r}"
        )
        sys.exit(1)

    fw_cls = framework_registry.get(framework_name)
    framework = fw_cls()

    evidence: dict[str, object] = {}
    if evidence_path:
        try:
            raw = Path(evidence_path).read_text(encoding="utf-8")
            loaded = yaml.safe_load(raw)
            if isinstance(loaded, dict):
                evidence = loaded
        except Exception as exc:
            err_console.print(f"[red]Error
init_command function · python · L563-L594 (32 LOC)
src/agent_gov/cli/main.py
def init_command(preset: str, output_path: str) -> None:
    """Initialize a project with a policy pack."""
    import importlib.resources

    # Load the pack from the package data
    packs_dir = Path(__file__).parent.parent / "packs"
    pack_file = packs_dir / f"{preset}.yaml"

    if not pack_file.exists():
        err_console.print(f"[red]Pack file not found:[/red] {pack_file}")
        sys.exit(1)

    destination = Path(output_path)
    if destination.exists():
        if not click.confirm(
            f"[yellow]{output_path}[/yellow] already exists. Overwrite?", default=False
        ):
            console.print("[yellow]Aborted.[/yellow]")
            return

    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(pack_file.read_text(encoding="utf-8"), encoding="utf-8")

    console.print(
        f"[green]Initialized[/green] [cyan]{preset}[/cyan] policy pack → [bold]{output_path}[/bold]"
    )
    console.print(
        "\nNext steps:\n"
        
plugins_list_command function · python · L608-L639 (32 LOC)
src/agent_gov/cli/main.py
def plugins_list_command() -> None:
    """List all registered rules and compliance frameworks."""
    from agent_gov.plugins.registry import framework_registry, rule_registry

    # Rules table
    rule_table = Table(title="Registered Rules", box=None, padding=(0, 1))
    rule_table.add_column("Name", style="cyan")
    rule_table.add_column("Class")

    for rule_name in rule_registry.list_names():
        rule_cls = rule_registry.get(rule_name)
        rule_table.add_row(rule_name, rule_cls.__qualname__)

    console.print(rule_table)
    console.print()

    # Frameworks table
    fw_table = Table(title="Registered Frameworks", box=None, padding=(0, 1))
    fw_table.add_column("Name", style="cyan")
    fw_table.add_column("Version")
    fw_table.add_column("Class")

    for fw_name in framework_registry.list_names():
        fw_cls = framework_registry.get(fw_name)
        instance = fw_cls()
        fw_table.add_row(
            fw_name,
            getattr(instance, "version", "—"
classify_command function · python · L677-L729 (53 LOC)
src/agent_gov/cli/main.py
def classify_command(
    description: str,
    use_case: tuple[str, ...],
    data_category: tuple[str, ...],
    output_format: str,
) -> None:
    """Classify an AI system's risk level per the EU AI Act (Annex III)."""
    import dataclasses

    from agent_gov.frameworks.eu_ai_act_classifier import EUAIActClassifier

    classifier = EUAIActClassifier()
    result = classifier.classify(
        system_description=description,
        use_cases=list(use_case) if use_case else None,
        data_categories=list(data_category) if data_category else None,
    )

    if output_format == "json":
        click.echo(json.dumps(dataclasses.asdict(result), indent=2))
        return

    # Rich table output
    level_colours: dict[str, str] = {
        "unacceptable": "red",
        "high": "yellow",
        "limited": "cyan",
        "minimal": "green",
    }
    colour = level_colours.get(result.level.value, "white")

    console.print(
        Panel.fit(
            f"[bold {colour}]{resul
document_command function · python · L771-L815 (45 LOC)
src/agent_gov/cli/main.py
def document_command(
    system_name: str,
    provider: str,
    description: str,
    output: str,
    output_format: str,
) -> None:
    """Generate EU AI Act Annex IV technical documentation."""
    from pathlib import Path as _Path

    from agent_gov.frameworks.eu_ai_act_docs import AnnexIVDocumentation

    doc = AnnexIVDocumentation(
        system_name=system_name,
        provider_name=provider,
        system_description=description,
        intended_purpose=description,
    )

    out_path = _Path(output)
    out_path.mkdir(parents=True, exist_ok=True)
    created: list[str] = []

    if output_format in ("markdown", "both"):
        md_path = out_path / "annex-iv-technical-documentation.md"
        md_path.write_text(doc.to_markdown(), encoding="utf-8")
        created.append(str(md_path))

    if output_format in ("json", "both"):
        import dataclasses

        json_path = out_path / "annex-iv-data.json"
        json_path.write_text(
            json.dumps(dataclass
policy_list_command function · python · L850-L921 (72 LOC)
src/agent_gov/cli/main.py
def policy_list_command(
    source_dir: Optional[str],
    domain: Optional[str],
    severity: Optional[str],
) -> None:
    """List available governance policies from the policy library."""
    from pathlib import Path as _Path

    from agent_gov.policies.loader import LibraryPolicyLoadError, LibraryPolicyLoader

    # Resolve source directory: use built-in library if not specified
    if source_dir:
        resolved_source = _Path(source_dir)
    else:
        resolved_source = _Path(__file__).parent.parent.parent.parent.parent / "policies"
        if not resolved_source.exists():
            # Fall back to relative path from cwd
            resolved_source = _Path.cwd() / "policies"

    if not resolved_source.exists():
        err_console.print(
            f"[red]Policy library directory not found:[/red] {resolved_source}\n"
            "Use [cyan]--source[/cyan] to specify the policies directory."
        )
        sys.exit(1)

    loader = LibraryPolicyLoader()
    try:
     
policy_install_command function · python · L960-L1032 (73 LOC)
src/agent_gov/cli/main.py
def policy_install_command(
    source_dir: str,
    target_dir: str,
    domain: Optional[str],
    no_overwrite: bool,
    dry_run: bool,
) -> None:
    """Install governance policies from a source directory into a target directory."""
    from agent_gov.policies.installer import LibraryPolicyInstaller

    installer = LibraryPolicyInstaller()

    try:
        results = installer.install(
            source=source_dir,
            target=target_dir,
            domain=domain,
            overwrite=not no_overwrite,
            dry_run=dry_run,
        )
    except ValueError as exc:
        err_console.print(f"[red]Install error:[/red] {exc}")
        sys.exit(1)

    if not results:
        console.print("[yellow]No policy files found in source directory.[/yellow]")
        return

    dry_run_label = " [dim](dry-run)[/dim]" if dry_run else ""
    table = Table(
        title=f"Policy Installation{dry_run_label}",
        box=None,
        padding=(0, 1),
    )
    table.add_column
policy_validate_command function · python · L1040-L1066 (27 LOC)
src/agent_gov/cli/main.py
def policy_validate_command(policy_file: str) -> None:
    """Validate a governance policy YAML file against the library schema."""
    from pathlib import Path as _Path

    from agent_gov.policies.validator import LibraryPolicyValidator

    validator = LibraryPolicyValidator()
    result = validator.validate_file(_Path(policy_file))

    if result.valid:
        console.print(
            Panel.fit(
                f"[bold green]VALID[/bold green]  [cyan]{policy_file}[/cyan]",
                title="Policy Validation",
                border_style="green",
            )
        )
    else:
        error_lines = "\n".join(f"  • {err}" for err in result.errors)
        console.print(
            Panel.fit(
                f"[bold red]INVALID[/bold red]  [cyan]{policy_file}[/cyan]\n\n{error_lines}",
                title="Policy Validation",
                border_style="red",
            )
        )
        sys.exit(2)
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ComplianceRequirement.to_dict method · python · L75-L84 (10 LOC)
src/agent_gov/compliance_cost/calculator.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "framework": self.framework,
            "requirement_id": self.requirement_id,
            "description": self.description,
            "automation_level": self.automation_level,
            "estimated_hours_manual": self.estimated_hours_manual,
            "estimated_hours_automated": self.estimated_hours_automated,
        }
CostReport.to_dict method · python · L132-L147 (16 LOC)
src/agent_gov/compliance_cost/calculator.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "framework": self.framework,
            "total_requirements": self.total_requirements,
            "automated_count": self.automated_count,
            "semi_automated_count": self.semi_automated_count,
            "manual_count": self.manual_count,
            "total_hours_manual": round(self.total_hours_manual, 2),
            "total_hours_automated": round(self.total_hours_automated, 2),
            "total_cost_manual": round(self.total_cost_manual, 2),
            "total_cost_with_automation": round(self.total_cost_with_automation, 2),
            "savings_percentage": round(self.savings_percentage, 2),
            "hourly_rate": self.hourly_rate,
            "requirement_details": list(self.requirement_details),
        }
CostReport.summary method · python · L149-L157 (9 LOC)
src/agent_gov/compliance_cost/calculator.py
    def summary(self) -> str:
        """Return a one-line human-readable summary."""
        return (
            f"Framework: {self.framework} | "
            f"Requirements: {self.total_requirements} | "
            f"Manual cost: ${self.total_cost_manual:,.0f} | "
            f"Automated cost: ${self.total_cost_with_automation:,.0f} | "
            f"Savings: {self.savings_percentage:.1f}%"
        )
ComparisonReport.to_dict method · python · L175-L183 (9 LOC)
src/agent_gov/compliance_cost/calculator.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "framework": self.framework,
            "scenarios": [
                {"label": label, "report": report.to_dict()}
                for label, report in self.scenarios
            ],
        }
ComplianceCostCalculator.calculate method · python · L215-L239 (25 LOC)
src/agent_gov/compliance_cost/calculator.py
    def calculate(
        self,
        framework: str,
        automation_coverage: dict[str, str],
    ) -> CostReport:
        """Calculate compliance costs for *framework* under the given scenario.

        Parameters
        ----------
        framework:
            Framework name: ``"eu_ai_act"``, ``"gdpr"``, or ``"hipaa"``.
        automation_coverage:
            Mapping of requirement_id to overridden automation_level string.
            Requirements not in this dict use their default level from the
            framework catalogue.

        Returns
        -------
        CostReport
            Detailed cost breakdown.
        """
        from agent_gov.compliance_cost.framework_maps import get_requirements

        requirements = get_requirements(framework)
        return self._compute_report(framework, requirements, automation_coverage)
ComplianceCostCalculator.compare_scenarios method · python · L241-L273 (33 LOC)
src/agent_gov/compliance_cost/calculator.py
    def compare_scenarios(
        self,
        framework: str,
        scenarios: list[dict[str, object]],  # each: {"label": str, "automation_coverage": dict[str, str]}
    ) -> ComparisonReport:
        """Compare multiple automation scenarios for the same framework.

        Parameters
        ----------
        framework:
            Framework name.
        scenarios:
            List of scenario dicts, each with:
            - ``"label"`` (str): human-readable scenario name.
            - ``"automation_coverage"`` (dict[str, str]): per-requirement
              automation level overrides (same format as :meth:`calculate`).

        Returns
        -------
        ComparisonReport
            Structured comparison of all scenarios.
        """
        pairs: list[tuple[str, CostReport]] = []
        for scenario in scenarios:
            label = str(scenario.get("label", "unnamed"))
            coverage = dict(scenario.get("automation_coverage", {}))
            report = self.cal
ComplianceCostCalculator.calculate_with_custom_requirements method · python · L275-L294 (20 LOC)
src/agent_gov/compliance_cost/calculator.py
    def calculate_with_custom_requirements(
        self,
        requirements: list[ComplianceRequirement],
        automation_coverage: dict[str, str],
    ) -> CostReport:
        """Calculate costs from a custom list of requirements.

        Parameters
        ----------
        requirements:
            List of :class:`ComplianceRequirement` objects.
        automation_coverage:
            Per-requirement automation level overrides.

        Returns
        -------
        CostReport
        """
        framework = requirements[0].framework if requirements else "custom"
        return self._compute_report(framework, requirements, automation_coverage)
ComplianceCostCalculator._compute_report method · python · L305-L374 (70 LOC)
src/agent_gov/compliance_cost/calculator.py
    def _compute_report(
        self,
        framework: str,
        requirements: list[ComplianceRequirement],
        automation_coverage: dict[str, str],
    ) -> CostReport:
        """Compute cost report from requirements and automation coverage."""
        automated_count = 0
        semi_automated_count = 0
        manual_count = 0
        total_hours_manual = 0.0
        total_hours_automated = 0.0
        details: list[dict[str, object]] = []

        for req in requirements:
            # Apply coverage override or use default
            effective_level = automation_coverage.get(req.requirement_id, req.automation_level)

            # Determine effective hours based on level
            hours_used = self._hours_for_level(
                req,
                effective_level,
            )

            cost_manual = req.cost_manual(self._hourly_rate)
            cost_with_automation = hours_used * self._hourly_rate

            if effective_level == "fully_automated":
     
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
ComplianceCostCalculator._hours_for_level method · python · L376-L388 (13 LOC)
src/agent_gov/compliance_cost/calculator.py
    def _hours_for_level(
        self,
        req: ComplianceRequirement,
        effective_level: str,
    ) -> float:
        """Return the applicable hours estimate given the effective automation level."""
        if effective_level == "fully_automated":
            return req.estimated_hours_automated
        if effective_level == "semi_automated":
            # Blend: midpoint of manual and automated hours
            return (req.estimated_hours_manual + req.estimated_hours_automated) / 2.0
        # manual
        return req.estimated_hours_manual
get_requirements function · python · L340-L360 (21 LOC)
src/agent_gov/compliance_cost/framework_maps.py
def get_requirements(framework: str) -> list[ComplianceRequirement]:
    """Return the requirement list for the given framework name.

    Parameters
    ----------
    framework:
        One of ``"eu_ai_act"``, ``"gdpr"``, ``"hipaa"``.

    Returns
    -------
    list[ComplianceRequirement]

    Raises
    ------
    KeyError
        If *framework* is not registered.
    """
    if framework not in FRAMEWORK_REGISTRY:
        available = ", ".join(sorted(FRAMEWORK_REGISTRY.keys()))
        raise KeyError(f"Unknown framework {framework!r}. Available: {available}")
    return list(FRAMEWORK_REGISTRY[framework])
CostReportRenderer.to_markdown method · python · L36-L83 (48 LOC)
src/agent_gov/compliance_cost/report.py
    def to_markdown(self, report: CostReport) -> str:
        """Render a :class:`CostReport` as a Markdown document.

        Parameters
        ----------
        report:
            The cost report to render.

        Returns
        -------
        str
            Formatted Markdown string.
        """
        s = self._symbol
        lines: list[str] = [
            f"# Cost-of-Compliance Report — {report.framework.replace('_', ' ').title()}",
            "",
            "## Summary",
            "",
            f"| Metric | Value |",
            f"|--------|-------|",
            f"| Total Requirements | {report.total_requirements} |",
            f"| Fully Automated | {report.automated_count} |",
            f"| Semi-Automated | {report.semi_automated_count} |",
            f"| Manual | {report.manual_count} |",
            f"| Hourly Rate | {s}{report.hourly_rate:,.2f} |",
            f"| Manual Total Hours | {report.total_hours_manual:,.1f} |",
            f"| Automated Total 
CostReportRenderer.to_text_summary method · python · L85-L107 (23 LOC)
src/agent_gov/compliance_cost/report.py
    def to_text_summary(self, report: CostReport) -> str:
        """Render a compact single-paragraph text summary.

        Parameters
        ----------
        report:
            The cost report to summarise.

        Returns
        -------
        str
            Single-paragraph summary string.
        """
        return (
            f"{report.framework.replace('_', ' ').upper()}: "
            f"{report.total_requirements} requirements, "
            f"{report.automated_count} fully automated, "
            f"{report.semi_automated_count} semi-automated, "
            f"{report.manual_count} manual. "
            f"Manual cost: {self._symbol}{report.total_cost_manual:,.0f} "
            f"| Automated cost: {self._symbol}{report.total_cost_with_automation:,.0f} "
            f"| Savings: {report.savings_percentage:.1f}%."
        )
CostReportRenderer.comparison_to_markdown method · python · L109-L144 (36 LOC)
src/agent_gov/compliance_cost/report.py
    def comparison_to_markdown(self, comparison: ComparisonReport) -> str:
        """Render a :class:`ComparisonReport` as a Markdown comparison table.

        Parameters
        ----------
        comparison:
            The comparison report to render.

        Returns
        -------
        str
            Formatted Markdown comparison document.
        """
        s = self._symbol
        lines: list[str] = [
            f"# Compliance Cost Comparison — {comparison.framework.replace('_', ' ').title()}",
            "",
            "## Scenario Comparison",
            "",
            "| Scenario | Automated | Manual | Total Cost | Savings |",
            "|----------|-----------|--------|------------|---------|",
        ]
        for label, report in comparison.scenarios:
            lines.append(
                f"| {label} | {report.automated_count} | {report.manual_count} "
                f"| {s}{report.total_cost_with_automation:,.0f} "
                f"| {report.savings_
GovernanceEngine.__init__ method · python · L41-L56 (16 LOC)
src/agent_gov/convenience.py
    def __init__(self, policy_path: str | None = None) -> None:
        from agent_gov.policy.evaluator import PolicyEvaluator
        from agent_gov.policy.schema import PolicyConfig, RuleConfig

        self._evaluator = PolicyEvaluator()

        if policy_path is not None:
            from agent_gov.policy.loader import PolicyLoader
            loader = PolicyLoader()
            self._policy = loader.load_file(policy_path)
        else:
            self._policy = PolicyConfig(
                name="quickstart-default",
                version="1.0",
                rules=[],
            )
GovernanceEngine.evaluate method · python · L58-L81 (24 LOC)
src/agent_gov/convenience.py
    def evaluate(self, action: dict[str, Any]) -> Any:
        """Evaluate an agent action against the active policy.

        Parameters
        ----------
        action:
            Dict describing the agent action. Common keys include
            ``action`` (action type), ``agent_id``, and action-specific
            fields like ``query`` or ``path``.

        Returns
        -------
        EvaluationReport
            Report with ``.passed`` bool and per-rule verdicts.

        Example
        -------
        ::

            engine = GovernanceEngine()
            report = engine.evaluate({"action": "search", "query": "test"})
            assert report.passed
        """
        return self._evaluator.evaluate(self._policy, action)
_build_handler function · python · L40-L185 (146 LOC)
src/agent_gov/dashboard/dashboard_server.py
def _build_handler(
    collector: EvidenceCollector,
    scorer: PostureScorer,
    generator: ReportGenerator,
) -> type[BaseHTTPRequestHandler]:
    """Build a ``BaseHTTPRequestHandler`` subclass bound to the given services.

    Parameters
    ----------
    collector:
        Evidence collector instance.
    scorer:
        Posture scorer instance.
    generator:
        Report generator instance.

    Returns
    -------
    type[BaseHTTPRequestHandler]
        A request handler class with the services in closure.
    """

    class _Handler(BaseHTTPRequestHandler):
        """Handles GET requests for the compliance dashboard API."""

        _collector = collector
        _scorer = scorer
        _generator = generator

        def log_message(self, fmt: str, *args: object) -> None:
            """Suppress default access log output."""
            pass  # noqa: PIE790

        def do_GET(self) -> None:  # noqa: N802
            """Route GET requests to the appropriate endpoint."
All rows scored by the Repobility analyzer (https://repobility.com)
DashboardServer.__init__ method · python · L213-L226 (14 LOC)
src/agent_gov/dashboard/dashboard_server.py
    def __init__(
        self,
        collector: EvidenceCollector,
        scorer: PostureScorer,
        generator: ReportGenerator,
        host: str = "127.0.0.1",
        port: int = 8765,
    ) -> None:
        self._collector = collector
        self._scorer = scorer
        self._generator = generator
        self._host = host
        self._port = port
        self._server: HTTPServer | None = None
DashboardServer.build_server method · python · L228-L238 (11 LOC)
src/agent_gov/dashboard/dashboard_server.py
    def build_server(self) -> HTTPServer:
        """Build and return the underlying ``HTTPServer`` without starting it.

        Returns
        -------
        HTTPServer
        """
        handler_cls = _build_handler(self._collector, self._scorer, self._generator)
        server = HTTPServer((self._host, self._port), handler_cls)
        self._server = server
        return server
DashboardServer.serve_forever method · python · L240-L252 (13 LOC)
src/agent_gov/dashboard/dashboard_server.py
    def serve_forever(self) -> None:
        """Start the HTTP server and block until interrupted.

        Raises
        ------
        KeyboardInterrupt
            Propagated from the underlying server; triggers clean shutdown.
        """
        server = self.build_server()
        try:
            server.serve_forever()
        finally:
            server.server_close()
create_fastapi_app function · python · L270-L337 (68 LOC)
src/agent_gov/dashboard/dashboard_server.py
def create_fastapi_app(
    collector: EvidenceCollector,
    scorer: PostureScorer,
    generator: ReportGenerator,
) -> object:
    """Create a FastAPI ASGI application for the compliance dashboard.

    This function is provided as an *optional* integration point.  It
    requires ``fastapi`` to be installed.  When not available, raises
    ``ImportError`` with a helpful message.

    Parameters
    ----------
    collector:
        Evidence collector instance.
    scorer:
        Posture scorer instance.
    generator:
        Report generator instance.

    Returns
    -------
    fastapi.FastAPI
        ASGI application.

    Raises
    ------
    ImportError
        When ``fastapi`` is not installed.
    """
    try:
        import fastapi  # type: ignore[import]
    except ImportError as exc:
        raise ImportError(
            "FastAPI is not installed.  Install it with: pip install fastapi"
        ) from exc

    app = fastapi.FastAPI(title="agent-gov Compliance Dashboard
EvidenceEntry.to_dict method · python · L59-L73 (15 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def to_dict(self) -> dict[str, object]:
        """Serialise the entry to a plain dictionary.

        Returns
        -------
        dict[str, object]
            JSON-serialisable representation.
        """
        return {
            "timestamp": self.timestamp.isoformat(),
            "policy_id": self.policy_id,
            "rule_id": self.rule_id,
            "result": self.result,
            "context": self.context,
        }
EvidenceEntry.from_dict method · python · L76-L103 (28 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def from_dict(cls, data: dict[str, object]) -> EvidenceEntry:
        """Reconstruct an entry from a dictionary produced by :meth:`to_dict`.

        Parameters
        ----------
        data:
            Dictionary with at least ``timestamp``, ``policy_id``,
            ``rule_id``, ``result``, and ``context`` keys.

        Returns
        -------
        EvidenceEntry
        """
        raw_ts = data.get("timestamp", "")
        try:
            ts = datetime.fromisoformat(str(raw_ts))
            if ts.tzinfo is None:
                ts = ts.replace(tzinfo=timezone.utc)
        except (ValueError, TypeError):
            ts = datetime.now(timezone.utc)

        return cls(
            timestamp=ts,
            policy_id=str(data.get("policy_id", "")),
            rule_id=str(data.get("rule_id", "")),
            result=str(data.get("result", "skip")),
            context=dict(data.get("context", {})),
        )
EvidenceCollector.record method · python · L129-L139 (11 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def record(self, entry: EvidenceEntry) -> None:
        """Append *entry* to the evidence log.

        Parameters
        ----------
        entry:
            The :class:`EvidenceEntry` to record.
        """
        self._entries.append(entry)
        if self._max_entries is not None and len(self._entries) > self._max_entries:
            self._entries.pop(0)
EvidenceCollector.record_many method · python · L141-L150 (10 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def record_many(self, entries: list[EvidenceEntry]) -> None:
        """Append multiple entries in insertion order.

        Parameters
        ----------
        entries:
            Sequence of :class:`EvidenceEntry` objects to record.
        """
        for entry in entries:
            self.record(entry)
Powered by Repobility — scan your code at https://repobility.com
EvidenceCollector.query method · python · L156-L197 (42 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def query(
        self,
        policy_id: str | None = None,
        rule_id: str | None = None,
        result: str | None = None,
        since: datetime | None = None,
        until: datetime | None = None,
    ) -> list[EvidenceEntry]:
        """Return entries matching all supplied filter criteria.

        Parameters
        ----------
        policy_id:
            Filter to entries with this policy identifier.
        rule_id:
            Filter to entries with this rule identifier.
        result:
            Filter to entries with this result (``"pass"`` / ``"fail"`` / ``"skip"``).
        since:
            Include only entries at or after this UTC timestamp.
        until:
            Include only entries at or before this UTC timestamp.

        Returns
        -------
        list[EvidenceEntry]
            Matching entries in insertion order.
        """
        results: list[EvidenceEntry] = []
        for entry in self._entries:
            if policy_id is not No
EvidenceCollector.all_entries method · python · L199-L206 (8 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def all_entries(self) -> list[EvidenceEntry]:
        """Return all stored entries in insertion order.

        Returns
        -------
        list[EvidenceEntry]
        """
        return list(self._entries)
EvidenceCollector.rule_ids method · python · L217-L226 (10 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def rule_ids(self, policy_id: str | None = None) -> list[str]:
        """Return a sorted deduplicated list of rule IDs.

        Parameters
        ----------
        policy_id:
            When provided, limit to rule IDs under this policy.
        """
        entries = self.query(policy_id=policy_id) if policy_id else self._entries
        return sorted({e.rule_id for e in entries})
‹ prevpage 2 / 5next ›