← back to invincible-jha__agent-memory

Function bodies 288 total

All specs Real LLM only Function bodies
EpisodicConsolidator.similarity method · python · L219-L236 (18 LOC)
src/agent_memory/consolidation/consolidator.py
    def similarity(self, entry_a: MemoryEntry, entry_b: MemoryEntry) -> float:
        """Return the cosine similarity between two memory entries.

        Parameters
        ----------
        entry_a:
            First memory entry.
        entry_b:
            Second memory entry.

        Returns
        -------
        float
            Cosine similarity in [0, 1].
        """
        vec_a = _build_term_vector(_tokenise(entry_a.content))
        vec_b = _build_term_vector(_tokenise(entry_b.content))
        return _cosine_similarity(vec_a, vec_b)
EpisodicConsolidator._greedy_cluster method · python · L242-L275 (34 LOC)
src/agent_memory/consolidation/consolidator.py
    def _greedy_cluster(
        self,
        entries: list[MemoryEntry],
        vectors: dict[str, dict[str, float]],
    ) -> list[set[str]]:
        """Greedy single-pass clustering algorithm.

        Each entry is assigned to the first cluster whose centroid has
        cosine similarity >= threshold, or starts a new cluster otherwise.
        Cluster centroids are recomputed as members join.
        """
        clusters: list[set[str]] = []
        cluster_centroids: list[dict[str, float]] = []

        for entry in entries:
            entry_vec = vectors[entry.memory_id]
            assigned = False

            for cluster_index, centroid in enumerate(cluster_centroids):
                similarity = _cosine_similarity(entry_vec, centroid)
                if similarity >= self._similarity_threshold:
                    clusters[cluster_index].add(entry.memory_id)
                    # Update centroid: average with incoming vector
                    cluster_centroids[cluster_
EpisodicConsolidator._average_vectors method · python · L277-L293 (17 LOC)
src/agent_memory/consolidation/consolidator.py
    def _average_vectors(
        self,
        current_centroid: dict[str, float],
        new_vector: dict[str, float],
        current_size: int,
    ) -> dict[str, float]:
        """Return an updated centroid by incorporating a new member vector."""
        all_terms = set(current_centroid) | set(new_vector)
        weight_old = (current_size - 1) / current_size
        weight_new = 1.0 / current_size
        return {
            term: (
                current_centroid.get(term, 0.0) * weight_old
                + new_vector.get(term, 0.0) * weight_new
            )
            for term in all_terms
        }
EpisodicConsolidator._compute_centroid_terms method · python · L295-L302 (8 LOC)
src/agent_memory/consolidation/consolidator.py
    def _compute_centroid_terms(self, members: list[MemoryEntry]) -> list[str]:
        """Return the top N terms by aggregate TF across all cluster members."""
        term_totals: dict[str, float] = defaultdict(float)
        for member in members:
            for term, freq in _build_term_vector(_tokenise(member.content)).items():
                term_totals[term] += freq
        ranked = sorted(term_totals.items(), key=lambda x: x[1], reverse=True)
        return [term for term, _ in ranked[: self._max_centroid_terms]]
EpisodicConsolidator._make_semantic_entry method · python · L304-L320 (17 LOC)
src/agent_memory/consolidation/consolidator.py
    def _make_semantic_entry(self, cluster: EpisodeCluster) -> MemoryEntry:
        """Create a semantic MemoryEntry from a cluster's centroid terms."""
        terms_text = ", ".join(cluster.centroid_terms)
        content = (
            f"Consolidated pattern from {len(cluster.episode_ids)} episodes: {terms_text}."
        )
        return MemoryEntry(
            content=content,
            layer=MemoryLayer.SEMANTIC,
            importance_score=min(1.0, cluster.average_importance * 1.1),
            source=MemorySource.AGENT_INFERENCE,
            metadata={
                "cluster_id": str(cluster.cluster_id),
                "episode_count": str(len(cluster.episode_ids)),
                "source_episodes": ",".join(cluster.episode_ids[:5]),
            },
        )
ExtractedPattern.top_entities method · python · L81-L94 (14 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def top_entities(self, limit: int = 5) -> list[tuple[str, int]]:
        """Return top entities by frequency.

        Parameters
        ----------
        limit:
            Maximum number of results.

        Returns
        -------
        list[tuple[str, int]]
            Sorted (entity, count) pairs, highest first.
        """
        return sorted(self.entities.items(), key=lambda x: x[1], reverse=True)[:limit]
ExtractedPattern.top_actions method · python · L96-L109 (14 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def top_actions(self, limit: int = 5) -> list[tuple[str, int]]:
        """Return top actions by frequency.

        Parameters
        ----------
        limit:
            Maximum number of results.

        Returns
        -------
        list[tuple[str, int]]
            Sorted (action, count) pairs, highest first.
        """
        return sorted(self.actions.items(), key=lambda x: x[1], reverse=True)[:limit]
Repobility (the analyzer behind this table) · https://repobility.com
ExtractedPattern.to_dict method · python · L111-L118 (8 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dictionary."""
        return {
            "entities": dict(self.top_entities(10)),
            "actions": dict(self.top_actions(10)),
            "total_episodes": self.total_episodes,
            "coverage_ratio": self.coverage_ratio,
        }
PatternExtractor.__init__ method · python · L142-L152 (11 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def __init__(
        self,
        min_frequency: int = 2,
        max_entities: int = 20,
        max_actions: int = 10,
    ) -> None:
        if min_frequency < 1:
            raise ValueError(f"min_frequency must be >= 1, got {min_frequency}")
        self._min_frequency = min_frequency
        self._max_entities = max_entities
        self._max_actions = max_actions
PatternExtractor.extract method · python · L154-L219 (66 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def extract(self, texts: list[str]) -> ExtractedPattern:
        """Extract recurring entity and action patterns from text snippets.

        Parameters
        ----------
        texts:
            List of text strings to analyse (e.g. episode content fields).

        Returns
        -------
        ExtractedPattern
            Entities and actions found above the minimum frequency threshold.
        """
        if not texts:
            return ExtractedPattern(
                entities={},
                actions={},
                total_episodes=0,
                coverage_ratio=0.0,
            )

        entity_counter: Counter[str] = Counter()
        action_counter: Counter[str] = Counter()
        covered_count = 0

        for text in texts:
            tokens = _tokenise(text)
            token_set = set(tokens)

            episode_entities = {t for t in token_set if _is_likely_entity(t)}
            episode_actions = {t for t in token_set if _is_likely_action(t)}

   
PatternExtractor.extract_from_cluster method · python · L221-L241 (21 LOC)
src/agent_memory/consolidation/pattern_extractor.py
    def extract_from_cluster(
        self,
        episodes: list[object],
        content_attr: str = "content",
    ) -> ExtractedPattern:
        """Extract patterns from objects with a content field.

        Parameters
        ----------
        episodes:
            Objects that have a ``content`` string attribute (e.g. MemoryEntry).
        content_attr:
            Name of the attribute to read text from. Defaults to ``"content"``.

        Returns
        -------
        ExtractedPattern
            Entities and actions extracted from the episode contents.
        """
        texts = [getattr(ep, content_attr, "") for ep in episodes]
        return self.extract([str(t) for t in texts if t])
ContradictionDetector.__init__ method · python · L57-L62 (6 LOC)
src/agent_memory/contradiction/detector.py
    def __init__(
        self,
        similarity_threshold: float = _TOPIC_SIMILARITY_THRESHOLD,
    ) -> None:
        self._threshold = similarity_threshold
        self._matcher = EntityAttributeMatcher()
ContradictionDetector.detect method · python · L64-L75 (12 LOC)
src/agent_memory/contradiction/detector.py
    def detect(self, entries: Sequence[MemoryEntry]) -> ContradictionReport:
        """Scan all entries and return a ContradictionReport."""
        entry_list = list(entries)
        report = ContradictionReport(total_entries_scanned=len(entry_list))

        for i in range(len(entry_list)):
            for j in range(i + 1, len(entry_list)):
                pair = self._compare(entry_list[i], entry_list[j])
                if pair is not None:
                    report.contradiction_pairs.append(pair)

        return report
ContradictionDetector.detect_for_entry method · python · L77-L90 (14 LOC)
src/agent_memory/contradiction/detector.py
    def detect_for_entry(
        self,
        entry: MemoryEntry,
        candidates: Sequence[MemoryEntry],
    ) -> list[ContradictionPair]:
        """Find contradictions between one entry and a set of candidates."""
        result: list[ContradictionPair] = []
        for candidate in candidates:
            if candidate.memory_id == entry.memory_id:
                continue
            pair = self._compare(entry, candidate)
            if pair is not None:
                result.append(pair)
        return result
ContradictionDetector._compare method · python · L96-L135 (40 LOC)
src/agent_memory/contradiction/detector.py
    def _compare(
        self, entry_a: MemoryEntry, entry_b: MemoryEntry
    ) -> ContradictionPair | None:
        similarity = self._matcher.similarity(entry_a.content, entry_b.content)
        if similarity < self._threshold:
            return None

        triples_a = self._matcher.extract_triples(entry_a.content)
        triples_b = self._matcher.extract_triples(entry_b.content)
        conflicts = self._matcher.conflicts(triples_a, triples_b)

        if conflicts:
            conflict_descs = [
                f"entity={c[0].entity!r} attr={c[0].attribute!r}: "
                f"{c[0].value!r} vs {c[1].value!r}"
                for c in conflicts
            ]
            return ContradictionPair(
                entry_a_id=entry_a.memory_id,
                entry_b_id=entry_b.memory_id,
                entry_a_content=entry_a.content,
                entry_b_content=entry_b.content,
                conflict_description="; ".join(conflict_descs),
                similarity_sc
Repobility — same analyzer, your code, free for public repos · /scan/
ContradictionDetector._has_negation_conflict method · python · L137-L144 (8 LOC)
src/agent_memory/contradiction/detector.py
    def _has_negation_conflict(self, text_a: str, text_b: str) -> bool:
        """Return True if exactly one text contains a negation marker."""
        negation_re = re.compile(
            r"\b(not|never|no|cannot|can't|won't|isn't|aren't)\b", re.IGNORECASE
        )
        a_has = bool(negation_re.search(text_a))
        b_has = bool(negation_re.search(text_b))
        return a_has != b_has
_tf function · python · L179-L185 (7 LOC)
src/agent_memory/contradiction/detector.py
def _tf(tokens: list[str]) -> dict[str, float]:
    """Compute raw term frequency (count / document length)."""
    if not tokens:
        return {}
    counter = Counter(tokens)
    length = len(tokens)
    return {term: count / length for term, count in counter.items()}
_build_tfidf_vector function · python · L188-L204 (17 LOC)
src/agent_memory/contradiction/detector.py
def _build_tfidf_vector(
    tokens: list[str],
    df_map: dict[str, int],
    num_docs: int,
) -> dict[str, float]:
    """Compute TF-IDF weighted vector for a tokenised document.

    Uses the standard smoothed IDF: log((N + 1) / (df + 1)) + 1
    where N is the total number of documents in the corpus.
    """
    tf_map = _tf(tokens)
    vector: dict[str, float] = {}
    for term, tf_val in tf_map.items():
        df = df_map.get(term, 0)
        idf = math.log((num_docs + 1) / (df + 1)) + 1.0
        vector[term] = tf_val * idf
    return vector
_cosine_similarity function · python · L207-L220 (14 LOC)
src/agent_memory/contradiction/detector.py
def _cosine_similarity(vec_a: dict[str, float], vec_b: dict[str, float]) -> float:
    """Compute cosine similarity between two sparse TF-IDF vectors."""
    if not vec_a or not vec_b:
        return 0.0

    shared_terms = vec_a.keys() & vec_b.keys()
    dot = sum(vec_a[t] * vec_b[t] for t in shared_terms)
    norm_a = math.sqrt(sum(v * v for v in vec_a.values()))
    norm_b = math.sqrt(sum(v * v for v in vec_b.values()))

    if norm_a == 0.0 or norm_b == 0.0:
        return 0.0

    return dot / (norm_a * norm_b)
_pairwise_cosine function · python · L223-L247 (25 LOC)
src/agent_memory/contradiction/detector.py
def _pairwise_cosine(text_a: str, text_b: str) -> float:
    """Compute TF-IDF cosine similarity between two text strings.

    The mini two-document corpus is built on the fly so IDF is document-relative.
    For a larger corpus callers should use ``TFIDFContradictionDetector`` directly
    which batches the IDF computation.
    """
    tokens_a = _tokenise(text_a)
    tokens_b = _tokenise(text_b)

    if not tokens_a and not tokens_b:
        return 1.0
    if not tokens_a or not tokens_b:
        return 0.0

    # Build document-frequency map over the two-doc corpus
    df_map: dict[str, int] = {}
    for token in set(tokens_a):
        df_map[token] = df_map.get(token, 0) + 1
    for token in set(tokens_b):
        df_map[token] = df_map.get(token, 0) + 1

    vec_a = _build_tfidf_vector(tokens_a, df_map, num_docs=2)
    vec_b = _build_tfidf_vector(tokens_b, df_map, num_docs=2)
    return _cosine_similarity(vec_a, vec_b)
_temporal_distance_seconds function · python · L256-L279 (24 LOC)
src/agent_memory/contradiction/detector.py
def _temporal_distance_seconds(
    entry_a: LightweightMemoryEntry,
    entry_b: LightweightMemoryEntry,
) -> float:
    """Return absolute temporal distance in seconds between two entries."""
    from datetime import timezone

    def to_utc(dt_val: object) -> object:
        from datetime import datetime

        if not isinstance(dt_val, datetime):
            return dt_val
        if dt_val.tzinfo is None:
            return dt_val.replace(tzinfo=timezone.utc)
        return dt_val.astimezone(timezone.utc)

    ts_a = to_utc(entry_a.timestamp)
    ts_b = to_utc(entry_b.timestamp)

    from datetime import datetime

    if isinstance(ts_a, datetime) and isinstance(ts_b, datetime):
        return abs((ts_a - ts_b).total_seconds())
    return 0.0
_classify_contradiction_type function · python · L282-L334 (53 LOC)
src/agent_memory/contradiction/detector.py
def _classify_contradiction_type(
    entry_a: LightweightMemoryEntry,
    entry_b: LightweightMemoryEntry,
    similarity: float,
    temporal_seconds: float,
    negation_conflict: bool,
) -> tuple[ContradictionType, float, str]:
    """Classify the contradiction type and return (type, confidence, explanation).

    Classification priority:
    1. DIRECT_NEGATION   — exactly one entry contains negation words and
                           similarity is high enough to indicate they discuss
                           the same claim.
    2. TEMPORAL_INCONSISTENCY — entries are very similar but far apart in time.
    3. FACTUAL_CONFLICT  — high similarity with no negation; treated as a
                           factual claim conflict.
    4. PREFERENCE_CHANGE — moderate similarity with temporal gap — preference
                           or stance has shifted.
    """
    if negation_conflict and similarity >= 0.4:
        confidence = min(1.0, 0.5 + similarity * 0.5)
        explanatio
TFIDFContradictionDetector.__init__ method · python · L377-L391 (15 LOC)
src/agent_memory/contradiction/detector.py
    def __init__(
        self,
        similarity_threshold: float = 0.85,
        temporal_weight: float = 0.3,
    ) -> None:
        if not 0.0 <= similarity_threshold <= 1.0:
            raise ValueError(
                f"similarity_threshold must be in [0, 1], got {similarity_threshold!r}"
            )
        if not 0.0 <= temporal_weight <= 1.0:
            raise ValueError(
                f"temporal_weight must be in [0, 1], got {temporal_weight!r}"
            )
        self._similarity_threshold = similarity_threshold
        self._temporal_weight = temporal_weight
Repobility · severity-and-effort ranking · https://repobility.com
TFIDFContradictionDetector.detect method · python · L405-L427 (23 LOC)
src/agent_memory/contradiction/detector.py
    def detect(
        self,
        memories: list[LightweightMemoryEntry],
    ) -> list[Contradiction]:
        """Scan all entries pairwise and return detected contradictions.

        Parameters
        ----------
        memories:
            The list of memory entries to scan.  Pairwise O(n^2) comparison.

        Returns
        -------
        list[Contradiction]
            All detected contradictions.  Empty list if none found.
        """
        result: list[Contradiction] = []
        for i in range(len(memories)):
            for j in range(i + 1, len(memories)):
                contradiction = self.check_pair(memories[i], memories[j])
                if contradiction is not None:
                    result.append(contradiction)
        return result
TFIDFContradictionDetector.check_pair method · python · L429-L484 (56 LOC)
src/agent_memory/contradiction/detector.py
    def check_pair(
        self,
        a: LightweightMemoryEntry,
        b: LightweightMemoryEntry,
    ) -> Contradiction | None:
        """Check a single pair of entries for contradictions.

        Returns ``None`` if no contradiction is detected; otherwise returns a
        ``Contradiction`` describing the conflict.

        Parameters
        ----------
        a:
            The first memory entry.
        b:
            The second memory entry.

        Returns
        -------
        Contradiction | None
        """
        # Fast path: use pre-computed embeddings when available for both entries
        if a.embedding is not None and b.embedding is not None:
            similarity = _cosine_similarity_dense(a.embedding, b.embedding)
        else:
            similarity = _pairwise_cosine(a.content, b.content)

        if similarity < self._similarity_threshold:
            return None

        negation_conflict = _has_negation(a.content) != _has_negation(b.content)
       
_cosine_similarity_dense function · python · L492-L508 (17 LOC)
src/agent_memory/contradiction/detector.py
def _cosine_similarity_dense(vec_a: list[float], vec_b: list[float]) -> float:
    """Compute cosine similarity between two dense float vectors."""
    if len(vec_a) != len(vec_b):
        raise ValueError(
            f"Embedding dimension mismatch: {len(vec_a)} vs {len(vec_b)}"
        )
    if not vec_a:
        return 0.0

    dot = sum(a * b for a, b in zip(vec_a, vec_b))
    norm_a = math.sqrt(sum(v * v for v in vec_a))
    norm_b = math.sqrt(sum(v * v for v in vec_b))

    if norm_a == 0.0 or norm_b == 0.0:
        return 0.0

    return dot / (norm_a * norm_b)
EntityAttributeMatcher.extract_triples method · python · L38-L54 (17 LOC)
src/agent_memory/contradiction/matcher.py
    def extract_triples(self, content: str) -> list[Triple]:
        """Extract structured triples from free-form content."""
        results: list[Triple] = []
        sentences = re.split(r"[.!?;]", content)
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
            for pattern, attribute, _ in _PATTERNS:
                match = pattern.match(sentence)
                if match:
                    entity = _normalise(match.group(1))
                    value = _normalise(match.group(2))
                    if entity and value:
                        results.append(Triple(entity=entity, attribute=attribute, value=value))
                        break
        return results
EntityAttributeMatcher.conflicts method · python · L56-L71 (16 LOC)
src/agent_memory/contradiction/matcher.py
    def conflicts(
        self,
        triples_a: Sequence[Triple],
        triples_b: Sequence[Triple],
    ) -> list[tuple[Triple, Triple]]:
        """Return pairs of triples that conflict (same entity+attribute, different value)."""
        pairs: list[tuple[Triple, Triple]] = []
        for ta in triples_a:
            for tb in triples_b:
                if (
                    ta.entity == tb.entity
                    and ta.attribute == tb.attribute
                    and ta.value != tb.value
                ):
                    pairs.append((ta, tb))
        return pairs
EntityAttributeMatcher.similarity method · python · L73-L81 (9 LOC)
src/agent_memory/contradiction/matcher.py
    def similarity(self, text_a: str, text_b: str) -> float:
        """Simple token-level Jaccard similarity between two texts."""
        tokens_a = set(re.findall(r"[a-z0-9]+", text_a.lower()))
        tokens_b = set(re.findall(r"[a-z0-9]+", text_b.lower()))
        if not tokens_a and not tokens_b:
            return 1.0
        union = tokens_a | tokens_b
        intersection = tokens_a & tokens_b
        return len(intersection) / len(union)
ContradictionReport.mark_resolved method · python · L48-L63 (16 LOC)
src/agent_memory/contradiction/report.py
    def mark_resolved(
        self,
        entry_a_id: str,
        entry_b_id: str,
        strategy: str,
        winner_id: str,
    ) -> None:
        """Mark a contradiction pair as resolved."""
        for pair in self.contradiction_pairs:
            if (pair.entry_a_id == entry_a_id and pair.entry_b_id == entry_b_id) or (
                pair.entry_a_id == entry_b_id and pair.entry_b_id == entry_a_id
            ):
                pair.resolved = True
                pair.resolution_strategy = strategy
                pair.winner_id = winner_id
                return
ContradictionReport.summary method · python · L65-L72 (8 LOC)
src/agent_memory/contradiction/report.py
    def summary(self) -> dict[str, object]:
        return {
            "generated_at": self.generated_at.isoformat(),
            "total_scanned": self.total_entries_scanned,
            "total_contradictions": self.contradiction_count,
            "unresolved": self.unresolved_count,
            "resolved": self.contradiction_count - self.unresolved_count,
        }
Source: Repobility analyzer · https://repobility.com
ResolutionStrategyBase.resolve method · python · L99-L114 (16 LOC)
src/agent_memory/contradiction/resolution.py
    def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
        """Resolve the contradiction between *existing* and *new*.

        Parameters
        ----------
        existing:
            The memory entry currently stored.
        new:
            The incoming memory entry that contradicts *existing*.

        Returns
        -------
        ResolutionResult
            The resolution outcome.
        """
        ...
ContradictionResolver.__init__ method · python · L153-L174 (22 LOC)
src/agent_memory/contradiction/resolution.py
    def __init__(self, default_strategy: str = "latest_wins") -> None:
        # Lazy import to avoid circular dependencies at module load time
        from agent_memory.contradiction.resolution_strategies.latest_wins import LatestWinsStrategy
        from agent_memory.contradiction.resolution_strategies.merge import MergeResolutionStrategy
        from agent_memory.contradiction.resolution_strategies.user_confirmation import UserConfirmationStrategy

        self._registry: dict[str, ResolutionStrategyBase] = {}

        # Register built-in strategies
        for strategy in (
            LatestWinsStrategy(),
            UserConfirmationStrategy(),
            MergeResolutionStrategy(),
        ):
            self._registry[strategy.name] = strategy

        if default_strategy not in self._registry:
            raise ValueError(
                f"Unknown default strategy {default_strategy!r}. "
                f"Valid options: {sorted(self._registry.keys())!r}"
            )
       
ContradictionResolver.register_strategy method · python · L186-L215 (30 LOC)
src/agent_memory/contradiction/resolution.py
    def register_strategy(
        self,
        name: str,
        strategy: ResolutionStrategyBase,
    ) -> None:
        """Register a custom resolution strategy.

        Parameters
        ----------
        name:
            The strategy name used to look it up at resolution time.
            Overrides any existing strategy with the same name.
        strategy:
            An instance of a :class:`ResolutionStrategyBase` subclass.

        Example
        -------
        >>> class MyStrategy(ResolutionStrategyBase):
        ...     @property
        ...     def name(self): return "my_strategy"
        ...     def resolve(self, existing, new): ...
        >>> resolver = ContradictionResolver()
        >>> resolver.register_strategy("my_strategy", MyStrategy())
        """
        if not isinstance(strategy, ResolutionStrategyBase):
            raise TypeError(
                f"strategy must be a ResolutionStrategyBase instance, "
                f"got {type(strategy).__name__}"
ContradictionResolver.resolve method · python · L217-L254 (38 LOC)
src/agent_memory/contradiction/resolution.py
    def resolve(
        self,
        existing: MemoryEntry,
        new: MemoryEntry,
        strategy: str | None = None,
    ) -> ResolutionResult:
        """Resolve the contradiction between *existing* and *new*.

        Parameters
        ----------
        existing:
            The memory entry currently stored.
        new:
            The incoming memory entry that contradicts *existing*.
        strategy:
            Name of the strategy to use.  Falls back to
            :attr:`default_strategy` when ``None``.

        Returns
        -------
        ResolutionResult
            The resolution outcome from the selected strategy.

        Raises
        ------
        ValueError
            If the requested strategy name is not registered.
        """
        active_name = strategy if strategy is not None else self._default_strategy

        strategy_obj = self._registry.get(active_name)
        if strategy_obj is None:
            raise ValueError(
                f"Unknow
ContradictionResolver.resolve_batch method · python · L256-L278 (23 LOC)
src/agent_memory/contradiction/resolution.py
    def resolve_batch(
        self,
        pairs: list[tuple[MemoryEntry, MemoryEntry]],
        strategy: str | None = None,
    ) -> list[ResolutionResult]:
        """Resolve multiple contradiction pairs using the same strategy.

        Parameters
        ----------
        pairs:
            List of ``(existing, new)`` memory entry pairs.
        strategy:
            Strategy name override.  Falls back to default when ``None``.

        Returns
        -------
        list[ResolutionResult]
            One result per input pair, in the same order.
        """
        return [
            self.resolve(existing, new, strategy=strategy)
            for existing, new in pairs
        ]
LatestWinsStrategy.resolve method · python · L54-L99 (46 LOC)
src/agent_memory/contradiction/resolution_strategies/latest_wins.py
    def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
        """Resolve by keeping the newer of the two entries.

        Parameters
        ----------
        existing:
            The entry already in memory.
        new:
            The incoming entry that contradicts *existing*.

        Returns
        -------
        ResolutionResult
            ``action="replace"`` if *new* is more recent; otherwise
            ``action="replace"`` is returned with *existing* as content
            (stable tie-break: keep existing when timestamps are equal).
        """
        ts_existing = _to_utc(existing.timestamp)
        ts_new = _to_utc(new.timestamp)

        if ts_new > ts_existing:
            # Incoming entry is newer — replace existing with new
            return ResolutionResult(
                strategy_used=self.name,
                action="replace",
                resolved_content=new.content,
                requires_user_input=False,
           
_to_utc function · python · L107-L115 (9 LOC)
src/agent_memory/contradiction/resolution_strategies/latest_wins.py
def _to_utc(dt: object) -> object:
    """Normalise a datetime to UTC-aware.  Returns input unchanged if not datetime."""
    from datetime import datetime

    if not isinstance(dt, datetime):
        return dt
    if dt.tzinfo is None:
        return dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc)
MergeResolutionStrategy.resolve method · python · L74-L109 (36 LOC)
src/agent_memory/contradiction/resolution_strategies/merge.py
    def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
        """Merge the content of *existing* and *new* into a single entry.

        Parameters
        ----------
        existing:
            The entry already in memory.
        new:
            The incoming entry that contradicts *existing*.

        Returns
        -------
        ResolutionResult
            Result with ``action="merge"`` and the merged content in
            ``resolved_content``.  ``requires_user_input=False``.
        """
        merged_content = _merge_contents(
            existing.content,
            new.content,
            separator=self._separator,
        )

        unique_sentences_count = len(_split_sentences(merged_content))
        explanation = (
            f"Merged {len(_split_sentences(existing.content))} sentence(s) from existing "
            f"and {len(_split_sentences(new.content))} sentence(s) from new entry into "
            f"{unique_sentences_count} uniqu
Repobility (the analyzer behind this table) · https://repobility.com
_split_sentences function · python · L117-L132 (16 LOC)
src/agent_memory/contradiction/resolution_strategies/merge.py
def _split_sentences(text: str) -> list[str]:
    """Split *text* into sentences by common punctuation boundaries.

    Parameters
    ----------
    text:
        Input text to split.

    Returns
    -------
    list[str]
        List of non-empty, stripped sentence strings.
    """
    # Split on sentence-ending punctuation followed by whitespace or end-of-string
    parts = re.split(r"(?<=[.!?])\s+", text.strip())
    return [p.strip() for p in parts if p.strip()]
_merge_contents function · python · L140-L180 (41 LOC)
src/agent_memory/contradiction/resolution_strategies/merge.py
def _merge_contents(existing: str, new: str, separator: str = " ") -> str:
    """Merge two content strings by combining unique sentences.

    Parameters
    ----------
    existing:
        Content of the existing memory entry.
    new:
        Content of the new memory entry.
    separator:
        String used between sentences in the output.

    Returns
    -------
    str
        Merged content string.
    """
    existing_sentences = _split_sentences(existing)
    new_sentences = _split_sentences(new)

    # Build ordered union, deduplicating by normalised form
    seen_normalised: set[str] = set()
    merged: list[str] = []

    for sentence in existing_sentences:
        key = _normalise_sentence(sentence)
        if key not in seen_normalised:
            seen_normalised.add(key)
            merged.append(sentence)

    for sentence in new_sentences:
        key = _normalise_sentence(sentence)
        if key not in seen_normalised:
            seen_normalised.add(key)
       
UserConfirmationStrategy.__init__ method · python · L65-L70 (6 LOC)
src/agent_memory/contradiction/resolution_strategies/user_confirmation.py
    def __init__(self, prompt_template: str | None = None) -> None:
        self._prompt_template = (
            prompt_template
            if prompt_template is not None
            else self._DEFAULT_PROMPT_TEMPLATE
        )
UserConfirmationStrategy.resolve method · python · L76-L102 (27 LOC)
src/agent_memory/contradiction/resolution_strategies/user_confirmation.py
    def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
        """Defer the resolution decision to the user.

        Parameters
        ----------
        existing:
            The entry already in memory.
        new:
            The incoming entry that contradicts *existing*.

        Returns
        -------
        ResolutionResult
            Result with ``action="defer"``, ``resolved_content=None``,
            and ``requires_user_input=True``.
        """
        explanation = self._prompt_template.format(
            existing=_truncate(existing.content, 200),
            new=_truncate(new.content, 200),
        )
        return ResolutionResult(
            strategy_used=self.name,
            action="defer",
            resolved_content=None,
            requires_user_input=True,
            explanation=explanation,
        )
ContradictionResolver.resolve method · python · L92-L147 (56 LOC)
src/agent_memory/contradiction/resolver.py
    def resolve(
        self,
        contradiction: ContradictionPair,
        entry_a: MemoryEntry,
        entry_b: MemoryEntry,
        strategy: Optional[ResolutionStrategy] = None,
        manual_winner_id: Optional[str] = None,
    ) -> ResolutionResult:
        """Resolve a contradiction between two memory entries.

        Parameters
        ----------
        contradiction:
            The detected contradiction pair to resolve.
        entry_a:
            The first memory entry in the pair.
        entry_b:
            The second memory entry in the pair.
        strategy:
            Override strategy; falls back to ``default_strategy`` if None.
        manual_winner_id:
            Required when ``strategy`` is ``MANUAL``; must be the ID of
            either entry_a or entry_b.

        Returns
        -------
        ResolutionResult
            Structured resolution outcome with winner, loser, and rationale.

        Raises
        ------
        ValueError
          
ContradictionResolver.resolve_batch method · python · L149-L171 (23 LOC)
src/agent_memory/contradiction/resolver.py
    def resolve_batch(
        self,
        pairs: list[tuple[ContradictionPair, MemoryEntry, MemoryEntry]],
        strategy: Optional[ResolutionStrategy] = None,
    ) -> list[ResolutionResult]:
        """Resolve multiple contradiction pairs using the same strategy.

        Parameters
        ----------
        pairs:
            List of ``(ContradictionPair, entry_a, entry_b)`` tuples.
        strategy:
            Strategy to apply to all pairs; falls back to default if None.

        Returns
        -------
        list[ResolutionResult]
            One result per input pair, in the same order.
        """
        return [
            self.resolve(pair, entry_a, entry_b, strategy=strategy)
            for pair, entry_a, entry_b in pairs
        ]
ContradictionResolver._resolve_recency method · python · L177-L206 (30 LOC)
src/agent_memory/contradiction/resolver.py
    def _resolve_recency(
        self,
        contradiction: ContradictionPair,
        entry_a: MemoryEntry,
        entry_b: MemoryEntry,
    ) -> ResolutionResult:
        """The more recently created entry wins."""
        created_a = _to_utc(entry_a.created_at)
        created_b = _to_utc(entry_b.created_at)

        if created_a >= created_b:
            winner, loser = entry_a, entry_b
            rationale = (
                f"Entry {entry_a.memory_id!r} is newer "
                f"({entry_a.created_at.isoformat()} >= {entry_b.created_at.isoformat()})."
            )
        else:
            winner, loser = entry_b, entry_a
            rationale = (
                f"Entry {entry_b.memory_id!r} is newer "
                f"({entry_b.created_at.isoformat()} > {entry_a.created_at.isoformat()})."
            )

        return ResolutionResult(
            contradiction_pair=contradiction,
            strategy_used=ResolutionStrategy.RECENCY_WINS,
            winner_id=winner.
ContradictionResolver._resolve_source_priority method · python · L208-L237 (30 LOC)
src/agent_memory/contradiction/resolver.py
    def _resolve_source_priority(
        self,
        contradiction: ContradictionPair,
        entry_a: MemoryEntry,
        entry_b: MemoryEntry,
    ) -> ResolutionResult:
        """The entry from the more authoritative source wins."""
        priority_a = _SOURCE_PRIORITY.get(entry_a.source.value, 0)
        priority_b = _SOURCE_PRIORITY.get(entry_b.source.value, 0)

        if priority_a >= priority_b:
            winner, loser = entry_a, entry_b
            rationale = (
                f"Source {entry_a.source.value!r} (priority={priority_a}) >= "
                f"{entry_b.source.value!r} (priority={priority_b})."
            )
        else:
            winner, loser = entry_b, entry_a
            rationale = (
                f"Source {entry_b.source.value!r} (priority={priority_b}) > "
                f"{entry_a.source.value!r} (priority={priority_a})."
            )

        return ResolutionResult(
            contradiction_pair=contradiction,
            strategy_used=R
Repobility — same analyzer, your code, free for public repos · /scan/
ContradictionResolver._resolve_confidence method · python · L239-L268 (30 LOC)
src/agent_memory/contradiction/resolver.py
    def _resolve_confidence(
        self,
        contradiction: ContradictionPair,
        entry_a: MemoryEntry,
        entry_b: MemoryEntry,
    ) -> ResolutionResult:
        """The entry with higher importance_score * freshness_score wins."""
        score_a = entry_a.composite_score
        score_b = entry_b.composite_score

        if score_a >= score_b:
            winner, loser = entry_a, entry_b
            rationale = (
                f"Entry {entry_a.memory_id!r} has higher composite score "
                f"({score_a:.4f} >= {score_b:.4f})."
            )
        else:
            winner, loser = entry_b, entry_a
            rationale = (
                f"Entry {entry_b.memory_id!r} has higher composite score "
                f"({score_b:.4f} > {score_a:.4f})."
            )

        return ResolutionResult(
            contradiction_pair=contradiction,
            strategy_used=ResolutionStrategy.CONFIDENCE_BASED,
            winner_id=winner.memory_id,
            l
ContradictionResolver._resolve_manual method · python · L270-L300 (31 LOC)
src/agent_memory/contradiction/resolver.py
    def _resolve_manual(
        self,
        contradiction: ContradictionPair,
        entry_a: MemoryEntry,
        entry_b: MemoryEntry,
        manual_winner_id: Optional[str],
    ) -> ResolutionResult:
        """The caller explicitly designates the winner."""
        valid_ids = {entry_a.memory_id, entry_b.memory_id}
        if manual_winner_id not in valid_ids:
            return ResolutionResult(
                contradiction_pair=contradiction,
                strategy_used=ResolutionStrategy.MANUAL,
                winner_id="",
                loser_id="",
                rationale=(
                    f"Manual resolution requested but winner_id {manual_winner_id!r} "
                    f"is not one of {sorted(valid_ids)!r}. Requires human review."
                ),
                requires_manual_review=True,
            )

        winner_id = manual_winner_id
        loser_id = entry_b.memory_id if winner_id == entry_a.memory_id else entry_a.memory_id
        return R
_get_strategy_registry function · python · L312-L322 (11 LOC)
src/agent_memory/contradiction/resolver.py
def _get_strategy_registry() -> dict[str, object]:
    global _STRATEGY_REGISTRY
    if not _STRATEGY_REGISTRY:
        _STRATEGY_REGISTRY = {
            "keep_newer": KeepNewerStrategy(),
            "keep_older": _KeepOlderStrategy(),
            "keep_both_with_context": KeepBothStrategy(),
            "flag_for_review": FlagForReviewStrategy(),
            "merge": MergeStrategy(),
        }
    return _STRATEGY_REGISTRY
‹ prevpage 2 / 6next ›