← back to invincible-jha__agent-memory

Function bodies 288 total

All specs Real LLM only Function bodies
run_benchmark function · python · L29-L86 (58 LOC)
benchmarks/bench_memory_write.py
def run_benchmark(
    n_iterations: int = 1000,
    seed: int = 42,
) -> dict[str, object]:
    """Measure write throughput and per-write latency.

    Parameters
    ----------
    n_iterations:
        Number of individual write operations to measure.
    seed:
        Reproducibility seed.

    Returns
    -------
    dict with throughput (memories/sec) and per-write latency stats.
    """
    entries, _ = generate_memory_dataset(
        n_memories=n_iterations, seed=seed, layer=MemoryLayer.SEMANTIC
    )
    store = InMemoryStorage()

    individual_latencies_ms: list[float] = []

    # Warmup — write 10 entries to prime any interpreter JIT paths
    warmup_count = min(10, len(entries))
    for entry in entries[:warmup_count]:
        store.save(entry)
    store.clear()

    # Timed run
    overall_start = time.perf_counter()
    for entry in entries:
        start = time.perf_counter()
        store.save(entry)
        elapsed_ms = (time.perf_counter() - start) * 1000
        in
_measure_query_latencies function · python · L29-L71 (43 LOC)
benchmarks/bench_retrieval_latency.py
def _measure_query_latencies(
    store_size: int,
    n_queries: int,
    seed: int,
) -> dict[str, float]:
    """Populate a store of store_size entries and run n_queries, return stats.

    Parameters
    ----------
    store_size:
        Number of memories in the store.
    n_queries:
        Number of search operations to measure.
    seed:
        Reproducibility seed.

    Returns
    -------
    dict with p50/p95/p99/mean latencies in milliseconds.
    """
    store = build_store(n_memories=store_size, seed=seed)
    queries = [generate_query_for_topic(t) for t in TOPIC_NAMES] * (
        n_queries // len(TOPIC_NAMES) + 1
    )
    queries = queries[:n_queries]

    latencies_ms: list[float] = []
    for query in queries:
        start = time.perf_counter()
        store.search(query, layer=MemoryLayer.SEMANTIC, limit=10)
        elapsed_ms = (time.perf_counter() - start) * 1000
        latencies_ms.append(elapsed_ms)

    sorted_lats = sorted(latencies_ms)
    n = len(sorted_
run_benchmark function · python · L74-L112 (39 LOC)
benchmarks/bench_retrieval_latency.py
def run_benchmark(
    store_sizes: list[int] | None = None,
    n_queries: int = 200,
    seed: int = 42,
) -> dict[str, object]:
    """Measure retrieval latency across multiple store sizes.

    Parameters
    ----------
    store_sizes:
        List of store sizes to evaluate. Defaults to [100, 1000, 10000].
    n_queries:
        Queries to run per store size.
    seed:
        Reproducibility seed.

    Returns
    -------
    dict with latency stats per store size.
    """
    if store_sizes is None:
        store_sizes = [100, 1_000, 10_000]

    size_results: dict[str, dict[str, float]] = {}
    for size in store_sizes:
        print(f"  Measuring store_size={size:,} with {n_queries} queries...")
        size_results[str(size)] = _measure_query_latencies(
            store_size=size,
            n_queries=n_queries,
            seed=seed,
        )

    return {
        "benchmark": "retrieval_latency",
        "store_sizes": store_sizes,
        "n_queries_per_size": n_querie
precision_at_k function · python · L38-L63 (26 LOC)
benchmarks/bench_retrieval_precision.py
def precision_at_k(
    retrieved_ids: list[str],
    relevant_ids: set[str],
    k: int,
) -> float:
    """Compute precision@k: fraction of top-k results that are relevant.

    Parameters
    ----------
    retrieved_ids:
        Ordered list of memory_ids returned by the search.
    relevant_ids:
        Set of memory_ids that are relevant (same topic).
    k:
        Cut-off rank.

    Returns
    -------
    float
        Precision in [0.0, 1.0].
    """
    if k <= 0:
        return 0.0
    top_k = retrieved_ids[:k]
    hits = sum(1 for mid in top_k if mid in relevant_ids)
    return hits / k
recall_at_k function · python · L66-L91 (26 LOC)
benchmarks/bench_retrieval_precision.py
def recall_at_k(
    retrieved_ids: list[str],
    relevant_ids: set[str],
    k: int,
) -> float:
    """Compute recall@k: fraction of relevant items found in top-k.

    Parameters
    ----------
    retrieved_ids:
        Ordered list of memory_ids returned by the search.
    relevant_ids:
        Complete set of relevant memory_ids.
    k:
        Cut-off rank.

    Returns
    -------
    float
        Recall in [0.0, 1.0].
    """
    if not relevant_ids:
        return 0.0
    top_k = set(retrieved_ids[:k])
    hits = len(top_k & relevant_ids)
    return hits / len(relevant_ids)
run_benchmark function · python · L94-L183 (90 LOC)
benchmarks/bench_retrieval_precision.py
def run_benchmark(
    n_memories: int = 1000,
    k_values: list[int] | None = None,
    seed: int = 42,
) -> dict[str, object]:
    """Measure retrieval precision@k and recall@k.

    Parameters
    ----------
    n_memories:
        Total memories inserted into the store.
    k_values:
        List of k cut-offs to evaluate. Defaults to [1, 5, 10].
    seed:
        Fixed seed for reproducibility.

    Returns
    -------
    dict
        Benchmark results with precision/recall per k value.
    """
    if k_values is None:
        k_values = [1, 5, 10]

    entries, ground_truth = generate_memory_dataset(
        n_memories=n_memories, seed=seed, layer=MemoryLayer.SEMANTIC
    )
    store = InMemoryStorage()
    for entry in entries:
        store.save(entry)

    per_topic_results: list[dict[str, float]] = []
    latencies_ms: list[float] = []

    for topic in TOPIC_NAMES:
        query = generate_query_for_topic(topic)
        relevant_ids: set[str] = set(ground_truth.get(topic, 
_fmt_table function · python · L34-L41 (8 LOC)
benchmarks/compare.py
def _fmt_table(rows: list[tuple[str, str]], title: str) -> None:
    """Print a simple two-column table."""
    col1_width = max(len(r[0]) for r in rows) + 2
    print(f"\n{'=' * 60}")
    print(f"  {title}")
    print(f"{'=' * 60}")
    for key, value in rows:
        print(f"  {key:<{col1_width}} {value}")
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
display_precision_results function · python · L51-L64 (14 LOC)
benchmarks/compare.py
def display_precision_results(data: dict[str, object]) -> None:
    """Display precision/recall benchmark results."""
    metrics = data.get("metrics", {})
    rows: list[tuple[str, str]] = []
    for k in [1, 5, 10]:
        rows.append((f"Precision@{k}", f"{metrics.get(f'mean_precision_at_{k}', 'N/A'):.4f}"))
        rows.append((f"Recall@{k}", f"{metrics.get(f'mean_recall_at_{k}', 'N/A'):.4f}"))
        rows.append((f"F1@{k}", f"{metrics.get(f'mean_f1_at_{k}', 'N/A'):.4f}"))
    lat = data.get("query_latency_ms", {})
    rows.append(("Query latency p50 (ms)", str(lat.get("p50", "N/A"))))
    rows.append(("Query latency p95 (ms)", str(lat.get("p95", "N/A"))))
    rows.append(("Query latency p99 (ms)", str(lat.get("p99", "N/A"))))
    _fmt_table(rows, "Retrieval Precision Results")
    print(f"\n  Note: {COMPETITOR_NOTES['retrieval_precision']}")
display_latency_results function · python · L67-L76 (10 LOC)
benchmarks/compare.py
def display_latency_results(data: dict[str, object]) -> None:
    """Display latency benchmark results across store sizes."""
    size_results = data.get("results_by_store_size", {})
    rows: list[tuple[str, str]] = []
    for size, stats in size_results.items():
        rows.append((f"Store size {size} — p50 (ms)", str(stats.get("p50_ms"))))
        rows.append((f"Store size {size} — p95 (ms)", str(stats.get("p95_ms"))))
        rows.append((f"Store size {size} — p99 (ms)", str(stats.get("p99_ms"))))
    _fmt_table(rows, "Retrieval Latency by Store Size")
    print(f"\n  Note: {COMPETITOR_NOTES['retrieval_latency']}")
display_write_results function · python · L79-L90 (12 LOC)
benchmarks/compare.py
def display_write_results(data: dict[str, object]) -> None:
    """Display write throughput benchmark results."""
    lat = data.get("per_write_latency_ms", {})
    rows: list[tuple[str, str]] = [
        ("Throughput (memories/sec)", str(data.get("throughput_memories_per_second"))),
        ("Per-write p50 (ms)", str(lat.get("p50"))),
        ("Per-write p95 (ms)", str(lat.get("p95"))),
        ("Per-write p99 (ms)", str(lat.get("p99"))),
        ("Total elapsed (sec)", str(data.get("total_elapsed_seconds"))),
    ]
    _fmt_table(rows, "Write Throughput Results")
    print(f"\n  Note: {COMPETITOR_NOTES['memory_write_throughput']}")
main function · python · L93-L121 (29 LOC)
benchmarks/compare.py
def main() -> None:
    """Load all result files and display comparison tables."""
    results_dir = Path(__file__).parent / "results"

    baseline = _load_json(results_dir / "baseline.json")
    latency = _load_json(results_dir / "latency_baseline.json")
    write = _load_json(results_dir / "write_baseline.json")

    if baseline:
        display_precision_results(baseline)
    else:
        print("No baseline.json found. Run bench_retrieval_precision.py first.")

    if latency:
        display_latency_results(latency)
    else:
        print("No latency_baseline.json found. Run bench_retrieval_latency.py first.")

    if write:
        display_write_results(write)
    else:
        print("No write_baseline.json found. Run bench_memory_write.py first.")

    print("\n" + "=" * 60)
    print("  To regenerate all results:")
    print("    python benchmarks/bench_retrieval_precision.py")
    print("    python benchmarks/bench_retrieval_latency.py")
    print("    python benchmarks/benc
build_store function · python · L24-L47 (24 LOC)
benchmarks/conftest.py
def build_store(n_memories: int, seed: int = 42) -> InMemoryStorage:
    """Return an InMemoryStorage pre-populated with n_memories entries.

    Parameters
    ----------
    n_memories:
        Number of MemoryEntry objects to insert.
    seed:
        Fixed seed for reproducibility.

    Returns
    -------
    InMemoryStorage
        Populated store ready for benchmarking.
    """
    entries, _ = generate_memory_dataset(
        n_memories=n_memories,
        seed=seed,
        layer=MemoryLayer.SEMANTIC,
    )
    store = InMemoryStorage()
    for entry in entries:
        store.save(entry)
    return store
_generate_content function · python · L57-L67 (11 LOC)
benchmarks/datasets/synthetic_memories.py
def _generate_content(topic: str, keywords: list[str], rng: random.Random) -> str:
    """Generate a deterministic sentence for a topic using its keywords."""
    keyword = rng.choice(keywords)
    templates = [
        f"The {keyword} is essential for understanding {topic.replace('_', ' ')} concepts.",
        f"When working with {topic.replace('_', ' ')}, remember that {keyword} matters most.",
        f"A common pattern in {topic.replace('_', ' ')} involves configuring the {keyword} correctly.",
        f"The {keyword} component of {topic.replace('_', ' ')} requires careful attention.",
        f"Experienced engineers use {keyword} as a core primitive in {topic.replace('_', ' ')}.",
    ]
    return rng.choice(templates)
generate_memory_dataset function · python · L70-L114 (45 LOC)
benchmarks/datasets/synthetic_memories.py
def generate_memory_dataset(
    n_memories: int = 1000,
    seed: int = 42,
    layer: MemoryLayer = MemoryLayer.SEMANTIC,
) -> tuple[list[MemoryEntry], dict[str, list[str]]]:
    """Generate a reproducible synthetic memory dataset.

    Parameters
    ----------
    n_memories:
        Total number of MemoryEntry objects to generate.
    seed:
        Random seed for reproducibility.
    layer:
        The MemoryLayer to assign to all generated entries.

    Returns
    -------
    tuple of:
        - list of MemoryEntry objects
        - dict mapping topic name to list of memory_ids for that topic
          (the ground truth for retrieval evaluation)
    """
    rng = random.Random(seed)
    entries: list[MemoryEntry] = []
    ground_truth: dict[str, list[str]] = {topic: [] for topic in TOPIC_NAMES}

    for index in range(n_memories):
        topic = TOPIC_NAMES[index % len(TOPIC_NAMES)]
        keywords = TOPIC_KEYWORDS[topic]
        content = _generate_content(topic, keywords, r
generate_query_for_topic function · python · L117-L123 (7 LOC)
benchmarks/datasets/synthetic_memories.py
def generate_query_for_topic(topic: str) -> str:
    """Return the primary search keyword for a topic.

    The first keyword in the topic's vocabulary is used as the canonical
    query. This ensures the query matches only entries from that topic.
    """
    return TOPIC_KEYWORDS[topic][0]
Repobility · code-quality intelligence platform · https://repobility.com
generate_ground_truth_entries function · python · L126-L163 (38 LOC)
benchmarks/datasets/synthetic_memories.py
def generate_ground_truth_entries(
    n_memories: int = 100,
    seed: int = 42,
    layer: MemoryLayer = MemoryLayer.SEMANTIC,
) -> list[GroundTruthEntry]:
    """Generate GroundTruthEntry objects for precision/recall evaluation.

    Parameters
    ----------
    n_memories:
        Total number of entries to generate.
    seed:
        Random seed for reproducibility.
    layer:
        Memory layer for all entries.

    Returns
    -------
    list of GroundTruthEntry
        Each entry has its memory and the canonical topic+keywords.
    """
    entries, ground_truth = generate_memory_dataset(n_memories, seed, layer)
    result: list[GroundTruthEntry] = []
    memory_to_topic: dict[str, str] = {}
    for topic, memory_ids in ground_truth.items():
        for memory_id in memory_ids:
            memory_to_topic[memory_id] = topic

    for entry in entries:
        topic = memory_to_topic[entry.memory_id]
        result.append(
            GroundTruthEntry(
                entry=en
AnthropicMemoryBridge.store_conversation method · python · L28-L48 (21 LOC)
src/agent_memory/adapters/anthropic_sdk.py
    def store_conversation(self, conversation_id: str, messages: list[Any]) -> str:
        """Persist a conversation's messages under a conversation ID.

        Returns a unique memory entry ID.
        """
        entry_id = str(uuid.uuid4())
        entry: dict[str, Any] = {
            "id": entry_id,
            "type": "conversation",
            "conversation_id": conversation_id,
            "messages": [str(m) for m in messages],
            "count": len(messages),
        }
        self._items.append(entry)
        logger.debug(
            "Stored conversation conversation_id=%s entry_id=%s messages=%d",
            conversation_id,
            entry_id,
            len(messages),
        )
        return entry_id
AnthropicMemoryBridge.retrieve_context method · python · L50-L59 (10 LOC)
src/agent_memory/adapters/anthropic_sdk.py
    def retrieve_context(self, query: str) -> list[dict[str, Any]]:
        """Retrieve memory entries whose messages contain the query string.

        Returns matching entries across all conversations.
        """
        results: list[dict[str, Any]] = []
        for item in self._items:
            if any(query.lower() in msg.lower() for msg in item.get("messages", [])):
                results.append(item)
        return results
CrewAIMemoryBridge.store_task_result method · python · L28-L42 (15 LOC)
src/agent_memory/adapters/crewai.py
    def store_task_result(self, task_name: str, result: Any) -> str:
        """Persist a task result to the memory store.

        Returns a unique memory entry ID.
        """
        entry_id = str(uuid.uuid4())
        entry: dict[str, Any] = {
            "id": entry_id,
            "type": "task_result",
            "task_name": task_name,
            "result": str(result) if result is not None else "",
        }
        self._items.append(entry)
        logger.debug("Stored task result for task=%s id=%s", task_name, entry_id)
        return entry_id
CrewAIMemoryBridge.retrieve_knowledge method · python · L44-L54 (11 LOC)
src/agent_memory/adapters/crewai.py
    def retrieve_knowledge(self, query: str) -> list[dict[str, Any]]:
        """Retrieve stored knowledge entries that match the query.

        Returns task result entries (naive full-scan; replace with vector
        search in production).
        """
        results: list[dict[str, Any]] = []
        for item in self._items:
            if query.lower() in item.get("task_name", "").lower() or query.lower() in item.get("result", "").lower():
                results.append(item)
        return results
LangChainMemoryBridge.store_conversation method · python · L29-L43 (15 LOC)
src/agent_memory/adapters/langchain.py
    def store_conversation(self, messages: list[Any]) -> str:
        """Persist a list of LangChain messages to the memory store.

        Returns a unique memory entry ID for later retrieval.
        """
        entry_id = str(uuid.uuid4())
        entry: dict[str, Any] = {
            "id": entry_id,
            "type": "conversation",
            "messages": [str(m) for m in messages],
            "count": len(messages),
        }
        self._items.append(entry)
        logger.debug("Stored conversation with id=%s, messages=%d", entry_id, len(messages))
        return entry_id
LangChainMemoryBridge.retrieve_context method · python · L45-L51 (7 LOC)
src/agent_memory/adapters/langchain.py
    def retrieve_context(self, query: str, k: int = 5) -> list[dict[str, Any]]:
        """Retrieve the most recent k memory entries relevant to the query.

        Returns a list of memory entry dictionaries (naive recency ranking).
        """
        conversation_items = [item for item in self._items if item.get("type") == "conversation"]
        return conversation_items[-k:]
MicrosoftMemoryBridge.store_turn method · python · L28-L42 (15 LOC)
src/agent_memory/adapters/microsoft_agents.py
    def store_turn(self, conversation_id: str, turn: Any) -> str:
        """Persist a single conversation turn for a given conversation.

        Returns a unique memory entry ID.
        """
        entry_id = str(uuid.uuid4())
        entry: dict[str, Any] = {
            "id": entry_id,
            "type": "turn",
            "conversation_id": conversation_id,
            "turn": str(turn) if turn is not None else "",
        }
        self._items.append(entry)
        logger.debug("Stored turn for conversation_id=%s entry_id=%s", conversation_id, entry_id)
        return entry_id
Repobility (the analyzer behind this table) · https://repobility.com
MicrosoftMemoryBridge.retrieve_state method · python · L44-L55 (12 LOC)
src/agent_memory/adapters/microsoft_agents.py
    def retrieve_state(self, conversation_id: str) -> dict[str, Any]:
        """Retrieve the aggregated state for a conversation.

        Returns a dict with turn count and the most recent turn text.
        """
        turns = [item for item in self._items if item.get("conversation_id") == conversation_id]
        latest_turn = turns[-1].get("turn", "") if turns else ""
        return {
            "conversation_id": conversation_id,
            "turn_count": len(turns),
            "latest_turn": latest_turn,
        }
OpenAIMemoryBridge.store_messages method · python · L28-L43 (16 LOC)
src/agent_memory/adapters/openai_agents.py
    def store_messages(self, thread_id: str, messages: list[Any]) -> str:
        """Persist a list of messages for a given thread.

        Returns a unique memory entry ID.
        """
        entry_id = str(uuid.uuid4())
        entry: dict[str, Any] = {
            "id": entry_id,
            "type": "thread_messages",
            "thread_id": thread_id,
            "messages": [str(m) for m in messages],
            "count": len(messages),
        }
        self._items.append(entry)
        logger.debug("Stored %d messages for thread=%s id=%s", len(messages), thread_id, entry_id)
        return entry_id
OpenAIMemoryBridge.retrieve_context method · python · L45-L56 (12 LOC)
src/agent_memory/adapters/openai_agents.py
    def retrieve_context(self, thread_id: str, query: str) -> list[dict[str, Any]]:
        """Retrieve message entries for a thread that contain the query string.

        Returns matching entries in insertion order.
        """
        results: list[dict[str, Any]] = []
        for item in self._items:
            if item.get("thread_id") != thread_id:
                continue
            if any(query.lower() in msg.lower() for msg in item.get("messages", [])):
                results.append(item)
        return results
TierStats.to_dict method · python · L106-L116 (11 LOC)
src/agent_memory/budget/memory_budget.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "tier": self.tier.value,
            "entry_count": self.entry_count,
            "token_count": self.token_count,
            "capacity_entries": self.capacity_entries,
            "capacity_tokens": self.capacity_tokens,
            "entries_utilisation": round(self.entries_utilisation, 4),
            "tokens_utilisation": round(self.tokens_utilisation, 4),
        }
MemoryBudget.__init__ method · python · L162-L171 (10 LOC)
src/agent_memory/budget/memory_budget.py
    def __init__(
        self,
        hot_config: Optional[TierConfig] = None,
        warm_config: Optional[TierConfig] = None,
        cold_config: Optional[TierConfig] = None,
    ) -> None:
        self._hot_config = hot_config or _DEFAULT_HOT_CONFIG
        self._warm_config = warm_config or _DEFAULT_WARM_CONFIG
        self._cold_config = cold_config or _DEFAULT_COLD_CONFIG
        self._entries: dict[str, _BudgetEntry] = {}
MemoryBudget.add method · python · L177-L210 (34 LOC)
src/agent_memory/budget/memory_budget.py
    def add(
        self,
        entry_id: str,
        content: str,
        initial_tier: MemoryTier = MemoryTier.COLD,
        importance_score: float = 0.5,
    ) -> None:
        """Add an entry to the budget system.

        Parameters
        ----------
        entry_id:
            Unique identifier for this entry.
        content:
            The text content (used for token estimation).
        initial_tier:
            Starting tier for the entry. Defaults to COLD.
        importance_score:
            Importance score in [0, 1] — used when evicting from over-capacity tiers.

        Raises
        ------
        ValueError
            If an entry with the same ``entry_id`` already exists.
        """
        if entry_id in self._entries:
            raise ValueError(f"Entry {entry_id!r} already exists in budget.")
        self._entries[entry_id] = _BudgetEntry(
            entry_id=entry_id,
            content=content,
            tier=initial_tier,
            importanc
MemoryBudget.remove method · python · L212-L228 (17 LOC)
src/agent_memory/budget/memory_budget.py
    def remove(self, entry_id: str) -> bool:
        """Remove an entry from the budget.

        Parameters
        ----------
        entry_id:
            The entry to remove.

        Returns
        -------
        bool
            True if removed, False if not found.
        """
        if entry_id not in self._entries:
            return False
        del self._entries[entry_id]
        return True
MemoryBudget.record_access method · python · L230-L249 (20 LOC)
src/agent_memory/budget/memory_budget.py
    def record_access(self, entry_id: str) -> Optional[MemoryTier]:
        """Record an access event for an entry, potentially promoting it.

        Parameters
        ----------
        entry_id:
            The entry that was accessed.

        Returns
        -------
        MemoryTier | None
            The new tier if the entry was promoted, otherwise None.
            Returns None if the entry is not found.
        """
        entry = self._entries.get(entry_id)
        if entry is None:
            return None
        entry.access_count += 1
        entry.last_accessed = _utcnow()
        return self._maybe_promote(entry)
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
MemoryBudget.rebalance method · python · L266-L290 (25 LOC)
src/agent_memory/budget/memory_budget.py
    def rebalance(self) -> dict[str, list[str]]:
        """Run a full rebalance pass — demote under-accessed entries.

        Returns
        -------
        dict[str, list[str]]
            Maps ``"promoted"`` and ``"demoted"`` to lists of affected entry IDs.
        """
        promoted: list[str] = []
        demoted: list[str] = []

        for entry in list(self._entries.values()):
            new_tier = self._evaluate_tier(entry)
            if new_tier != entry.tier:
                if _tier_rank(new_tier) > _tier_rank(entry.tier):
                    promoted.append(entry.entry_id)
                else:
                    demoted.append(entry.entry_id)
                entry.tier = new_tier

        # Enforce capacity after reassignment
        for tier in MemoryTier:
            self._enforce_capacity(tier)

        return {"promoted": promoted, "demoted": demoted}
MemoryBudget.stats method · python · L296-L325 (30 LOC)
src/agent_memory/budget/memory_budget.py
    def stats(self, tier: Optional[MemoryTier] = None) -> list[TierStats]:
        """Return tier statistics.

        Parameters
        ----------
        tier:
            If provided, return stats for that tier only. Otherwise returns
            stats for all three tiers.

        Returns
        -------
        list[TierStats]
            One TierStats per requested tier.
        """
        tiers = [tier] if tier else list(MemoryTier)
        result: list[TierStats] = []
        for t in tiers:
            tier_entries = [e for e in self._entries.values() if e.tier == t]
            token_count = sum(e.token_cost for e in tier_entries)
            config = self._config_for(t)
            result.append(
                TierStats(
                    tier=t,
                    entry_count=len(tier_entries),
                    token_count=token_count,
                    capacity_entries=config.max_entries,
                    capacity_tokens=config.max_tokens,
                )
MemoryBudget.budget_summary method · python · L331-L342 (12 LOC)
src/agent_memory/budget/memory_budget.py
    def budget_summary(self) -> dict[str, object]:
        """Return a human-readable summary of budget utilisation."""
        all_stats = self.stats()
        return {
            s.tier.value: {
                "entries": s.entry_count,
                "tokens": s.token_count,
                "entries_pct": round(s.entries_utilisation * 100, 1),
                "tokens_pct": round(s.tokens_utilisation * 100, 1),
            }
            for s in all_stats
        }
MemoryBudget._config_for method · python · L348-L353 (6 LOC)
src/agent_memory/budget/memory_budget.py
    def _config_for(self, tier: MemoryTier) -> TierConfig:
        if tier == MemoryTier.HOT:
            return self._hot_config
        if tier == MemoryTier.WARM:
            return self._warm_config
        return self._cold_config
MemoryBudget._evaluate_tier method · python · L355-L368 (14 LOC)
src/agent_memory/budget/memory_budget.py
    def _evaluate_tier(self, entry: _BudgetEntry) -> MemoryTier:
        """Determine the correct tier for an entry based on access count."""
        access = entry.access_count
        current = entry.tier

        if current == MemoryTier.COLD and access >= self._cold_config.promote_threshold:
            return MemoryTier.WARM
        if current == MemoryTier.WARM and access >= self._warm_config.promote_threshold:
            return MemoryTier.HOT
        if current == MemoryTier.WARM and access < self._warm_config.demote_threshold:
            return MemoryTier.COLD
        if current == MemoryTier.HOT and access < self._hot_config.demote_threshold:
            return MemoryTier.WARM
        return current
MemoryBudget._maybe_promote method · python · L370-L377 (8 LOC)
src/agent_memory/budget/memory_budget.py
    def _maybe_promote(self, entry: _BudgetEntry) -> Optional[MemoryTier]:
        """Check if an entry should be promoted after an access event."""
        new_tier = self._evaluate_tier(entry)
        if new_tier != entry.tier and _tier_rank(new_tier) > _tier_rank(entry.tier):
            entry.tier = new_tier
            self._enforce_capacity(new_tier)
            return new_tier
        return None
MemoryBudget._enforce_capacity method · python · L379-L397 (19 LOC)
src/agent_memory/budget/memory_budget.py
    def _enforce_capacity(self, tier: MemoryTier) -> None:
        """Evict lowest-importance entries from a tier if over capacity."""
        config = self._config_for(tier)
        tier_entries = [e for e in self._entries.values() if e.tier == tier]

        # Sort by importance ascending — evict lowest importance first
        tier_entries.sort(key=lambda e: (e.importance_score, e.access_count))

        # Enforce entry count limit
        while len(tier_entries) > config.max_entries:
            victim = tier_entries.pop(0)
            self._demote_one(victim)

        # Enforce token limit
        token_total = sum(e.token_cost for e in tier_entries)
        while token_total > config.max_tokens and tier_entries:
            victim = tier_entries.pop(0)
            token_total -= victim.token_cost
            self._demote_one(victim)
MemoryBudget._demote_one method · python · L399-L404 (6 LOC)
src/agent_memory/budget/memory_budget.py
    def _demote_one(self, entry: _BudgetEntry) -> None:
        """Move an entry to the next lower tier."""
        if entry.tier == MemoryTier.HOT:
            entry.tier = MemoryTier.WARM
        elif entry.tier == MemoryTier.WARM:
            entry.tier = MemoryTier.COLD
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
_get_memory function · python · L29-L38 (10 LOC)
src/agent_memory/cli/main.py
def _get_memory(db_path: str, backend: str) -> object:
    """Build and return a UnifiedMemory instance based on CLI options."""
    from agent_memory.unified.config import MemoryConfig
    from agent_memory.unified.memory import UnifiedMemory

    config = MemoryConfig(
        storage_backend=backend,  # type: ignore[arg-type]
        sqlite_path=db_path,
    )
    return UnifiedMemory(config=config)
store_command function · python · L141-L178 (38 LOC)
src/agent_memory/cli/main.py
def store_command(
    ctx: click.Context,
    content: str,
    layer: str,
    source: str,
    importance: Optional[float],
    safety_critical: bool,
    metadata_pairs: tuple[str, ...],
) -> None:
    """Store a memory CONTENT string into the specified layer."""
    from agent_memory.memory.types import MemoryEntry, MemoryLayer, MemorySource

    memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])

    metadata: dict[str, str] = {}
    for pair in metadata_pairs:
        if "=" not in pair:
            console.print(f"[red]Invalid metadata pair {pair!r} — expected KEY=VALUE[/red]")
            sys.exit(1)
        key, _, value = pair.partition("=")
        metadata[key.strip()] = value.strip()

    entry = MemoryEntry(
        content=content,
        layer=MemoryLayer(layer),
        source=MemorySource(source),
        safety_critical=safety_critical,
        metadata=metadata,
    )
    if importance is not None:
        entry = entry.model_copy(update={"importance_sco
recall_command function · python · L190-L222 (33 LOC)
src/agent_memory/cli/main.py
def recall_command(ctx: click.Context, memory_id: str, json_output: bool) -> None:
    """Retrieve and display a memory entry by MEMORY_ID."""
    memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
    entry = memory.recall(memory_id)  # type: ignore[attr-defined]

    if entry is None:
        console.print(f"[yellow]No entry found for ID:[/yellow] {memory_id}")
        sys.exit(1)

    if json_output:
        click.echo(entry.model_dump_json(indent=2))
        return

    table = Table(box=box.SIMPLE, show_header=False)
    table.add_column("Field", style="bold cyan")
    table.add_column("Value")

    table.add_row("ID", entry.memory_id)
    table.add_row("Layer", entry.layer.value)
    table.add_row("Source", entry.source.value)
    table.add_row("Importance", f"{entry.importance_score:.4f}")
    table.add_row("Freshness", f"{entry.freshness_score:.4f}")
    table.add_row("Composite", f"{entry.composite_score:.4f}")
    table.add_row("Safety Critical", str(entry.safety_cr
search_command function · python · L241-L285 (45 LOC)
src/agent_memory/cli/main.py
def search_command(
    ctx: click.Context,
    query: str,
    layer: Optional[str],
    limit: int,
    json_output: bool,
) -> None:
    """Search memory entries matching QUERY."""
    from agent_memory.memory.types import MemoryLayer

    memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
    layer_enum = MemoryLayer(layer) if layer else None
    results = memory.search(query=query, layer=layer_enum, limit=limit)  # type: ignore[attr-defined]

    if json_output:
        click.echo(
            json.dumps([json.loads(e.model_dump_json()) for e in results], indent=2)
        )
        return

    if not results:
        console.print(f"[yellow]No results for:[/yellow] {query!r}")
        return

    table = Table(
        "Rank",
        "ID",
        "Layer",
        "Importance",
        "Freshness",
        "Content Preview",
        box=box.SIMPLE_HEAD,
        title=f"Search results for {query!r}",
    )
    for rank, entry in enumerate(results, start=1):
        prev
stats_command function · python · L296-L316 (21 LOC)
src/agent_memory/cli/main.py
def stats_command(ctx: click.Context, json_output: bool) -> None:
    """Show aggregate memory statistics."""
    memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])
    stats = memory.stats()  # type: ignore[attr-defined]

    if json_output:
        click.echo(json.dumps(stats, indent=2))
        return

    table = Table("Metric", "Value", box=box.SIMPLE, show_header=True, title="Memory Stats")
    layer_counts = stats.get("layer_counts", {})
    for layer_name, count in layer_counts.items():  # type: ignore[union-attr]
        table.add_row(f"  {layer_name}", str(count))
    table.add_row("Total (in-process)", str(stats.get("total_in_process", 0)))
    table.add_row("Total (storage)", str(stats.get("total_in_storage", 0)))
    table.add_row("Store calls", str(stats.get("store_calls", 0)))
    table.add_row("Working utilisation", str(stats.get("working_utilisation", 0)))
    table.add_row("Storage backend", str(stats.get("storage_backend", "")))
    table.add_row("Decay fun
consolidate_command function · python · L332-L361 (30 LOC)
src/agent_memory/cli/main.py
def consolidate_command(ctx: click.Context, dry_run: bool) -> None:
    """Run a garbage-collection pass to remove stale, low-importance entries."""
    from agent_memory.importance.gc import MemoryGarbageCollector

    memory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])

    if dry_run:
        from agent_memory.memory.types import MemoryLayer

        console.print("[dim]Dry-run mode — no entries will be deleted.[/dim]")
        total_candidates = 0
        for layer in MemoryLayer:
            layer_store = memory._layer_store(layer)  # type: ignore[attr-defined]
            gc = MemoryGarbageCollector(store=layer_store)
            candidates = gc.candidates()
            if candidates:
                console.print(
                    f"  [yellow]{len(candidates)}[/yellow] candidate(s) in "
                    f"[cyan]{layer.value}[/cyan]"
                )
                total_candidates += len(candidates)
        console.print(f"Total candidates: [yellow]{total_candid
scan_contradictions_command function · python · L389-L471 (83 LOC)
src/agent_memory/cli/main.py
def scan_contradictions_command(
    ctx: click.Context,
    scope: str,
    max_contradictions: int,
    json_output: bool,
) -> None:
    """Scan memory for contradicting entries and report findings."""
    from agent_memory.contradiction.scanner import ContradictionScanner, ScanScope
    from agent_memory.memory.types import MemoryLayer
    from agent_memory.unified.memory import UnifiedMemory

    memory: UnifiedMemory = _get_memory(ctx.obj["db_path"], ctx.obj["backend"])  # type: ignore[assignment]
    scanner = ContradictionScanner(max_contradictions=max_contradictions)
    scan_scope = ScanScope(scope)

    # Load entries from durable storage, filtered by scope
    if scan_scope == ScanScope.ALL:
        entries = memory.storage.load_all(limit=5000)
    elif scan_scope == ScanScope.HIGH_IMPORTANCE:
        all_entries = memory.storage.load_all(limit=5000)
        entries = [e for e in all_entries if e.importance_score >= 0.6]
    else:
        # Map scope to layer
        layer_
_build_term_vector function · python · L40-L48 (9 LOC)
src/agent_memory/consolidation/consolidator.py
def _build_term_vector(tokens: list[str]) -> dict[str, float]:
    """Build a normalised term-frequency vector from token list."""
    if not tokens:
        return {}
    counts: dict[str, int] = defaultdict(int)
    for token in tokens:
        counts[token] += 1
    length = len(tokens)
    return {term: count / length for term, count in counts.items()}
Repobility · code-quality intelligence platform · https://repobility.com
_cosine_similarity function · python · L51-L60 (10 LOC)
src/agent_memory/consolidation/consolidator.py
def _cosine_similarity(vec_a: dict[str, float], vec_b: dict[str, float]) -> float:
    """Compute cosine similarity between two term-frequency vectors."""
    if not vec_a or not vec_b:
        return 0.0
    dot_product = sum(vec_a.get(term, 0.0) * vec_b.get(term, 0.0) for term in vec_a)
    magnitude_a = math.sqrt(sum(v * v for v in vec_a.values()))
    magnitude_b = math.sqrt(sum(v * v for v in vec_b.values()))
    if magnitude_a == 0.0 or magnitude_b == 0.0:
        return 0.0
    return dot_product / (magnitude_a * magnitude_b)
EpisodicConsolidator.__init__ method · python · L142-L158 (17 LOC)
src/agent_memory/consolidation/consolidator.py
    def __init__(
        self,
        similarity_threshold: float = 0.35,
        min_cluster_size: int = 2,
        max_centroid_terms: int = 10,
    ) -> None:
        if not 0.0 <= similarity_threshold <= 1.0:
            raise ValueError(
                f"similarity_threshold must be in [0, 1], got {similarity_threshold}"
            )
        if min_cluster_size < 1:
            raise ValueError(
                f"min_cluster_size must be >= 1, got {min_cluster_size}"
            )
        self._similarity_threshold = similarity_threshold
        self._min_cluster_size = min_cluster_size
        self._max_centroid_terms = max_centroid_terms
EpisodicConsolidator.consolidate method · python · L164-L217 (54 LOC)
src/agent_memory/consolidation/consolidator.py
    def consolidate(self, episodes: Sequence[MemoryEntry]) -> ConsolidationResult:
        """Run consolidation over a collection of episodic entries.

        Parameters
        ----------
        episodes:
            Episodic MemoryEntry objects to cluster. Non-episodic entries are
            silently skipped.

        Returns
        -------
        ConsolidationResult
            Clusters found and new semantic entries derived from them.
        """
        episodic_only = [e for e in episodes if e.layer == MemoryLayer.EPISODIC]
        result = ConsolidationResult(episodes_processed=len(episodic_only))

        if not episodic_only:
            return result

        # Build TF vectors for each episode
        vectors: dict[str, dict[str, float]] = {
            entry.memory_id: _build_term_vector(_tokenise(entry.content))
            for entry in episodic_only
        }

        # Greedy single-pass clustering
        raw_clusters = self._greedy_cluster(episodic_only, vectors)
page 1 / 6next ›