Function bodies 288 total
InMemoryStorage.delete method · python · L37-L42 (6 LOC)src/agent_memory/storage/memory_store.py
def delete(self, key: str) -> bool:
"""Remove an entry by memory_id. Returns True if it existed."""
if key not in self._store:
return False
del self._store[key]
return TrueInMemoryStorage.search method · python · L44-L65 (22 LOC)src/agent_memory/storage/memory_store.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Case-insensitive substring search across all stored content.
All query tokens (split on whitespace) must be present in the
content for an entry to match.
"""
tokens = query.lower().split()
results: list[MemoryEntry] = []
for entry in self._store.values():
if layer is not None and entry.layer != layer:
continue
content_lower = entry.content.lower()
if all(token in content_lower for token in tokens):
results.append(entry)
if len(results) >= limit:
break
return resultsInMemoryStorage.list_keys method · python · L67-L77 (11 LOC)src/agent_memory/storage/memory_store.py
def list_keys(
self,
layer: Optional[MemoryLayer] = None,
limit: int = 1000,
) -> list[str]:
"""Return stored memory_id values, optionally filtered by layer."""
if layer is None:
keys = list(self._store.keys())
else:
keys = [k for k, e in self._store.items() if e.layer == layer]
return keys[:limit]InMemoryStorage.clear method · python · L79-L88 (10 LOC)src/agent_memory/storage/memory_store.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
"""Remove all entries, optionally filtered by layer."""
if layer is None:
count = len(self._store)
self._store.clear()
return count
to_delete = [k for k, e in self._store.items() if e.layer == layer]
for key in to_delete:
del self._store[key]
return len(to_delete)InMemoryStorage.load_all method · python · L96-L106 (11 LOC)src/agent_memory/storage/memory_store.py
def load_all(
self,
layer: Optional[MemoryLayer] = None,
limit: int = 1000,
) -> list[MemoryEntry]:
"""Return all stored entries, optionally filtered by layer."""
if layer is None:
entries = list(self._store.values())
else:
entries = [e for e in self._store.values() if e.layer == layer]
return entries[:limit]RedisStorage.__init__ method · python · L59-L78 (20 LOC)src/agent_memory/storage/redis_store.py
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
ttl_seconds: int = 0,
client: Optional[redis_lib.Redis] = None, # type: ignore[type-arg]
) -> None:
if client is not None:
self._client: redis_lib.Redis = client # type: ignore[type-arg]
else:
self._client = redis_lib.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
)
self._ttl = ttl_secondsRedisStorage.save method · python · L84-L95 (12 LOC)src/agent_memory/storage/redis_store.py
def save(self, entry: MemoryEntry) -> None:
"""Persist an entry as a Redis hash with optional TTL."""
entry_key = _entry_key(entry.memory_id)
serialised = _serialise(entry)
pipeline = self._client.pipeline()
pipeline.hset(entry_key, mapping=serialised) # type: ignore[arg-type]
if self._ttl > 0:
pipeline.expire(entry_key, self._ttl)
pipeline.sadd(_ALL_KEYS_SET, entry.memory_id)
pipeline.sadd(_layer_set_key(entry.layer), entry.memory_id)
pipeline.execute()Repobility analyzer · published findings · https://repobility.com
RedisStorage.load method · python · L97-L102 (6 LOC)src/agent_memory/storage/redis_store.py
def load(self, key: str) -> Optional[MemoryEntry]:
"""Load an entry by memory_id, returning None if not found."""
data = self._client.hgetall(_entry_key(key))
if not data:
return None
return _deserialise(data) # type: ignore[arg-type]RedisStorage.delete method · python · L104-L123 (20 LOC)src/agent_memory/storage/redis_store.py
def delete(self, key: str) -> bool:
"""Delete an entry. Returns True if it existed."""
entry_key = _entry_key(key)
# Load to find the layer before deleting
data = self._client.hgetall(entry_key)
if not data:
return False
layer_value = data.get("layer", "")
pipeline = self._client.pipeline()
pipeline.delete(entry_key)
pipeline.srem(_ALL_KEYS_SET, key)
if layer_value:
try:
layer = MemoryLayer(layer_value)
pipeline.srem(_layer_set_key(layer), key)
except ValueError:
pass
pipeline.execute()
return TrueRedisStorage.search method · python · L125-L142 (18 LOC)src/agent_memory/storage/redis_store.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Substring search across all entries (scans content field)."""
entries = self.load_all(layer=layer, limit=limit * 10)
query_lower = query.lower()
tokens = query_lower.split()
results: list[MemoryEntry] = []
for entry in entries:
content_lower = entry.content.lower()
if all(token in content_lower for token in tokens):
results.append(entry)
if len(results) >= limit:
break
return resultsRedisStorage.list_keys method · python · L144-L155 (12 LOC)src/agent_memory/storage/redis_store.py
def list_keys(
self,
layer: Optional[MemoryLayer] = None,
limit: int = 1000,
) -> list[str]:
"""Return stored memory_id values, up to ``limit``."""
if layer is None:
all_keys = self._client.smembers(_ALL_KEYS_SET)
else:
all_keys = self._client.smembers(_layer_set_key(layer))
# smembers returns a set — sort for deterministic ordering
return sorted(all_keys)[:limit] # type: ignore[return-value]RedisStorage.clear method · python · L157-L186 (30 LOC)src/agent_memory/storage/redis_store.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
"""Remove entries. Returns count deleted."""
if layer is None:
all_ids: list[str] = list(self._client.smembers(_ALL_KEYS_SET)) # type: ignore[arg-type]
else:
all_ids = list(self._client.smembers(_layer_set_key(layer))) # type: ignore[arg-type]
if not all_ids:
return 0
pipeline = self._client.pipeline()
for memory_id in all_ids:
pipeline.delete(_entry_key(memory_id))
pipeline.execute()
# Rebuild index sets
if layer is None:
self._client.delete(_ALL_KEYS_SET)
# Also clear all layer sets
for lyr in MemoryLayer:
self._client.delete(_layer_set_key(lyr))
else:
# Remove cleared IDs from global and layer sets
pipeline = self._client.pipeline()
for memory_id in all_ids:
pipeline.srem(_ALL_KEYS_SET,RedisStorage.ping method · python · L188-L193 (6 LOC)src/agent_memory/storage/redis_store.py
def ping(self) -> bool:
"""Return True if the Redis server is reachable."""
try:
return bool(self._client.ping())
except redis_lib.ConnectionError:
return False_serialise function · python · L201-L215 (15 LOC)src/agent_memory/storage/redis_store.py
def _serialise(entry: MemoryEntry) -> dict[str, str]:
"""Convert a MemoryEntry to a flat dict of strings for Redis hset."""
return {
"memory_id": entry.memory_id,
"content": entry.content,
"layer": entry.layer.value,
"importance_score": str(entry.importance_score),
"freshness_score": str(entry.freshness_score),
"source": entry.source.value,
"created_at": entry.created_at.isoformat(),
"last_accessed": entry.last_accessed.isoformat(),
"access_count": str(entry.access_count),
"safety_critical": "1" if entry.safety_critical else "0",
"metadata": json.dumps(entry.metadata),
}_parse_dt function · python · L218-L223 (6 LOC)src/agent_memory/storage/redis_store.py
def _parse_dt(value: str) -> datetime:
"""Parse an ISO 8601 datetime string, attaching UTC if naive."""
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dtProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_deserialise function · python · L226-L241 (16 LOC)src/agent_memory/storage/redis_store.py
def _deserialise(data: dict[str, str]) -> MemoryEntry:
"""Reconstruct a MemoryEntry from a Redis hash dict."""
metadata: dict[str, str] = json.loads(data.get("metadata", "{}"))
return MemoryEntry(
memory_id=data["memory_id"],
content=data["content"],
layer=MemoryLayer(data["layer"]),
importance_score=float(data.get("importance_score", "0.5")),
freshness_score=float(data.get("freshness_score", "1.0")),
source=MemorySource(data.get("source", "agent_inference")),
created_at=_parse_dt(data["created_at"]),
last_accessed=_parse_dt(data["last_accessed"]),
access_count=int(data.get("access_count", "0")),
safety_critical=data.get("safety_critical", "0") == "1",
metadata=metadata,
)SQLiteStorage.__init__ method · python · L131-L139 (9 LOC)src/agent_memory/storage/sqlite_store.py
def __init__(
self,
db_path: str | Path = ":memory:",
timeout: float = 5.0,
) -> None:
self._db_path = str(db_path)
self._timeout = timeout
self._conn = self._connect()
self._initialise_schema()SQLiteStorage.load method · python · L151-L157 (7 LOC)src/agent_memory/storage/sqlite_store.py
def load(self, key: str) -> Optional[MemoryEntry]:
"""Load an entry by memory_id, returning None if not found."""
cursor = self._conn.execute(_SELECT_BY_ID, (key,))
row = cursor.fetchone()
if row is None:
return None
return _row_to_entry(row)SQLiteStorage.search method · python · L165-L175 (11 LOC)src/agent_memory/storage/sqlite_store.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Search entries via FTS5, falling back to LIKE if FTS is unavailable."""
try:
return self._fts_search(query, layer=layer, limit=limit)
except sqlite3.OperationalError:
return self._fallback_search(query, layer=layer, limit=limit)SQLiteStorage.list_keys method · python · L177-L187 (11 LOC)src/agent_memory/storage/sqlite_store.py
def list_keys(
self,
layer: Optional[MemoryLayer] = None,
limit: int = 1000,
) -> list[str]:
"""Return up to ``limit`` stored memory_id values, newest first."""
if layer is None:
cursor = self._conn.execute(_LIST_KEYS_ALL, (limit,))
else:
cursor = self._conn.execute(_LIST_KEYS_LAYER, (layer.value, limit))
return [row[0] for row in cursor.fetchall()]SQLiteStorage.clear method · python · L189-L196 (8 LOC)src/agent_memory/storage/sqlite_store.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
"""Remove entries. Returns number deleted."""
with self._conn:
if layer is None:
cursor = self._conn.execute(_CLEAR_ALL)
else:
cursor = self._conn.execute(_CLEAR_LAYER, (layer.value,))
return cursor.rowcountSQLiteStorage.count method · python · L198-L205 (8 LOC)src/agent_memory/storage/sqlite_store.py
def count(self, layer: Optional[MemoryLayer] = None) -> int:
"""Return the number of stored entries."""
if layer is None:
cursor = self._conn.execute(_COUNT_ALL)
else:
cursor = self._conn.execute(_COUNT_LAYER, (layer.value,))
result = cursor.fetchone()
return result[0] if result else 0SQLiteStorage._connect method · python · L225-L231 (7 LOC)src/agent_memory/storage/sqlite_store.py
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path, timeout=self._timeout)
conn.row_factory = sqlite3.Row
# Enable WAL mode for better concurrent reads (no-op for :memory:)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return connHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
SQLiteStorage._fts_search method · python · L238-L250 (13 LOC)src/agent_memory/storage/sqlite_store.py
def _fts_search(
self,
query: str,
layer: Optional[MemoryLayer],
limit: int,
) -> list[MemoryEntry]:
# Escape special FTS5 characters to avoid syntax errors
safe_query = _escape_fts_query(query)
if layer is None:
cursor = self._conn.execute(_FTS_SEARCH_ALL, (safe_query, limit))
else:
cursor = self._conn.execute(_FTS_SEARCH_LAYER, (safe_query, layer.value, limit))
return [_row_to_entry(row) for row in cursor.fetchall()]SQLiteStorage._fallback_search method · python · L252-L265 (14 LOC)src/agent_memory/storage/sqlite_store.py
def _fallback_search(
self,
query: str,
layer: Optional[MemoryLayer],
limit: int,
) -> list[MemoryEntry]:
like_pattern = f"%{query.lower()}%"
if layer is None:
cursor = self._conn.execute(_FALLBACK_SEARCH_ALL, (like_pattern, limit))
else:
cursor = self._conn.execute(
_FALLBACK_SEARCH_LAYER, (like_pattern, layer.value, limit)
)
return [_row_to_entry(row) for row in cursor.fetchall()]_entry_to_row function · python · L273-L284 (12 LOC)src/agent_memory/storage/sqlite_store.py
def _entry_to_row(entry: MemoryEntry) -> tuple[str, str, str, str, str, str, str, float]:
"""Convert a MemoryEntry to a database row tuple."""
return (
entry.memory_id, # id
entry.memory_id, # key (same as id for now)
entry.layer.value, # type
entry.content, # content
json.dumps(entry.metadata), # metadata
entry.created_at.isoformat(), # created_at
entry.last_accessed.isoformat(), # updated_at
entry.importance_score, # importance
)_row_to_entry function · python · L287-L299 (13 LOC)src/agent_memory/storage/sqlite_store.py
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
metadata: dict[str, str] = json.loads(row["metadata"] or "{}")
# Rebuild a MemoryEntry from stored fields; non-persisted fields use defaults
return MemoryEntry(
memory_id=row["id"],
content=row["content"],
layer=MemoryLayer(row["type"]),
importance_score=row["importance"],
metadata=metadata,
created_at=_parse_dt(row["created_at"]),
last_accessed=_parse_dt(row["updated_at"]),
)MemoryConfig.__post_init__ method · python · L70-L96 (27 LOC)src/agent_memory/unified/config.py
def __post_init__(self) -> None:
if self.working_capacity < 1:
raise ValueError(f"working_capacity must be >= 1, got {self.working_capacity}")
if self.episodic_max_items < 0:
raise ValueError(
f"episodic_max_items must be >= 0, got {self.episodic_max_items}"
)
if self.consolidation_interval < 0:
raise ValueError(
f"consolidation_interval must be >= 0, got {self.consolidation_interval}"
)
if not 0.0 <= self.importance_threshold <= 1.0:
raise ValueError(
f"importance_threshold must be in [0, 1], got {self.importance_threshold}"
)
valid_decay = {"linear", "exponential", "step"}
if self.decay_function not in valid_decay:
raise ValueError(
f"decay_function must be one of {sorted(valid_decay)!r}, "
f"got {self.decay_function!r}"
)
valid_backends = _build_storage_backend function · python · L21-L41 (21 LOC)src/agent_memory/unified/memory.py
def _build_storage_backend(config: MemoryConfig) -> StorageBackend:
"""Construct the appropriate StorageBackend from the config."""
if config.storage_backend == "sqlite":
from agent_memory.storage.sqlite_store import SQLiteStorage
return SQLiteStorage(db_path=config.sqlite_path)
if config.storage_backend == "redis":
from agent_memory.storage.redis_store import RedisStorage
return RedisStorage(
host=config.redis_host,
port=config.redis_port,
db=config.redis_db,
ttl_seconds=config.redis_ttl_seconds,
)
# Default: in-memory
from agent_memory.storage.memory_store import InMemoryStorage
return InMemoryStorage()UnifiedMemory.__init__ method · python · L62-L81 (20 LOC)src/agent_memory/unified/memory.py
def __init__(
self,
config: Optional[MemoryConfig] = None,
storage: Optional[StorageBackend] = None,
) -> None:
self._config = config or MemoryConfig()
self._storage = storage or _build_storage_backend(self._config)
# Cognitive layers
self._working = WorkingMemory(capacity=self._config.working_capacity)
self._episodic = EpisodicMemory()
self._semantic = SemanticMemory()
self._procedural = ProceduralMemory()
# Subsystems
self._importance_scorer = ImportanceScorer()
self._freshness_scorer = FreshnessScorer()
self._provenance_tracker = ProvenanceTracker()
self._store_count = 0UnifiedMemory.store method · python · L95-L128 (34 LOC)src/agent_memory/unified/memory.py
def store(self, entry: MemoryEntry) -> MemoryEntry:
"""Store a memory entry across the appropriate cognitive layer and backend.
Scoring and provenance tagging are applied according to config before
the entry is written. Automatic consolidation is triggered when the
``consolidation_interval`` threshold is met.
Parameters
----------
entry:
The memory entry to store.
Returns
-------
MemoryEntry
The final (possibly score-updated) entry that was persisted.
"""
processed = self._apply_processing(entry)
# Route to the correct cognitive layer
layer_store = self._layer_store(processed.layer)
layer_store.store(processed)
# Persist to durable backend
self._storage.save(processed)
self._store_count += 1
if (
self._config.consolidation_interval > 0
and self._store_count % self._config.consoliRepobility · code-quality intelligence platform · https://repobility.com
UnifiedMemory.recall method · python · L130-L162 (33 LOC)src/agent_memory/unified/memory.py
def recall(self, memory_id: str) -> Optional[MemoryEntry]:
"""Retrieve a single entry by memory_id.
Checks in-process layers first (fast path), then falls back to the
durable storage backend. The entry's access metadata is updated.
Parameters
----------
memory_id:
The ID of the entry to retrieve.
Returns
-------
MemoryEntry | None
The entry, or None if not found anywhere.
"""
# Fast path: in-process layers
for layer_store in self._all_layer_stores():
entry = layer_store.retrieve(memory_id)
if entry is not None:
touched = entry.touch()
layer_store.store(touched)
self._storage.save(touched)
return touched
# Slow path: durable storage
entry = self._storage.load(memory_id)
if entry is not None:
touched = entry.touch()
self._storaUnifiedMemory.forget method · python · L164-L183 (20 LOC)src/agent_memory/unified/memory.py
def forget(self, memory_id: str) -> bool:
"""Remove an entry from all layers and durable storage.
Parameters
----------
memory_id:
The ID of the entry to remove.
Returns
-------
bool
True if the entry was found and removed from at least one location.
"""
removed = False
for layer_store in self._all_layer_stores():
if layer_store.delete(memory_id):
removed = True
if self._storage.delete(memory_id):
removed = True
return removedUnifiedMemory.search method · python · L185-L226 (42 LOC)src/agent_memory/unified/memory.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Search for entries matching a query string.
Searches the durable storage backend (which may support FTS), then
de-duplicates against entries returned from in-process layers for
completeness.
Parameters
----------
query:
Free-text query string.
layer:
Optional layer filter.
limit:
Maximum number of results.
Returns
-------
Sequence[MemoryEntry]
Matching entries, ordered by backend relevance.
"""
backend_results = list(self._storage.search(query, layer=layer, limit=limit))
seen_ids = {e.memory_id for e in backend_results}
# Include any in-process matches not yet in storage
tokens = query.lower().split()
for layer_store in self._all_layer_storeUnifiedMemory.consolidate method · python · L228-L253 (26 LOC)src/agent_memory/unified/memory.py
def consolidate(self) -> dict[str, int]:
"""Run a garbage-collection pass across all in-process layers.
Removes entries whose effective importance (importance * decay) falls
below the configured threshold. Safety-critical entries are never
removed.
Returns
-------
dict[str, int]
Summary with keys ``"collected"`` and ``"retained"``.
"""
collected = 0
for layer_store in self._all_layer_stores():
gc = MemoryGarbageCollector(
store=layer_store,
threshold=self._config.importance_threshold,
)
evicted_ids = gc.collect(dry_run=False)
collected += len(evicted_ids)
# Sync deletions to durable storage
for memory_id in evicted_ids:
self._storage.delete(memory_id)
total = sum(s.count() for s in self._all_layer_stores())
return {"collected": collected, "retained": totUnifiedMemory.stats method · python · L255-L283 (29 LOC)src/agent_memory/unified/memory.py
def stats(self) -> dict[str, object]:
"""Return aggregate statistics about the current memory state.
Returns
-------
dict[str, object]
Contains total counts per layer, overall totals, and backend type.
"""
layer_counts: dict[str, int] = {}
total_in_process = 0
for layer in MemoryLayer:
layer_store = self._layer_store(layer)
layer_counts[layer.value] = layer_store.count()
total_in_process += layer_counts[layer.value]
storage_total = self._storage.count()
return {
"layer_counts": layer_counts,
"total_in_process": total_in_process,
"total_in_storage": storage_total,
"store_calls": self._store_count,
"working_capacity": self._config.working_capacity,
"working_utilisation": round(
layer_counts.get("working", 0) / max(1, self._config.working_capacity), 3
),
UnifiedMemory._layer_store method · python · L289-L297 (9 LOC)src/agent_memory/unified/memory.py
def _layer_store(self, layer: MemoryLayer) -> MemoryStore:
"""Return the in-process MemoryStore for the given layer."""
mapping: dict[MemoryLayer, MemoryStore] = {
MemoryLayer.WORKING: self._working,
MemoryLayer.EPISODIC: self._episodic,
MemoryLayer.SEMANTIC: self._semantic,
MemoryLayer.PROCEDURAL: self._procedural,
}
return mapping[layer]UnifiedMemory._apply_processing method · python · L306-L315 (10 LOC)src/agent_memory/unified/memory.py
def _apply_processing(self, entry: MemoryEntry) -> MemoryEntry:
"""Apply scoring and provenance tagging in the configured order."""
result = entry
if self._config.auto_score_importance:
result = self._importance_scorer.score_and_update(result)
if self._config.auto_score_freshness:
result = self._freshness_scorer.score_and_update(result)
if self._config.auto_tag_provenance:
result = self._provenance_tracker.tag(result)
return result‹ prevpage 6 / 6