Function bodies 13 total
_load_manager function · python · L17-L26 (10 LOC)src/aumai_toolwatch/cli.py
def _load_manager() -> WatchManager:
"""Load or initialise a WatchManager from disk."""
manager = WatchManager()
baselines_path = _DEFAULT_STATE_DIR / "baselines.json"
if baselines_path.exists():
raw = json.loads(baselines_path.read_text(encoding="utf-8"))
for entry in raw:
fp = ToolFingerprint.model_validate(entry)
manager.add_baseline(fp)
return manager_save_manager function · python · L29-L42 (14 LOC)src/aumai_toolwatch/cli.py
def _save_manager(manager: WatchManager) -> None:
"""Persist baselines to disk."""
_DEFAULT_STATE_DIR.mkdir(parents=True, exist_ok=True)
baselines_path = _DEFAULT_STATE_DIR / "baselines.json"
data = [fp.model_dump(mode="json") for fp in manager.get_all_baselines()]
baselines_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
alerts_path = _DEFAULT_STATE_DIR / "alerts.json"
alerts_data = [a.model_dump(mode="json") for a in manager.get_alerts()]
existing: list[dict[str, object]] = []
if alerts_path.exists():
existing = json.loads(alerts_path.read_text(encoding="utf-8"))
existing.extend(alerts_data)
alerts_path.write_text(json.dumps(existing, indent=2), encoding="utf-8")baseline_cmd function · python · L67-L87 (21 LOC)src/aumai_toolwatch/cli.py
def baseline_cmd(tool: str, schema_file: str, tool_version: str) -> None:
"""Capture and store a baseline fingerprint for a tool."""
schema: dict[str, object] = json.loads(Path(schema_file).read_text(encoding="utf-8"))
fingerprinter = ToolFingerprinter()
fp = fingerprinter.fingerprint(tool, schema, [], version=tool_version)
manager = _load_manager()
manager.add_baseline(fp)
_save_manager(manager)
click.echo(
json.dumps(
{
"tool_name": fp.tool_name,
"schema_hash": fp.schema_hash,
"response_pattern_hash": fp.response_pattern_hash,
"captured_at": fp.captured_at.isoformat(),
},
indent=2,
)
)check_cmd function · python · L106-L129 (24 LOC)src/aumai_toolwatch/cli.py
def check_cmd(tool: str, schema_file: str, tool_version: str) -> None:
"""Check a tool against its stored baseline and report any mutations."""
schema: dict[str, object] = json.loads(Path(schema_file).read_text(encoding="utf-8"))
fingerprinter = ToolFingerprinter()
current_fp = fingerprinter.fingerprint(tool, schema, [], version=tool_version)
manager = _load_manager()
alert = manager.check(tool, current_fp)
_save_manager(manager)
if alert is None:
click.echo(f"No mutation detected for tool '{tool}'.")
else:
click.echo(
json.dumps(
{
"tool_name": alert.tool_name,
"change_type": alert.change_type,
"severity": alert.severity,
"detected_at": alert.detected_at.isoformat(),
},
indent=2,
)
)alerts_cmd function · python · L133-L145 (13 LOC)src/aumai_toolwatch/cli.py
def alerts_cmd() -> None:
"""List all recorded mutation alerts."""
alerts_path = _DEFAULT_STATE_DIR / "alerts.json"
if not alerts_path.exists():
click.echo("No alerts recorded.")
return
raw: list[dict[str, object]] = json.loads(alerts_path.read_text(encoding="utf-8"))
if not raw:
click.echo("No alerts recorded.")
return
click.echo(json.dumps(raw, indent=2))ToolFingerprinter.fingerprint method · python · L34-L61 (28 LOC)src/aumai_toolwatch/core.py
def fingerprint(
self,
tool_name: str,
schema: dict[str, object],
sample_responses: list[dict[str, object]],
version: str = "unknown",
) -> ToolFingerprint:
"""Create a :class:`ToolFingerprint` for a tool.
Args:
tool_name: Unique tool identifier.
schema: The tool's JSON schema (input/output parameter definition).
sample_responses: A list of representative response dicts.
version: Optional version string for the tool.
Returns:
A :class:`ToolFingerprint` capturing the current state of the tool.
"""
schema_hash = _sha256(_stable_json(schema))
response_pattern_hash = _sha256(self._summarise_responses(sample_responses))
return ToolFingerprint(
tool_name=tool_name,
version=version,
schema_hash=schema_hash,
response_pattern_hash=response_pattern_hash,
captured_at=datetiToolFingerprinter._summarise_responses method · python · L67-L81 (15 LOC)src/aumai_toolwatch/core.py
def _summarise_responses(self, responses: list[dict[str, object]]) -> str:
"""Reduce a list of response dicts to a structural fingerprint string.
We collect the sorted union of top-level keys and their value types
across all responses. This captures behavioural shape without relying
on specific values.
"""
key_types: dict[str, set[str]] = {}
for response in responses:
for key, value in response.items():
key_types.setdefault(key, set()).add(type(value).__name__)
# Produce a stable representation
summary = {k: sorted(v) for k, v in sorted(key_types.items())}
return _stable_json(summary)Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
MutationDetector.detect_mutation method · python · L87-L123 (37 LOC)src/aumai_toolwatch/core.py
def detect_mutation(
self, old: ToolFingerprint, new: ToolFingerprint
) -> MutationAlert | None:
"""Compare *old* and *new* fingerprints and return an alert if they differ.
Args:
old: The baseline fingerprint.
new: The freshly captured fingerprint.
Returns:
A :class:`MutationAlert` when a difference is detected, or *None*.
"""
schema_changed = old.schema_hash != new.schema_hash
response_changed = old.response_pattern_hash != new.response_pattern_hash
if not schema_changed and not response_changed:
return None
# Determine the most specific change type
if schema_changed and response_changed:
change_type = "behavior_change"
elif schema_changed:
change_type = "schema_change"
else:
change_type = "response_change"
num_changes = int(schema_changed) + int(response_changed)
severity = _SEVWatchManager.add_baseline method · python · L137-L143 (7 LOC)src/aumai_toolwatch/core.py
def add_baseline(self, fingerprint: ToolFingerprint) -> None:
"""Store *fingerprint* as the trusted baseline for its tool.
Args:
fingerprint: Fingerprint to register as the new baseline.
"""
self._baselines[fingerprint.tool_name] = fingerprintWatchManager.check method · python · L145-L166 (22 LOC)src/aumai_toolwatch/core.py
def check(self, tool_name: str, current: ToolFingerprint) -> MutationAlert | None:
"""Compare *current* against the stored baseline for *tool_name*.
If no baseline exists, the current fingerprint is registered as the
baseline and *None* is returned.
Args:
tool_name: Name of the tool to check.
current: The freshly captured fingerprint to compare.
Returns:
A :class:`MutationAlert` if a mutation is detected, otherwise *None*.
"""
baseline = self._baselines.get(tool_name)
if baseline is None:
self._baselines[tool_name] = current
return None
alert = self._detector.detect_mutation(baseline, current)
if alert is not None:
self._alerts.append(alert)
return alertWatchManager.get_alerts method · python · L168-L174 (7 LOC)src/aumai_toolwatch/core.py
def get_alerts(self) -> list[MutationAlert]:
"""Return all alerts accumulated since the manager was created.
Returns:
List of :class:`MutationAlert` objects in detection order.
"""
return list(self._alerts)WatchManager.get_baseline method · python · L176-L185 (10 LOC)src/aumai_toolwatch/core.py
def get_baseline(self, tool_name: str) -> ToolFingerprint | None:
"""Return the stored baseline for *tool_name*, or *None*.
Args:
tool_name: Name of the tool to look up.
Returns:
The baseline :class:`ToolFingerprint`, or *None* if not registered.
"""
return self._baselines.get(tool_name)WatchManager.get_all_baselines method · python · L187-L193 (7 LOC)src/aumai_toolwatch/core.py
def get_all_baselines(self) -> list[ToolFingerprint]:
"""Return all stored baseline fingerprints.
Returns:
A list of all :class:`ToolFingerprint` objects in the registry.
"""
return list(self._baselines.values())