Function bodies 212 total
EvidenceCollector.export_json method · python · L232-L242 (11 LOC)src/agent_gov/dashboard/evidence_collector.py
def export_json(self, path: Path) -> None:
"""Write all entries to a JSONL file at *path*.
Parameters
----------
path:
Destination file path. Parent directories must exist.
"""
with path.open("w", encoding="utf-8") as fh:
for entry in self._entries:
fh.write(json.dumps(entry.to_dict(), ensure_ascii=False) + "\n")EvidenceCollector.export_dict method · python · L244-L255 (12 LOC)src/agent_gov/dashboard/evidence_collector.py
def export_dict(self) -> dict[str, object]:
"""Export all entries as a serialisable dictionary.
Returns
-------
dict[str, object]
Dictionary with ``entries`` list and ``count`` integer.
"""
return {
"count": self.count,
"entries": [e.to_dict() for e in self._entries],
}EvidenceCollector.load_json method · python · L258-L282 (25 LOC)src/agent_gov/dashboard/evidence_collector.py
def load_json(cls, path: Path) -> EvidenceCollector:
"""Load entries from a JSONL file produced by :meth:`export_json`.
Parameters
----------
path:
Source file path.
Returns
-------
EvidenceCollector
New collector pre-populated with entries from the file.
"""
collector = cls()
with path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
collector.record(EvidenceEntry.from_dict(data))
except (json.JSONDecodeError, KeyError, ValueError):
continue # skip malformed lines
return collectorPostureScore.grade method · python · L62-L79 (18 LOC)src/agent_gov/dashboard/posture_scorer.py
def grade(self) -> str:
"""Return a letter-grade interpretation of the overall score.
Returns
-------
str
One of ``"A"``, ``"B"``, ``"C"``, ``"D"``, ``"F"``.
"""
score = self.overall_score
if score >= 90:
return "A"
if score >= 80:
return "B"
if score >= 70:
return "C"
if score >= 60:
return "D"
return "F"PostureScore.to_dict method · python · L81-L97 (17 LOC)src/agent_gov/dashboard/posture_scorer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary.
Returns
-------
dict[str, object]
"""
return {
"overall_score": round(self.overall_score, 2),
"grade": self.grade(),
"per_policy": {k: round(v, 2) for k, v in self.per_policy.items()},
"total_entries": self.total_entries,
"pass_count": self.pass_count,
"fail_count": self.fail_count,
"skip_count": self.skip_count,
"computed_at": self.computed_at.isoformat(),
}PostureScorer.score method · python · L124-L164 (41 LOC)src/agent_gov/dashboard/posture_scorer.py
def score(self, evidence: list[EvidenceEntry]) -> PostureScore:
"""Compute compliance posture from *evidence*.
Parameters
----------
evidence:
List of :class:`~agent_gov.dashboard.evidence_collector.EvidenceEntry`
objects to score.
Returns
-------
PostureScore
Aggregate and per-policy scores.
"""
pass_count = sum(1 for e in evidence if e.result == "pass")
fail_count = sum(1 for e in evidence if e.result == "fail")
skip_count = sum(1 for e in evidence if e.result == "skip")
overall = self._compute_score(pass_count, fail_count, skip_count)
# Group by policy
policy_groups: dict[str, list[EvidenceEntry]] = {}
for entry in evidence:
policy_groups.setdefault(entry.policy_id, []).append(entry)
per_policy: dict[str, float] = {}
for policy_id, group in policy_groups.items():
p = sum(1 for e inPostureScorer.score_trend method · python · L166-L181 (16 LOC)src/agent_gov/dashboard/posture_scorer.py
def score_trend(
self, evidence_windows: list[list[EvidenceEntry]]
) -> list[float]:
"""Compute a trend of overall scores across multiple evidence windows.
Parameters
----------
evidence_windows:
Ordered list of evidence slices (e.g. one per day).
Returns
-------
list[float]
Overall score for each window, in the same order.
"""
return [self.score(window).overall_score for window in evidence_windows]Repobility · MCP-ready · https://repobility.com
PostureScorer._compute_score method · python · L183-L191 (9 LOC)src/agent_gov/dashboard/posture_scorer.py
def _compute_score(
self, pass_count: int, fail_count: int, skip_count: int
) -> float:
"""Internal score computation."""
weighted_skip = skip_count * self._skip_weight
denominator = pass_count + fail_count + weighted_skip
if denominator == 0:
return 0.0
return (pass_count / denominator) * 100.0ReportGenerator.__init__ method · python · L46-L52 (7 LOC)src/agent_gov/dashboard/report_generator.py
def __init__(
self,
system_name: str = "AI System",
include_context: bool = False,
) -> None:
self._system_name = system_name
self._include_context = include_contextReportGenerator.generate_markdown method · python · L58-L150 (93 LOC)src/agent_gov/dashboard/report_generator.py
def generate_markdown(
self,
evidence: list[EvidenceEntry],
posture: PostureScore,
) -> str:
"""Generate an auditor-ready Markdown compliance report.
Parameters
----------
evidence:
List of evidence entries to include.
posture:
Pre-computed posture score.
Returns
-------
str
Complete Markdown document.
"""
lines: list[str] = []
# Title
lines.append(f"# Compliance Report — {self._system_name}")
lines.append("")
lines.append(f"**Generated:** {_utc_now_str()}")
lines.append(f"**Evidence entries:** {len(evidence)}")
lines.append("")
lines.append("---")
lines.append("")
# Posture summary
lines.append("## Compliance Posture Summary")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.appenReportGenerator.generate_json method · python · L152-L198 (47 LOC)src/agent_gov/dashboard/report_generator.py
def generate_json(
self,
evidence: list[EvidenceEntry],
posture: PostureScore,
) -> dict[str, object]:
"""Generate a structured JSON-serialisable compliance report dictionary.
Parameters
----------
evidence:
List of evidence entries.
posture:
Pre-computed posture score.
Returns
-------
dict[str, object]
JSON-serialisable report dictionary.
"""
evidence_records: list[dict[str, object]] = []
for entry in evidence:
record: dict[str, object] = {
"timestamp": entry.timestamp.isoformat(),
"policy_id": entry.policy_id,
"rule_id": entry.rule_id,
"result": entry.result,
}
if self._include_context:
record["context"] = entry.context
evidence_records.append(record)
return {
"system_name": self._sReportGenerator.generate_json_string method · python · L200-L226 (27 LOC)src/agent_gov/dashboard/report_generator.py
def generate_json_string(
self,
evidence: list[EvidenceEntry],
posture: PostureScore,
indent: int = 2,
) -> str:
"""Return the JSON report as a formatted string.
Parameters
----------
evidence:
List of evidence entries.
posture:
Pre-computed posture score.
indent:
JSON indentation level.
Returns
-------
str
Formatted JSON string.
"""
return json.dumps(
self.generate_json(evidence, posture),
indent=indent,
ensure_ascii=False,
)ReportGenerator.write_markdown method · python · L228-L248 (21 LOC)src/agent_gov/dashboard/report_generator.py
def write_markdown(
self,
evidence: list[EvidenceEntry],
posture: PostureScore,
path: object,
) -> None:
"""Write the Markdown report to a file at *path*.
Parameters
----------
evidence:
List of evidence entries.
posture:
Pre-computed posture score.
path:
:class:`pathlib.Path` or string path for the output file.
"""
from pathlib import Path
content = self.generate_markdown(evidence, posture)
Path(path).write_text(content, encoding="utf-8")ReportGenerator.write_json method · python · L250-L273 (24 LOC)src/agent_gov/dashboard/report_generator.py
def write_json(
self,
evidence: list[EvidenceEntry],
posture: PostureScore,
path: object,
indent: int = 2,
) -> None:
"""Write the JSON report to a file at *path*.
Parameters
----------
evidence:
List of evidence entries.
posture:
Pre-computed posture score.
path:
:class:`pathlib.Path` or string path for the output file.
indent:
JSON indentation level.
"""
from pathlib import Path
content = self.generate_json_string(evidence, posture, indent=indent)
Path(path).write_text(content, encoding="utf-8")FrameworkReport.to_dict method · python · L123-L143 (21 LOC)src/agent_gov/frameworks/base.py
def to_dict(self) -> dict[str, object]:
"""Serialise the report to a plain dictionary."""
return {
"framework": self.framework,
"score": self.score,
"score_percent": self.score_percent,
"total": len(self.results),
"passed": self.passed_count,
"failed": self.failed_count,
"unknown": self.unknown_count,
"results": [
{
"id": r.item.id,
"name": r.item.name,
"category": r.item.category,
"status": r.status,
"evidence": r.evidence,
}
for r in self.results
],
}Want this analysis on your repo? https://repobility.com/scan/
ComplianceFramework.run_check method · python · L168-L181 (14 LOC)src/agent_gov/frameworks/base.py
def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
"""Evaluate evidence against every item in the checklist.
Parameters
----------
evidence:
Key/value mapping where keys correspond to checklist item IDs
or other evidence fields recognised by this framework.
Returns
-------
FrameworkReport
Report containing one :class:`CheckResult` per checklist item.
"""EUAIActClassifier.classify method · python · L225-L320 (96 LOC)src/agent_gov/frameworks/eu_ai_act_classifier.py
def classify(
self,
system_description: str,
use_cases: list[str] | None = None,
data_categories: list[str] | None = None,
) -> RiskClassification:
"""Determine the risk level of an AI system.
Parameters
----------
system_description:
Free-text description of the AI system and its purpose.
use_cases:
Optional list of intended use-case strings. These are included
in the keyword search corpus alongside ``system_description``.
data_categories:
Optional list of data category strings (e.g. ``"biometric data"``,
``"financial data"``). Also included in the search corpus.
Returns
-------
RiskClassification
The determined risk classification with obligations and reasoning.
"""
description_lower = system_description.lower()
use_case_text = " ".join(use_cases or []).lower()
data_tAnnexIVDocumentation.to_markdown method · python · L138-L236 (99 LOC)src/agent_gov/frameworks/eu_ai_act_docs.py
def to_markdown(self) -> str:
"""Generate a Markdown representation of the Annex IV documentation.
The output includes all seven Annex IV sections with proper headings,
JSON code blocks for structured metrics, and bullet-pointed limitations.
Returns
-------
str
A multi-section Markdown document.
"""
sections: list[str] = [
f"# Technical Documentation: {self.system_name}",
"",
"**Per EU AI Act (Regulation (EU) 2024/1689), Annex IV**",
f"**Generated:** {self.generated_at}",
f"**Provider:** {self.provider_name}",
f"**System Version:** {self.system_version}",
"",
"---",
"",
"## 1. General Description of the AI System (Annex IV(1))",
"",
"### Intended Purpose",
self.intended_purpose,
"",
"### System Description",
self.sysAnnexIVDocumentation.export method · python · L238-L271 (34 LOC)src/agent_gov/frameworks/eu_ai_act_docs.py
def export(self, output_dir: str) -> list[str]:
"""Export the documentation package to files on disk.
Creates the output directory if it does not exist, then writes:
- ``annex-iv-technical-documentation.md`` — human-readable Markdown.
- ``annex-iv-data.json`` — full dataclass serialised to JSON.
Parameters
----------
output_dir:
Path to the directory where files should be written.
Returns
-------
list[str]
Absolute paths of all files created, in the order they were written.
"""
out_path = Path(output_dir)
out_path.mkdir(parents=True, exist_ok=True)
created: list[str] = []
md_path = out_path / "annex-iv-technical-documentation.md"
md_path.write_text(self.to_markdown(), encoding="utf-8")
created.append(str(md_path))
json_path = out_path / "annex-iv-data.json"
json_path.write_text(
json.dumps(dEuAiActFramework.checklist method · python · L49-L125 (77 LOC)src/agent_gov/frameworks/eu_ai_act.py
def checklist(self) -> list[ChecklistItem]:
"""Return the 8-item EU AI Act checklist."""
return [
ChecklistItem(
id="A6",
name="Risk Classification",
description=(
"Article 6: The AI system has been assessed and classified "
"according to the EU AI Act risk categories (unacceptable, high, "
"limited, minimal)."
),
category="risk",
),
ChecklistItem(
id="A9",
name="Risk Management System",
description=(
"Article 9: A risk management system is established, implemented, "
"documented, and maintained for the lifecycle of the AI system."
),
category="risk",
),
ChecklistItem(
id="A10",
name="Data Governance",
deEuAiActFramework.run_check method · python · L127-L150 (24 LOC)src/agent_gov/frameworks/eu_ai_act.py
def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
"""Evaluate evidence against the 8 EU AI Act checklist items.
Parameters
----------
evidence:
Dict keyed by article ID (``"A6"``, ``"A9"``, etc.). Each value
may be:
- A mapping with ``"status"`` (``"pass"``/``"fail"``/``"unknown"``)
and optional ``"evidence"`` string.
- A plain truthy/falsy value (converted to ``"pass"``/``"fail"``).
- Missing key → ``"unknown"``.
Returns
-------
FrameworkReport
"""
results: list[CheckResult] = []
for item in self.checklist():
item_evidence = evidence.get(item.id)
status, evidence_text = _resolve_evidence(item_evidence)
results.append(CheckResult(item=item, status=status, evidence=evidence_text))
return FrameworkReport(framework=self.name, results=results)_resolve_evidence function · python · L153-L163 (11 LOC)src/agent_gov/frameworks/eu_ai_act.py
def _resolve_evidence(value: object) -> tuple[str, str]:
"""Resolve a raw evidence value to a (status, evidence_text) pair."""
if value is None:
return "unknown", "No evidence provided."
if isinstance(value, dict):
raw_status = str(value.get("status", "unknown")).lower()
status = raw_status if raw_status in ("pass", "fail", "unknown") else "unknown"
evidence_text = str(value.get("evidence", ""))
return status, evidence_text
# Treat truthy scalar as pass, falsy as fail
return ("pass" if value else "fail"), str(value)GdprFramework.checklist method · python · L54-L128 (75 LOC)src/agent_gov/frameworks/gdpr.py
def checklist(self) -> list[ChecklistItem]:
"""Return the 7-item GDPR checklist."""
return [
ChecklistItem(
id="P1",
name="Lawful Basis for Processing",
description=(
"Article 5(1)(a): Personal data is processed lawfully, fairly, and "
"transparently. A valid lawful basis (consent, contract, legal "
"obligation, vital interests, public task, or legitimate interests) "
"is identified and documented."
),
category="lawfulness",
),
ChecklistItem(
id="P2",
name="Purpose Limitation",
description=(
"Article 5(1)(b): Personal data is collected for specified, explicit, "
"and legitimate purposes. Data is not further processed in a manner "
"incompatible with those purposes."Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
GdprFramework.run_check method · python · L130-L149 (20 LOC)src/agent_gov/frameworks/gdpr.py
def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
"""Evaluate evidence against the 7 GDPR principle checklist items.
Parameters
----------
evidence:
Dict keyed by principle ID. Same resolution rules as
:meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.
Returns
-------
FrameworkReport
"""
results: list[CheckResult] = []
for item in self.checklist():
item_evidence = evidence.get(item.id)
status, evidence_text = _resolve_evidence(item_evidence)
results.append(CheckResult(item=item, status=status, evidence=evidence_text))
return FrameworkReport(framework=self.name, results=results)HipaaFramework.checklist method · python · L54-L108 (55 LOC)src/agent_gov/frameworks/hipaa.py
def checklist(self) -> list[ChecklistItem]:
"""Return the 5-item HIPAA Security Rule checklist."""
return [
ChecklistItem(
id="SC1",
name="Access Controls",
description=(
"164.312(a)(1): Implement technical policies and procedures for "
"electronic information systems that maintain ePHI to allow access "
"only to those persons or software programs that have been granted "
"access rights."
),
category="access",
),
ChecklistItem(
id="SC2",
name="Audit Controls",
description=(
"164.312(b): Implement hardware, software, and/or procedural "
"mechanisms that record and examine activity in information systems "
"that contain or use ePHI."
),
cHipaaFramework.run_check method · python · L110-L129 (20 LOC)src/agent_gov/frameworks/hipaa.py
def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
"""Evaluate evidence against the 5 HIPAA Security Rule checklist items.
Parameters
----------
evidence:
Dict keyed by safeguard ID. Same resolution rules as
:meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.
Returns
-------
FrameworkReport
"""
results: list[CheckResult] = []
for item in self.checklist():
item_evidence = evidence.get(item.id)
status, evidence_text = _resolve_evidence(item_evidence)
results.append(CheckResult(item=item, status=status, evidence=evidence_text))
return FrameworkReport(framework=self.name, results=results)Soc2Framework.checklist method · python · L53-L105 (53 LOC)src/agent_gov/frameworks/soc2.py
def checklist(self) -> list[ChecklistItem]:
"""Return the 5-item SOC 2 Trust Services Criteria checklist."""
return [
ChecklistItem(
id="CC6",
name="Security",
description=(
"CC6 — Logical and Physical Access Controls: The system is protected "
"against unauthorised access using logical and physical access controls "
"including authentication, authorisation, and access monitoring."
),
category="security",
),
ChecklistItem(
id="A1",
name="Availability",
description=(
"A1 — Availability: The system is available for operation and use "
"as committed or agreed, and meets defined performance and uptime "
"service level objectives."
),
category="availability",
Soc2Framework.run_check method · python · L107-L126 (20 LOC)src/agent_gov/frameworks/soc2.py
def run_check(self, evidence: dict[str, object]) -> FrameworkReport:
"""Evaluate evidence against the 5 SOC 2 Trust Services Criteria.
Parameters
----------
evidence:
Dict keyed by criteria ID. Same resolution rules as
:meth:`~agent_gov.frameworks.eu_ai_act.EuAiActFramework.run_check`.
Returns
-------
FrameworkReport
"""
results: list[CheckResult] = []
for item in self.checklist():
item_evidence = evidence.get(item.id)
status, evidence_text = _resolve_evidence(item_evidence)
results.append(CheckResult(item=item, status=status, evidence=evidence_text))
return FrameworkReport(framework=self.name, results=results)AgentCoreBridge.__init__ method · python · L69-L80 (12 LOC)src/agent_gov/integration/agentcore_bridge.py
def __init__(
self,
policy: PolicyConfig,
audit_logger: Optional[AuditLogger] = None,
*,
agent_id_field: str = "agent_id",
) -> None:
self._policy = policy
self._audit_logger = audit_logger
self._agent_id_field = agent_id_field
self._evaluator = PolicyEvaluator()
self._connected = FalseAgentCoreBridge.connect method · python · L92-L121 (30 LOC)src/agent_gov/integration/agentcore_bridge.py
def connect(self) -> bool:
"""Subscribe to the agentcore event bus.
Returns
-------
bool
``True`` when successfully connected, ``False`` when
agentcore-sdk is not available.
"""
if not _AGENTCORE_AVAILABLE:
logger.warning(
"agentcore-sdk is not installed. "
"Install with: pip install agent-gov[agentcore]"
)
return False
try:
# Subscribe to agent action events via agentcore event bus.
# The actual API depends on the agentcore-sdk version.
event_bus = agentcore.get_event_bus() # type: ignore[union-attr]
event_bus.subscribe("agent.action", self._handle_event)
self._connected = True
logger.info(
"AgentCoreBridge connected; evaluating against policy %r",
self._policy.name,
)
return True
except Exception:
AgentCoreBridge.disconnect method · python · L123-L134 (12 LOC)src/agent_gov/integration/agentcore_bridge.py
def disconnect(self) -> None:
"""Unsubscribe from the agentcore event bus."""
if not _AGENTCORE_AVAILABLE or not self._connected:
return
try:
event_bus = agentcore.get_event_bus() # type: ignore[union-attr]
event_bus.unsubscribe("agent.action", self._handle_event)
self._connected = False
logger.info("AgentCoreBridge disconnected.")
except Exception:
logger.exception("Failed to disconnect AgentCoreBridge.")Repobility — the code-quality scanner for AI-generated software · https://repobility.com
AgentCoreBridge._handle_event method · python · L136-L163 (28 LOC)src/agent_gov/integration/agentcore_bridge.py
def _handle_event(self, event: dict[str, object]) -> None:
"""Handle an incoming agentcore event.
Parameters
----------
event:
Raw event payload from the agentcore event bus.
"""
try:
report = self._evaluator.evaluate(self._policy, event)
agent_id = str(event.get(self._agent_id_field, "unknown"))
if self._audit_logger is not None:
self._audit_logger.log_from_report(report, agent_id=agent_id)
if not report.passed:
logger.warning(
"Policy %r FAILED for agent %r: %d violation(s), highest severity=%s",
self._policy.name,
agent_id,
report.violation_count,
report.highest_severity,
)
except Exception:
logger.exception(
"AgentCoreBridge failed to evaluate event against policy %r.",
seAgentCoreBridge.evaluate_event method · python · L165-L190 (26 LOC)src/agent_gov/integration/agentcore_bridge.py
def evaluate_event(
self,
event: dict[str, object],
agent_id: str = "unknown",
) -> bool:
"""Manually evaluate a single event (without the event bus).
Useful for testing the bridge configuration without a live
agentcore connection.
Parameters
----------
event:
Action payload to evaluate.
agent_id:
Agent identifier for audit logging.
Returns
-------
bool
``True`` when the event passes the policy.
"""
report = self._evaluator.evaluate(self._policy, event)
if self._audit_logger is not None:
self._audit_logger.log_from_report(report, agent_id=agent_id)
return report.passedMappingResult.to_dict method · python · L113-L129 (17 LOC)src/agent_gov/multi_framework/mapper.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"source_framework": self.source_framework.value,
"source_requirement_id": self.source_requirement_id,
"source_requirement_name": self.source_requirement_name,
"matches": [
{
"framework": m.framework.value,
"requirement_id": m.requirement_id,
"requirement_name": m.requirement_name,
"similarity_score": m.similarity_score,
"shared_tags": sorted(m.shared_tags),
}
for m in self.matches
],
}_jaccard_similarity function · python · L360-L368 (9 LOC)src/agent_gov/multi_framework/mapper.py
def _jaccard_similarity(set_a: frozenset[str], set_b: frozenset[str]) -> float:
"""Compute Jaccard similarity between two tag sets."""
if not set_a and not set_b:
return 0.0
intersection = len(set_a & set_b)
union = len(set_a | set_b)
if union == 0:
return 0.0
return intersection / unionCrossFrameworkMapper.__init__ method · python · L394-L400 (7 LOC)src/agent_gov/multi_framework/mapper.py
def __init__(self, *, similarity_threshold: float = 0.2) -> None:
self._similarity_threshold = similarity_threshold
self._catalog = _build_requirement_catalog()
self._index: dict[tuple[SupportedFramework, str], FrameworkRequirement] = {}
for requirements in self._catalog.values():
for req in requirements:
self._index[(req.framework, req.requirement_id)] = reqCrossFrameworkMapper.get_requirement method · python · L402-L412 (11 LOC)src/agent_gov/multi_framework/mapper.py
def get_requirement(
self,
framework: str | SupportedFramework,
requirement_id: str,
) -> Optional[FrameworkRequirement]:
"""Look up a specific requirement by framework and ID.
Returns None if not found.
"""
fw = SupportedFramework(framework) if isinstance(framework, str) else framework
return self._index.get((fw, requirement_id))CrossFrameworkMapper.map_requirement method · python · L414-L471 (58 LOC)src/agent_gov/multi_framework/mapper.py
def map_requirement(
self,
framework: str | SupportedFramework,
requirement_id: str,
*,
exclude_same_framework: bool = True,
) -> MappingResult:
"""Find equivalent requirements in other frameworks.
Parameters
----------
framework:
Source framework name or enum value.
requirement_id:
ID of the requirement to map.
exclude_same_framework:
When True, results from the same framework are excluded.
Returns
-------
MappingResult
Matches sorted by descending similarity score.
Raises
------
KeyError
When the source requirement is not found in the catalog.
"""
fw = SupportedFramework(framework) if isinstance(framework, str) else framework
source = self._index.get((fw, requirement_id))
if source is None:
raise KeyError(f"Requirement {requirement_id!r}CrossFrameworkMapper.map_all_requirements method · python · L473-L486 (14 LOC)src/agent_gov/multi_framework/mapper.py
def map_all_requirements(
self,
framework: str | SupportedFramework,
) -> list[MappingResult]:
"""Map every requirement in a framework to other frameworks.
Returns one MappingResult per requirement in the source framework.
"""
fw = SupportedFramework(framework) if isinstance(framework, str) else framework
results: list[MappingResult] = []
for req in self._catalog.get(fw, []):
result = self.map_requirement(fw, req.requirement_id)
results.append(result)
return resultsRepobility · MCP-ready · https://repobility.com
CrossFrameworkMapper.list_requirements method · python · L488-L494 (7 LOC)src/agent_gov/multi_framework/mapper.py
def list_requirements(
self,
framework: str | SupportedFramework,
) -> list[FrameworkRequirement]:
"""Return all requirements in a given framework."""
fw = SupportedFramework(framework) if isinstance(framework, str) else framework
return list(self._catalog.get(fw, []))CrossFrameworkMapper.find_by_category method · python · L496-L517 (22 LOC)src/agent_gov/multi_framework/mapper.py
def find_by_category(
self,
category: str,
*,
frameworks: Optional[list[SupportedFramework]] = None,
) -> list[FrameworkRequirement]:
"""Return all requirements matching a category across frameworks.
Parameters
----------
category:
Category string to filter by (e.g. ``"transparency"``).
frameworks:
If provided, restrict search to these frameworks.
"""
results: list[FrameworkRequirement] = []
target_frameworks = frameworks or list(self._catalog.keys())
for fw in target_frameworks:
for req in self._catalog.get(fw, []):
if req.category == category:
results.append(req)
return resultsCrossFrameworkMapper.find_by_tag method · python · L519-L525 (7 LOC)src/agent_gov/multi_framework/mapper.py
def find_by_tag(self, tag: str) -> list[FrameworkRequirement]:
"""Return all requirements whose control_tags contain the given tag."""
return [
req
for req in self._index.values()
if tag in req.control_tags
]ControlGroup.to_dict method · python · L77-L92 (16 LOC)src/agent_gov/multi_framework/overlap_analyzer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"shared_tag": self.shared_tag,
"frameworks_covered": sorted(fw.value for fw in self.frameworks_covered),
"requirement_count": len(self.requirements),
"requirements": [
{
"framework": r.framework.value,
"requirement_id": r.requirement_id,
"name": r.name,
"category": r.category,
}
for r in self.requirements
],
}OverlapReport.groups_for_framework method · python · L121-L126 (6 LOC)src/agent_gov/multi_framework/overlap_analyzer.py
def groups_for_framework(self, framework: SupportedFramework) -> list[ControlGroup]:
"""Return groups that include requirements from the given framework."""
return [
g for g in self.control_groups
if framework in g.frameworks_covered
]OverlapReport.to_dict method · python · L128-L143 (16 LOC)src/agent_gov/multi_framework/overlap_analyzer.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"total_requirements_analyzed": self.total_requirements_analyzed,
"control_group_count": len(self.control_groups),
"shared_controls": [
{
"tag": sc.tag,
"cross_framework_count": sc.cross_framework_count,
"requirement_count": sc.requirement_count,
"frameworks": sorted(fw.value for fw in sc.frameworks),
}
for sc in self.shared_controls
],
"control_groups": [g.to_dict() for g in self.control_groups],
}OverlapAnalyzer.analyze method · python · L172-L224 (53 LOC)src/agent_gov/multi_framework/overlap_analyzer.py
def analyze(self) -> OverlapReport:
"""Run a full overlap analysis across all supported frameworks.
Returns
-------
OverlapReport
Groups of overlapping requirements with shared control tags.
"""
all_requirements: list[FrameworkRequirement] = []
for fw in SupportedFramework:
all_requirements.extend(self._mapper.list_requirements(fw))
# Collect all tags and which frameworks + requirements use them
tag_to_requirements: dict[str, list[FrameworkRequirement]] = {}
tag_to_frameworks: dict[str, set[SupportedFramework]] = {}
for req in all_requirements:
for tag in req.control_tags:
tag_to_requirements.setdefault(tag, []).append(req)
tag_to_frameworks.setdefault(tag, set()).add(req.framework)
# Build shared controls and groups filtered by min_frameworks
shared_controls: list[SharedControl] = []
control_groups: OverlapAnalyzer.find_redundant_requirements method · python · L226-L261 (36 LOC)src/agent_gov/multi_framework/overlap_analyzer.py
def find_redundant_requirements(
self,
framework: SupportedFramework,
*,
similarity_threshold: float = 0.5,
) -> list[tuple[FrameworkRequirement, list[FrameworkRequirement]]]:
"""Find requirements in a framework that are highly redundant with others.
Returns a list of (requirement, similar_requirements_from_other_frameworks).
A requirement is considered redundant if it has at least one match in
another framework above the similarity threshold.
Parameters
----------
framework:
The framework to analyze for redundant requirements.
similarity_threshold:
Minimum Jaccard similarity for two requirements to be considered
redundant. Defaults to 0.5.
"""
source_requirements = self._mapper.list_requirements(framework)
result: list[tuple[FrameworkRequirement, list[FrameworkRequirement]]] = []
for source_req in source_requiremWant this analysis on your repo? https://repobility.com/scan/
_GenericRegistry.register method · python · L59-L78 (20 LOC)src/agent_gov/plugins/registry.py
def register(self, name: str) -> Callable[[type[T]], type[T]]:
"""Decorator that registers a class under ``name``."""
def decorator(cls: type[T]) -> type[T]:
if name in self._entries:
logger.warning(
"Overwriting existing registration %r in registry %r.",
name,
self._name,
)
self._entries[name] = cls
logger.debug(
"Registered %r -> %s in %r registry.",
name,
cls.__qualname__, # type: ignore[attr-defined]
self._name,
)
return cls
return decorator_GenericRegistry.get method · python · L84-L97 (14 LOC)src/agent_gov/plugins/registry.py
def get(self, name: str) -> type[T]:
"""Return the class registered under ``name``.
Raises
------
KeyError
If no class is registered under ``name``.
"""
if name not in self._entries:
raise KeyError(
f"{name!r} is not registered in the {self._name!r} registry. "
f"Available: {sorted(self._entries)!r}"
)
return self._entries[name]_GenericRegistry.load_entrypoints method · python · L112-L139 (28 LOC)src/agent_gov/plugins/registry.py
def load_entrypoints(self, group: str) -> None:
"""Load and register classes declared as package entry-points.
Already-registered names are skipped (idempotent).
Parameters
----------
group:
Entry-point group to scan.
"""
for ep in importlib.metadata.entry_points(group=group):
if ep.name in self._entries:
logger.debug(
"Entry-point %r already registered in %r; skipping.",
ep.name,
self._name,
)
continue
try:
cls = ep.load()
self.register_class(ep.name, cls)
logger.debug("Loaded entry-point %r into %r registry.", ep.name, self._name)
except Exception:
logger.exception(
"Failed to load entry-point %r from group %r; skipping.",
ep.name,
group,