← back to invincible-jha__agent-gov

Function bodies 212 total

All specs Real LLM only Function bodies
EvidenceCollector.export_json method · python · L232-L242 (11 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def export_json(self, path: Path) -> None:
        """Write all entries to a JSONL file at *path*.

        Parameters
        ----------
        path:
            Destination file path.  Parent directories must exist.
        """
        with path.open("w", encoding="utf-8") as fh:
            for entry in self._entries:
                fh.write(json.dumps(entry.to_dict(), ensure_ascii=False) + "\n")
EvidenceCollector.export_dict method · python · L244-L255 (12 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def export_dict(self) -> dict[str, object]:
        """Export all entries as a serialisable dictionary.

        Returns
        -------
        dict[str, object]
            Dictionary with ``entries`` list and ``count`` integer.
        """
        return {
            "count": self.count,
            "entries": [e.to_dict() for e in self._entries],
        }
EvidenceCollector.load_json method · python · L258-L282 (25 LOC)
src/agent_gov/dashboard/evidence_collector.py
    def load_json(cls, path: Path) -> EvidenceCollector:
        """Load entries from a JSONL file produced by :meth:`export_json`.

        Parameters
        ----------
        path:
            Source file path.

        Returns
        -------
        EvidenceCollector
            New collector pre-populated with entries from the file.
        """
        collector = cls()
        with path.open("r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    data = json.loads(line)
                    collector.record(EvidenceEntry.from_dict(data))
                except (json.JSONDecodeError, KeyError, ValueError):
                    continue  # skip malformed lines
        return collector
PostureScore.grade method · python · L62-L79 (18 LOC)
src/agent_gov/dashboard/posture_scorer.py
    def grade(self) -> str:
        """Return a letter-grade interpretation of the overall score.

        Returns
        -------
        str
            One of ``"A"``, ``"B"``, ``"C"``, ``"D"``, ``"F"``.
        """
        score = self.overall_score
        if score >= 90:
            return "A"
        if score >= 80:
            return "B"
        if score >= 70:
            return "C"
        if score >= 60:
            return "D"
        return "F"
PostureScore.to_dict method · python · L81-L97 (17 LOC)
src/agent_gov/dashboard/posture_scorer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary.

        Returns
        -------
        dict[str, object]
        """
        return {
            "overall_score": round(self.overall_score, 2),
            "grade": self.grade(),
            "per_policy": {k: round(v, 2) for k, v in self.per_policy.items()},
            "total_entries": self.total_entries,
            "pass_count": self.pass_count,
            "fail_count": self.fail_count,
            "skip_count": self.skip_count,
            "computed_at": self.computed_at.isoformat(),
        }
PostureScorer.score method · python · L124-L164 (41 LOC)
src/agent_gov/dashboard/posture_scorer.py
    def score(self, evidence: list[EvidenceEntry]) -> PostureScore:
        """Compute compliance posture from *evidence*.

        Parameters
        ----------
        evidence:
            List of :class:`~agent_gov.dashboard.evidence_collector.EvidenceEntry`
            objects to score.

        Returns
        -------
        PostureScore
            Aggregate and per-policy scores.
        """
        pass_count = sum(1 for e in evidence if e.result == "pass")
        fail_count = sum(1 for e in evidence if e.result == "fail")
        skip_count = sum(1 for e in evidence if e.result == "skip")

        overall = self._compute_score(pass_count, fail_count, skip_count)

        # Group by policy
        policy_groups: dict[str, list[EvidenceEntry]] = {}
        for entry in evidence:
            policy_groups.setdefault(entry.policy_id, []).append(entry)

        per_policy: dict[str, float] = {}
        for policy_id, group in policy_groups.items():
            p = sum(1 for e in
PostureScorer.score_trend method · python · L166-L181 (16 LOC)
src/agent_gov/dashboard/posture_scorer.py
    def score_trend(
        self, evidence_windows: list[list[EvidenceEntry]]
    ) -> list[float]:
        """Compute a trend of overall scores across multiple evidence windows.

        Parameters
        ----------
        evidence_windows:
            Ordered list of evidence slices (e.g. one per day).

        Returns
        -------
        list[float]
            Overall score for each window, in the same order.
        """
        return [self.score(window).overall_score for window in evidence_windows]
Repobility · MCP-ready · https://repobility.com
PostureScorer._compute_score method · python · L183-L191 (9 LOC)
src/agent_gov/dashboard/posture_scorer.py
    def _compute_score(
        self, pass_count: int, fail_count: int, skip_count: int
    ) -> float:
        """Internal score computation."""
        weighted_skip = skip_count * self._skip_weight
        denominator = pass_count + fail_count + weighted_skip
        if denominator == 0:
            return 0.0
        return (pass_count / denominator) * 100.0
ReportGenerator.__init__ method · python · L46-L52 (7 LOC)
src/agent_gov/dashboard/report_generator.py
    def __init__(
        self,
        system_name: str = "AI System",
        include_context: bool = False,
    ) -> None:
        self._system_name = system_name
        self._include_context = include_context
ReportGenerator.generate_markdown method · python · L58-L150 (93 LOC)
src/agent_gov/dashboard/report_generator.py
    def generate_markdown(
        self,
        evidence: list[EvidenceEntry],
        posture: PostureScore,
    ) -> str:
        """Generate an auditor-ready Markdown compliance report.

        Parameters
        ----------
        evidence:
            List of evidence entries to include.
        posture:
            Pre-computed posture score.

        Returns
        -------
        str
            Complete Markdown document.
        """
        lines: list[str] = []

        # Title
        lines.append(f"# Compliance Report — {self._system_name}")
        lines.append("")
        lines.append(f"**Generated:** {_utc_now_str()}")
        lines.append(f"**Evidence entries:** {len(evidence)}")
        lines.append("")
        lines.append("---")
        lines.append("")

        # Posture summary
        lines.append("## Compliance Posture Summary")
        lines.append("")
        lines.append(f"| Metric | Value |")
        lines.append(f"|--------|-------|")
        lines.appen
ReportGenerator.generate_json method · python · L152-L198 (47 LOC)
src/agent_gov/dashboard/report_generator.py
    def generate_json(
        self,
        evidence: list[EvidenceEntry],
        posture: PostureScore,
    ) -> dict[str, object]:
        """Generate a structured JSON-serialisable compliance report dictionary.

        Parameters
        ----------
        evidence:
            List of evidence entries.
        posture:
            Pre-computed posture score.

        Returns
        -------
        dict[str, object]
            JSON-serialisable report dictionary.
        """
        evidence_records: list[dict[str, object]] = []
        for entry in evidence:
            record: dict[str, object] = {
                "timestamp": entry.timestamp.isoformat(),
                "policy_id": entry.policy_id,
                "rule_id": entry.rule_id,
                "result": entry.result,
            }
            if self._include_context:
                record["context"] = entry.context
            evidence_records.append(record)

        return {
            "system_name": self._s
ReportGenerator.generate_json_string method · python · L200-L226 (27 LOC)
src/agent_gov/dashboard/report_generator.py
    def generate_json_string(
        self,
        evidence: list[EvidenceEntry],
        posture: PostureScore,
        indent: int = 2,
    ) -> str:
        """Return the JSON report as a formatted string.

        Parameters
        ----------
        evidence:
            List of evidence entries.
        posture:
            Pre-computed posture score.
        indent:
            JSON indentation level.

        Returns
        -------
        str
            Formatted JSON string.
        """
        return json.dumps(
            self.generate_json(evidence, posture),
            indent=indent,
            ensure_ascii=False,
        )
ReportGenerator.write_markdown method · python · L228-L248 (21 LOC)
src/agent_gov/dashboard/report_generator.py
    def write_markdown(
        self,
        evidence: list[EvidenceEntry],
        posture: PostureScore,
        path: object,
    ) -> None:
        """Write the Markdown report to a file at *path*.

        Parameters
        ----------
        evidence:
            List of evidence entries.
        posture:
            Pre-computed posture score.
        path:
            :class:`pathlib.Path` or string path for the output file.
        """
        from pathlib import Path

        content = self.generate_markdown(evidence, posture)
        Path(path).write_text(content, encoding="utf-8")
ReportGenerator.write_json method · python · L250-L273 (24 LOC)
src/agent_gov/dashboard/report_generator.py
    def write_json(
        self,
        evidence: list[EvidenceEntry],
        posture: PostureScore,
        path: object,
        indent: int = 2,
    ) -> None:
        """Write the JSON report to a file at *path*.

        Parameters
        ----------
        evidence:
            List of evidence entries.
        posture:
            Pre-computed posture score.
        path:
            :class:`pathlib.Path` or string path for the output file.
        indent:
            JSON indentation level.
        """
        from pathlib import Path

        content = self.generate_json_string(evidence, posture, indent=indent)
        Path(path).write_text(content, encoding="utf-8")
FrameworkReport.to_dict method · python · L123-L143 (21 LOC)
src/agent_gov/frameworks/base.py
    def to_dict(self) -> dict[str, object]:
        """Serialise the report to a plain dictionary."""
        return {
            "framework": self.framework,
            "score": self.score,
            "score_percent": self.score_percent,
            "total": len(self.results),
            "passed": self.passed_count,
            "failed": self.failed_count,
            "unknown": self.unknown_count,
            "results": [
                {
                    "id": r.item.id,
                    "name": r.item.name,
                    "category": r.item.category,
                    "status": r.status,
                    "evidence": r.evidence,
                }
                for r in self.results
            ],
        }
Want this analysis on your repo? https://repobility.com/scan/
ComplianceFramework.run_check method · python · L168-L181 (14 LOC)
src/agent_gov/frameworks/base.py
    def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
        """Evaluate evidence against every item in the checklist.

        Parameters
        ----------
        evidence:
            Key/value mapping where keys correspond to checklist item IDs
            or other evidence fields recognised by this framework.

        Returns
        -------
        FrameworkReport
            Report containing one :class:`CheckResult` per checklist item.
        """
EUAIActClassifier.classify method · python · L225-L320 (96 LOC)
src/agent_gov/frameworks/eu_ai_act_classifier.py
    def classify(
        self,
        system_description: str,
        use_cases: list[str] | None = None,
        data_categories: list[str] | None = None,
    ) -> RiskClassification:
        """Determine the risk level of an AI system.

        Parameters
        ----------
        system_description:
            Free-text description of the AI system and its purpose.
        use_cases:
            Optional list of intended use-case strings.  These are included
            in the keyword search corpus alongside ``system_description``.
        data_categories:
            Optional list of data category strings (e.g. ``"biometric data"``,
            ``"financial data"``).  Also included in the search corpus.

        Returns
        -------
        RiskClassification
            The determined risk classification with obligations and reasoning.
        """
        description_lower = system_description.lower()
        use_case_text = " ".join(use_cases or []).lower()
        data_t
AnnexIVDocumentation.to_markdown method · python · L138-L236 (99 LOC)
src/agent_gov/frameworks/eu_ai_act_docs.py
    def to_markdown(self) -> str:
        """Generate a Markdown representation of the Annex IV documentation.

        The output includes all seven Annex IV sections with proper headings,
        JSON code blocks for structured metrics, and bullet-pointed limitations.

        Returns
        -------
        str
            A multi-section Markdown document.
        """
        sections: list[str] = [
            f"# Technical Documentation: {self.system_name}",
            "",
            "**Per EU AI Act (Regulation (EU) 2024/1689), Annex IV**",
            f"**Generated:** {self.generated_at}",
            f"**Provider:** {self.provider_name}",
            f"**System Version:** {self.system_version}",
            "",
            "---",
            "",
            "## 1. General Description of the AI System (Annex IV(1))",
            "",
            "### Intended Purpose",
            self.intended_purpose,
            "",
            "### System Description",
            self.sys
AnnexIVDocumentation.export method · python · L238-L271 (34 LOC)
src/agent_gov/frameworks/eu_ai_act_docs.py
    def export(self, output_dir: str) -> list[str]:
        """Export the documentation package to files on disk.

        Creates the output directory if it does not exist, then writes:

        - ``annex-iv-technical-documentation.md`` — human-readable Markdown.
        - ``annex-iv-data.json`` — full dataclass serialised to JSON.

        Parameters
        ----------
        output_dir:
            Path to the directory where files should be written.

        Returns
        -------
        list[str]
            Absolute paths of all files created, in the order they were written.
        """
        out_path = Path(output_dir)
        out_path.mkdir(parents=True, exist_ok=True)
        created: list[str] = []

        md_path = out_path / "annex-iv-technical-documentation.md"
        md_path.write_text(self.to_markdown(), encoding="utf-8")
        created.append(str(md_path))

        json_path = out_path / "annex-iv-data.json"
        json_path.write_text(
            json.dumps(d
EuAiActFramework.checklist method · python · L49-L125 (77 LOC)
src/agent_gov/frameworks/eu_ai_act.py
    def checklist(self) -> list[ChecklistItem]:
        """Return the 8-item EU AI Act checklist."""
        return [
            ChecklistItem(
                id="A6",
                name="Risk Classification",
                description=(
                    "Article 6: The AI system has been assessed and classified "
                    "according to the EU AI Act risk categories (unacceptable, high, "
                    "limited, minimal)."
                ),
                category="risk",
            ),
            ChecklistItem(
                id="A9",
                name="Risk Management System",
                description=(
                    "Article 9: A risk management system is established, implemented, "
                    "documented, and maintained for the lifecycle of the AI system."
                ),
                category="risk",
            ),
            ChecklistItem(
                id="A10",
                name="Data Governance",
                de
EuAiActFramework.run_check method · python · L127-L150 (24 LOC)
src/agent_gov/frameworks/eu_ai_act.py
    def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
        """Evaluate evidence against the 8 EU AI Act checklist items.

        Parameters
        ----------
        evidence:
            Dict keyed by article ID (``"A6"``, ``"A9"``, etc.).  Each value
            may be:
            - A mapping with ``"status"`` (``"pass"``/``"fail"``/``"unknown"``)
              and optional ``"evidence"`` string.
            - A plain truthy/falsy value (converted to ``"pass"``/``"fail"``).
            - Missing key → ``"unknown"``.

        Returns
        -------
        FrameworkReport
        """
        results: list[CheckResult] = []
        for item in self.checklist():
            item_evidence = evidence.get(item.id)
            status, evidence_text = _resolve_evidence(item_evidence)
            results.append(CheckResult(item=item, status=status, evidence=evidence_text))

        return FrameworkReport(framework=self.name, results=results)
_resolve_evidence function · python · L153-L163 (11 LOC)
src/agent_gov/frameworks/eu_ai_act.py
def _resolve_evidence(value: object) -> tuple[str, str]:
    """Resolve a raw evidence value to a (status, evidence_text) pair."""
    if value is None:
        return "unknown", "No evidence provided."
    if isinstance(value, dict):
        raw_status = str(value.get("status", "unknown")).lower()
        status = raw_status if raw_status in ("pass", "fail", "unknown") else "unknown"
        evidence_text = str(value.get("evidence", ""))
        return status, evidence_text
    # Treat truthy scalar as pass, falsy as fail
    return ("pass" if value else "fail"), str(value)
GdprFramework.checklist method · python · L54-L128 (75 LOC)
src/agent_gov/frameworks/gdpr.py
    def checklist(self) -> list[ChecklistItem]:
        """Return the 7-item GDPR checklist."""
        return [
            ChecklistItem(
                id="P1",
                name="Lawful Basis for Processing",
                description=(
                    "Article 5(1)(a): Personal data is processed lawfully, fairly, and "
                    "transparently.  A valid lawful basis (consent, contract, legal "
                    "obligation, vital interests, public task, or legitimate interests) "
                    "is identified and documented."
                ),
                category="lawfulness",
            ),
            ChecklistItem(
                id="P2",
                name="Purpose Limitation",
                description=(
                    "Article 5(1)(b): Personal data is collected for specified, explicit, "
                    "and legitimate purposes.  Data is not further processed in a manner "
                    "incompatible with those purposes."
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
GdprFramework.run_check method · python · L130-L149 (20 LOC)
src/agent_gov/frameworks/gdpr.py
    def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
        """Evaluate evidence against the 7 GDPR principle checklist items.

        Parameters
        ----------
        evidence:
            Dict keyed by principle ID.  Same resolution rules as
            :meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.

        Returns
        -------
        FrameworkReport
        """
        results: list[CheckResult] = []
        for item in self.checklist():
            item_evidence = evidence.get(item.id)
            status, evidence_text = _resolve_evidence(item_evidence)
            results.append(CheckResult(item=item, status=status, evidence=evidence_text))

        return FrameworkReport(framework=self.name, results=results)
HipaaFramework.checklist method · python · L54-L108 (55 LOC)
src/agent_gov/frameworks/hipaa.py
    def checklist(self) -> list[ChecklistItem]:
        """Return the 5-item HIPAA Security Rule checklist."""
        return [
            ChecklistItem(
                id="SC1",
                name="Access Controls",
                description=(
                    "164.312(a)(1): Implement technical policies and procedures for "
                    "electronic information systems that maintain ePHI to allow access "
                    "only to those persons or software programs that have been granted "
                    "access rights."
                ),
                category="access",
            ),
            ChecklistItem(
                id="SC2",
                name="Audit Controls",
                description=(
                    "164.312(b): Implement hardware, software, and/or procedural "
                    "mechanisms that record and examine activity in information systems "
                    "that contain or use ePHI."
                ),
                c
HipaaFramework.run_check method · python · L110-L129 (20 LOC)
src/agent_gov/frameworks/hipaa.py
    def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
        """Evaluate evidence against the 5 HIPAA Security Rule checklist items.

        Parameters
        ----------
        evidence:
            Dict keyed by safeguard ID.  Same resolution rules as
            :meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.

        Returns
        -------
        FrameworkReport
        """
        results: list[CheckResult] = []
        for item in self.checklist():
            item_evidence = evidence.get(item.id)
            status, evidence_text = _resolve_evidence(item_evidence)
            results.append(CheckResult(item=item, status=status, evidence=evidence_text))

        return FrameworkReport(framework=self.name, results=results)
Soc2Framework.checklist method · python · L53-L105 (53 LOC)
src/agent_gov/frameworks/soc2.py
    def checklist(self) -> list[ChecklistItem]:
        """Return the 5-item SOC 2 Trust Services Criteria checklist."""
        return [
            ChecklistItem(
                id="CC6",
                name="Security",
                description=(
                    "CC6 — Logical and Physical Access Controls: The system is protected "
                    "against unauthorised access using logical and physical access controls "
                    "including authentication, authorisation, and access monitoring."
                ),
                category="security",
            ),
            ChecklistItem(
                id="A1",
                name="Availability",
                description=(
                    "A1 — Availability: The system is available for operation and use "
                    "as committed or agreed, and meets defined performance and uptime "
                    "service level objectives."
                ),
                category="availability",
 
Soc2Framework.run_check method · python · L107-L126 (20 LOC)
src/agent_gov/frameworks/soc2.py
    def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
        """Evaluate evidence against the 5 SOC 2 Trust Services Criteria.

        Parameters
        ----------
        evidence:
            Dict keyed by criteria ID.  Same resolution rules as
            :meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.

        Returns
        -------
        FrameworkReport
        """
        results: list[CheckResult] = []
        for item in self.checklist():
            item_evidence = evidence.get(item.id)
            status, evidence_text = _resolve_evidence(item_evidence)
            results.append(CheckResult(item=item, status=status, evidence=evidence_text))

        return FrameworkReport(framework=self.name, results=results)
AgentCoreBridge.__init__ method · python · L69-L80 (12 LOC)
src/agent_gov/integration/agentcore_bridge.py
    def __init__(
        self,
        policy: PolicyConfig,
        audit_logger: Optional[AuditLogger] = None,
        *,
        agent_id_field: str = "agent_id",
    ) -> None:
        self._policy = policy
        self._audit_logger = audit_logger
        self._agent_id_field = agent_id_field
        self._evaluator = PolicyEvaluator()
        self._connected = False
AgentCoreBridge.connect method · python · L92-L121 (30 LOC)
src/agent_gov/integration/agentcore_bridge.py
    def connect(self) -> bool:
        """Subscribe to the agentcore event bus.

        Returns
        -------
        bool
            ``True`` when successfully connected, ``False`` when
            agentcore-sdk is not available.
        """
        if not _AGENTCORE_AVAILABLE:
            logger.warning(
                "agentcore-sdk is not installed. "
                "Install with: pip install agent-gov[agentcore]"
            )
            return False

        try:
            # Subscribe to agent action events via agentcore event bus.
            # The actual API depends on the agentcore-sdk version.
            event_bus = agentcore.get_event_bus()  # type: ignore[union-attr]
            event_bus.subscribe("agent.action", self._handle_event)
            self._connected = True
            logger.info(
                "AgentCoreBridge connected; evaluating against policy %r",
                self._policy.name,
            )
            return True
        except Exception:
AgentCoreBridge.disconnect method · python · L123-L134 (12 LOC)
src/agent_gov/integration/agentcore_bridge.py
    def disconnect(self) -> None:
        """Unsubscribe from the agentcore event bus."""
        if not _AGENTCORE_AVAILABLE or not self._connected:
            return

        try:
            event_bus = agentcore.get_event_bus()  # type: ignore[union-attr]
            event_bus.unsubscribe("agent.action", self._handle_event)
            self._connected = False
            logger.info("AgentCoreBridge disconnected.")
        except Exception:
            logger.exception("Failed to disconnect AgentCoreBridge.")
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
AgentCoreBridge._handle_event method · python · L136-L163 (28 LOC)
src/agent_gov/integration/agentcore_bridge.py
    def _handle_event(self, event: dict[str, object]) -> None:
        """Handle an incoming agentcore event.

        Parameters
        ----------
        event:
            Raw event payload from the agentcore event bus.
        """
        try:
            report = self._evaluator.evaluate(self._policy, event)
            agent_id = str(event.get(self._agent_id_field, "unknown"))

            if self._audit_logger is not None:
                self._audit_logger.log_from_report(report, agent_id=agent_id)

            if not report.passed:
                logger.warning(
                    "Policy %r FAILED for agent %r: %d violation(s), highest severity=%s",
                    self._policy.name,
                    agent_id,
                    report.violation_count,
                    report.highest_severity,
                )
        except Exception:
            logger.exception(
                "AgentCoreBridge failed to evaluate event against policy %r.",
                se
AgentCoreBridge.evaluate_event method · python · L165-L190 (26 LOC)
src/agent_gov/integration/agentcore_bridge.py
    def evaluate_event(
        self,
        event: dict[str, object],
        agent_id: str = "unknown",
    ) -> bool:
        """Manually evaluate a single event (without the event bus).

        Useful for testing the bridge configuration without a live
        agentcore connection.

        Parameters
        ----------
        event:
            Action payload to evaluate.
        agent_id:
            Agent identifier for audit logging.

        Returns
        -------
        bool
            ``True`` when the event passes the policy.
        """
        report = self._evaluator.evaluate(self._policy, event)
        if self._audit_logger is not None:
            self._audit_logger.log_from_report(report, agent_id=agent_id)
        return report.passed
MappingResult.to_dict method · python · L113-L129 (17 LOC)
src/agent_gov/multi_framework/mapper.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "source_framework": self.source_framework.value,
            "source_requirement_id": self.source_requirement_id,
            "source_requirement_name": self.source_requirement_name,
            "matches": [
                {
                    "framework": m.framework.value,
                    "requirement_id": m.requirement_id,
                    "requirement_name": m.requirement_name,
                    "similarity_score": m.similarity_score,
                    "shared_tags": sorted(m.shared_tags),
                }
                for m in self.matches
            ],
        }
_jaccard_similarity function · python · L360-L368 (9 LOC)
src/agent_gov/multi_framework/mapper.py
def _jaccard_similarity(set_a: frozenset[str], set_b: frozenset[str]) -> float:
    """Compute Jaccard similarity between two tag sets."""
    if not set_a and not set_b:
        return 0.0
    intersection = len(set_a & set_b)
    union = len(set_a | set_b)
    if union == 0:
        return 0.0
    return intersection / union
CrossFrameworkMapper.__init__ method · python · L394-L400 (7 LOC)
src/agent_gov/multi_framework/mapper.py
    def __init__(self, *, similarity_threshold: float = 0.2) -> None:
        self._similarity_threshold = similarity_threshold
        self._catalog = _build_requirement_catalog()
        self._index: dict[tuple[SupportedFramework, str], FrameworkRequirement] = {}
        for requirements in self._catalog.values():
            for req in requirements:
                self._index[(req.framework, req.requirement_id)] = req
CrossFrameworkMapper.get_requirement method · python · L402-L412 (11 LOC)
src/agent_gov/multi_framework/mapper.py
    def get_requirement(
        self,
        framework: str | SupportedFramework,
        requirement_id: str,
    ) -> Optional[FrameworkRequirement]:
        """Look up a specific requirement by framework and ID.

        Returns None if not found.
        """
        fw = SupportedFramework(framework) if isinstance(framework, str) else framework
        return self._index.get((fw, requirement_id))
CrossFrameworkMapper.map_requirement method · python · L414-L471 (58 LOC)
src/agent_gov/multi_framework/mapper.py
    def map_requirement(
        self,
        framework: str | SupportedFramework,
        requirement_id: str,
        *,
        exclude_same_framework: bool = True,
    ) -> MappingResult:
        """Find equivalent requirements in other frameworks.

        Parameters
        ----------
        framework:
            Source framework name or enum value.
        requirement_id:
            ID of the requirement to map.
        exclude_same_framework:
            When True, results from the same framework are excluded.

        Returns
        -------
        MappingResult
            Matches sorted by descending similarity score.

        Raises
        ------
        KeyError
            When the source requirement is not found in the catalog.
        """
        fw = SupportedFramework(framework) if isinstance(framework, str) else framework
        source = self._index.get((fw, requirement_id))
        if source is None:
            raise KeyError(f"Requirement {requirement_id!r}
CrossFrameworkMapper.map_all_requirements method · python · L473-L486 (14 LOC)
src/agent_gov/multi_framework/mapper.py
    def map_all_requirements(
        self,
        framework: str | SupportedFramework,
    ) -> list[MappingResult]:
        """Map every requirement in a framework to other frameworks.

        Returns one MappingResult per requirement in the source framework.
        """
        fw = SupportedFramework(framework) if isinstance(framework, str) else framework
        results: list[MappingResult] = []
        for req in self._catalog.get(fw, []):
            result = self.map_requirement(fw, req.requirement_id)
            results.append(result)
        return results
Repobility · MCP-ready · https://repobility.com
CrossFrameworkMapper.list_requirements method · python · L488-L494 (7 LOC)
src/agent_gov/multi_framework/mapper.py
    def list_requirements(
        self,
        framework: str | SupportedFramework,
    ) -> list[FrameworkRequirement]:
        """Return all requirements in a given framework."""
        fw = SupportedFramework(framework) if isinstance(framework, str) else framework
        return list(self._catalog.get(fw, []))
CrossFrameworkMapper.find_by_category method · python · L496-L517 (22 LOC)
src/agent_gov/multi_framework/mapper.py
    def find_by_category(
        self,
        category: str,
        *,
        frameworks: Optional[list[SupportedFramework]] = None,
    ) -> list[FrameworkRequirement]:
        """Return all requirements matching a category across frameworks.

        Parameters
        ----------
        category:
            Category string to filter by (e.g. ``"transparency"``).
        frameworks:
            If provided, restrict search to these frameworks.
        """
        results: list[FrameworkRequirement] = []
        target_frameworks = frameworks or list(self._catalog.keys())
        for fw in target_frameworks:
            for req in self._catalog.get(fw, []):
                if req.category == category:
                    results.append(req)
        return results
CrossFrameworkMapper.find_by_tag method · python · L519-L525 (7 LOC)
src/agent_gov/multi_framework/mapper.py
    def find_by_tag(self, tag: str) -> list[FrameworkRequirement]:
        """Return all requirements whose control_tags contain the given tag."""
        return [
            req
            for req in self._index.values()
            if tag in req.control_tags
        ]
ControlGroup.to_dict method · python · L77-L92 (16 LOC)
src/agent_gov/multi_framework/overlap_analyzer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "shared_tag": self.shared_tag,
            "frameworks_covered": sorted(fw.value for fw in self.frameworks_covered),
            "requirement_count": len(self.requirements),
            "requirements": [
                {
                    "framework": r.framework.value,
                    "requirement_id": r.requirement_id,
                    "name": r.name,
                    "category": r.category,
                }
                for r in self.requirements
            ],
        }
OverlapReport.groups_for_framework method · python · L121-L126 (6 LOC)
src/agent_gov/multi_framework/overlap_analyzer.py
    def groups_for_framework(self, framework: SupportedFramework) -> list[ControlGroup]:
        """Return groups that include requirements from the given framework."""
        return [
            g for g in self.control_groups
            if framework in g.frameworks_covered
        ]
OverlapReport.to_dict method · python · L128-L143 (16 LOC)
src/agent_gov/multi_framework/overlap_analyzer.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "total_requirements_analyzed": self.total_requirements_analyzed,
            "control_group_count": len(self.control_groups),
            "shared_controls": [
                {
                    "tag": sc.tag,
                    "cross_framework_count": sc.cross_framework_count,
                    "requirement_count": sc.requirement_count,
                    "frameworks": sorted(fw.value for fw in sc.frameworks),
                }
                for sc in self.shared_controls
            ],
            "control_groups": [g.to_dict() for g in self.control_groups],
        }
OverlapAnalyzer.analyze method · python · L172-L224 (53 LOC)
src/agent_gov/multi_framework/overlap_analyzer.py
    def analyze(self) -> OverlapReport:
        """Run a full overlap analysis across all supported frameworks.

        Returns
        -------
        OverlapReport
            Groups of overlapping requirements with shared control tags.
        """
        all_requirements: list[FrameworkRequirement] = []
        for fw in SupportedFramework:
            all_requirements.extend(self._mapper.list_requirements(fw))

        # Collect all tags and which frameworks + requirements use them
        tag_to_requirements: dict[str, list[FrameworkRequirement]] = {}
        tag_to_frameworks: dict[str, set[SupportedFramework]] = {}

        for req in all_requirements:
            for tag in req.control_tags:
                tag_to_requirements.setdefault(tag, []).append(req)
                tag_to_frameworks.setdefault(tag, set()).add(req.framework)

        # Build shared controls and groups filtered by min_frameworks
        shared_controls: list[SharedControl] = []
        control_groups: 
OverlapAnalyzer.find_redundant_requirements method · python · L226-L261 (36 LOC)
src/agent_gov/multi_framework/overlap_analyzer.py
    def find_redundant_requirements(
        self,
        framework: SupportedFramework,
        *,
        similarity_threshold: float = 0.5,
    ) -> list[tuple[FrameworkRequirement, list[FrameworkRequirement]]]:
        """Find requirements in a framework that are highly redundant with others.

        Returns a list of (requirement, similar_requirements_from_other_frameworks).
        A requirement is considered redundant if it has at least one match in
        another framework above the similarity threshold.

        Parameters
        ----------
        framework:
            The framework to analyze for redundant requirements.
        similarity_threshold:
            Minimum Jaccard similarity for two requirements to be considered
            redundant. Defaults to 0.5.
        """
        source_requirements = self._mapper.list_requirements(framework)
        result: list[tuple[FrameworkRequirement, list[FrameworkRequirement]]] = []

        for source_req in source_requirem
Want this analysis on your repo? https://repobility.com/scan/
_GenericRegistry.register method · python · L59-L78 (20 LOC)
src/agent_gov/plugins/registry.py
    def register(self, name: str) -> Callable[[type[T]], type[T]]:
        """Decorator that registers a class under ``name``."""

        def decorator(cls: type[T]) -> type[T]:
            if name in self._entries:
                logger.warning(
                    "Overwriting existing registration %r in registry %r.",
                    name,
                    self._name,
                )
            self._entries[name] = cls
            logger.debug(
                "Registered %r -> %s in %r registry.",
                name,
                cls.__qualname__,  # type: ignore[attr-defined]
                self._name,
            )
            return cls

        return decorator
_GenericRegistry.get method · python · L84-L97 (14 LOC)
src/agent_gov/plugins/registry.py
    def get(self, name: str) -> type[T]:
        """Return the class registered under ``name``.

        Raises
        ------
        KeyError
            If no class is registered under ``name``.
        """
        if name not in self._entries:
            raise KeyError(
                f"{name!r} is not registered in the {self._name!r} registry. "
                f"Available: {sorted(self._entries)!r}"
            )
        return self._entries[name]
_GenericRegistry.load_entrypoints method · python · L112-L139 (28 LOC)
src/agent_gov/plugins/registry.py
    def load_entrypoints(self, group: str) -> None:
        """Load and register classes declared as package entry-points.

        Already-registered names are skipped (idempotent).

        Parameters
        ----------
        group:
            Entry-point group to scan.
        """
        for ep in importlib.metadata.entry_points(group=group):
            if ep.name in self._entries:
                logger.debug(
                    "Entry-point %r already registered in %r; skipping.",
                    ep.name,
                    self._name,
                )
                continue
            try:
                cls = ep.load()
                self.register_class(ep.name, cls)
                logger.debug("Loaded entry-point %r into %r registry.", ep.name, self._name)
            except Exception:
                logger.exception(
                    "Failed to load entry-point %r from group %r; skipping.",
                    ep.name,
                    group,
         
‹ prevpage 3 / 5next ›