Function bodies 288 total
_KeepOlderStrategy.apply method · python · L328-L366 (39 LOC)src/agent_memory/contradiction/resolver.py
def apply(self, contradiction: Contradiction) -> Resolution:
entry_a = contradiction.entry_a
entry_b = contradiction.entry_b
from datetime import timezone as tz
def _utc(dt_val: object) -> object:
from datetime import datetime
if not isinstance(dt_val, datetime):
return dt_val
if dt_val.tzinfo is None:
return dt_val.replace(tzinfo=tz.utc)
return dt_val.astimezone(tz.utc)
ts_a = _utc(entry_a.timestamp)
ts_b = _utc(entry_b.timestamp)
from datetime import datetime
if isinstance(ts_a, datetime) and isinstance(ts_b, datetime):
if ts_a <= ts_b:
kept = entry_a
archived = entry_b
else:
kept = entry_b
archived = entry_a
else:
kept = entry_a
archived = entry_b
return Resolution(
strategy_used="keep_older",SimpleContradictionResolver.__init__ method · python · L394-L400 (7 LOC)src/agent_memory/contradiction/resolver.py
def __init__(self, default_strategy: str = "keep_newer") -> None:
if default_strategy not in _VALID_STRATEGY_NAMES:
raise ValueError(
f"Unknown strategy {default_strategy!r}. "
f"Valid options: {sorted(_VALID_STRATEGY_NAMES)!r}"
)
self._default_strategy = default_strategySimpleContradictionResolver.resolve method · python · L406-L444 (39 LOC)src/agent_memory/contradiction/resolver.py
def resolve(
self,
contradiction: Contradiction,
strategy: Optional[str] = None,
) -> Resolution:
"""Resolve a contradiction using the specified strategy.
Parameters
----------
contradiction:
The detected contradiction to resolve.
strategy:
Strategy name override. Falls back to ``default_strategy`` when
``None``. Must be one of ``'keep_newer'``, ``'keep_older'``,
``'keep_both_with_context'``, ``'flag_for_review'``, ``'merge'``.
Returns
-------
Resolution
The resolution outcome.
Raises
------
ValueError
If the strategy name is not recognised.
"""
active_strategy = strategy if strategy is not None else self._default_strategy
registry = _get_strategy_registry()
strategy_obj = registry.get(active_strategy)
if strategy_obj is None:
raise ValueEScanResult.summary method · python · L42-L50 (9 LOC)src/agent_memory/contradiction/scanner.py
def summary(self) -> dict[str, object]:
return {
"scanned_at": self.scanned_at.isoformat(),
"scanned_count": self.scanned_count,
"skipped_count": self.skipped_count,
"contradiction_count": self.contradiction_count,
"early_terminated": self.early_terminated,
"scan_duration_seconds": round(self.scan_duration_seconds, 3),
}ContradictionScanner.__init__ method · python · L80-L91 (12 LOC)src/agent_memory/contradiction/scanner.py
def __init__(
self,
detector: Optional[ContradictionDetector] = None,
max_contradictions: int = 0,
similarity_threshold: float = 0.25,
high_importance_threshold: float = _HIGH_IMPORTANCE_THRESHOLD,
) -> None:
self._detector = detector or ContradictionDetector(
similarity_threshold=similarity_threshold
)
self._max_contradictions = max_contradictions
self._high_importance_threshold = high_importance_thresholdContradictionScanner.scan method · python · L93-L118 (26 LOC)src/agent_memory/contradiction/scanner.py
def scan(
self,
store: MemoryStore,
scope: ScanScope = ScanScope.ALL,
) -> ScanResult:
"""Scan the store for contradictions within the given scope.
Parameters
----------
store:
The memory store to scan.
scope:
Which entries to include. Defaults to ``ALL``.
Returns
-------
ScanResult
Contains all detected contradiction pairs and scan metadata.
"""
import time
start_time = time.monotonic()
candidates = self._collect_candidates(store, scope)
result = self._pairwise_scan(candidates)
result.scan_duration_seconds = time.monotonic() - start_time
return resultContradictionScanner.scan_entries method · python · L120-L140 (21 LOC)src/agent_memory/contradiction/scanner.py
def scan_entries(
self,
entries: Sequence[MemoryEntry],
) -> ScanResult:
"""Scan an explicit list of entries rather than a store.
Parameters
----------
entries:
The entries to check for contradictions.
Returns
-------
ScanResult
"""
import time
start_time = time.monotonic()
result = self._pairwise_scan(list(entries))
result.scan_duration_seconds = time.monotonic() - start_time
return resultProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
ContradictionScanner.scan_against method · python · L142-L179 (38 LOC)src/agent_memory/contradiction/scanner.py
def scan_against(
self,
entry: MemoryEntry,
store: MemoryStore,
scope: ScanScope = ScanScope.ALL,
) -> ScanResult:
"""Find contradictions between a single entry and all store entries.
Useful for checking a newly added entry against the existing store
without a full pairwise scan.
Parameters
----------
entry:
The entry to check.
store:
The store to search within.
scope:
Restricts which stored entries are checked.
Returns
-------
ScanResult
"""
import time
start_time = time.monotonic()
candidates = self._collect_candidates(store, scope)
# Exclude the entry itself if it is already in the store
candidates = [c for c in candidates if c.memory_id != entry.memory_id]
pairs = self._detector.detect_for_entry(entry, candidates)
result = ScanResult(
scanneContradictionScanner._collect_candidates method · python · L185-L208 (24 LOC)src/agent_memory/contradiction/scanner.py
def _collect_candidates(
self,
store: MemoryStore,
scope: ScanScope,
) -> list[MemoryEntry]:
"""Return the filtered set of entries to scan based on scope."""
layer_map: dict[ScanScope, MemoryLayer] = {
ScanScope.WORKING: MemoryLayer.WORKING,
ScanScope.EPISODIC: MemoryLayer.EPISODIC,
ScanScope.SEMANTIC: MemoryLayer.SEMANTIC,
ScanScope.PROCEDURAL: MemoryLayer.PROCEDURAL,
}
if scope in layer_map:
return list(store.all(layer=layer_map[scope]))
if scope == ScanScope.HIGH_IMPORTANCE:
return [
e for e in store.all()
if e.importance_score >= self._high_importance_threshold
]
# ScanScope.ALL
return list(store.all())ContradictionScanner._pairwise_scan method · python · L210-L237 (28 LOC)src/agent_memory/contradiction/scanner.py
def _pairwise_scan(self, entries: list[MemoryEntry]) -> ScanResult:
"""Run pairwise contradiction detection with optional early termination."""
found: list[ContradictionPair] = []
skipped = 0
early_terminated = False
for i in range(len(entries)):
for j in range(i + 1, len(entries)):
if self._max_contradictions > 0 and len(found) >= self._max_contradictions:
# Count remaining unchecked pairs as skipped
remaining = (len(entries) - i - 1) * len(entries) // 2
skipped += max(0, remaining)
early_terminated = True
break
pair = self._detector._compare(entries[i], entries[j])
if pair is not None:
found.append(pair)
if early_terminated:
break
return ScanResult(
scanned_count=len(entries),
contradiction_pairs=founKeepNewerStrategy.apply method · python · L56-L98 (43 LOC)src/agent_memory/contradiction/strategies.py
def apply(self, contradiction: Contradiction) -> Resolution:
"""Apply recency-wins logic to the contradiction.
Parameters
----------
contradiction:
The contradiction to resolve.
Returns
-------
Resolution
Resolution that retains the newer entry and archives the older.
"""
entry_a = contradiction.entry_a
entry_b = contradiction.entry_b
# Normalise timestamps to UTC-aware for safe comparison
ts_a = _to_utc(entry_a.timestamp)
ts_b = _to_utc(entry_b.timestamp)
if ts_a >= ts_b:
kept = entry_a
archived = entry_b
rationale = (
f"Kept entry '{entry_a.id}' (timestamp {entry_a.timestamp.isoformat()}) "
f"over '{entry_b.id}' (timestamp {entry_b.timestamp.isoformat()}) "
f"— recency wins."
)
else:
kept = entry_b
archived = entry_aKeepBothStrategy.apply method · python · L114-L142 (29 LOC)src/agent_memory/contradiction/strategies.py
def apply(self, contradiction: Contradiction) -> Resolution:
"""Apply keep-both logic to the contradiction.
Parameters
----------
contradiction:
The contradiction to resolve.
Returns
-------
Resolution
Resolution that retains both entries with an annotation note.
"""
entry_a = contradiction.entry_a
entry_b = contradiction.entry_b
annotation = (
f"Both entries retained with contradiction annotation. "
f"Type: {contradiction.contradiction_type.value}. "
f"Similarity: {contradiction.similarity_score:.4f}. "
f"Explanation: {contradiction.explanation}"
)
return Resolution(
strategy_used="keep_both_with_context",
kept_entries=(entry_a, entry_b),
archived_entries=(),
notes=annotation,
)FlagForReviewStrategy.apply method · python · L158-L186 (29 LOC)src/agent_memory/contradiction/strategies.py
def apply(self, contradiction: Contradiction) -> Resolution:
"""Flag both entries for manual review.
Parameters
----------
contradiction:
The contradiction to resolve.
Returns
-------
Resolution
Resolution that keeps both entries and flags them for review.
"""
entry_a = contradiction.entry_a
entry_b = contradiction.entry_b
review_note = (
f"PENDING_REVIEW: entries '{entry_a.id}' and '{entry_b.id}' "
f"contradict each other ({contradiction.contradiction_type.value}, "
f"confidence={contradiction.confidence:.4f}). "
f"Human review required. Explanation: {contradiction.explanation}"
)
return Resolution(
strategy_used="flag_for_review",
kept_entries=(entry_a, entry_b),
archived_entries=(),
notes=review_note,
)MergeStrategy.apply method · python · L208-L250 (43 LOC)src/agent_memory/contradiction/strategies.py
def apply(self, contradiction: Contradiction) -> Resolution:
"""Synthesise a merged entry from the two contradicting entries.
Parameters
----------
contradiction:
The contradiction to resolve.
Returns
-------
Resolution
Resolution with a single new merged entry in ``kept_entries``
and both originals in ``archived_entries``.
"""
entry_a = contradiction.entry_a
entry_b = contradiction.entry_b
# Use the newer entry's timestamp for the merge
ts_a = _to_utc(entry_a.timestamp)
ts_b = _to_utc(entry_b.timestamp)
merge_timestamp = entry_a.timestamp if ts_a >= ts_b else entry_b.timestamp
merged_content = entry_a.content + self.MERGE_SEPARATOR + entry_b.content
merged_source = f"merged:{entry_a.source}+{entry_b.source}"
merged_entry = MemoryEntry(
content=merged_content,
timestamp=merge_timestam_to_utc function · python · L258-L266 (9 LOC)src/agent_memory/contradiction/strategies.py
def _to_utc(dt_value: object) -> object:
"""Normalise a datetime to UTC-aware. Returns the input unchanged if not a datetime."""
from datetime import datetime, timezone
if not isinstance(dt_value, datetime):
return dt_value
if dt_value.tzinfo is None:
return dt_value.replace(tzinfo=timezone.utc)
return dt_value.astimezone(timezone.utc)If a scraper extracted this row, it came from Repobility (https://repobility.com)
Memory.__init__ method · python · L35-L40 (6 LOC)src/agent_memory/convenience.py
def __init__(self) -> None:
from agent_memory.unified.memory import UnifiedMemory
from agent_memory.unified.config import MemoryConfig
config = MemoryConfig(storage_backend="memory")
self._memory = UnifiedMemory(config=config)Memory.add method · python · L42-L89 (48 LOC)src/agent_memory/convenience.py
def add(
self,
text: str,
user_id: str = "default-user",
layer: str = "semantic",
) -> Any:
"""Add a text memory.
Parameters
----------
text:
Text content to memorize.
user_id:
User identifier for multi-user isolation.
layer:
Memory layer — one of ``"working"``, ``"episodic"``,
``"semantic"`` (default), or ``"procedural"``.
Returns
-------
MemoryEntry
The stored memory entry with its assigned ID.
Example
-------
::
m = Memory()
entry = m.add("The Eiffel Tower is in Paris", user_id="alice")
print(entry.entry_id)
"""
from agent_memory.memory.types import MemoryEntry, MemoryLayer, MemorySource
layer_map: dict[str, MemoryLayer] = {
"working": MemoryLayer.WORKING,
"episodic": MemoryLayer.EPISODIC,
"Memory.search method · python · L91-L115 (25 LOC)src/agent_memory/convenience.py
def search(self, query: str, limit: int = 10) -> list[Any]:
"""Search stored memories by text query.
Parameters
----------
query:
Free-text search query.
limit:
Maximum number of results to return.
Returns
-------
list[MemoryEntry]
List of matching memory entries ordered by relevance.
Example
-------
::
m = Memory()
m.add("Python is a programming language")
results = m.search("programming")
print(results[0].entry.content if results else "empty")
"""
return self._memory.search(query, limit=limit)PolicyConfig.__post_init__ method · python · L55-L63 (9 LOC)src/agent_memory/forgetting/policy.py
def __post_init__(self) -> None:
if self.half_life_hours <= 0:
raise ValueError(
f"half_life_hours must be > 0, got {self.half_life_hours}"
)
if not 0.0 <= self.forget_threshold <= 1.0:
raise ValueError(
f"forget_threshold must be in [0, 1], got {self.forget_threshold}"
)ForgettingPolicy.register method · python · L118-L146 (29 LOC)src/agent_memory/forgetting/policy.py
def register(
self,
entry_id: str,
safety_critical: bool = False,
initial_score: float = 1.0,
) -> None:
"""Register a memory entry for decay tracking.
Parameters
----------
entry_id:
Unique identifier for the entry.
safety_critical:
Whether this entry is exempt from forgetting.
initial_score:
Starting decay score (default 1.0).
Raises
------
ValueError
If the entry is already registered.
"""
if entry_id in self._records:
raise ValueError(f"Entry {entry_id!r} is already registered.")
self._records[entry_id] = DecayRecord(
entry_id=entry_id,
current_score=min(1.0, max(0.0, initial_score)),
safety_critical=safety_critical,
)ForgettingPolicy.deregister method · python · L148-L156 (9 LOC)src/agent_memory/forgetting/policy.py
def deregister(self, entry_id: str) -> bool:
"""Remove an entry from decay tracking.
Returns True if removed, False if not found.
"""
if entry_id not in self._records:
return False
del self._records[entry_id]
return TrueForgettingPolicy.reinforce method · python · L162-L182 (21 LOC)src/agent_memory/forgetting/policy.py
def reinforce(self, entry_id: str) -> bool:
"""Record an access event, resetting the decay timer.
Parameters
----------
entry_id:
The entry that was accessed.
Returns
-------
bool
True if reinforced, False if entry not found or
``reinforce_on_access`` is disabled.
"""
record = self._records.get(entry_id)
if record is None:
return False
if self._config.reinforce_on_access:
record.current_score = 1.0
record.last_reinforced = _utcnow()
return TrueForgettingPolicy.compute_score method · python · L188-L213 (26 LOC)src/agent_memory/forgetting/policy.py
def compute_score(
self,
entry_id: str,
now: Optional[datetime] = None,
) -> Optional[float]:
"""Compute the current decay score for an entry.
Parameters
----------
entry_id:
The entry to score.
now:
Reference time (defaults to UTC now).
Returns
-------
float | None
Current decay score in [0, 1], or None if entry not found.
"""
record = self._records.get(entry_id)
if record is None:
return None
if record.safety_critical and self._config.safety_critical_exempt:
return 1.0
elapsed = record.elapsed_hours(now)
return self._apply_decay(record.current_score, elapsed)Repobility · severity-and-effort ranking · https://repobility.com
ForgettingPolicy.update_scores method · python · L215-L227 (13 LOC)src/agent_memory/forgetting/policy.py
def update_scores(self, now: Optional[datetime] = None) -> None:
"""Update the stored score for all records using current time.
This is called by the scheduler before sweeping for forgotten entries.
"""
reference = now or _utcnow()
for record in self._records.values():
if record.safety_critical and self._config.safety_critical_exempt:
continue
elapsed = record.elapsed_hours(reference)
new_score = self._apply_decay(record.current_score, elapsed)
record.current_score = new_score
record.last_reinforced = reference # Reset timer after snapshotForgettingPolicy.forgotten_entries method · python · L233-L252 (20 LOC)src/agent_memory/forgetting/policy.py
def forgotten_entries(self, now: Optional[datetime] = None) -> list[str]:
"""Return IDs of entries whose decay score is at or below the threshold.
Parameters
----------
now:
Reference time (defaults to UTC now).
Returns
-------
list[str]
Entry IDs that should be forgotten.
"""
result: list[str] = []
reference = now or _utcnow()
for entry_id, record in self._records.items():
score = self.compute_score(entry_id, now=reference)
if score is not None and score <= self._config.forget_threshold:
result.append(entry_id)
return resultForgettingPolicy.purge_forgotten method · python · L254-L270 (17 LOC)src/agent_memory/forgetting/policy.py
def purge_forgotten(self, now: Optional[datetime] = None) -> list[str]:
"""Remove forgotten entries from tracking and return their IDs.
Parameters
----------
now:
Reference time (defaults to UTC now).
Returns
-------
list[str]
Entry IDs that were purged.
"""
to_purge = self.forgotten_entries(now=now)
for entry_id in to_purge:
self._records.pop(entry_id, None)
return to_purgeForgettingPolicy.all_scores method · python · L276-L293 (18 LOC)src/agent_memory/forgetting/policy.py
def all_scores(self, now: Optional[datetime] = None) -> dict[str, float]:
"""Return current decay scores for all tracked entries.
Parameters
----------
now:
Reference time (defaults to UTC now).
Returns
-------
dict[str, float]
Maps entry_id to current decay score.
"""
reference = now or _utcnow()
return {
entry_id: self.compute_score(entry_id, now=reference) or 0.0
for entry_id in self._records
}ForgettingPolicy._apply_decay method · python · L299-L322 (24 LOC)src/agent_memory/forgetting/policy.py
def _apply_decay(self, current_score: float, elapsed_hours: float) -> float:
"""Apply the configured decay function to a score."""
if elapsed_hours <= 0.0:
return current_score
config = self._config
half_life = config.half_life_hours
if config.decay_function == DecayFunction.EXPONENTIAL:
# score(t) = score_0 * (0.5)^(t / half_life)
decay_factor = math.pow(0.5, elapsed_hours / half_life)
new_score = current_score * decay_factor
elif config.decay_function == DecayFunction.LINEAR:
# score(t) = score_0 - (1/half_life) * elapsed_hours
# half_life_hours represents time to go from 1.0 to 0.0
rate = 1.0 / half_life
new_score = current_score - rate * elapsed_hours
else: # STEP
# Score drops to 0 after half_life_hours have elapsed
new_score = 0.0 if elapsed_hours >= half_life else current_score
return maxForgettingScheduler.sweep method · python · L373-L401 (29 LOC)src/agent_memory/forgetting/policy.py
def sweep(self, now: Optional[datetime] = None) -> SweepResult:
"""Run a single forgetting sweep.
Updates all decay scores and purges entries below the forget threshold.
Parameters
----------
now:
Reference time (defaults to UTC now).
Returns
-------
SweepResult
Details of what was purged.
"""
reference = now or _utcnow()
entries_before = self._policy.tracked_count()
self._policy.update_scores(now=reference)
forgotten_ids = self._policy.purge_forgotten(now=reference)
entries_after = self._policy.tracked_count()
result = SweepResult(
forgotten_ids=forgotten_ids,
sweep_time=reference,
entries_before=entries_before,
entries_after=entries_after,
)
self._sweep_history.append(result)
return resultFreshnessDecay.__init__ method · python · L30-L41 (12 LOC)src/agent_memory/freshness/decay.py
def __init__(
self,
mode: str = "exponential",
half_life_hours: float = 48.0,
) -> None:
if mode not in self.MODES:
raise ValueError(f"mode must be one of {sorted(self.MODES)}, got {mode!r}")
if half_life_hours <= 0:
raise ValueError("half_life_hours must be positive")
self._mode = mode
self._half_life = half_life_hours
self._lambda = math.log(2.0) / half_life_hoursFreshnessDecay.compute method · python · L47-L61 (15 LOC)src/agent_memory/freshness/decay.py
def compute(self, age_hours: float) -> float:
"""Return freshness retention in [0, 1] for the given age."""
age = max(0.0, age_hours)
if self._mode == "linear":
return max(0.0, 1.0 - age / (self._half_life * 2.0))
if self._mode == "exponential":
return math.exp(-self._lambda * age)
# step
result = 1.0
for threshold, retention in self._STEP_THRESHOLDS:
if age >= threshold:
result = retention
else:
break
return max(0.0, result)Open data scored by Repobility · https://repobility.com
RefreshTrigger.mark_for_refresh method · python · L23-L29 (7 LOC)src/agent_memory/freshness/refresh.py
def mark_for_refresh(self, entry: MemoryEntry) -> MemoryEntry:
"""Return a new entry with the ``needs_refresh`` metadata flag set."""
now_iso = datetime.now(timezone.utc).isoformat()
updated_meta = dict(entry.metadata)
updated_meta[_NEEDS_REFRESH_KEY] = "true"
updated_meta[_REFRESH_REQUESTED_AT_KEY] = now_iso
return entry.model_copy(update={"metadata": updated_meta})RefreshTrigger.clear_refresh_flag method · python · L31-L44 (14 LOC)src/agent_memory/freshness/refresh.py
def clear_refresh_flag(self, entry: MemoryEntry) -> MemoryEntry:
"""Return a new entry with the ``needs_refresh`` flag cleared and
``last_verified`` set to now."""
now_iso = datetime.now(timezone.utc).isoformat()
updated_meta = dict(entry.metadata)
updated_meta.pop(_NEEDS_REFRESH_KEY, None)
updated_meta.pop(_REFRESH_REQUESTED_AT_KEY, None)
updated_meta["last_verified"] = now_iso
return entry.model_copy(
update={
"metadata": updated_meta,
"freshness_score": 1.0,
}
)FreshnessScorer.score_and_update method · python · L41-L46 (6 LOC)src/agent_memory/freshness/scorer.py
def score_and_update(
self, entry: MemoryEntry, now: datetime | None = None
) -> MemoryEntry:
"""Return a new MemoryEntry with the freshness_score field updated."""
new_score = self.score(entry, now)
return entry.model_copy(update={"freshness_score": new_score})FreshnessScorer._reference_time method · python · L52-L59 (8 LOC)src/agent_memory/freshness/scorer.py
def _reference_time(self, entry: MemoryEntry) -> datetime:
raw = entry.metadata.get("last_verified")
if raw:
try:
return datetime.fromisoformat(raw)
except ValueError:
pass
return entry.last_accessedStaleMemoryDetector.__init__ method · python · L24-L30 (7 LOC)src/agent_memory/freshness/validator.py
def __init__(
self,
scorer: FreshnessScorer | None = None,
threshold: float = 0.3,
) -> None:
self._scorer = scorer or FreshnessScorer()
self._threshold = thresholdStaleMemoryDetector.find_stale method · python · L40-L46 (7 LOC)src/agent_memory/freshness/validator.py
def find_stale(
self,
entries: Sequence[MemoryEntry],
now: Optional[datetime] = None,
) -> list[MemoryEntry]:
"""Return all entries in the sequence that are stale."""
return [e for e in entries if self.is_stale(e, now)]StaleMemoryDetector.freshness_report method · python · L48-L62 (15 LOC)src/agent_memory/freshness/validator.py
def freshness_report(
self,
entries: Sequence[MemoryEntry],
now: Optional[datetime] = None,
) -> list[dict[str, object]]:
"""Return a summary dict for each entry with ID, score, and stale flag."""
return [
{
"memory_id": e.memory_id,
"freshness_score": self._scorer.score(e, now),
"is_stale": self.is_stale(e, now),
"content_preview": e.content[:80],
}
for e in entries
]DecayCurve.decay method · python · L18-L30 (13 LOC)src/agent_memory/importance/decay.py
def decay(self, age_hours: float) -> float:
"""Return retention factor for a memory of the given age.
Parameters
----------
age_hours:
How many hours have elapsed since the memory was created/verified.
Returns
-------
float
Value in [0.0, 1.0]; 1.0 means fully fresh, 0.0 means fully decayed.
"""Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
StepDecay.__init__ method · python · L91-L97 (7 LOC)src/agent_memory/importance/decay.py
def __init__(
self,
thresholds: list[tuple[float, float]] | None = None,
) -> None:
raw = thresholds if thresholds is not None else self._DEFAULT_THRESHOLDS
# Sort ascending by age
self._thresholds = sorted(raw, key=lambda t: t[0])StepDecay.decay method · python · L99-L107 (9 LOC)src/agent_memory/importance/decay.py
def decay(self, age_hours: float) -> float:
"""Step function: use the retention of the last applicable threshold."""
result = 1.0
for age_limit, retention in self._thresholds:
if age_hours >= age_limit:
result = retention
else:
break
return max(0.0, result)_age_hours function · python · L14-L21 (8 LOC)src/agent_memory/importance/gc.py
def _age_hours(entry: MemoryEntry) -> float:
"""Return the age of an entry in hours from now."""
now = datetime.now(timezone.utc)
created = entry.created_at
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
delta = now - created
return delta.total_seconds() / 3600.0MemoryGarbageCollector.__init__ method · python · L42-L51 (10 LOC)src/agent_memory/importance/gc.py
def __init__(
self,
store: MemoryStore,
decay_curve: DecayCurve | None = None,
threshold: float = 0.05,
) -> None:
self._store = store
self._decay = decay_curve or ExponentialDecay(half_life_hours=48.0)
self._threshold = threshold
self._protector = SafetyMemoryProtector()MemoryGarbageCollector.candidates method · python · L62-L73 (12 LOC)src/agent_memory/importance/gc.py
def candidates(self, entries: Sequence[MemoryEntry] | None = None) -> list[MemoryEntry]:
"""Return entries eligible for garbage collection.
Parameters
----------
entries:
Explicit list of entries to evaluate; if None, all entries in
the store are evaluated.
"""
pool = list(entries) if entries is not None else list(self._store.all())
unprotected = self._protector.filter_unprotected(pool)
return [e for e in unprotected if self.effective_importance(e) < self._threshold]MemoryGarbageCollector.collect method · python · L75-L92 (18 LOC)src/agent_memory/importance/gc.py
def collect(self, dry_run: bool = False) -> list[str]:
"""Remove eligible entries from the store.
Parameters
----------
dry_run:
If True, identify candidates but do not actually delete them.
Returns
-------
list[str]
Memory IDs that were (or would be) removed.
"""
evicted_ids = [e.memory_id for e in self.candidates()]
if not dry_run:
for mid in evicted_ids:
self._store.delete(mid)
return evicted_idsMemoryGarbageCollector.stats method · python · L94-L105 (12 LOC)src/agent_memory/importance/gc.py
def stats(self) -> dict[str, int]:
"""Return a summary of current store state relative to GC threshold."""
all_entries = list(self._store.all())
unprotected = self._protector.filter_unprotected(all_entries)
protected = self._protector.filter_protected(all_entries)
eligible = [e for e in unprotected if self.effective_importance(e) < self._threshold]
return {
"total": len(all_entries),
"protected": len(protected),
"unprotected": len(unprotected),
"eligible_for_gc": len(eligible),
}ImportanceScorer.score method · python · L55-L66 (12 LOC)src/agent_memory/importance/scorer.py
def score(self, entry: MemoryEntry) -> float:
"""Return an importance score in [0.0, 1.0] for the given entry."""
if entry.safety_critical:
return 1.0
keyword_score = self._keyword_score(entry.content)
source_weight = _SOURCE_WEIGHTS.get(entry.source, 0.5)
layer_base = _LAYER_BASE.get(entry.layer, 0.5)
# Weighted blend: keywords carry most weight, source and layer add context
raw = keyword_score * 0.5 + source_weight * 0.3 + layer_base * 0.2
return min(1.0, max(0.0, raw))If a scraper extracted this row, it came from Repobility (https://repobility.com)
ImportanceScorer._keyword_score method · python · L77-L86 (10 LOC)src/agent_memory/importance/scorer.py
def _keyword_score(self, content: str) -> float:
"""Scan content for importance-indicating keywords."""
words = set(re.findall(r"[a-z0-9]+", content.lower()))
if words & _CRITICAL_KEYWORDS:
return 1.0
if words & _HIGH_KEYWORDS:
return 0.75
if words & _MEDIUM_KEYWORDS:
return 0.55
return 0.4 # neutral / no signalMem0EnhancedMemory.__init__ method · python · L169-L196 (28 LOC)src/agent_memory/integrations/mem0_adapter.py
def __init__(
self,
user_id: str = "default",
mem0_client: Optional[Any] = None,
mem0_config: Optional[dict[str, Any]] = None,
contradiction_threshold: float = 0.25,
block_on_contradiction: bool = False,
freshness_weight: float = 0.3,
) -> None:
if not 0.0 <= contradiction_threshold <= 1.0:
raise ValueError(
f"contradiction_threshold must be in [0.0, 1.0], got {contradiction_threshold}"
)
if not 0.0 <= freshness_weight <= 1.0:
raise ValueError(
f"freshness_weight must be in [0.0, 1.0], got {freshness_weight}"
)
self._user_id = user_id
self._contradiction_threshold = contradiction_threshold
self._block_on_contradiction = block_on_contradiction
self._freshness_weight = freshness_weight
if mem0_client is not None:
self._client = mem0_client
elif mem0_config is not None:
Mem0EnhancedMemory.add_with_contradiction_check method · python · L202-L279 (78 LOC)src/agent_memory/integrations/mem0_adapter.py
def add_with_contradiction_check(
self,
text: str,
metadata: Optional[dict[str, Any]] = None,
source: MemorySource = MemorySource.AGENT_INFERENCE,
) -> AddResult:
"""Add a memory after checking for contradictions with existing entries.
Fetches semantically similar existing memories from Mem0 and checks
the candidate text against them using AumOS ContradictionDetector.
If contradictions are found and ``block_on_contradiction`` is True,
the add is aborted.
Parameters
----------
text:
The memory text to store.
metadata:
Optional key-value metadata to attach to the memory.
source:
AumOS memory source classification (informational only).
Returns
-------
AddResult
Contains the Mem0 response, detected contradictions, and whether
the add was blocked.
"""
if not text.strip():