← back to invincible-jha__aumai-toolwatch

Function bodies 13 total

All specs Real LLM only Function bodies
_load_manager function · python · L17-L26 (10 LOC)
src/aumai_toolwatch/cli.py
def _load_manager() -> WatchManager:
    """Load or initialise a WatchManager from disk."""
    manager = WatchManager()
    baselines_path = _DEFAULT_STATE_DIR / "baselines.json"
    if baselines_path.exists():
        raw = json.loads(baselines_path.read_text(encoding="utf-8"))
        for entry in raw:
            fp = ToolFingerprint.model_validate(entry)
            manager.add_baseline(fp)
    return manager
_save_manager function · python · L29-L42 (14 LOC)
src/aumai_toolwatch/cli.py
def _save_manager(manager: WatchManager) -> None:
    """Persist baselines to disk."""
    _DEFAULT_STATE_DIR.mkdir(parents=True, exist_ok=True)
    baselines_path = _DEFAULT_STATE_DIR / "baselines.json"
    data = [fp.model_dump(mode="json") for fp in manager.get_all_baselines()]
    baselines_path.write_text(json.dumps(data, indent=2), encoding="utf-8")

    alerts_path = _DEFAULT_STATE_DIR / "alerts.json"
    alerts_data = [a.model_dump(mode="json") for a in manager.get_alerts()]
    existing: list[dict[str, object]] = []
    if alerts_path.exists():
        existing = json.loads(alerts_path.read_text(encoding="utf-8"))
    existing.extend(alerts_data)
    alerts_path.write_text(json.dumps(existing, indent=2), encoding="utf-8")
baseline_cmd function · python · L67-L87 (21 LOC)
src/aumai_toolwatch/cli.py
def baseline_cmd(tool: str, schema_file: str, tool_version: str) -> None:
    """Capture and store a baseline fingerprint for a tool."""
    schema: dict[str, object] = json.loads(Path(schema_file).read_text(encoding="utf-8"))
    fingerprinter = ToolFingerprinter()
    fp = fingerprinter.fingerprint(tool, schema, [], version=tool_version)

    manager = _load_manager()
    manager.add_baseline(fp)
    _save_manager(manager)

    click.echo(
        json.dumps(
            {
                "tool_name": fp.tool_name,
                "schema_hash": fp.schema_hash,
                "response_pattern_hash": fp.response_pattern_hash,
                "captured_at": fp.captured_at.isoformat(),
            },
            indent=2,
        )
    )
check_cmd function · python · L106-L129 (24 LOC)
src/aumai_toolwatch/cli.py
def check_cmd(tool: str, schema_file: str, tool_version: str) -> None:
    """Check a tool against its stored baseline and report any mutations."""
    schema: dict[str, object] = json.loads(Path(schema_file).read_text(encoding="utf-8"))
    fingerprinter = ToolFingerprinter()
    current_fp = fingerprinter.fingerprint(tool, schema, [], version=tool_version)

    manager = _load_manager()
    alert = manager.check(tool, current_fp)
    _save_manager(manager)

    if alert is None:
        click.echo(f"No mutation detected for tool '{tool}'.")
    else:
        click.echo(
            json.dumps(
                {
                    "tool_name": alert.tool_name,
                    "change_type": alert.change_type,
                    "severity": alert.severity,
                    "detected_at": alert.detected_at.isoformat(),
                },
                indent=2,
            )
        )
alerts_cmd function · python · L133-L145 (13 LOC)
src/aumai_toolwatch/cli.py
def alerts_cmd() -> None:
    """List all recorded mutation alerts."""
    alerts_path = _DEFAULT_STATE_DIR / "alerts.json"
    if not alerts_path.exists():
        click.echo("No alerts recorded.")
        return

    raw: list[dict[str, object]] = json.loads(alerts_path.read_text(encoding="utf-8"))
    if not raw:
        click.echo("No alerts recorded.")
        return

    click.echo(json.dumps(raw, indent=2))
ToolFingerprinter.fingerprint method · python · L34-L61 (28 LOC)
src/aumai_toolwatch/core.py
    def fingerprint(
        self,
        tool_name: str,
        schema: dict[str, object],
        sample_responses: list[dict[str, object]],
        version: str = "unknown",
    ) -> ToolFingerprint:
        """Create a :class:`ToolFingerprint` for a tool.

        Args:
            tool_name: Unique tool identifier.
            schema: The tool's JSON schema (input/output parameter definition).
            sample_responses: A list of representative response dicts.
            version: Optional version string for the tool.

        Returns:
            A :class:`ToolFingerprint` capturing the current state of the tool.
        """
        schema_hash = _sha256(_stable_json(schema))
        response_pattern_hash = _sha256(self._summarise_responses(sample_responses))

        return ToolFingerprint(
            tool_name=tool_name,
            version=version,
            schema_hash=schema_hash,
            response_pattern_hash=response_pattern_hash,
            captured_at=dateti
ToolFingerprinter._summarise_responses method · python · L67-L81 (15 LOC)
src/aumai_toolwatch/core.py
    def _summarise_responses(self, responses: list[dict[str, object]]) -> str:
        """Reduce a list of response dicts to a structural fingerprint string.

        We collect the sorted union of top-level keys and their value types
        across all responses.  This captures behavioural shape without relying
        on specific values.
        """
        key_types: dict[str, set[str]] = {}
        for response in responses:
            for key, value in response.items():
                key_types.setdefault(key, set()).add(type(value).__name__)

        # Produce a stable representation
        summary = {k: sorted(v) for k, v in sorted(key_types.items())}
        return _stable_json(summary)
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
MutationDetector.detect_mutation method · python · L87-L123 (37 LOC)
src/aumai_toolwatch/core.py
    def detect_mutation(
        self, old: ToolFingerprint, new: ToolFingerprint
    ) -> MutationAlert | None:
        """Compare *old* and *new* fingerprints and return an alert if they differ.

        Args:
            old: The baseline fingerprint.
            new: The freshly captured fingerprint.

        Returns:
            A :class:`MutationAlert` when a difference is detected, or *None*.
        """
        schema_changed = old.schema_hash != new.schema_hash
        response_changed = old.response_pattern_hash != new.response_pattern_hash

        if not schema_changed and not response_changed:
            return None

        # Determine the most specific change type
        if schema_changed and response_changed:
            change_type = "behavior_change"
        elif schema_changed:
            change_type = "schema_change"
        else:
            change_type = "response_change"

        num_changes = int(schema_changed) + int(response_changed)
        severity = _SEV
WatchManager.add_baseline method · python · L137-L143 (7 LOC)
src/aumai_toolwatch/core.py
    def add_baseline(self, fingerprint: ToolFingerprint) -> None:
        """Store *fingerprint* as the trusted baseline for its tool.

        Args:
            fingerprint: Fingerprint to register as the new baseline.
        """
        self._baselines[fingerprint.tool_name] = fingerprint
WatchManager.check method · python · L145-L166 (22 LOC)
src/aumai_toolwatch/core.py
    def check(self, tool_name: str, current: ToolFingerprint) -> MutationAlert | None:
        """Compare *current* against the stored baseline for *tool_name*.

        If no baseline exists, the current fingerprint is registered as the
        baseline and *None* is returned.

        Args:
            tool_name: Name of the tool to check.
            current: The freshly captured fingerprint to compare.

        Returns:
            A :class:`MutationAlert` if a mutation is detected, otherwise *None*.
        """
        baseline = self._baselines.get(tool_name)
        if baseline is None:
            self._baselines[tool_name] = current
            return None

        alert = self._detector.detect_mutation(baseline, current)
        if alert is not None:
            self._alerts.append(alert)
        return alert
WatchManager.get_alerts method · python · L168-L174 (7 LOC)
src/aumai_toolwatch/core.py
    def get_alerts(self) -> list[MutationAlert]:
        """Return all alerts accumulated since the manager was created.

        Returns:
            List of :class:`MutationAlert` objects in detection order.
        """
        return list(self._alerts)
WatchManager.get_baseline method · python · L176-L185 (10 LOC)
src/aumai_toolwatch/core.py
    def get_baseline(self, tool_name: str) -> ToolFingerprint | None:
        """Return the stored baseline for *tool_name*, or *None*.

        Args:
            tool_name: Name of the tool to look up.

        Returns:
            The baseline :class:`ToolFingerprint`, or *None* if not registered.
        """
        return self._baselines.get(tool_name)
WatchManager.get_all_baselines method · python · L187-L193 (7 LOC)
src/aumai_toolwatch/core.py
    def get_all_baselines(self) -> list[ToolFingerprint]:
        """Return all stored baseline fingerprints.

        Returns:
            A list of all :class:`ToolFingerprint` objects in the registry.
        """
        return list(self._baselines.values())