← back to invincible-jha__agent-memory

Function bodies 288 total

All specs Real LLM only Function bodies
InMemoryStorage.delete method · python · L37-L42 (6 LOC)
src/agent_memory/storage/memory_store.py
    def delete(self, key: str) -> bool:
        """Remove an entry by memory_id. Returns True if it existed."""
        if key not in self._store:
            return False
        del self._store[key]
        return True
InMemoryStorage.search method · python · L44-L65 (22 LOC)
src/agent_memory/storage/memory_store.py
    def search(
        self,
        query: str,
        layer: Optional[MemoryLayer] = None,
        limit: int = 20,
    ) -> Sequence[MemoryEntry]:
        """Case-insensitive substring search across all stored content.

        All query tokens (split on whitespace) must be present in the
        content for an entry to match.
        """
        tokens = query.lower().split()
        results: list[MemoryEntry] = []
        for entry in self._store.values():
            if layer is not None and entry.layer != layer:
                continue
            content_lower = entry.content.lower()
            if all(token in content_lower for token in tokens):
                results.append(entry)
            if len(results) >= limit:
                break
        return results
InMemoryStorage.list_keys method · python · L67-L77 (11 LOC)
src/agent_memory/storage/memory_store.py
    def list_keys(
        self,
        layer: Optional[MemoryLayer] = None,
        limit: int = 1000,
    ) -> list[str]:
        """Return stored memory_id values, optionally filtered by layer."""
        if layer is None:
            keys = list(self._store.keys())
        else:
            keys = [k for k, e in self._store.items() if e.layer == layer]
        return keys[:limit]
InMemoryStorage.clear method · python · L79-L88 (10 LOC)
src/agent_memory/storage/memory_store.py
    def clear(self, layer: Optional[MemoryLayer] = None) -> int:
        """Remove all entries, optionally filtered by layer."""
        if layer is None:
            count = len(self._store)
            self._store.clear()
            return count
        to_delete = [k for k, e in self._store.items() if e.layer == layer]
        for key in to_delete:
            del self._store[key]
        return len(to_delete)
InMemoryStorage.load_all method · python · L96-L106 (11 LOC)
src/agent_memory/storage/memory_store.py
    def load_all(
        self,
        layer: Optional[MemoryLayer] = None,
        limit: int = 1000,
    ) -> list[MemoryEntry]:
        """Return all stored entries, optionally filtered by layer."""
        if layer is None:
            entries = list(self._store.values())
        else:
            entries = [e for e in self._store.values() if e.layer == layer]
        return entries[:limit]
RedisStorage.__init__ method · python · L59-L78 (20 LOC)
src/agent_memory/storage/redis_store.py
    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: Optional[str] = None,
        ttl_seconds: int = 0,
        client: Optional[redis_lib.Redis] = None,  # type: ignore[type-arg]
    ) -> None:
        if client is not None:
            self._client: redis_lib.Redis = client  # type: ignore[type-arg]
        else:
            self._client = redis_lib.Redis(
                host=host,
                port=port,
                db=db,
                password=password,
                decode_responses=True,
            )
        self._ttl = ttl_seconds
RedisStorage.save method · python · L84-L95 (12 LOC)
src/agent_memory/storage/redis_store.py
    def save(self, entry: MemoryEntry) -> None:
        """Persist an entry as a Redis hash with optional TTL."""
        entry_key = _entry_key(entry.memory_id)
        serialised = _serialise(entry)

        pipeline = self._client.pipeline()
        pipeline.hset(entry_key, mapping=serialised)  # type: ignore[arg-type]
        if self._ttl > 0:
            pipeline.expire(entry_key, self._ttl)
        pipeline.sadd(_ALL_KEYS_SET, entry.memory_id)
        pipeline.sadd(_layer_set_key(entry.layer), entry.memory_id)
        pipeline.execute()
Repobility analyzer · published findings · https://repobility.com
RedisStorage.load method · python · L97-L102 (6 LOC)
src/agent_memory/storage/redis_store.py
    def load(self, key: str) -> Optional[MemoryEntry]:
        """Load an entry by memory_id, returning None if not found."""
        data = self._client.hgetall(_entry_key(key))
        if not data:
            return None
        return _deserialise(data)  # type: ignore[arg-type]
RedisStorage.delete method · python · L104-L123 (20 LOC)
src/agent_memory/storage/redis_store.py
    def delete(self, key: str) -> bool:
        """Delete an entry. Returns True if it existed."""
        entry_key = _entry_key(key)
        # Load to find the layer before deleting
        data = self._client.hgetall(entry_key)
        if not data:
            return False
        layer_value = data.get("layer", "")

        pipeline = self._client.pipeline()
        pipeline.delete(entry_key)
        pipeline.srem(_ALL_KEYS_SET, key)
        if layer_value:
            try:
                layer = MemoryLayer(layer_value)
                pipeline.srem(_layer_set_key(layer), key)
            except ValueError:
                pass
        pipeline.execute()
        return True
RedisStorage.search method · python · L125-L142 (18 LOC)
src/agent_memory/storage/redis_store.py
    def search(
        self,
        query: str,
        layer: Optional[MemoryLayer] = None,
        limit: int = 20,
    ) -> Sequence[MemoryEntry]:
        """Substring search across all entries (scans content field)."""
        entries = self.load_all(layer=layer, limit=limit * 10)
        query_lower = query.lower()
        tokens = query_lower.split()
        results: list[MemoryEntry] = []
        for entry in entries:
            content_lower = entry.content.lower()
            if all(token in content_lower for token in tokens):
                results.append(entry)
            if len(results) >= limit:
                break
        return results
RedisStorage.list_keys method · python · L144-L155 (12 LOC)
src/agent_memory/storage/redis_store.py
    def list_keys(
        self,
        layer: Optional[MemoryLayer] = None,
        limit: int = 1000,
    ) -> list[str]:
        """Return stored memory_id values, up to ``limit``."""
        if layer is None:
            all_keys = self._client.smembers(_ALL_KEYS_SET)
        else:
            all_keys = self._client.smembers(_layer_set_key(layer))
        # smembers returns a set — sort for deterministic ordering
        return sorted(all_keys)[:limit]  # type: ignore[return-value]
RedisStorage.clear method · python · L157-L186 (30 LOC)
src/agent_memory/storage/redis_store.py
    def clear(self, layer: Optional[MemoryLayer] = None) -> int:
        """Remove entries. Returns count deleted."""
        if layer is None:
            all_ids: list[str] = list(self._client.smembers(_ALL_KEYS_SET))  # type: ignore[arg-type]
        else:
            all_ids = list(self._client.smembers(_layer_set_key(layer)))  # type: ignore[arg-type]

        if not all_ids:
            return 0

        pipeline = self._client.pipeline()
        for memory_id in all_ids:
            pipeline.delete(_entry_key(memory_id))
        pipeline.execute()

        # Rebuild index sets
        if layer is None:
            self._client.delete(_ALL_KEYS_SET)
            # Also clear all layer sets
            for lyr in MemoryLayer:
                self._client.delete(_layer_set_key(lyr))
        else:
            # Remove cleared IDs from global and layer sets
            pipeline = self._client.pipeline()
            for memory_id in all_ids:
                pipeline.srem(_ALL_KEYS_SET,
RedisStorage.ping method · python · L188-L193 (6 LOC)
src/agent_memory/storage/redis_store.py
    def ping(self) -> bool:
        """Return True if the Redis server is reachable."""
        try:
            return bool(self._client.ping())
        except redis_lib.ConnectionError:
            return False
_serialise function · python · L201-L215 (15 LOC)
src/agent_memory/storage/redis_store.py
def _serialise(entry: MemoryEntry) -> dict[str, str]:
    """Convert a MemoryEntry to a flat dict of strings for Redis hset."""
    return {
        "memory_id": entry.memory_id,
        "content": entry.content,
        "layer": entry.layer.value,
        "importance_score": str(entry.importance_score),
        "freshness_score": str(entry.freshness_score),
        "source": entry.source.value,
        "created_at": entry.created_at.isoformat(),
        "last_accessed": entry.last_accessed.isoformat(),
        "access_count": str(entry.access_count),
        "safety_critical": "1" if entry.safety_critical else "0",
        "metadata": json.dumps(entry.metadata),
    }
_parse_dt function · python · L218-L223 (6 LOC)
src/agent_memory/storage/redis_store.py
def _parse_dt(value: str) -> datetime:
    """Parse an ISO 8601 datetime string, attaching UTC if naive."""
    dt = datetime.fromisoformat(value)
    if dt.tzinfo is None:
        return dt.replace(tzinfo=timezone.utc)
    return dt
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_deserialise function · python · L226-L241 (16 LOC)
src/agent_memory/storage/redis_store.py
def _deserialise(data: dict[str, str]) -> MemoryEntry:
    """Reconstruct a MemoryEntry from a Redis hash dict."""
    metadata: dict[str, str] = json.loads(data.get("metadata", "{}"))
    return MemoryEntry(
        memory_id=data["memory_id"],
        content=data["content"],
        layer=MemoryLayer(data["layer"]),
        importance_score=float(data.get("importance_score", "0.5")),
        freshness_score=float(data.get("freshness_score", "1.0")),
        source=MemorySource(data.get("source", "agent_inference")),
        created_at=_parse_dt(data["created_at"]),
        last_accessed=_parse_dt(data["last_accessed"]),
        access_count=int(data.get("access_count", "0")),
        safety_critical=data.get("safety_critical", "0") == "1",
        metadata=metadata,
    )
SQLiteStorage.__init__ method · python · L131-L139 (9 LOC)
src/agent_memory/storage/sqlite_store.py
    def __init__(
        self,
        db_path: str | Path = ":memory:",
        timeout: float = 5.0,
    ) -> None:
        self._db_path = str(db_path)
        self._timeout = timeout
        self._conn = self._connect()
        self._initialise_schema()
SQLiteStorage.load method · python · L151-L157 (7 LOC)
src/agent_memory/storage/sqlite_store.py
    def load(self, key: str) -> Optional[MemoryEntry]:
        """Load an entry by memory_id, returning None if not found."""
        cursor = self._conn.execute(_SELECT_BY_ID, (key,))
        row = cursor.fetchone()
        if row is None:
            return None
        return _row_to_entry(row)
SQLiteStorage.search method · python · L165-L175 (11 LOC)
src/agent_memory/storage/sqlite_store.py
    def search(
        self,
        query: str,
        layer: Optional[MemoryLayer] = None,
        limit: int = 20,
    ) -> Sequence[MemoryEntry]:
        """Search entries via FTS5, falling back to LIKE if FTS is unavailable."""
        try:
            return self._fts_search(query, layer=layer, limit=limit)
        except sqlite3.OperationalError:
            return self._fallback_search(query, layer=layer, limit=limit)
SQLiteStorage.list_keys method · python · L177-L187 (11 LOC)
src/agent_memory/storage/sqlite_store.py
    def list_keys(
        self,
        layer: Optional[MemoryLayer] = None,
        limit: int = 1000,
    ) -> list[str]:
        """Return up to ``limit`` stored memory_id values, newest first."""
        if layer is None:
            cursor = self._conn.execute(_LIST_KEYS_ALL, (limit,))
        else:
            cursor = self._conn.execute(_LIST_KEYS_LAYER, (layer.value, limit))
        return [row[0] for row in cursor.fetchall()]
SQLiteStorage.clear method · python · L189-L196 (8 LOC)
src/agent_memory/storage/sqlite_store.py
    def clear(self, layer: Optional[MemoryLayer] = None) -> int:
        """Remove entries. Returns number deleted."""
        with self._conn:
            if layer is None:
                cursor = self._conn.execute(_CLEAR_ALL)
            else:
                cursor = self._conn.execute(_CLEAR_LAYER, (layer.value,))
        return cursor.rowcount
SQLiteStorage.count method · python · L198-L205 (8 LOC)
src/agent_memory/storage/sqlite_store.py
    def count(self, layer: Optional[MemoryLayer] = None) -> int:
        """Return the number of stored entries."""
        if layer is None:
            cursor = self._conn.execute(_COUNT_ALL)
        else:
            cursor = self._conn.execute(_COUNT_LAYER, (layer.value,))
        result = cursor.fetchone()
        return result[0] if result else 0
SQLiteStorage._connect method · python · L225-L231 (7 LOC)
src/agent_memory/storage/sqlite_store.py
    def _connect(self) -> sqlite3.Connection:
        conn = sqlite3.connect(self._db_path, timeout=self._timeout)
        conn.row_factory = sqlite3.Row
        # Enable WAL mode for better concurrent reads (no-op for :memory:)
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA foreign_keys=ON")
        return conn
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
SQLiteStorage._fts_search method · python · L238-L250 (13 LOC)
src/agent_memory/storage/sqlite_store.py
    def _fts_search(
        self,
        query: str,
        layer: Optional[MemoryLayer],
        limit: int,
    ) -> list[MemoryEntry]:
        # Escape special FTS5 characters to avoid syntax errors
        safe_query = _escape_fts_query(query)
        if layer is None:
            cursor = self._conn.execute(_FTS_SEARCH_ALL, (safe_query, limit))
        else:
            cursor = self._conn.execute(_FTS_SEARCH_LAYER, (safe_query, layer.value, limit))
        return [_row_to_entry(row) for row in cursor.fetchall()]
SQLiteStorage._fallback_search method · python · L252-L265 (14 LOC)
src/agent_memory/storage/sqlite_store.py
    def _fallback_search(
        self,
        query: str,
        layer: Optional[MemoryLayer],
        limit: int,
    ) -> list[MemoryEntry]:
        like_pattern = f"%{query.lower()}%"
        if layer is None:
            cursor = self._conn.execute(_FALLBACK_SEARCH_ALL, (like_pattern, limit))
        else:
            cursor = self._conn.execute(
                _FALLBACK_SEARCH_LAYER, (like_pattern, layer.value, limit)
            )
        return [_row_to_entry(row) for row in cursor.fetchall()]
_entry_to_row function · python · L273-L284 (12 LOC)
src/agent_memory/storage/sqlite_store.py
def _entry_to_row(entry: MemoryEntry) -> tuple[str, str, str, str, str, str, str, float]:
    """Convert a MemoryEntry to a database row tuple."""
    return (
        entry.memory_id,                         # id
        entry.memory_id,                         # key (same as id for now)
        entry.layer.value,                       # type
        entry.content,                           # content
        json.dumps(entry.metadata),              # metadata
        entry.created_at.isoformat(),            # created_at
        entry.last_accessed.isoformat(),         # updated_at
        entry.importance_score,                  # importance
    )
_row_to_entry function · python · L287-L299 (13 LOC)
src/agent_memory/storage/sqlite_store.py
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
    """Convert a database row to a MemoryEntry."""
    metadata: dict[str, str] = json.loads(row["metadata"] or "{}")
    # Rebuild a MemoryEntry from stored fields; non-persisted fields use defaults
    return MemoryEntry(
        memory_id=row["id"],
        content=row["content"],
        layer=MemoryLayer(row["type"]),
        importance_score=row["importance"],
        metadata=metadata,
        created_at=_parse_dt(row["created_at"]),
        last_accessed=_parse_dt(row["updated_at"]),
    )
MemoryConfig.__post_init__ method · python · L70-L96 (27 LOC)
src/agent_memory/unified/config.py
    def __post_init__(self) -> None:
        if self.working_capacity < 1:
            raise ValueError(f"working_capacity must be >= 1, got {self.working_capacity}")
        if self.episodic_max_items < 0:
            raise ValueError(
                f"episodic_max_items must be >= 0, got {self.episodic_max_items}"
            )
        if self.consolidation_interval < 0:
            raise ValueError(
                f"consolidation_interval must be >= 0, got {self.consolidation_interval}"
            )
        if not 0.0 <= self.importance_threshold <= 1.0:
            raise ValueError(
                f"importance_threshold must be in [0, 1], got {self.importance_threshold}"
            )
        valid_decay = {"linear", "exponential", "step"}
        if self.decay_function not in valid_decay:
            raise ValueError(
                f"decay_function must be one of {sorted(valid_decay)!r}, "
                f"got {self.decay_function!r}"
            )
        valid_backends = 
_build_storage_backend function · python · L21-L41 (21 LOC)
src/agent_memory/unified/memory.py
def _build_storage_backend(config: MemoryConfig) -> StorageBackend:
    """Construct the appropriate StorageBackend from the config."""
    if config.storage_backend == "sqlite":
        from agent_memory.storage.sqlite_store import SQLiteStorage

        return SQLiteStorage(db_path=config.sqlite_path)

    if config.storage_backend == "redis":
        from agent_memory.storage.redis_store import RedisStorage

        return RedisStorage(
            host=config.redis_host,
            port=config.redis_port,
            db=config.redis_db,
            ttl_seconds=config.redis_ttl_seconds,
        )

    # Default: in-memory
    from agent_memory.storage.memory_store import InMemoryStorage

    return InMemoryStorage()
UnifiedMemory.__init__ method · python · L62-L81 (20 LOC)
src/agent_memory/unified/memory.py
    def __init__(
        self,
        config: Optional[MemoryConfig] = None,
        storage: Optional[StorageBackend] = None,
    ) -> None:
        self._config = config or MemoryConfig()
        self._storage = storage or _build_storage_backend(self._config)

        # Cognitive layers
        self._working = WorkingMemory(capacity=self._config.working_capacity)
        self._episodic = EpisodicMemory()
        self._semantic = SemanticMemory()
        self._procedural = ProceduralMemory()

        # Subsystems
        self._importance_scorer = ImportanceScorer()
        self._freshness_scorer = FreshnessScorer()
        self._provenance_tracker = ProvenanceTracker()

        self._store_count = 0
UnifiedMemory.store method · python · L95-L128 (34 LOC)
src/agent_memory/unified/memory.py
    def store(self, entry: MemoryEntry) -> MemoryEntry:
        """Store a memory entry across the appropriate cognitive layer and backend.

        Scoring and provenance tagging are applied according to config before
        the entry is written. Automatic consolidation is triggered when the
        ``consolidation_interval`` threshold is met.

        Parameters
        ----------
        entry:
            The memory entry to store.

        Returns
        -------
        MemoryEntry
            The final (possibly score-updated) entry that was persisted.
        """
        processed = self._apply_processing(entry)

        # Route to the correct cognitive layer
        layer_store = self._layer_store(processed.layer)
        layer_store.store(processed)

        # Persist to durable backend
        self._storage.save(processed)

        self._store_count += 1
        if (
            self._config.consolidation_interval > 0
            and self._store_count % self._config.consoli
Repobility · code-quality intelligence platform · https://repobility.com
UnifiedMemory.recall method · python · L130-L162 (33 LOC)
src/agent_memory/unified/memory.py
    def recall(self, memory_id: str) -> Optional[MemoryEntry]:
        """Retrieve a single entry by memory_id.

        Checks in-process layers first (fast path), then falls back to the
        durable storage backend.  The entry's access metadata is updated.

        Parameters
        ----------
        memory_id:
            The ID of the entry to retrieve.

        Returns
        -------
        MemoryEntry | None
            The entry, or None if not found anywhere.
        """
        # Fast path: in-process layers
        for layer_store in self._all_layer_stores():
            entry = layer_store.retrieve(memory_id)
            if entry is not None:
                touched = entry.touch()
                layer_store.store(touched)
                self._storage.save(touched)
                return touched

        # Slow path: durable storage
        entry = self._storage.load(memory_id)
        if entry is not None:
            touched = entry.touch()
            self._stora
UnifiedMemory.forget method · python · L164-L183 (20 LOC)
src/agent_memory/unified/memory.py
    def forget(self, memory_id: str) -> bool:
        """Remove an entry from all layers and durable storage.

        Parameters
        ----------
        memory_id:
            The ID of the entry to remove.

        Returns
        -------
        bool
            True if the entry was found and removed from at least one location.
        """
        removed = False
        for layer_store in self._all_layer_stores():
            if layer_store.delete(memory_id):
                removed = True
        if self._storage.delete(memory_id):
            removed = True
        return removed
UnifiedMemory.search method · python · L185-L226 (42 LOC)
src/agent_memory/unified/memory.py
    def search(
        self,
        query: str,
        layer: Optional[MemoryLayer] = None,
        limit: int = 20,
    ) -> Sequence[MemoryEntry]:
        """Search for entries matching a query string.

        Searches the durable storage backend (which may support FTS), then
        de-duplicates against entries returned from in-process layers for
        completeness.

        Parameters
        ----------
        query:
            Free-text query string.
        layer:
            Optional layer filter.
        limit:
            Maximum number of results.

        Returns
        -------
        Sequence[MemoryEntry]
            Matching entries, ordered by backend relevance.
        """
        backend_results = list(self._storage.search(query, layer=layer, limit=limit))
        seen_ids = {e.memory_id for e in backend_results}

        # Include any in-process matches not yet in storage
        tokens = query.lower().split()
        for layer_store in self._all_layer_store
UnifiedMemory.consolidate method · python · L228-L253 (26 LOC)
src/agent_memory/unified/memory.py
    def consolidate(self) -> dict[str, int]:
        """Run a garbage-collection pass across all in-process layers.

        Removes entries whose effective importance (importance * decay) falls
        below the configured threshold.  Safety-critical entries are never
        removed.

        Returns
        -------
        dict[str, int]
            Summary with keys ``"collected"`` and ``"retained"``.
        """
        collected = 0
        for layer_store in self._all_layer_stores():
            gc = MemoryGarbageCollector(
                store=layer_store,
                threshold=self._config.importance_threshold,
            )
            evicted_ids = gc.collect(dry_run=False)
            collected += len(evicted_ids)
            # Sync deletions to durable storage
            for memory_id in evicted_ids:
                self._storage.delete(memory_id)

        total = sum(s.count() for s in self._all_layer_stores())
        return {"collected": collected, "retained": tot
UnifiedMemory.stats method · python · L255-L283 (29 LOC)
src/agent_memory/unified/memory.py
    def stats(self) -> dict[str, object]:
        """Return aggregate statistics about the current memory state.

        Returns
        -------
        dict[str, object]
            Contains total counts per layer, overall totals, and backend type.
        """
        layer_counts: dict[str, int] = {}
        total_in_process = 0
        for layer in MemoryLayer:
            layer_store = self._layer_store(layer)
            layer_counts[layer.value] = layer_store.count()
            total_in_process += layer_counts[layer.value]

        storage_total = self._storage.count()

        return {
            "layer_counts": layer_counts,
            "total_in_process": total_in_process,
            "total_in_storage": storage_total,
            "store_calls": self._store_count,
            "working_capacity": self._config.working_capacity,
            "working_utilisation": round(
                layer_counts.get("working", 0) / max(1, self._config.working_capacity), 3
            ),
   
UnifiedMemory._layer_store method · python · L289-L297 (9 LOC)
src/agent_memory/unified/memory.py
    def _layer_store(self, layer: MemoryLayer) -> MemoryStore:
        """Return the in-process MemoryStore for the given layer."""
        mapping: dict[MemoryLayer, MemoryStore] = {
            MemoryLayer.WORKING: self._working,
            MemoryLayer.EPISODIC: self._episodic,
            MemoryLayer.SEMANTIC: self._semantic,
            MemoryLayer.PROCEDURAL: self._procedural,
        }
        return mapping[layer]
UnifiedMemory._apply_processing method · python · L306-L315 (10 LOC)
src/agent_memory/unified/memory.py
    def _apply_processing(self, entry: MemoryEntry) -> MemoryEntry:
        """Apply scoring and provenance tagging in the configured order."""
        result = entry
        if self._config.auto_score_importance:
            result = self._importance_scorer.score_and_update(result)
        if self._config.auto_score_freshness:
            result = self._freshness_scorer.score_and_update(result)
        if self._config.auto_tag_provenance:
            result = self._provenance_tracker.tag(result)
        return result
‹ prevpage 6 / 6