Function bodies 288 total
Mem0EnhancedMemory.search_hybrid method · python · L285-L331 (47 LOC)src/agent_memory/integrations/mem0_adapter.py
def search_hybrid(
self,
query: str,
top_k: int = 10,
filters: Optional[dict[str, Any]] = None,
) -> list[MemoryResult]:
"""Search Mem0 and re-rank results using a hybrid AumOS score.
Retrieves up to ``top_k * 2`` raw results from Mem0, then computes
a hybrid score for each result by combining Mem0's raw similarity
score with a freshness heuristic derived from the result's metadata.
Returns the top ``top_k`` results sorted by hybrid score (descending).
Parameters
----------
query:
The semantic query string.
top_k:
Maximum number of results to return after re-ranking.
filters:
Optional Mem0 filter dict passed to the search call.
Returns
-------
list[MemoryResult]
Re-ranked memory results, highest hybrid score first.
"""
if top_k < 1:
raise ValueError(f"top_k must be >Mem0EnhancedMemory.get_all method · python · L333-L352 (20 LOC)src/agent_memory/integrations/mem0_adapter.py
def get_all(self, limit: int = 100) -> list[MemoryResult]:
"""Retrieve all memories for the current user.
Parameters
----------
limit:
Maximum number of memories to fetch. Default: 100.
Returns
-------
list[MemoryResult]
All stored memories sorted by hybrid score descending.
"""
raw_results: list[Any] = self._client.get_all(
user_id=self._user_id,
limit=limit,
)
scored = [self._to_memory_result(raw) for raw in raw_results]
scored.sort(key=lambda r: r.hybrid_score, reverse=True)
return scoredMem0EnhancedMemory.delete method · python · L354-L373 (20 LOC)src/agent_memory/integrations/mem0_adapter.py
def delete(self, memory_id: str) -> bool:
"""Delete a memory entry by its Mem0 identifier.
Parameters
----------
memory_id:
The Mem0 memory identifier.
Returns
-------
bool
True when deletion succeeded; False on error.
"""
try:
self._client.delete(memory_id=memory_id)
logger.debug("Deleted memory %r", memory_id)
return True
except Exception as exc: # noqa: BLE001
logger.warning("Failed to delete memory %r: %s", memory_id, exc)
return FalseMem0EnhancedMemory._check_contradictions method · python · L379-L426 (48 LOC)src/agent_memory/integrations/mem0_adapter.py
def _check_contradictions(self, candidate_text: str) -> list[ContradictionWarning]:
"""Search for existing memories and check for contradictions."""
try:
similar_raw = self._client.search(
query=candidate_text,
user_id=self._user_id,
limit=20,
)
except Exception as exc: # noqa: BLE001
logger.warning("Contradiction pre-check search failed: %s", exc)
return []
if not similar_raw:
return []
existing_entries = [self._raw_to_lightweight(raw) for raw in similar_raw]
warnings: list[ContradictionWarning] = []
for existing in existing_entries:
similarity = _pairwise_cosine(candidate_text, existing.content)
if similarity < self._contradiction_threshold:
continue
# Check for negation conflict (one sentence negates the other)
candidate_has_neg = _has_negation(candiMem0EnhancedMemory._raw_to_lightweight method · python · L428-L441 (14 LOC)src/agent_memory/integrations/mem0_adapter.py
def _raw_to_lightweight(self, raw: Any) -> LightweightEntry:
"""Convert a Mem0 result dict to a LightweightEntry for contradiction checking."""
if isinstance(raw, dict):
memory_id = str(raw.get("id", "") or "")
content = str(raw.get("memory", "") or raw.get("text", "") or "")
else:
memory_id = str(getattr(raw, "id", "") or "")
content = str(getattr(raw, "memory", "") or getattr(raw, "text", "") or "")
return LightweightEntry(
id=memory_id or "unknown",
content=content,
timestamp=datetime.now(timezone.utc),
)Mem0EnhancedMemory._to_memory_result method · python · L443-L465 (23 LOC)src/agent_memory/integrations/mem0_adapter.py
def _to_memory_result(self, raw: Any) -> MemoryResult:
"""Convert a Mem0 search result to a MemoryResult with hybrid scoring."""
if isinstance(raw, dict):
memory_id = str(raw.get("id", "") or "")
content = str(raw.get("memory", "") or raw.get("text", "") or "")
raw_score = float(raw.get("score", 0.5) or 0.5)
metadata: dict[str, Any] = dict(raw.get("metadata", {}) or {})
else:
memory_id = str(getattr(raw, "id", "") or "")
content = str(getattr(raw, "memory", "") or getattr(raw, "text", "") or "")
raw_score = float(getattr(raw, "score", 0.5) or 0.5)
metadata = dict(getattr(raw, "metadata", {}) or {})
hybrid_score = self._compute_hybrid_score(raw_score=raw_score, metadata=metadata)
return MemoryResult(
memory_id=memory_id,
content=content,
raw_score=raw_score,
hybrid_score=hybrid_score,
metMem0EnhancedMemory._compute_hybrid_score method · python · L467-L498 (32 LOC)src/agent_memory/integrations/mem0_adapter.py
def _compute_hybrid_score(
self,
raw_score: float,
metadata: dict[str, Any],
) -> float:
"""Compute a hybrid relevance score combining Mem0 similarity with freshness.
The hybrid score blends the Mem0 raw similarity score with a freshness
signal. Freshness is estimated from the ``created_at`` metadata field
when present; otherwise a neutral score of 0.5 is used.
Score formula::
hybrid = (1 - freshness_weight) * raw_score
+ freshness_weight * freshness_estimate
Parameters
----------
raw_score:
Mem0 cosine similarity score for this result.
metadata:
Metadata dict from the Mem0 result.
Returns
-------
float
Hybrid score in [0.0, 1.0].
"""
freshness = self._estimate_freshness(metadata)
similarity_weight = 1.0 - self._freshness_weight
hybrid = (similarity_weight *Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
Mem0EnhancedMemory._estimate_freshness method · python · L500-L526 (27 LOC)src/agent_memory/integrations/mem0_adapter.py
def _estimate_freshness(self, metadata: dict[str, Any]) -> float:
"""Estimate a freshness score from result metadata.
Returns a value in [0.0, 1.0] where 1.0 = created in the last hour
and 0.0 = older than 30 days.
"""
created_at_raw = metadata.get("created_at") or metadata.get("timestamp")
if created_at_raw is None:
return 0.5 # unknown freshness — neutral
try:
if isinstance(created_at_raw, datetime):
created_at = created_at_raw
else:
created_at = datetime.fromisoformat(str(created_at_raw))
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
age_seconds = max(0.0, (now - created_at).total_seconds())
thirty_days_seconds = 30 * 24 * 3600
# Linear decay from 1.0 (age=0) to 0.0 (age >= 30 days)
return max(0.Mem0EnhancedMemory.__repr__ method · python · L528-L533 (6 LOC)src/agent_memory/integrations/mem0_adapter.py
def __repr__(self) -> str:
return (
f"Mem0EnhancedMemory("
f"user_id={self._user_id!r}, "
f"block_on_contradiction={self._block_on_contradiction!r})"
)MemoryStore.search method · python · L38-L51 (14 LOC)src/agent_memory/memory/base.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Full-text search; default implementation does substring matching."""
tokens = query.lower().split()
results: list[MemoryEntry] = []
for entry in self.all(layer=layer):
content_lower = entry.content.lower()
if all(token in content_lower for token in tokens):
results.append(entry)
return results[:limit]EpisodicMemory.clear method · python · L49-L57 (9 LOC)src/agent_memory/memory/episodic.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
if layer is None:
count = len(self._entries)
self._entries.clear()
return count
to_delete = [mid for mid, e in self._entries.items() if e.layer == layer]
for mid in to_delete:
del self._entries[mid]
return len(to_delete)EpisodicMemory.query_range method · python · L63-L78 (16 LOC)src/agent_memory/memory/episodic.py
def query_range(
self,
start: datetime,
end: datetime,
layer: Optional[MemoryLayer] = None,
) -> Sequence[MemoryEntry]:
"""Return entries whose ``created_at`` falls within [start, end]."""
# Normalise to UTC-aware for safe comparison
start_utc = _to_utc(start)
end_utc = _to_utc(end)
results = [
e
for e in self.all(layer=layer)
if start_utc <= _to_utc(e.created_at) <= end_utc
]
return resultsProceduralMemory.delete method · python · L75-L83 (9 LOC)src/agent_memory/memory/procedural.py
def delete(self, memory_id: str) -> bool:
if memory_id not in self._entries:
return False
entry = self._entries.pop(memory_id)
proc_name = entry.metadata.get("procedure_name", "")
if proc_name:
self._name_index.pop(proc_name, None)
self._procedures.pop(proc_name, None)
return TrueProceduralMemory.clear method · python · L85-L95 (11 LOC)src/agent_memory/memory/procedural.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
if layer is None:
count = len(self._entries)
self._entries.clear()
self._procedures.clear()
self._name_index.clear()
return count
to_delete = [mid for mid, e in self._entries.items() if e.layer == layer]
for mid in to_delete:
self.delete(mid)
return len(to_delete)ProceduralMemory.search method · python · L97-L112 (16 LOC)src/agent_memory/memory/procedural.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""Keyword-based search across procedure names and descriptions."""
query_words = _keywords(query)
results: list[tuple[int, MemoryEntry]] = []
for entry in self.all(layer=layer):
entry_words = _keywords(entry.content)
overlap = len(query_words & entry_words)
if overlap > 0:
results.append((overlap, entry))
results.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in results[:limit]]Repobility analyzer · published findings · https://repobility.com
ProceduralMemory.store_procedure method · python · L118-L126 (9 LOC)src/agent_memory/memory/procedural.py
def store_procedure(
self,
procedure: Procedure,
entry: MemoryEntry,
) -> None:
"""Register a typed Procedure alongside its MemoryEntry."""
self.store(entry)
self._procedures[procedure.name] = procedure
self._name_index[procedure.name] = entry.memory_idProceduralMemory.retrieve_entry_by_name method · python · L132-L137 (6 LOC)src/agent_memory/memory/procedural.py
def retrieve_entry_by_name(self, name: str) -> Optional[MemoryEntry]:
"""Return the MemoryEntry for the named procedure."""
mid = self._name_index.get(name)
if mid is None:
return None
return self._entries.get(mid)ProceduralMemory.search_procedures method · python · L139-L149 (11 LOC)src/agent_memory/memory/procedural.py
def search_procedures(self, query: str) -> list[Procedure]:
"""Return procedures whose name or description matches query keywords."""
query_words = _keywords(query)
matches: list[tuple[int, Procedure]] = []
for proc in self._procedures.values():
proc_words = _keywords(proc.name) | _keywords(proc.description)
overlap = len(query_words & proc_words)
if overlap > 0:
matches.append((overlap, proc))
matches.sort(key=lambda x: x[0], reverse=True)
return [p for _, p in matches]SemanticMemory.__init__ method · python · L41-L46 (6 LOC)src/agent_memory/memory/semantic.py
def __init__(self) -> None:
self._entries: dict[str, MemoryEntry] = {}
# Inverted index: token -> set of memory_ids
self._inverted: dict[str, set[str]] = defaultdict(set)
# Entity index: entity_name -> set of memory_ids
self._entity_index: dict[str, set[str]] = defaultdict(set)SemanticMemory.store method · python · L52-L57 (6 LOC)src/agent_memory/memory/semantic.py
def store(self, entry: MemoryEntry) -> None:
# Remove old index entries if updating
if entry.memory_id in self._entries:
self._remove_from_index(entry.memory_id)
self._entries[entry.memory_id] = entry
self._add_to_index(entry)SemanticMemory.delete method · python · L73-L78 (6 LOC)src/agent_memory/memory/semantic.py
def delete(self, memory_id: str) -> bool:
if memory_id not in self._entries:
return False
self._remove_from_index(memory_id)
del self._entries[memory_id]
return TrueSemanticMemory.clear method · python · L80-L90 (11 LOC)src/agent_memory/memory/semantic.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
if layer is None:
count = len(self._entries)
self._entries.clear()
self._inverted.clear()
self._entity_index.clear()
return count
to_delete = [mid for mid, e in self._entries.items() if e.layer == layer]
for mid in to_delete:
self.delete(mid)
return len(to_delete)SemanticMemory.search method · python · L92-L104 (13 LOC)src/agent_memory/memory/semantic.py
def search(
self,
query: str,
layer: Optional[MemoryLayer] = None,
limit: int = 20,
) -> Sequence[MemoryEntry]:
"""TF-IDF ranked keyword search over stored entries."""
tokens = _tokenise(query)
if not tokens:
return []
scores = self._tfidf_scores(tokens, layer=layer)
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [self._entries[mid] for mid, _ in ranked[:limit] if mid in self._entries]Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
SemanticMemory._add_to_index method · python · L123-L129 (7 LOC)src/agent_memory/memory/semantic.py
def _add_to_index(self, entry: MemoryEntry) -> None:
tokens = _tokenise(entry.content)
for token in set(tokens):
self._inverted[token].add(entry.memory_id)
entity = entry.metadata.get("entity", "").lower()
if entity:
self._entity_index[entity].add(entry.memory_id)SemanticMemory._remove_from_index method · python · L131-L144 (14 LOC)src/agent_memory/memory/semantic.py
def _remove_from_index(self, memory_id: str) -> None:
entry = self._entries.get(memory_id)
if entry is None:
return
tokens = _tokenise(entry.content)
for token in set(tokens):
self._inverted[token].discard(memory_id)
if not self._inverted[token]:
del self._inverted[token]
entity = entry.metadata.get("entity", "").lower()
if entity:
self._entity_index[entity].discard(memory_id)
if not self._entity_index[entity]:
del self._entity_index[entity]SemanticMemory._tfidf_scores method · python · L146-L174 (29 LOC)src/agent_memory/memory/semantic.py
def _tfidf_scores(
self,
tokens: list[str],
layer: Optional[MemoryLayer] = None,
) -> dict[str, float]:
"""Compute a simple TF-IDF score per candidate document."""
num_docs = len(self._entries) or 1
candidate_ids: set[str] = set()
for token in tokens:
candidate_ids.update(self._inverted.get(token, set()))
if layer is not None:
candidate_ids = {
mid for mid in candidate_ids
if mid in self._entries and self._entries[mid].layer == layer
}
scores: dict[str, float] = {}
for mid in candidate_ids:
entry = self._entries[mid]
doc_tokens = _tokenise(entry.content)
tf_sum = 0.0
for token in tokens:
tf = doc_tokens.count(token) / (len(doc_tokens) or 1)
df = len(self._inverted.get(token, set()))
idf = math.log((num_docs + 1) / (df + 1)) + 1.0
ImportanceLevel.from_score method · python · L42-L52 (11 LOC)src/agent_memory/memory/types.py
def from_score(score: float) -> "ImportanceLevel":
"""Map a numeric score [0, 1] to an ImportanceLevel."""
if score >= 0.9:
return ImportanceLevel.CRITICAL
if score >= 0.7:
return ImportanceLevel.HIGH
if score >= 0.5:
return ImportanceLevel.MEDIUM
if score >= 0.3:
return ImportanceLevel.LOW
return ImportanceLevel.TRIVIALMemoryEntry.touch method · python · L90-L97 (8 LOC)src/agent_memory/memory/types.py
def touch(self) -> "MemoryEntry":
"""Return a new entry with updated access timestamp and count."""
return self.model_copy(
update={
"last_accessed": _utcnow(),
"access_count": self.access_count + 1,
}
)WorkingMemory.__init__ method · python · L22-L29 (8 LOC)src/agent_memory/memory/working.py
def __init__(self, capacity: int = 64) -> None:
if capacity < 1:
raise ValueError(f"capacity must be >= 1, got {capacity}")
self._capacity = capacity
# deque maxlen enforces FIFO eviction automatically
self._queue: collections.deque[MemoryEntry] = collections.deque(maxlen=capacity)
# index for O(1) retrieval by ID
self._index: dict[str, MemoryEntry] = {}WorkingMemory.store method · python · L35-L50 (16 LOC)src/agent_memory/memory/working.py
def store(self, entry: MemoryEntry) -> None:
"""Add or replace an entry. Replacement does not change queue order."""
if entry.memory_id in self._index:
# Update in-place within the deque
for i, existing in enumerate(self._queue):
if existing.memory_id == entry.memory_id:
self._queue[i] = entry
self._index[entry.memory_id] = entry
return
# New entry: if queue is at capacity, deque drops leftmost automatically
# We must remove the evicted entry from the index first
if len(self._queue) == self._capacity:
evicted = self._queue[0]
self._index.pop(evicted.memory_id, None)
self._queue.append(entry)
self._index[entry.memory_id] = entryWorkingMemory.delete method · python · L66-L75 (10 LOC)src/agent_memory/memory/working.py
def delete(self, memory_id: str) -> bool:
if memory_id not in self._index:
return False
new_queue: collections.deque[MemoryEntry] = collections.deque(maxlen=self._capacity)
for entry in self._queue:
if entry.memory_id != memory_id:
new_queue.append(entry)
self._queue = new_queue
del self._index[memory_id]
return TrueCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
WorkingMemory.clear method · python · L77-L86 (10 LOC)src/agent_memory/memory/working.py
def clear(self, layer: Optional[MemoryLayer] = None) -> int:
if layer is None:
count = len(self._queue)
self._queue.clear()
self._index.clear()
return count
to_delete = [e.memory_id for e in self._queue if e.layer == layer]
for mid in to_delete:
self.delete(mid)
return len(to_delete)AutoMemorizeMiddleware.__init__ method · python · L52-L65 (14 LOC)src/agent_memory/middleware/auto_memorize.py
def __init__(
self,
memory: UnifiedMemory,
store_user_turns: bool = True,
store_agent_turns: bool = True,
min_content_length: int = 5,
importance_boost: float = 0.0,
) -> None:
self._memory = memory
self._store_user = store_user_turns
self._store_agent = store_agent_turns
self._min_length = min_content_length
self._importance_boost = max(0.0, min(1.0, importance_boost))
self._stored_count = 0AutoMemorizeMiddleware.record method · python · L72-L107 (36 LOC)src/agent_memory/middleware/auto_memorize.py
def record(self, interaction: Interaction) -> list[MemoryEntry]:
"""Process one interaction turn and store relevant entries.
Parameters
----------
interaction:
The interaction turn to record.
Returns
-------
list[MemoryEntry]
The stored entries (0–2 entries depending on config).
"""
stored: list[MemoryEntry] = []
if self._store_user and len(interaction.user_input) >= self._min_length:
user_entry = self._make_entry(
content=interaction.user_input,
source=MemorySource.USER_INPUT,
interaction=interaction,
role="user",
)
stored.append(self._memory.store(user_entry))
self._stored_count += 1
if self._store_agent and len(interaction.agent_response) >= self._min_length:
agent_entry = self._make_entry(
content=interaction.agent_response,
AutoMemorizeMiddleware.record_raw method · python · L109-L144 (36 LOC)src/agent_memory/middleware/auto_memorize.py
def record_raw(
self,
user_input: str,
agent_response: str,
session_id: str = "",
turn_id: str = "",
extra_metadata: Optional[dict[str, str]] = None,
) -> list[MemoryEntry]:
"""Convenience wrapper: build an Interaction and call ``record()``.
Parameters
----------
user_input:
The user's message.
agent_response:
The agent's reply.
session_id:
Optional session identifier.
turn_id:
Optional turn counter or UUID.
extra_metadata:
Additional metadata to attach to both stored entries.
Returns
-------
list[MemoryEntry]
Stored entries.
"""
interaction = Interaction(
user_input=user_input,
agent_response=agent_response,
session_id=session_id,
turn_id=turn_id,
extra_metadata=extra_metadata or {},
)
AutoMemorizeMiddleware._make_entry method · python · L150-L180 (31 LOC)src/agent_memory/middleware/auto_memorize.py
def _make_entry(
self,
content: str,
source: MemorySource,
interaction: Interaction,
role: str,
) -> MemoryEntry:
"""Build a MemoryEntry for an interaction turn."""
metadata: dict[str, str] = {
"role": role,
"occurred_at": interaction.occurred_at.isoformat(),
}
if interaction.session_id:
metadata["session_id"] = interaction.session_id
if interaction.turn_id:
metadata["turn_id"] = interaction.turn_id
metadata.update(interaction.extra_metadata)
entry = MemoryEntry(
content=content,
layer=MemoryLayer.EPISODIC,
source=source,
created_at=interaction.occurred_at,
metadata=metadata,
)
if self._importance_boost > 0.0:
boosted = min(1.0, entry.importance_score + self._importance_boost)
entry = entry.model_copy(update={"importance_score": boostedContextBuilder.__init__ method · python · L65-L77 (13 LOC)src/agent_memory/middleware/context_builder.py
def __init__(
self,
memory: UnifiedMemory,
layer_order: Optional[list[MemoryLayer]] = None,
chars_per_token: int = _CHARS_PER_TOKEN,
include_metadata: bool = False,
separator: str = "\n\n",
) -> None:
self._memory = memory
self._layer_order = layer_order or _DEFAULT_LAYER_ORDER
self._chars_per_token = max(1, chars_per_token)
self._include_metadata = include_metadata
self._separator = separatorContextBuilder.build method · python · L79-L124 (46 LOC)src/agent_memory/middleware/context_builder.py
def build(
self,
query: str,
token_budget: int = 2048,
min_importance: float = 0.0,
top_per_layer: int = 10,
) -> str:
"""Build a context string relevant to the given query.
Parameters
----------
query:
The current user query or topic; used to rank retrieved entries.
token_budget:
Maximum number of tokens for the entire context block.
min_importance:
Exclude entries with importance_score below this threshold.
top_per_layer:
Maximum entries retrieved per layer before budget trimming.
Returns
-------
str
Formatted context string ready for inclusion in a prompt.
"""
char_budget = token_budget * self._chars_per_token
sections: list[str] = []
used_chars = 0
for layer in self._layer_order:
if used_chars >= char_budget:
break
ContextBuilder.build_from_entries method · python · L126-L155 (30 LOC)src/agent_memory/middleware/context_builder.py
def build_from_entries(
self,
entries: Sequence[MemoryEntry],
token_budget: int = 2048,
) -> str:
"""Build a context string from an explicit list of entries.
Parameters
----------
entries:
Pre-selected entries to format.
token_budget:
Character budget (token_budget * chars_per_token).
Returns
-------
str
Formatted context string.
"""
char_budget = token_budget * self._chars_per_token
sorted_entries = sorted(entries, key=lambda e: e.composite_score, reverse=True)
lines: list[str] = []
used = 0
for entry in sorted_entries:
line = self._format_entry(entry)
if used + len(line) > char_budget:
break
lines.append(line)
used += len(line)
return "\n".join(lines)Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ContextBuilder._build_section method · python · L165-L202 (38 LOC)src/agent_memory/middleware/context_builder.py
def _build_section(
self,
layer: MemoryLayer,
query: str,
char_budget: int,
min_importance: float,
top_per_layer: int,
) -> str:
"""Build a single section for one memory layer."""
search_results = self._memory.search(query, layer=layer, limit=top_per_layer)
entries = [
e for e in search_results
if e.importance_score >= min_importance
]
if not entries:
return ""
# Sort by composite score descending
entries_sorted = sorted(entries, key=lambda e: e.composite_score, reverse=True)
heading = _LAYER_HEADINGS.get(layer, layer.value.replace("_", " ").title())
header_line = f"[{heading}]"
lines: list[str] = [header_line]
used = len(header_line)
for entry in entries_sorted:
line = self._format_entry(entry)
if used + len(line) + 1 > char_budget:
break
lines.ContextBuilder._format_entry method · python · L204-L212 (9 LOC)src/agent_memory/middleware/context_builder.py
def _format_entry(self, entry: MemoryEntry) -> str:
"""Format a single MemoryEntry as a context line."""
content = entry.content.strip()
if not self._include_metadata:
return f"- {content}"
importance = f"{entry.importance_score:.2f}"
source = entry.source.value
return f"- {content} [src={source}, imp={importance}]"PluginNotFoundError.__init__ method · python · L55-L62 (8 LOC)src/agent_memory/plugins/registry.py
def __init__(self, name: str, registry_name: str) -> None:
self.plugin_name = name
self.registry_name = registry_name
super().__init__(
f"Plugin {name!r} is not registered in the {registry_name!r} registry. "
f"Available plugins: {name!r} was not found. "
"Check that the package is installed and its entry-points are declared."
)PluginAlreadyRegisteredError.__init__ method · python · L68-L74 (7 LOC)src/agent_memory/plugins/registry.py
def __init__(self, name: str, registry_name: str) -> None:
self.plugin_name = name
self.registry_name = registry_name
super().__init__(
f"Plugin {name!r} is already registered in the {registry_name!r} registry. "
"Use a unique name or explicitly deregister the existing entry first."
)PluginRegistry.register method · python · L100-L147 (48 LOC)src/agent_memory/plugins/registry.py
def register(self, name: str) -> Callable[[type[T]], type[T]]:
"""Return a class decorator that registers the decorated class.
Parameters
----------
name:
The unique string key for this plugin.
Returns
-------
Callable[[type[T]], type[T]]
A decorator that registers the class and returns it unchanged,
allowing it to remain usable as a normal class.
Raises
------
PluginAlreadyRegisteredError
If ``name`` is already in use in this registry.
TypeError
If the decorated class does not subclass ``base_class``.
Example
-------
::
@registry.register("my-plugin")
class MyPlugin(BasePlugin):
...
"""
def decorator(cls: type[T]) -> type[T]:
if name in self._plugins:
raise PluginAlreadyRegisteredError(name, self._name)
if not (PluginRegistry.register_class method · python · L149-L182 (34 LOC)src/agent_memory/plugins/registry.py
def register_class(self, name: str, cls: type[T]) -> None:
"""Register a class directly without using the decorator syntax.
This is useful for programmatic registration, such as inside
``load_entrypoints``.
Parameters
----------
name:
The unique string key for this plugin.
cls:
The class to register. Must subclass ``base_class``.
Raises
------
PluginAlreadyRegisteredError
If ``name`` is already registered.
TypeError
If ``cls`` is not a subclass of ``base_class``.
"""
if name in self._plugins:
raise PluginAlreadyRegisteredError(name, self._name)
if not (isinstance(cls, type) and issubclass(cls, self._base_class)):
raise TypeError(
f"Cannot register {cls!r} under {name!r}: "
f"it must be a subclass of {self._base_class.__name__}."
)
self._plugins[naPluginRegistry.deregister method · python · L184-L200 (17 LOC)src/agent_memory/plugins/registry.py
def deregister(self, name: str) -> None:
"""Remove a plugin from the registry.
Parameters
----------
name:
The key previously passed to ``register``.
Raises
------
PluginNotFoundError
If ``name`` is not currently registered.
"""
if name not in self._plugins:
raise PluginNotFoundError(name, self._name)
del self._plugins[name]
logger.debug("Deregistered plugin %r from registry %r", name, self._name)PluginRegistry.get method · python · L206-L227 (22 LOC)src/agent_memory/plugins/registry.py
def get(self, name: str) -> type[T]:
"""Return the class registered under ``name``.
Parameters
----------
name:
The key used when registering the plugin.
Returns
-------
type[T]
The registered class (not an instance).
Raises
------
PluginNotFoundError
If no plugin is registered under ``name``.
"""
try:
return self._plugins[name]
except KeyError:
raise PluginNotFoundError(name, self._name) from NoneRepobility analyzer · published findings · https://repobility.com
PluginRegistry.list_plugins method · python · L229-L237 (9 LOC)src/agent_memory/plugins/registry.py
def list_plugins(self) -> list[str]:
"""Return a sorted list of all registered plugin names.
Returns
-------
list[str]
Plugin names in alphabetical order.
"""
return sorted(self._plugins)PluginRegistry.__repr__ method · python · L247-L252 (6 LOC)src/agent_memory/plugins/registry.py
def __repr__(self) -> str:
return (
f"PluginRegistry(name={self._name!r}, "
f"base_class={self._base_class.__name__}, "
f"plugins={self.list_plugins()})"
)PluginRegistry.load_entrypoints method · python · L258-L311 (54 LOC)src/agent_memory/plugins/registry.py
def load_entrypoints(self, group: str) -> None:
"""Discover and register plugins declared as package entry-points.
Iterates over all installed distributions that declare entry-points
in ``group``. Each entry-point value is imported and registered
under the entry-point name.
Plugins that are already registered (e.g., from a previous call)
are skipped with a debug-level log entry rather than raising an
error. This makes repeated calls to ``load_entrypoints`` idempotent.
Parameters
----------
group:
The entry-point group name, e.g. "agent_memory.plugins".
Example
-------
In a downstream package's ``pyproject.toml``::
[agent_memory.plugins]
my-processor = "my_package.processors:MyProcessor"
Then at runtime::
registry.load_entrypoints("agent_memory.plugins")
"""
entry_points = importlib.metadata.entry_point