Function bodies 288 total
run_benchmark function · python · L29-L86 (58 LOC)benchmarks/bench_memory_write.py
def run_benchmark(
n_iterations: int = 1000,
seed: int = 42,
) -> dict[str, object]:
"""Measure write throughput and per-write latency.
Parameters
----------
n_iterations:
Number of individual write operations to measure.
seed:
Reproducibility seed.
Returns
-------
dict with throughput (memories/sec) and per-write latency stats.
"""
entries, _ = generate_memory_dataset(
n_memories=n_iterations, seed=seed, layer=MemoryLayer.SEMANTIC
)
store = InMemoryStorage()
individual_latencies_ms: list[float] = []
# Warmup — write 10 entries to prime any interpreter JIT paths
warmup_count = min(10, len(entries))
for entry in entries[:warmup_count]:
store.save(entry)
store.clear()
# Timed run
overall_start = time.perf_counter()
for entry in entries:
start = time.perf_counter()
store.save(entry)
elapsed_ms = (time.perf_counter() - start) * 1000
in_measure_query_latencies function · python · L29-L71 (43 LOC)benchmarks/bench_retrieval_latency.py
def _measure_query_latencies(
store_size: int,
n_queries: int,
seed: int,
) -> dict[str, float]:
"""Populate a store of store_size entries and run n_queries, return stats.
Parameters
----------
store_size:
Number of memories in the store.
n_queries:
Number of search operations to measure.
seed:
Reproducibility seed.
Returns
-------
dict with p50/p95/p99/mean latencies in milliseconds.
"""
store = build_store(n_memories=store_size, seed=seed)
queries = [generate_query_for_topic(t) for t in TOPIC_NAMES] * (
n_queries // len(TOPIC_NAMES) + 1
)
queries = queries[:n_queries]
latencies_ms: list[float] = []
for query in queries:
start = time.perf_counter()
store.search(query, layer=MemoryLayer.SEMANTIC, limit=10)
elapsed_ms = (time.perf_counter() - start) * 1000
latencies_ms.append(elapsed_ms)
sorted_lats = sorted(latencies_ms)
n = len(sorted_run_benchmark function · python · L74-L112 (39 LOC)benchmarks/bench_retrieval_latency.py
def run_benchmark(
store_sizes: list[int] | None = None,
n_queries: int = 200,
seed: int = 42,
) -> dict[str, object]:
"""Measure retrieval latency across multiple store sizes.
Parameters
----------
store_sizes:
List of store sizes to evaluate. Defaults to [100, 1000, 10000].
n_queries:
Queries to run per store size.
seed:
Reproducibility seed.
Returns
-------
dict with latency stats per store size.
"""
if store_sizes is None:
store_sizes = [100, 1_000, 10_000]
size_results: dict[str, dict[str, float]] = {}
for size in store_sizes:
print(f" Measuring store_size={size:,} with {n_queries} queries...")
size_results[str(size)] = _measure_query_latencies(
store_size=size,
n_queries=n_queries,
seed=seed,
)
return {
"benchmark": "retrieval_latency",
"store_sizes": store_sizes,
"n_queries_per_size": n_querieprecision_at_k function · python · L38-L63 (26 LOC)benchmarks/bench_retrieval_precision.py
def precision_at_k(
retrieved_ids: list[str],
relevant_ids: set[str],
k: int,
) -> float:
"""Compute precision@k: fraction of top-k results that are relevant.
Parameters
----------
retrieved_ids:
Ordered list of memory_ids returned by the search.
relevant_ids:
Set of memory_ids that are relevant (same topic).
k:
Cut-off rank.
Returns
-------
float
Precision in [0.0, 1.0].
"""
if k <= 0:
return 0.0
top_k = retrieved_ids[:k]
hits = sum(1 for mid in top_k if mid in relevant_ids)
return hits / krecall_at_k function · python · L66-L91 (26 LOC)benchmarks/bench_retrieval_precision.py
def recall_at_k(
retrieved_ids: list[str],
relevant_ids: set[str],
k: int,
) -> float:
"""Compute recall@k: fraction of relevant items found in top-k.
Parameters
----------
retrieved_ids:
Ordered list of memory_ids returned by the search.
relevant_ids:
Complete set of relevant memory_ids.
k:
Cut-off rank.
Returns
-------
float
Recall in [0.0, 1.0].
"""
if not relevant_ids:
return 0.0
top_k = set(retrieved_ids[:k])
hits = len(top_k & relevant_ids)
return hits / len(relevant_ids)run_benchmark function · python · L94-L183 (90 LOC)benchmarks/bench_retrieval_precision.py
def run_benchmark(
n_memories: int = 1000,
k_values: list[int] | None = None,
seed: int = 42,
) -> dict[str, object]:
"""Measure retrieval precision@k and recall@k.
Parameters
----------
n_memories:
Total memories inserted into the store.
k_values:
List of k cut-offs to evaluate. Defaults to [1, 5, 10].
seed:
Fixed seed for reproducibility.
Returns
-------
dict
Benchmark results with precision/recall per k value.
"""
if k_values is None:
k_values = [1, 5, 10]
entries, ground_truth = generate_memory_dataset(
n_memories=n_memories, seed=seed, layer=MemoryLayer.SEMANTIC
)
store = InMemoryStorage()
for entry in entries:
store.save(entry)
per_topic_results: list[dict[str, float]] = []
latencies_ms: list[float] = []
for topic in TOPIC_NAMES:
query = generate_query_for_topic(topic)
relevant_ids: set[str] = set(ground_truth.get(topic, _fmt_table function · python · L34-L41 (8 LOC)benchmarks/compare.py
def _fmt_table(rows: list[tuple[str, str]], title: str) -> None:
"""Print a simple two-column table."""
col1_width = max(len(r[0]) for r in rows) + 2
print(f"\n{'=' * 60}")
print(f" {title}")
print(f"{'=' * 60}")
for key, value in rows:
print(f" {key:<{col1_width}} {value}")Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
display_precision_results function · python · L51-L64 (14 LOC)benchmarks/compare.py
def display_precision_results(data: dict[str, object]) -> None:
"""Display precision/recall benchmark results."""
metrics = data.get("metrics", {})
rows: list[tuple[str, str]] = []
for k in [1, 5, 10]:
rows.append((f"Precision@{k}", f"{metrics.get(f'mean_precision_at_{k}', 'N/A'):.4f}"))
rows.append((f"Recall@{k}", f"{metrics.get(f'mean_recall_at_{k}', 'N/A'):.4f}"))
rows.append((f"F1@{k}", f"{metrics.get(f'mean_f1_at_{k}', 'N/A'):.4f}"))
lat = data.get("query_latency_ms", {})
rows.append(("Query latency p50 (ms)", str(lat.get("p50", "N/A"))))
rows.append(("Query latency p95 (ms)", str(lat.get("p95", "N/A"))))
rows.append(("Query latency p99 (ms)", str(lat.get("p99", "N/A"))))
_fmt_table(rows, "Retrieval Precision Results")
print(f"\n Note: {COMPETITOR_NOTES['retrieval_precision']}")display_latency_results function · python · L67-L76 (10 LOC)benchmarks/compare.py
def display_latency_results(data: dict[str, object]) -> None:
"""Display latency benchmark results across store sizes."""
size_results = data.get("results_by_store_size", {})
rows: list[tuple[str, str]] = []
for size, stats in size_results.items():
rows.append((f"Store size {size} — p50 (ms)", str(stats.get("p50_ms"))))
rows.append((f"Store size {size} — p95 (ms)", str(stats.get("p95_ms"))))
rows.append((f"Store size {size} — p99 (ms)", str(stats.get("p99_ms"))))
_fmt_table(rows, "Retrieval Latency by Store Size")
print(f"\n Note: {COMPETITOR_NOTES['retrieval_latency']}")display_write_results function · python · L79-L90 (12 LOC)benchmarks/compare.py
def display_write_results(data: dict[str, object]) -> None:
"""Display write throughput benchmark results."""
lat = data.get("per_write_latency_ms", {})
rows: list[tuple[str, str]] = [
("Throughput (memories/sec)", str(data.get("throughput_memories_per_second"))),
("Per-write p50 (ms)", str(lat.get("p50"))),
("Per-write p95 (ms)", str(lat.get("p95"))),
("Per-write p99 (ms)", str(lat.get("p99"))),
("Total elapsed (sec)", str(data.get("total_elapsed_seconds"))),
]
_fmt_table(rows, "Write Throughput Results")
print(f"\n Note: {COMPETITOR_NOTES['memory_write_throughput']}")main function · python · L93-L121 (29 LOC)benchmarks/compare.py
def main() -> None:
"""Load all result files and display comparison tables."""
results_dir = Path(__file__).parent / "results"
baseline = _load_json(results_dir / "baseline.json")
latency = _load_json(results_dir / "latency_baseline.json")
write = _load_json(results_dir / "write_baseline.json")
if baseline:
display_precision_results(baseline)
else:
print("No baseline.json found. Run bench_retrieval_precision.py first.")
if latency:
display_latency_results(latency)
else:
print("No latency_baseline.json found. Run bench_retrieval_latency.py first.")
if write:
display_write_results(write)
else:
print("No write_baseline.json found. Run bench_memory_write.py first.")
print("\n" + "=" * 60)
print(" To regenerate all results:")
print(" python benchmarks/bench_retrieval_precision.py")
print(" python benchmarks/bench_retrieval_latency.py")
print(" python benchmarks/bencbuild_store function · python · L24-L47 (24 LOC)benchmarks/conftest.py
def build_store(n_memories: int, seed: int = 42) -> InMemoryStorage:
"""Return an InMemoryStorage pre-populated with n_memories entries.
Parameters
----------
n_memories:
Number of MemoryEntry objects to insert.
seed:
Fixed seed for reproducibility.
Returns
-------
InMemoryStorage
Populated store ready for benchmarking.
"""
entries, _ = generate_memory_dataset(
n_memories=n_memories,
seed=seed,
layer=MemoryLayer.SEMANTIC,
)
store = InMemoryStorage()
for entry in entries:
store.save(entry)
return store_generate_content function · python · L57-L67 (11 LOC)benchmarks/datasets/synthetic_memories.py
def _generate_content(topic: str, keywords: list[str], rng: random.Random) -> str:
"""Generate a deterministic sentence for a topic using its keywords."""
keyword = rng.choice(keywords)
templates = [
f"The {keyword} is essential for understanding {topic.replace('_', ' ')} concepts.",
f"When working with {topic.replace('_', ' ')}, remember that {keyword} matters most.",
f"A common pattern in {topic.replace('_', ' ')} involves configuring the {keyword} correctly.",
f"The {keyword} component of {topic.replace('_', ' ')} requires careful attention.",
f"Experienced engineers use {keyword} as a core primitive in {topic.replace('_', ' ')}.",
]
return rng.choice(templates)generate_memory_dataset function · python · L70-L114 (45 LOC)benchmarks/datasets/synthetic_memories.py
def generate_memory_dataset(
n_memories: int = 1000,
seed: int = 42,
layer: MemoryLayer = MemoryLayer.SEMANTIC,
) -> tuple[list[MemoryEntry], dict[str, list[str]]]:
"""Generate a reproducible synthetic memory dataset.
Parameters
----------
n_memories:
Total number of MemoryEntry objects to generate.
seed:
Random seed for reproducibility.
layer:
The MemoryLayer to assign to all generated entries.
Returns
-------
tuple of:
- list of MemoryEntry objects
- dict mapping topic name to list of memory_ids for that topic
(the ground truth for retrieval evaluation)
"""
rng = random.Random(seed)
entries: list[MemoryEntry] = []
ground_truth: dict[str, list[str]] = {topic: [] for topic in TOPIC_NAMES}
for index in range(n_memories):
topic = TOPIC_NAMES[index % len(TOPIC_NAMES)]
keywords = TOPIC_KEYWORDS[topic]
content = _generate_content(topic, keywords, rgenerate_query_for_topic function · python · L117-L123 (7 LOC)benchmarks/datasets/synthetic_memories.py
def generate_query_for_topic(topic: str) -> str:
"""Return the primary search keyword for a topic.
The first keyword in the topic's vocabulary is used as the canonical
query. This ensures the query matches only entries from that topic.
"""
return TOPIC_KEYWORDS[topic][0]Repobility · code-quality intelligence platform · https://repobility.com
generate_ground_truth_entries function · python · L126-L163 (38 LOC)benchmarks/datasets/synthetic_memories.py
def generate_ground_truth_entries(
n_memories: int = 100,
seed: int = 42,
layer: MemoryLayer = MemoryLayer.SEMANTIC,
) -> list[GroundTruthEntry]:
"""Generate GroundTruthEntry objects for precision/recall evaluation.
Parameters
----------
n_memories:
Total number of entries to generate.
seed:
Random seed for reproducibility.
layer:
Memory layer for all entries.
Returns
-------
list of GroundTruthEntry
Each entry has its memory and the canonical topic+keywords.
"""
entries, ground_truth = generate_memory_dataset(n_memories, seed, layer)
result: list[GroundTruthEntry] = []
memory_to_topic: dict[str, str] = {}
for topic, memory_ids in ground_truth.items():
for memory_id in memory_ids:
memory_to_topic[memory_id] = topic
for entry in entries:
topic = memory_to_topic[entry.memory_id]
result.append(
GroundTruthEntry(
entry=enAnthropicMemoryBridge.store_conversation method · python · L28-L48 (21 LOC)src/agent_memory/adapters/anthropic_sdk.py
def store_conversation(self, conversation_id: str, messages: list[Any]) -> str:
"""Persist a conversation's messages under a conversation ID.
Returns a unique memory entry ID.
"""
entry_id = str(uuid.uuid4())
entry: dict[str, Any] = {
"id": entry_id,
"type": "conversation",
"conversation_id": conversation_id,
"messages": [str(m) for m in messages],
"count": len(messages),
}
self._items.append(entry)
logger.debug(
"Stored conversation conversation_id=%s entry_id=%s messages=%d",
conversation_id,
entry_id,
len(messages),
)
return entry_idAnthropicMemoryBridge.retrieve_context method · python · L50-L59 (10 LOC)src/agent_memory/adapters/anthropic_sdk.py
def retrieve_context(self, query: str) -> list[dict[str, Any]]:
"""Retrieve memory entries whose messages contain the query string.
Returns matching entries across all conversations.
"""
results: list[dict[str, Any]] = []
for item in self._items:
if any(query.lower() in msg.lower() for msg in item.get("messages", [])):
results.append(item)
return resultsCrewAIMemoryBridge.store_task_result method · python · L28-L42 (15 LOC)src/agent_memory/adapters/crewai.py
def store_task_result(self, task_name: str, result: Any) -> str:
"""Persist a task result to the memory store.
Returns a unique memory entry ID.
"""
entry_id = str(uuid.uuid4())
entry: dict[str, Any] = {
"id": entry_id,
"type": "task_result",
"task_name": task_name,
"result": str(result) if result is not None else "",
}
self._items.append(entry)
logger.debug("Stored task result for task=%s id=%s", task_name, entry_id)
return entry_idCrewAIMemoryBridge.retrieve_knowledge method · python · L44-L54 (11 LOC)src/agent_memory/adapters/crewai.py
def retrieve_knowledge(self, query: str) -> list[dict[str, Any]]:
"""Retrieve stored knowledge entries that match the query.
Returns task result entries (naive full-scan; replace with vector
search in production).
"""
results: list[dict[str, Any]] = []
for item in self._items:
if query.lower() in item.get("task_name", "").lower() or query.lower() in item.get("result", "").lower():
results.append(item)
return resultsLangChainMemoryBridge.store_conversation method · python · L29-L43 (15 LOC)src/agent_memory/adapters/langchain.py
def store_conversation(self, messages: list[Any]) -> str:
"""Persist a list of LangChain messages to the memory store.
Returns a unique memory entry ID for later retrieval.
"""
entry_id = str(uuid.uuid4())
entry: dict[str, Any] = {
"id": entry_id,
"type": "conversation",
"messages": [str(m) for m in messages],
"count": len(messages),
}
self._items.append(entry)
logger.debug("Stored conversation with id=%s, messages=%d", entry_id, len(messages))
return entry_idLangChainMemoryBridge.retrieve_context method · python · L45-L51 (7 LOC)src/agent_memory/adapters/langchain.py
def retrieve_context(self, query: str, k: int = 5) -> list[dict[str, Any]]:
"""Retrieve the most recent k memory entries relevant to the query.
Returns a list of memory entry dictionaries (naive recency ranking).
"""
conversation_items = [item for item in self._items if item.get("type") == "conversation"]
return conversation_items[-k:]MicrosoftMemoryBridge.store_turn method · python · L28-L42 (15 LOC)src/agent_memory/adapters/microsoft_agents.py
def store_turn(self, conversation_id: str, turn: Any) -> str:
"""Persist a single conversation turn for a given conversation.
Returns a unique memory entry ID.
"""
entry_id = str(uuid.uuid4())
entry: dict[str, Any] = {
"id": entry_id,
"type": "turn",
"conversation_id": conversation_id,
"turn": str(turn) if turn is not None else "",
}
self._items.append(entry)
logger.debug("Stored turn for conversation_id=%s entry_id=%s", conversation_id, entry_id)
return entry_idRepobility (the analyzer behind this table) · https://repobility.com
MicrosoftMemoryBridge.retrieve_state method · python · L44-L55 (12 LOC)src/agent_memory/adapters/microsoft_agents.py
def retrieve_state(self, conversation_id: str) -> dict[str, Any]:
"""Retrieve the aggregated state for a conversation.
Returns a dict with turn count and the most recent turn text.
"""
turns = [item for item in self._items if item.get("conversation_id") == conversation_id]
latest_turn = turns[-1].get("turn", "") if turns else ""
return {
"conversation_id": conversation_id,
"turn_count": len(turns),
"latest_turn": latest_turn,
}OpenAIMemoryBridge.store_messages method · python · L28-L43 (16 LOC)src/agent_memory/adapters/openai_agents.py
def store_messages(self, thread_id: str, messages: list[Any]) -> str:
"""Persist a list of messages for a given thread.
Returns a unique memory entry ID.
"""
entry_id = str(uuid.uuid4())
entry: dict[str, Any] = {
"id": entry_id,
"type": "thread_messages",
"thread_id": thread_id,
"messages": [str(m) for m in messages],
"count": len(messages),
}
self._items.append(entry)
logger.debug("Stored %d messages for thread=%s id=%s", len(messages), thread_id, entry_id)
return entry_idOpenAIMemoryBridge.retrieve_context method · python · L45-L56 (12 LOC)src/agent_memory/adapters/openai_agents.py
def retrieve_context(self, thread_id: str, query: str) -> list[dict[str, Any]]:
"""Retrieve message entries for a thread that contain the query string.
Returns matching entries in insertion order.
"""
results: list[dict[str, Any]] = []
for item in self._items:
if item.get("thread_id") != thread_id:
continue
if any(query.lower() in msg.lower() for msg in item.get("messages", [])):
results.append(item)
return resultsTierStats.to_dict method · python · L106-L116 (11 LOC)src/agent_memory/budget/memory_budget.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"tier": self.tier.value,
"entry_count": self.entry_count,
"token_count": self.token_count,
"capacity_entries": self.capacity_entries,
"capacity_tokens": self.capacity_tokens,
"entries_utilisation": round(self.entries_utilisation, 4),
"tokens_utilisation": round(self.tokens_utilisation, 4),
}MemoryBudget.__init__ method · python · L162-L171 (10 LOC)src/agent_memory/budget/memory_budget.py
def __init__(
self,
hot_config: Optional[TierConfig] = None,
warm_config: Optional[TierConfig] = None,
cold_config: Optional[TierConfig] = None,
) -> None:
self._hot_config = hot_config or _DEFAULT_HOT_CONFIG
self._warm_config = warm_config or _DEFAULT_WARM_CONFIG
self._cold_config = cold_config or _DEFAULT_COLD_CONFIG
self._entries: dict[str, _BudgetEntry] = {}MemoryBudget.add method · python · L177-L210 (34 LOC)src/agent_memory/budget/memory_budget.py
def add(
self,
entry_id: str,
content: str,
initial_tier: MemoryTier = MemoryTier.COLD,
importance_score: float = 0.5,
) -> None:
"""Add an entry to the budget system.
Parameters
----------
entry_id:
Unique identifier for this entry.
content:
The text content (used for token estimation).
initial_tier:
Starting tier for the entry. Defaults to COLD.
importance_score:
Importance score in [0, 1] — used when evicting from over-capacity tiers.
Raises
------
ValueError
If an entry with the same ``entry_id`` already exists.
"""
if entry_id in self._entries:
raise ValueError(f"Entry {entry_id!r} already exists in budget.")
self._entries[entry_id] = _BudgetEntry(
entry_id=entry_id,
content=content,
tier=initial_tier,
importancMemoryBudget.remove method · python · L212-L228 (17 LOC)src/agent_memory/budget/memory_budget.py
def remove(self, entry_id: str) -> bool:
"""Remove an entry from the budget.
Parameters
----------
entry_id:
The entry to remove.
Returns
-------
bool
True if removed, False if not found.
"""
if entry_id not in self._entries:
return False
del self._entries[entry_id]
return TrueMemoryBudget.record_access method · python · L230-L249 (20 LOC)src/agent_memory/budget/memory_budget.py
def record_access(self, entry_id: str) -> Optional[MemoryTier]:
"""Record an access event for an entry, potentially promoting it.
Parameters
----------
entry_id:
The entry that was accessed.
Returns
-------
MemoryTier | None
The new tier if the entry was promoted, otherwise None.
Returns None if the entry is not found.
"""
entry = self._entries.get(entry_id)
if entry is None:
return None
entry.access_count += 1
entry.last_accessed = _utcnow()
return self._maybe_promote(entry)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
MemoryBudget.rebalance method · python · L266-L290 (25 LOC)src/agent_memory/budget/memory_budget.py
def rebalance(self) -> dict[str, list[str]]:
"""Run a full rebalance pass — demote under-accessed entries.
Returns
-------
dict[str, list[str]]
Maps ``"promoted"`` and ``"demoted"`` to lists of affected entry IDs.
"""
promoted: list[str] = []
demoted: list[str] = []
for entry in list(self._entries.values()):
new_tier = self._evaluate_tier(entry)
if new_tier != entry.tier:
if _tier_rank(new_tier) > _tier_rank(entry.tier):
promoted.append(entry.entry_id)
else:
demoted.append(entry.entry_id)
entry.tier = new_tier
# Enforce capacity after reassignment
for tier in MemoryTier:
self._enforce_capacity(tier)
return {"promoted": promoted, "demoted": demoted}MemoryBudget.stats method · python · L296-L325 (30 LOC)src/agent_memory/budget/memory_budget.py
def stats(self, tier: Optional[MemoryTier] = None) -> list[TierStats]:
"""Return tier statistics.
Parameters
----------
tier:
If provided, return stats for that tier only. Otherwise returns
stats for all three tiers.
Returns
-------
list[TierStats]
One TierStats per requested tier.
"""
tiers = [tier] if tier else list(MemoryTier)
result: list[TierStats] = []
for t in tiers:
tier_entries = [e for e in self._entries.values() if e.tier == t]
token_count = sum(e.token_cost for e in tier_entries)
config = self._config_for(t)
result.append(
TierStats(
tier=t,
entry_count=len(tier_entries),
token_count=token_count,
capacity_entries=config.max_entries,
capacity_tokens=config.max_tokens,
)
MemoryBudget.budget_summary method · python · L331-L342 (12 LOC)src/agent_memory/budget/memory_budget.py
def budget_summary(self) -> dict[str, object]:
"""Return a human-readable summary of budget utilisation."""
all_stats = self.stats()
return {
s.tier.value: {
"entries": s.entry_count,
"tokens": s.token_count,
"entries_pct": round(s.entries_utilisation * 100, 1),
"tokens_pct": round(s.tokens_utilisation * 100, 1),
}
for s in all_stats
}MemoryBudget._config_for method · python · L348-L353 (6 LOC)src/agent_memory/budget/memory_budget.py
def _config_for(self, tier: MemoryTier) -> TierConfig:
if tier == MemoryTier.HOT:
return self._hot_config
if tier == MemoryTier.WARM:
return self._warm_config
return self._cold_configMemoryBudget._evaluate_tier method · python · L355-L368 (14 LOC)src/agent_memory/budget/memory_budget.py
def _evaluate_tier(self, entry: _BudgetEntry) -> MemoryTier:
"""Determine the correct tier for an entry based on access count."""
access = entry.access_count
current = entry.tier
if current == MemoryTier.COLD and access >= self._cold_config.promote_threshold:
return MemoryTier.WARM
if current == MemoryTier.WARM and access >= self._warm_config.promote_threshold:
return MemoryTier.HOT
if current == MemoryTier.WARM and access < self._warm_config.demote_threshold:
return MemoryTier.COLD
if current == MemoryTier.HOT and access < self._hot_config.demote_threshold:
return MemoryTier.WARM
return currentMemoryBudget._maybe_promote method · python · L370-L377 (8 LOC)src/agent_memory/budget/memory_budget.py
def _maybe_promote(self, entry: _BudgetEntry) -> Optional[MemoryTier]:
"""Check if an entry should be promoted after an access event."""
new_tier = self._evaluate_tier(entry)
if new_tier != entry.tier and _tier_rank(new_tier) > _tier_rank(entry.tier):
entry.tier = new_tier
self._enforce_capacity(new_tier)
return new_tier
return NoneMemoryBudget._enforce_capacity method · python · L379-L397 (19 LOC)src/agent_memory/budget/memory_budget.py
def _enforce_capacity(self, tier: MemoryTier) -> None:
"""Evict lowest-importance entries from a tier if over capacity."""
config = self._config_for(tier)
tier_entries = [e for e in self._entries.values() if e.tier == tier]
# Sort by importance ascending — evict lowest importance first
tier_entries.sort(key=lambda e: (e.importance_score, e.access_count))
# Enforce entry count limit
while len(tier_entries) > config.max_entries:
victim = tier_entries.pop(0)
self._demote_one(victim)
# Enforce token limit
token_total = sum(e.token_cost for e in tier_entries)
while token_total > config.max_tokens and tier_entries:
victim = tier_entries.pop(0)
token_total -= victim.token_cost
self._demote_one(victim)MemoryBudget._demote_one method · python · L399-L404 (6 LOC)src/agent_memory/budget/memory_budget.py
def _demote_one(self, entry: _BudgetEntry) -> None:
"""Move an entry to the next lower tier."""
if entry.tier == MemoryTier.HOT:
entry.tier = MemoryTier.WARM
elif entry.tier == MemoryTier.WARM:
entry.tier = MemoryTier.COLDCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
_get_memory function · python · L29-L38 (10 LOC)src/agent_memory/cli/main.py
def _get_memory(db_path: str, backend: str) -> object:
"""Build and return a UnifiedMemory instance based on CLI options."""
from agent_memory.unified.config import MemoryConfig
from agent_memory.unified.memory import UnifiedMemory
config = MemoryConfig(
storage_backend=backend, # type: ignore[arg-type]
sqlite_path=db_path,
)
return UnifiedMemory(config=config)store_command function · python · L141-L178 (38 LOC)src/agent_memory/cli/main.py
def store_command(
ctx: click.Context,
content: str,
layer: str,
source: str,
importance: Optional[float],
safety_critical: bool,
metadata_pairs: tuple[str, ...],
) -> None:
"""Store a memory CONTENT string into the specified layer."""
from agent_memory.memory.types import MemoryEntry, MemoryLayer, MemorySource
memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
metadata: dict[str, str] = {}
for pair in metadata_pairs:
if "=" not in pair:
console.print(f"[red]Invalid metadata pair {pair!r} — expected KEY=VALUE[/red]")
sys.exit(1)
key, _, value = pair.partition("=")
metadata[key.strip()] = value.strip()
entry = MemoryEntry(
content=content,
layer=MemoryLayer(layer),
source=MemorySource(source),
safety_critical=safety_critical,
metadata=metadata,
)
if importance is not None:
entry = entry.model_copy(update={"importance_scorecall_command function · python · L190-L222 (33 LOC)src/agent_memory/cli/main.py
def recall_command(ctx: click.Context, memory_id: str, json_output: bool) -> None:
"""Retrieve and display a memory entry by MEMORY_ID."""
memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
entry = memory.recall(memory_id) # type: ignore[attr-defined]
if entry is None:
console.print(f"[yellow]No entry found for ID:[/yellow] {memory_id}")
sys.exit(1)
if json_output:
click.echo(entry.model_dump_json(indent=2))
return
table = Table(box=box.SIMPLE, show_header=False)
table.add_column("Field", style="bold cyan")
table.add_column("Value")
table.add_row("ID", entry.memory_id)
table.add_row("Layer", entry.layer.value)
table.add_row("Source", entry.source.value)
table.add_row("Importance", f"{entry.importance_score:.4f}")
table.add_row("Freshness", f"{entry.freshness_score:.4f}")
table.add_row("Composite", f"{entry.composite_score:.4f}")
table.add_row("Safety Critical", str(entry.safety_crsearch_command function · python · L241-L285 (45 LOC)src/agent_memory/cli/main.py
def search_command(
ctx: click.Context,
query: str,
layer: Optional[str],
limit: int,
json_output: bool,
) -> None:
"""Search memory entries matching QUERY."""
from agent_memory.memory.types import MemoryLayer
memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
layer_enum = MemoryLayer(layer) if layer else None
results = memory.search(query=query, layer=layer_enum, limit=limit) # type: ignore[attr-defined]
if json_output:
click.echo(
json.dumps([json.loads(e.model_dump_json()) for e in results], indent=2)
)
return
if not results:
console.print(f"[yellow]No results for:[/yellow] {query!r}")
return
table = Table(
"Rank",
"ID",
"Layer",
"Importance",
"Freshness",
"Content Preview",
box=box.SIMPLE_HEAD,
title=f"Search results for {query!r}",
)
for rank, entry in enumerate(results, start=1):
prevstats_command function · python · L296-L316 (21 LOC)src/agent_memory/cli/main.py
def stats_command(ctx: click.Context, json_output: bool) -> None:
"""Show aggregate memory statistics."""
memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
stats = memory.stats() # type: ignore[attr-defined]
if json_output:
click.echo(json.dumps(stats, indent=2))
return
table = Table("Metric", "Value", box=box.SIMPLE, show_header=True, title="Memory Stats")
layer_counts = stats.get("layer_counts", {})
for layer_name, count in layer_counts.items(): # type: ignore[union-attr]
table.add_row(f" {layer_name}", str(count))
table.add_row("Total (in-process)", str(stats.get("total_in_process", 0)))
table.add_row("Total (storage)", str(stats.get("total_in_storage", 0)))
table.add_row("Store calls", str(stats.get("store_calls", 0)))
table.add_row("Working utilisation", str(stats.get("working_utilisation", 0)))
table.add_row("Storage backend", str(stats.get("storage_backend", "")))
table.add_row("Decay funconsolidate_command function · python · L332-L361 (30 LOC)src/agent_memory/cli/main.py
def consolidate_command(ctx: click.Context, dry_run: bool) -> None:
"""Run a garbage-collection pass to remove stale, low-importance entries."""
from agent_memory.importance.gc import MemoryGarbageCollector
memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
if dry_run:
from agent_memory.memory.types import MemoryLayer
console.print("[dim]Dry-run mode — no entries will be deleted.[/dim]")
total_candidates = 0
for layer in MemoryLayer:
layer_store = memory._layer_store(layer) # type: ignore[attr-defined]
gc = MemoryGarbageCollector(store=layer_store)
candidates = gc.candidates()
if candidates:
console.print(
f" [yellow]{len(candidates)}[/yellow] candidate(s) in "
f"[cyan]{layer.value}[/cyan]"
)
total_candidates += len(candidates)
console.print(f"Total candidates: [yellow]{total_candidscan_contradictions_command function · python · L389-L471 (83 LOC)src/agent_memory/cli/main.py
def scan_contradictions_command(
ctx: click.Context,
scope: str,
max_contradictions: int,
json_output: bool,
) -> None:
"""Scan memory for contradicting entries and report findings."""
from agent_memory.contradiction.scanner import ContradictionScanner, ScanScope
from agent_memory.memory.types import MemoryLayer
from agent_memory.unified.memory import UnifiedMemory
memory: UnifiedMemory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"]) # type: ignore[assignment]
scanner = ContradictionScanner(max_contradictions=max_contradictions)
scan_scope = ScanScope(scope)
# Load entries from durable storage, filtered by scope
if scan_scope == ScanScope.ALL:
entries = memory.storage.load_all(limit=5000)
elif scan_scope == ScanScope.HIGH_IMPORTANCE:
all_entries = memory.storage.load_all(limit=5000)
entries = [e for e in all_entries if e.importance_score >= 0.6]
else:
# Map scope to layer
layer__build_term_vector function · python · L40-L48 (9 LOC)src/agent_memory/consolidation/consolidator.py
def _build_term_vector(tokens: list[str]) -> dict[str, float]:
"""Build a normalised term-frequency vector from token list."""
if not tokens:
return {}
counts: dict[str, int] = defaultdict(int)
for token in tokens:
counts[token] += 1
length = len(tokens)
return {term: count / length for term, count in counts.items()}Repobility · code-quality intelligence platform · https://repobility.com
_cosine_similarity function · python · L51-L60 (10 LOC)src/agent_memory/consolidation/consolidator.py
def _cosine_similarity(vec_a: dict[str, float], vec_b: dict[str, float]) -> float:
"""Compute cosine similarity between two term-frequency vectors."""
if not vec_a or not vec_b:
return 0.0
dot_product = sum(vec_a.get(term, 0.0) * vec_b.get(term, 0.0) for term in vec_a)
magnitude_a = math.sqrt(sum(v * v for v in vec_a.values()))
magnitude_b = math.sqrt(sum(v * v for v in vec_b.values()))
if magnitude_a == 0.0 or magnitude_b == 0.0:
return 0.0
return dot_product / (magnitude_a * magnitude_b)EpisodicConsolidator.__init__ method · python · L142-L158 (17 LOC)src/agent_memory/consolidation/consolidator.py
def __init__(
self,
similarity_threshold: float = 0.35,
min_cluster_size: int = 2,
max_centroid_terms: int = 10,
) -> None:
if not 0.0 <= similarity_threshold <= 1.0:
raise ValueError(
f"similarity_threshold must be in [0, 1], got {similarity_threshold}"
)
if min_cluster_size < 1:
raise ValueError(
f"min_cluster_size must be >= 1, got {min_cluster_size}"
)
self._similarity_threshold = similarity_threshold
self._min_cluster_size = min_cluster_size
self._max_centroid_terms = max_centroid_termsEpisodicConsolidator.consolidate method · python · L164-L217 (54 LOC)src/agent_memory/consolidation/consolidator.py
def consolidate(self, episodes: Sequence[MemoryEntry]) -> ConsolidationResult:
"""Run consolidation over a collection of episodic entries.
Parameters
----------
episodes:
Episodic MemoryEntry objects to cluster. Non-episodic entries are
silently skipped.
Returns
-------
ConsolidationResult
Clusters found and new semantic entries derived from them.
"""
episodic_only = [e for e in episodes if e.layer == MemoryLayer.EPISODIC]
result = ConsolidationResult(episodes_processed=len(episodic_only))
if not episodic_only:
return result
# Build TF vectors for each episode
vectors: dict[str, dict[str, float]] = {
entry.memory_id: _build_term_vector(_tokenise(entry.content))
for entry in episodic_only
}
# Greedy single-pass clustering
raw_clusters = self._greedy_cluster(episodic_only, vectors)
page 1 / 6next ›