← back to invincible-jha__agent-memory

Function bodies 288 total

All specs Real LLM only Function bodies
_KeepOlderStrategy.apply method · python · L328-L366 (39 LOC)
src/agent_memory/contradiction/resolver.py
    def apply(self, contradiction: Contradiction) -> Resolution:
        entry_a = contradiction.entry_a
        entry_b = contradiction.entry_b

        from datetime import timezone as tz

        def _utc(dt_val: object) -> object:
            from datetime import datetime

            if not isinstance(dt_val, datetime):
                return dt_val
            if dt_val.tzinfo is None:
                return dt_val.replace(tzinfo=tz.utc)
            return dt_val.astimezone(tz.utc)

        ts_a = _utc(entry_a.timestamp)
        ts_b = _utc(entry_b.timestamp)

        from datetime import datetime

        if isinstance(ts_a, datetime) and isinstance(ts_b, datetime):
            if ts_a <= ts_b:
                kept = entry_a
                archived = entry_b
            else:
                kept = entry_b
                archived = entry_a
        else:
            kept = entry_a
            archived = entry_b

        return Resolution(
            strategy_used="keep_older",
SimpleContradictionResolver.__init__ method · python · L394-L400 (7 LOC)
src/agent_memory/contradiction/resolver.py
    def __init__(self, default_strategy: str = "keep_newer") -> None:
        if default_strategy not in _VALID_STRATEGY_NAMES:
            raise ValueError(
                f"Unknown strategy {default_strategy!r}. "
                f"Valid options: {sorted(_VALID_STRATEGY_NAMES)!r}"
            )
        self._default_strategy = default_strategy
SimpleContradictionResolver.resolve method · python · L406-L444 (39 LOC)
src/agent_memory/contradiction/resolver.py
    def resolve(
        self,
        contradiction: Contradiction,
        strategy: Optional[str] = None,
    ) -> Resolution:
        """Resolve a contradiction using the specified strategy.

        Parameters
        ----------
        contradiction:
            The detected contradiction to resolve.
        strategy:
            Strategy name override.  Falls back to ``default_strategy`` when
            ``None``.  Must be one of ``'keep_newer'``, ``'keep_older'``,
            ``'keep_both_with_context'``, ``'flag_for_review'``, ``'merge'``.

        Returns
        -------
        Resolution
            The resolution outcome.

        Raises
        ------
        ValueError
            If the strategy name is not recognised.
        """
        active_strategy = strategy if strategy is not None else self._default_strategy

        registry = _get_strategy_registry()
        strategy_obj = registry.get(active_strategy)

        if strategy_obj is None:
            raise ValueE
ScanResult.summary method · python · L42-L50 (9 LOC)
src/agent_memory/contradiction/scanner.py
    def summary(self) -> dict[str, object]:
        return {
            "scanned_at": self.scanned_at.isoformat(),
            "scanned_count": self.scanned_count,
            "skipped_count": self.skipped_count,
            "contradiction_count": self.contradiction_count,
            "early_terminated": self.early_terminated,
            "scan_duration_seconds": round(self.scan_duration_seconds, 3),
        }
ContradictionScanner.__init__ method · python · L80-L91 (12 LOC)
src/agent_memory/contradiction/scanner.py
    def __init__(
        self,
        detector: Optional[ContradictionDetector] = None,
        max_contradictions: int = 0,
        similarity_threshold: float = 0.25,
        high_importance_threshold: float = _HIGH_IMPORTANCE_THRESHOLD,
    ) -> None:
        self._detector = detector or ContradictionDetector(
            similarity_threshold=similarity_threshold
        )
        self._max_contradictions = max_contradictions
        self._high_importance_threshold = high_importance_threshold
ContradictionScanner.scan method · python · L93-L118 (26 LOC)
src/agent_memory/contradiction/scanner.py
    def scan(
        self,
        store: MemoryStore,
        scope: ScanScope = ScanScope.ALL,
    ) -> ScanResult:
        """Scan the store for contradictions within the given scope.

        Parameters
        ----------
        store:
            The memory store to scan.
        scope:
            Which entries to include. Defaults to ``ALL``.

        Returns
        -------
        ScanResult
            Contains all detected contradiction pairs and scan metadata.
        """
        import time

        start_time = time.monotonic()
        candidates = self._collect_candidates(store, scope)
        result = self._pairwise_scan(candidates)
        result.scan_duration_seconds = time.monotonic() - start_time
        return result
ContradictionScanner.scan_entries method · python · L120-L140 (21 LOC)
src/agent_memory/contradiction/scanner.py
    def scan_entries(
        self,
        entries: Sequence[MemoryEntry],
    ) -> ScanResult:
        """Scan an explicit list of entries rather than a store.

        Parameters
        ----------
        entries:
            The entries to check for contradictions.

        Returns
        -------
        ScanResult
        """
        import time

        start_time = time.monotonic()
        result = self._pairwise_scan(list(entries))
        result.scan_duration_seconds = time.monotonic() - start_time
        return result
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
ContradictionScanner.scan_against method · python · L142-L179 (38 LOC)
src/agent_memory/contradiction/scanner.py
    def scan_against(
        self,
        entry: MemoryEntry,
        store: MemoryStore,
        scope: ScanScope = ScanScope.ALL,
    ) -> ScanResult:
        """Find contradictions between a single entry and all store entries.

        Useful for checking a newly added entry against the existing store
        without a full pairwise scan.

        Parameters
        ----------
        entry:
            The entry to check.
        store:
            The store to search within.
        scope:
            Restricts which stored entries are checked.

        Returns
        -------
        ScanResult
        """
        import time

        start_time = time.monotonic()
        candidates = self._collect_candidates(store, scope)
        # Exclude the entry itself if it is already in the store
        candidates = [c for c in candidates if c.memory_id != entry.memory_id]

        pairs = self._detector.detect_for_entry(entry, candidates)
        result = ScanResult(
            scanne
ContradictionScanner._collect_candidates method · python · L185-L208 (24 LOC)
src/agent_memory/contradiction/scanner.py
    def _collect_candidates(
        self,
        store: MemoryStore,
        scope: ScanScope,
    ) -> list[MemoryEntry]:
        """Return the filtered set of entries to scan based on scope."""
        layer_map: dict[ScanScope, MemoryLayer] = {
            ScanScope.WORKING: MemoryLayer.WORKING,
            ScanScope.EPISODIC: MemoryLayer.EPISODIC,
            ScanScope.SEMANTIC: MemoryLayer.SEMANTIC,
            ScanScope.PROCEDURAL: MemoryLayer.PROCEDURAL,
        }

        if scope in layer_map:
            return list(store.all(layer=layer_map[scope]))

        if scope == ScanScope.HIGH_IMPORTANCE:
            return [
                e for e in store.all()
                if e.importance_score >= self._high_importance_threshold
            ]

        # ScanScope.ALL
        return list(store.all())
ContradictionScanner._pairwise_scan method · python · L210-L237 (28 LOC)
src/agent_memory/contradiction/scanner.py
    def _pairwise_scan(self, entries: list[MemoryEntry]) -> ScanResult:
        """Run pairwise contradiction detection with optional early termination."""
        found: list[ContradictionPair] = []
        skipped = 0
        early_terminated = False

        for i in range(len(entries)):
            for j in range(i + 1, len(entries)):
                if self._max_contradictions > 0 and len(found) >= self._max_contradictions:
                    # Count remaining unchecked pairs as skipped
                    remaining = (len(entries) - i - 1) * len(entries) // 2
                    skipped += max(0, remaining)
                    early_terminated = True
                    break

                pair = self._detector._compare(entries[i], entries[j])
                if pair is not None:
                    found.append(pair)

            if early_terminated:
                break

        return ScanResult(
            scanned_count=len(entries),
            contradiction_pairs=foun
KeepNewerStrategy.apply method · python · L56-L98 (43 LOC)
src/agent_memory/contradiction/strategies.py
    def apply(self, contradiction: Contradiction) -> Resolution:
        """Apply recency-wins logic to the contradiction.

        Parameters
        ----------
        contradiction:
            The contradiction to resolve.

        Returns
        -------
        Resolution
            Resolution that retains the newer entry and archives the older.
        """
        entry_a = contradiction.entry_a
        entry_b = contradiction.entry_b

        # Normalise timestamps to UTC-aware for safe comparison
        ts_a = _to_utc(entry_a.timestamp)
        ts_b = _to_utc(entry_b.timestamp)

        if ts_a >= ts_b:
            kept = entry_a
            archived = entry_b
            rationale = (
                f"Kept entry '{entry_a.id}' (timestamp {entry_a.timestamp.isoformat()}) "
                f"over '{entry_b.id}' (timestamp {entry_b.timestamp.isoformat()}) "
                f"— recency wins."
            )
        else:
            kept = entry_b
            archived = entry_a
KeepBothStrategy.apply method · python · L114-L142 (29 LOC)
src/agent_memory/contradiction/strategies.py
    def apply(self, contradiction: Contradiction) -> Resolution:
        """Apply keep-both logic to the contradiction.

        Parameters
        ----------
        contradiction:
            The contradiction to resolve.

        Returns
        -------
        Resolution
            Resolution that retains both entries with an annotation note.
        """
        entry_a = contradiction.entry_a
        entry_b = contradiction.entry_b

        annotation = (
            f"Both entries retained with contradiction annotation. "
            f"Type: {contradiction.contradiction_type.value}. "
            f"Similarity: {contradiction.similarity_score:.4f}. "
            f"Explanation: {contradiction.explanation}"
        )

        return Resolution(
            strategy_used="keep_both_with_context",
            kept_entries=(entry_a, entry_b),
            archived_entries=(),
            notes=annotation,
        )
FlagForReviewStrategy.apply method · python · L158-L186 (29 LOC)
src/agent_memory/contradiction/strategies.py
    def apply(self, contradiction: Contradiction) -> Resolution:
        """Flag both entries for manual review.

        Parameters
        ----------
        contradiction:
            The contradiction to resolve.

        Returns
        -------
        Resolution
            Resolution that keeps both entries and flags them for review.
        """
        entry_a = contradiction.entry_a
        entry_b = contradiction.entry_b

        review_note = (
            f"PENDING_REVIEW: entries '{entry_a.id}' and '{entry_b.id}' "
            f"contradict each other ({contradiction.contradiction_type.value}, "
            f"confidence={contradiction.confidence:.4f}). "
            f"Human review required. Explanation: {contradiction.explanation}"
        )

        return Resolution(
            strategy_used="flag_for_review",
            kept_entries=(entry_a, entry_b),
            archived_entries=(),
            notes=review_note,
        )
MergeStrategy.apply method · python · L208-L250 (43 LOC)
src/agent_memory/contradiction/strategies.py
    def apply(self, contradiction: Contradiction) -> Resolution:
        """Synthesise a merged entry from the two contradicting entries.

        Parameters
        ----------
        contradiction:
            The contradiction to resolve.

        Returns
        -------
        Resolution
            Resolution with a single new merged entry in ``kept_entries``
            and both originals in ``archived_entries``.
        """
        entry_a = contradiction.entry_a
        entry_b = contradiction.entry_b

        # Use the newer entry's timestamp for the merge
        ts_a = _to_utc(entry_a.timestamp)
        ts_b = _to_utc(entry_b.timestamp)
        merge_timestamp = entry_a.timestamp if ts_a >= ts_b else entry_b.timestamp

        merged_content = entry_a.content + self.MERGE_SEPARATOR + entry_b.content
        merged_source = f"merged:{entry_a.source}+{entry_b.source}"

        merged_entry = MemoryEntry(
            content=merged_content,
            timestamp=merge_timestam
_to_utc function · python · L258-L266 (9 LOC)
src/agent_memory/contradiction/strategies.py
def _to_utc(dt_value: object) -> object:
    """Normalise a datetime to UTC-aware.  Returns the input unchanged if not a datetime."""
    from datetime import datetime, timezone

    if not isinstance(dt_value, datetime):
        return dt_value
    if dt_value.tzinfo is None:
        return dt_value.replace(tzinfo=timezone.utc)
    return dt_value.astimezone(timezone.utc)
If a scraper extracted this row, it came from Repobility (https://repobility.com)
Memory.__init__ method · python · L35-L40 (6 LOC)
src/agent_memory/convenience.py
    def __init__(self) -> None:
        from agent_memory.unified.memory import UnifiedMemory
        from agent_memory.unified.config import MemoryConfig

        config = MemoryConfig(storage_backend="memory")
        self._memory = UnifiedMemory(config=config)
Memory.add method · python · L42-L89 (48 LOC)
src/agent_memory/convenience.py
    def add(
        self,
        text: str,
        user_id: str = "default-user",
        layer: str = "semantic",
    ) -> Any:
        """Add a text memory.

        Parameters
        ----------
        text:
            Text content to memorize.
        user_id:
            User identifier for multi-user isolation.
        layer:
            Memory layer — one of ``"working"``, ``"episodic"``,
            ``"semantic"`` (default), or ``"procedural"``.

        Returns
        -------
        MemoryEntry
            The stored memory entry with its assigned ID.

        Example
        -------
        ::

            m = Memory()
            entry = m.add("The Eiffel Tower is in Paris", user_id="alice")
            print(entry.entry_id)
        """
        from agent_memory.memory.types import MemoryEntry, MemoryLayer, MemorySource

        layer_map: dict[str, MemoryLayer] = {
            "working": MemoryLayer.WORKING,
            "episodic": MemoryLayer.EPISODIC,
            "
Memory.search method · python · L91-L115 (25 LOC)
src/agent_memory/convenience.py
    def search(self, query: str, limit: int = 10) -> list[Any]:
        """Search stored memories by text query.

        Parameters
        ----------
        query:
            Free-text search query.
        limit:
            Maximum number of results to return.

        Returns
        -------
        list[MemoryEntry]
            List of matching memory entries ordered by relevance.

        Example
        -------
        ::

            m = Memory()
            m.add("Python is a programming language")
            results = m.search("programming")
            print(results[0].entry.content if results else "empty")
        """
        return self._memory.search(query, limit=limit)
PolicyConfig.__post_init__ method · python · L55-L63 (9 LOC)
src/agent_memory/forgetting/policy.py
    def __post_init__(self) -> None:
        if self.half_life_hours <= 0:
            raise ValueError(
                f"half_life_hours must be > 0, got {self.half_life_hours}"
            )
        if not 0.0 <= self.forget_threshold <= 1.0:
            raise ValueError(
                f"forget_threshold must be in [0, 1], got {self.forget_threshold}"
            )
ForgettingPolicy.register method · python · L118-L146 (29 LOC)
src/agent_memory/forgetting/policy.py
    def register(
        self,
        entry_id: str,
        safety_critical: bool = False,
        initial_score: float = 1.0,
    ) -> None:
        """Register a memory entry for decay tracking.

        Parameters
        ----------
        entry_id:
            Unique identifier for the entry.
        safety_critical:
            Whether this entry is exempt from forgetting.
        initial_score:
            Starting decay score (default 1.0).

        Raises
        ------
        ValueError
            If the entry is already registered.
        """
        if entry_id in self._records:
            raise ValueError(f"Entry {entry_id!r} is already registered.")
        self._records[entry_id] = DecayRecord(
            entry_id=entry_id,
            current_score=min(1.0, max(0.0, initial_score)),
            safety_critical=safety_critical,
        )
ForgettingPolicy.deregister method · python · L148-L156 (9 LOC)
src/agent_memory/forgetting/policy.py
    def deregister(self, entry_id: str) -> bool:
        """Remove an entry from decay tracking.

        Returns True if removed, False if not found.
        """
        if entry_id not in self._records:
            return False
        del self._records[entry_id]
        return True
ForgettingPolicy.reinforce method · python · L162-L182 (21 LOC)
src/agent_memory/forgetting/policy.py
    def reinforce(self, entry_id: str) -> bool:
        """Record an access event, resetting the decay timer.

        Parameters
        ----------
        entry_id:
            The entry that was accessed.

        Returns
        -------
        bool
            True if reinforced, False if entry not found or
            ``reinforce_on_access`` is disabled.
        """
        record = self._records.get(entry_id)
        if record is None:
            return False
        if self._config.reinforce_on_access:
            record.current_score = 1.0
            record.last_reinforced = _utcnow()
        return True
ForgettingPolicy.compute_score method · python · L188-L213 (26 LOC)
src/agent_memory/forgetting/policy.py
    def compute_score(
        self,
        entry_id: str,
        now: Optional[datetime] = None,
    ) -> Optional[float]:
        """Compute the current decay score for an entry.

        Parameters
        ----------
        entry_id:
            The entry to score.
        now:
            Reference time (defaults to UTC now).

        Returns
        -------
        float | None
            Current decay score in [0, 1], or None if entry not found.
        """
        record = self._records.get(entry_id)
        if record is None:
            return None
        if record.safety_critical and self._config.safety_critical_exempt:
            return 1.0
        elapsed = record.elapsed_hours(now)
        return self._apply_decay(record.current_score, elapsed)
Repobility · severity-and-effort ranking · https://repobility.com
ForgettingPolicy.update_scores method · python · L215-L227 (13 LOC)
src/agent_memory/forgetting/policy.py
    def update_scores(self, now: Optional[datetime] = None) -> None:
        """Update the stored score for all records using current time.

        This is called by the scheduler before sweeping for forgotten entries.
        """
        reference = now or _utcnow()
        for record in self._records.values():
            if record.safety_critical and self._config.safety_critical_exempt:
                continue
            elapsed = record.elapsed_hours(reference)
            new_score = self._apply_decay(record.current_score, elapsed)
            record.current_score = new_score
            record.last_reinforced = reference  # Reset timer after snapshot
ForgettingPolicy.forgotten_entries method · python · L233-L252 (20 LOC)
src/agent_memory/forgetting/policy.py
    def forgotten_entries(self, now: Optional[datetime] = None) -> list[str]:
        """Return IDs of entries whose decay score is at or below the threshold.

        Parameters
        ----------
        now:
            Reference time (defaults to UTC now).

        Returns
        -------
        list[str]
            Entry IDs that should be forgotten.
        """
        result: list[str] = []
        reference = now or _utcnow()
        for entry_id, record in self._records.items():
            score = self.compute_score(entry_id, now=reference)
            if score is not None and score <= self._config.forget_threshold:
                result.append(entry_id)
        return result
ForgettingPolicy.purge_forgotten method · python · L254-L270 (17 LOC)
src/agent_memory/forgetting/policy.py
    def purge_forgotten(self, now: Optional[datetime] = None) -> list[str]:
        """Remove forgotten entries from tracking and return their IDs.

        Parameters
        ----------
        now:
            Reference time (defaults to UTC now).

        Returns
        -------
        list[str]
            Entry IDs that were purged.
        """
        to_purge = self.forgotten_entries(now=now)
        for entry_id in to_purge:
            self._records.pop(entry_id, None)
        return to_purge
ForgettingPolicy.all_scores method · python · L276-L293 (18 LOC)
src/agent_memory/forgetting/policy.py
    def all_scores(self, now: Optional[datetime] = None) -> dict[str, float]:
        """Return current decay scores for all tracked entries.

        Parameters
        ----------
        now:
            Reference time (defaults to UTC now).

        Returns
        -------
        dict[str, float]
            Maps entry_id to current decay score.
        """
        reference = now or _utcnow()
        return {
            entry_id: self.compute_score(entry_id, now=reference) or 0.0
            for entry_id in self._records
        }
ForgettingPolicy._apply_decay method · python · L299-L322 (24 LOC)
src/agent_memory/forgetting/policy.py
    def _apply_decay(self, current_score: float, elapsed_hours: float) -> float:
        """Apply the configured decay function to a score."""
        if elapsed_hours <= 0.0:
            return current_score

        config = self._config
        half_life = config.half_life_hours

        if config.decay_function == DecayFunction.EXPONENTIAL:
            # score(t) = score_0 * (0.5)^(t / half_life)
            decay_factor = math.pow(0.5, elapsed_hours / half_life)
            new_score = current_score * decay_factor

        elif config.decay_function == DecayFunction.LINEAR:
            # score(t) = score_0 - (1/half_life) * elapsed_hours
            # half_life_hours represents time to go from 1.0 to 0.0
            rate = 1.0 / half_life
            new_score = current_score - rate * elapsed_hours

        else:  # STEP
            # Score drops to 0 after half_life_hours have elapsed
            new_score = 0.0 if elapsed_hours >= half_life else current_score

        return max
ForgettingScheduler.sweep method · python · L373-L401 (29 LOC)
src/agent_memory/forgetting/policy.py
    def sweep(self, now: Optional[datetime] = None) -> SweepResult:
        """Run a single forgetting sweep.

        Updates all decay scores and purges entries below the forget threshold.

        Parameters
        ----------
        now:
            Reference time (defaults to UTC now).

        Returns
        -------
        SweepResult
            Details of what was purged.
        """
        reference = now or _utcnow()
        entries_before = self._policy.tracked_count()
        self._policy.update_scores(now=reference)
        forgotten_ids = self._policy.purge_forgotten(now=reference)
        entries_after = self._policy.tracked_count()

        result = SweepResult(
            forgotten_ids=forgotten_ids,
            sweep_time=reference,
            entries_before=entries_before,
            entries_after=entries_after,
        )
        self._sweep_history.append(result)
        return result
FreshnessDecay.__init__ method · python · L30-L41 (12 LOC)
src/agent_memory/freshness/decay.py
    def __init__(
        self,
        mode: str = "exponential",
        half_life_hours: float = 48.0,
    ) -> None:
        if mode not in self.MODES:
            raise ValueError(f"mode must be one of {sorted(self.MODES)}, got {mode!r}")
        if half_life_hours <= 0:
            raise ValueError("half_life_hours must be positive")
        self._mode = mode
        self._half_life = half_life_hours
        self._lambda = math.log(2.0) / half_life_hours
FreshnessDecay.compute method · python · L47-L61 (15 LOC)
src/agent_memory/freshness/decay.py
    def compute(self, age_hours: float) -> float:
        """Return freshness retention in [0, 1] for the given age."""
        age = max(0.0, age_hours)
        if self._mode == "linear":
            return max(0.0, 1.0 - age / (self._half_life * 2.0))
        if self._mode == "exponential":
            return math.exp(-self._lambda * age)
        # step
        result = 1.0
        for threshold, retention in self._STEP_THRESHOLDS:
            if age >= threshold:
                result = retention
            else:
                break
        return max(0.0, result)
Open data scored by Repobility · https://repobility.com
RefreshTrigger.mark_for_refresh method · python · L23-L29 (7 LOC)
src/agent_memory/freshness/refresh.py
    def mark_for_refresh(self, entry: MemoryEntry) -> MemoryEntry:
        """Return a new entry with the ``needs_refresh`` metadata flag set."""
        now_iso = datetime.now(timezone.utc).isoformat()
        updated_meta = dict(entry.metadata)
        updated_meta[_NEEDS_REFRESH_KEY] = "true"
        updated_meta[_REFRESH_REQUESTED_AT_KEY] = now_iso
        return entry.model_copy(update={"metadata": updated_meta})
RefreshTrigger.clear_refresh_flag method · python · L31-L44 (14 LOC)
src/agent_memory/freshness/refresh.py
    def clear_refresh_flag(self, entry: MemoryEntry) -> MemoryEntry:
        """Return a new entry with the ``needs_refresh`` flag cleared and
        ``last_verified`` set to now."""
        now_iso = datetime.now(timezone.utc).isoformat()
        updated_meta = dict(entry.metadata)
        updated_meta.pop(_NEEDS_REFRESH_KEY, None)
        updated_meta.pop(_REFRESH_REQUESTED_AT_KEY, None)
        updated_meta["last_verified"] = now_iso
        return entry.model_copy(
            update={
                "metadata": updated_meta,
                "freshness_score": 1.0,
            }
        )
FreshnessScorer.score_and_update method · python · L41-L46 (6 LOC)
src/agent_memory/freshness/scorer.py
    def score_and_update(
        self, entry: MemoryEntry, now: datetime | None = None
    ) -> MemoryEntry:
        """Return a new MemoryEntry with the freshness_score field updated."""
        new_score = self.score(entry, now)
        return entry.model_copy(update={"freshness_score": new_score})
FreshnessScorer._reference_time method · python · L52-L59 (8 LOC)
src/agent_memory/freshness/scorer.py
    def _reference_time(self, entry: MemoryEntry) -> datetime:
        raw = entry.metadata.get("last_verified")
        if raw:
            try:
                return datetime.fromisoformat(raw)
            except ValueError:
                pass
        return entry.last_accessed
StaleMemoryDetector.__init__ method · python · L24-L30 (7 LOC)
src/agent_memory/freshness/validator.py
    def __init__(
        self,
        scorer: FreshnessScorer | None = None,
        threshold: float = 0.3,
    ) -> None:
        self._scorer = scorer or FreshnessScorer()
        self._threshold = threshold
StaleMemoryDetector.find_stale method · python · L40-L46 (7 LOC)
src/agent_memory/freshness/validator.py
    def find_stale(
        self,
        entries: Sequence[MemoryEntry],
        now: Optional[datetime] = None,
    ) -> list[MemoryEntry]:
        """Return all entries in the sequence that are stale."""
        return [e for e in entries if self.is_stale(e, now)]
StaleMemoryDetector.freshness_report method · python · L48-L62 (15 LOC)
src/agent_memory/freshness/validator.py
    def freshness_report(
        self,
        entries: Sequence[MemoryEntry],
        now: Optional[datetime] = None,
    ) -> list[dict[str, object]]:
        """Return a summary dict for each entry with ID, score, and stale flag."""
        return [
            {
                "memory_id": e.memory_id,
                "freshness_score": self._scorer.score(e, now),
                "is_stale": self.is_stale(e, now),
                "content_preview": e.content[:80],
            }
            for e in entries
        ]
DecayCurve.decay method · python · L18-L30 (13 LOC)
src/agent_memory/importance/decay.py
    def decay(self, age_hours: float) -> float:
        """Return retention factor for a memory of the given age.

        Parameters
        ----------
        age_hours:
            How many hours have elapsed since the memory was created/verified.

        Returns
        -------
        float
            Value in [0.0, 1.0]; 1.0 means fully fresh, 0.0 means fully decayed.
        """
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
StepDecay.__init__ method · python · L91-L97 (7 LOC)
src/agent_memory/importance/decay.py
    def __init__(
        self,
        thresholds: list[tuple[float, float]] | None = None,
    ) -> None:
        raw = thresholds if thresholds is not None else self._DEFAULT_THRESHOLDS
        # Sort ascending by age
        self._thresholds = sorted(raw, key=lambda t: t[0])
StepDecay.decay method · python · L99-L107 (9 LOC)
src/agent_memory/importance/decay.py
    def decay(self, age_hours: float) -> float:
        """Step function: use the retention of the last applicable threshold."""
        result = 1.0
        for age_limit, retention in self._thresholds:
            if age_hours >= age_limit:
                result = retention
            else:
                break
        return max(0.0, result)
_age_hours function · python · L14-L21 (8 LOC)
src/agent_memory/importance/gc.py
def _age_hours(entry: MemoryEntry) -> float:
    """Return the age of an entry in hours from now."""
    now = datetime.now(timezone.utc)
    created = entry.created_at
    if created.tzinfo is None:
        created = created.replace(tzinfo=timezone.utc)
    delta = now - created
    return delta.total_seconds() / 3600.0
MemoryGarbageCollector.__init__ method · python · L42-L51 (10 LOC)
src/agent_memory/importance/gc.py
    def __init__(
        self,
        store: MemoryStore,
        decay_curve: DecayCurve | None = None,
        threshold: float = 0.05,
    ) -> None:
        self._store = store
        self._decay = decay_curve or ExponentialDecay(half_life_hours=48.0)
        self._threshold = threshold
        self._protector = SafetyMemoryProtector()
MemoryGarbageCollector.candidates method · python · L62-L73 (12 LOC)
src/agent_memory/importance/gc.py
    def candidates(self, entries: Sequence[MemoryEntry] | None = None) -> list[MemoryEntry]:
        """Return entries eligible for garbage collection.

        Parameters
        ----------
        entries:
            Explicit list of entries to evaluate; if None, all entries in
            the store are evaluated.
        """
        pool = list(entries) if entries is not None else list(self._store.all())
        unprotected = self._protector.filter_unprotected(pool)
        return [e for e in unprotected if self.effective_importance(e) < self._threshold]
MemoryGarbageCollector.collect method · python · L75-L92 (18 LOC)
src/agent_memory/importance/gc.py
    def collect(self, dry_run: bool = False) -> list[str]:
        """Remove eligible entries from the store.

        Parameters
        ----------
        dry_run:
            If True, identify candidates but do not actually delete them.

        Returns
        -------
        list[str]
            Memory IDs that were (or would be) removed.
        """
        evicted_ids = [e.memory_id for e in self.candidates()]
        if not dry_run:
            for mid in evicted_ids:
                self._store.delete(mid)
        return evicted_ids
MemoryGarbageCollector.stats method · python · L94-L105 (12 LOC)
src/agent_memory/importance/gc.py
    def stats(self) -> dict[str, int]:
        """Return a summary of current store state relative to GC threshold."""
        all_entries = list(self._store.all())
        unprotected = self._protector.filter_unprotected(all_entries)
        protected = self._protector.filter_protected(all_entries)
        eligible = [e for e in unprotected if self.effective_importance(e) < self._threshold]
        return {
            "total": len(all_entries),
            "protected": len(protected),
            "unprotected": len(unprotected),
            "eligible_for_gc": len(eligible),
        }
ImportanceScorer.score method · python · L55-L66 (12 LOC)
src/agent_memory/importance/scorer.py
    def score(self, entry: MemoryEntry) -> float:
        """Return an importance score in [0.0, 1.0] for the given entry."""
        if entry.safety_critical:
            return 1.0

        keyword_score = self._keyword_score(entry.content)
        source_weight = _SOURCE_WEIGHTS.get(entry.source, 0.5)
        layer_base = _LAYER_BASE.get(entry.layer, 0.5)

        # Weighted blend: keywords carry most weight, source and layer add context
        raw = keyword_score * 0.5 + source_weight * 0.3 + layer_base * 0.2
        return min(1.0, max(0.0, raw))
If a scraper extracted this row, it came from Repobility (https://repobility.com)
ImportanceScorer._keyword_score method · python · L77-L86 (10 LOC)
src/agent_memory/importance/scorer.py
    def _keyword_score(self, content: str) -> float:
        """Scan content for importance-indicating keywords."""
        words = set(re.findall(r"[a-z0-9]+", content.lower()))
        if words & _CRITICAL_KEYWORDS:
            return 1.0
        if words & _HIGH_KEYWORDS:
            return 0.75
        if words & _MEDIUM_KEYWORDS:
            return 0.55
        return 0.4  # neutral / no signal
Mem0EnhancedMemory.__init__ method · python · L169-L196 (28 LOC)
src/agent_memory/integrations/mem0_adapter.py
    def __init__(
        self,
        user_id: str = "default",
        mem0_client: Optional[Any] = None,
        mem0_config: Optional[dict[str, Any]] = None,
        contradiction_threshold: float = 0.25,
        block_on_contradiction: bool = False,
        freshness_weight: float = 0.3,
    ) -> None:
        if not 0.0 <= contradiction_threshold <= 1.0:
            raise ValueError(
                f"contradiction_threshold must be in [0.0, 1.0], got {contradiction_threshold}"
            )
        if not 0.0 <= freshness_weight <= 1.0:
            raise ValueError(
                f"freshness_weight must be in [0.0, 1.0], got {freshness_weight}"
            )

        self._user_id = user_id
        self._contradiction_threshold = contradiction_threshold
        self._block_on_contradiction = block_on_contradiction
        self._freshness_weight = freshness_weight
        if mem0_client is not None:
            self._client = mem0_client
        elif mem0_config is not None:
 
Mem0EnhancedMemory.add_with_contradiction_check method · python · L202-L279 (78 LOC)
src/agent_memory/integrations/mem0_adapter.py
    def add_with_contradiction_check(
        self,
        text: str,
        metadata: Optional[dict[str, Any]] = None,
        source: MemorySource = MemorySource.AGENT_INFERENCE,
    ) -> AddResult:
        """Add a memory after checking for contradictions with existing entries.

        Fetches semantically similar existing memories from Mem0 and checks
        the candidate text against them using AumOS ContradictionDetector.
        If contradictions are found and ``block_on_contradiction`` is True,
        the add is aborted.

        Parameters
        ----------
        text:
            The memory text to store.
        metadata:
            Optional key-value metadata to attach to the memory.
        source:
            AumOS memory source classification (informational only).

        Returns
        -------
        AddResult
            Contains the Mem0 response, detected contradictions, and whether
            the add was blocked.
        """
        if not text.strip():
‹ prevpage 3 / 6next ›