Function bodies 288 total
EpisodicConsolidator.similarity method · python · L219-L236 (18 LOC)src/agent_memory/consolidation/consolidator.py
def similarity(self, entry_a: MemoryEntry, entry_b: MemoryEntry) -> float:
"""Return the cosine similarity between two memory entries.
Parameters
----------
entry_a:
First memory entry.
entry_b:
Second memory entry.
Returns
-------
float
Cosine similarity in [0, 1].
"""
vec_a = _build_term_vector(_tokenise(entry_a.content))
vec_b = _build_term_vector(_tokenise(entry_b.content))
return _cosine_similarity(vec_a, vec_b)EpisodicConsolidator._greedy_cluster method · python · L242-L275 (34 LOC)src/agent_memory/consolidation/consolidator.py
def _greedy_cluster(
self,
entries: list[MemoryEntry],
vectors: dict[str, dict[str, float]],
) -> list[set[str]]:
"""Greedy single-pass clustering algorithm.
Each entry is assigned to the first cluster whose centroid has
cosine similarity >= threshold, or starts a new cluster otherwise.
Cluster centroids are recomputed as members join.
"""
clusters: list[set[str]] = []
cluster_centroids: list[dict[str, float]] = []
for entry in entries:
entry_vec = vectors[entry.memory_id]
assigned = False
for cluster_index, centroid in enumerate(cluster_centroids):
similarity = _cosine_similarity(entry_vec, centroid)
if similarity >= self._similarity_threshold:
clusters[cluster_index].add(entry.memory_id)
# Update centroid: average with incoming vector
cluster_centroids[cluster_EpisodicConsolidator._average_vectors method · python · L277-L293 (17 LOC)src/agent_memory/consolidation/consolidator.py
def _average_vectors(
self,
current_centroid: dict[str, float],
new_vector: dict[str, float],
current_size: int,
) -> dict[str, float]:
"""Return an updated centroid by incorporating a new member vector."""
all_terms = set(current_centroid) | set(new_vector)
weight_old = (current_size - 1) / current_size
weight_new = 1.0 / current_size
return {
term: (
current_centroid.get(term, 0.0) * weight_old
+ new_vector.get(term, 0.0) * weight_new
)
for term in all_terms
}EpisodicConsolidator._compute_centroid_terms method · python · L295-L302 (8 LOC)src/agent_memory/consolidation/consolidator.py
def _compute_centroid_terms(self, members: list[MemoryEntry]) -> list[str]:
"""Return the top N terms by aggregate TF across all cluster members."""
term_totals: dict[str, float] = defaultdict(float)
for member in members:
for term, freq in _build_term_vector(_tokenise(member.content)).items():
term_totals[term] += freq
ranked = sorted(term_totals.items(), key=lambda x: x[1], reverse=True)
return [term for term, _ in ranked[: self._max_centroid_terms]]EpisodicConsolidator._make_semantic_entry method · python · L304-L320 (17 LOC)src/agent_memory/consolidation/consolidator.py
def _make_semantic_entry(self, cluster: EpisodeCluster) -> MemoryEntry:
"""Create a semantic MemoryEntry from a cluster's centroid terms."""
terms_text = ", ".join(cluster.centroid_terms)
content = (
f"Consolidated pattern from {len(cluster.episode_ids)} episodes: {terms_text}."
)
return MemoryEntry(
content=content,
layer=MemoryLayer.SEMANTIC,
importance_score=min(1.0, cluster.average_importance * 1.1),
source=MemorySource.AGENT_INFERENCE,
metadata={
"cluster_id": str(cluster.cluster_id),
"episode_count": str(len(cluster.episode_ids)),
"source_episodes": ",".join(cluster.episode_ids[:5]),
},
)ExtractedPattern.top_entities method · python · L81-L94 (14 LOC)src/agent_memory/consolidation/pattern_extractor.py
def top_entities(self, limit: int = 5) -> list[tuple[str, int]]:
"""Return top entities by frequency.
Parameters
----------
limit:
Maximum number of results.
Returns
-------
list[tuple[str, int]]
Sorted (entity, count) pairs, highest first.
"""
return sorted(self.entities.items(), key=lambda x: x[1], reverse=True)[:limit]ExtractedPattern.top_actions method · python · L96-L109 (14 LOC)src/agent_memory/consolidation/pattern_extractor.py
def top_actions(self, limit: int = 5) -> list[tuple[str, int]]:
"""Return top actions by frequency.
Parameters
----------
limit:
Maximum number of results.
Returns
-------
list[tuple[str, int]]
Sorted (action, count) pairs, highest first.
"""
return sorted(self.actions.items(), key=lambda x: x[1], reverse=True)[:limit]Repobility (the analyzer behind this table) · https://repobility.com
ExtractedPattern.to_dict method · python · L111-L118 (8 LOC)src/agent_memory/consolidation/pattern_extractor.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dictionary."""
return {
"entities": dict(self.top_entities(10)),
"actions": dict(self.top_actions(10)),
"total_episodes": self.total_episodes,
"coverage_ratio": self.coverage_ratio,
}PatternExtractor.__init__ method · python · L142-L152 (11 LOC)src/agent_memory/consolidation/pattern_extractor.py
def __init__(
self,
min_frequency: int = 2,
max_entities: int = 20,
max_actions: int = 10,
) -> None:
if min_frequency < 1:
raise ValueError(f"min_frequency must be >= 1, got {min_frequency}")
self._min_frequency = min_frequency
self._max_entities = max_entities
self._max_actions = max_actionsPatternExtractor.extract method · python · L154-L219 (66 LOC)src/agent_memory/consolidation/pattern_extractor.py
def extract(self, texts: list[str]) -> ExtractedPattern:
"""Extract recurring entity and action patterns from text snippets.
Parameters
----------
texts:
List of text strings to analyse (e.g. episode content fields).
Returns
-------
ExtractedPattern
Entities and actions found above the minimum frequency threshold.
"""
if not texts:
return ExtractedPattern(
entities={},
actions={},
total_episodes=0,
coverage_ratio=0.0,
)
entity_counter: Counter[str] = Counter()
action_counter: Counter[str] = Counter()
covered_count = 0
for text in texts:
tokens = _tokenise(text)
token_set = set(tokens)
episode_entities = {t for t in token_set if _is_likely_entity(t)}
episode_actions = {t for t in token_set if _is_likely_action(t)}
PatternExtractor.extract_from_cluster method · python · L221-L241 (21 LOC)src/agent_memory/consolidation/pattern_extractor.py
def extract_from_cluster(
self,
episodes: list[object],
content_attr: str = "content",
) -> ExtractedPattern:
"""Extract patterns from objects with a content field.
Parameters
----------
episodes:
Objects that have a ``content`` string attribute (e.g. MemoryEntry).
content_attr:
Name of the attribute to read text from. Defaults to ``"content"``.
Returns
-------
ExtractedPattern
Entities and actions extracted from the episode contents.
"""
texts = [getattr(ep, content_attr, "") for ep in episodes]
return self.extract([str(t) for t in texts if t])ContradictionDetector.__init__ method · python · L57-L62 (6 LOC)src/agent_memory/contradiction/detector.py
def __init__(
self,
similarity_threshold: float = _TOPIC_SIMILARITY_THRESHOLD,
) -> None:
self._threshold = similarity_threshold
self._matcher = EntityAttributeMatcher()ContradictionDetector.detect method · python · L64-L75 (12 LOC)src/agent_memory/contradiction/detector.py
def detect(self, entries: Sequence[MemoryEntry]) -> ContradictionReport:
"""Scan all entries and return a ContradictionReport."""
entry_list = list(entries)
report = ContradictionReport(total_entries_scanned=len(entry_list))
for i in range(len(entry_list)):
for j in range(i + 1, len(entry_list)):
pair = self._compare(entry_list[i], entry_list[j])
if pair is not None:
report.contradiction_pairs.append(pair)
return reportContradictionDetector.detect_for_entry method · python · L77-L90 (14 LOC)src/agent_memory/contradiction/detector.py
def detect_for_entry(
self,
entry: MemoryEntry,
candidates: Sequence[MemoryEntry],
) -> list[ContradictionPair]:
"""Find contradictions between one entry and a set of candidates."""
result: list[ContradictionPair] = []
for candidate in candidates:
if candidate.memory_id == entry.memory_id:
continue
pair = self._compare(entry, candidate)
if pair is not None:
result.append(pair)
return resultContradictionDetector._compare method · python · L96-L135 (40 LOC)src/agent_memory/contradiction/detector.py
def _compare(
self, entry_a: MemoryEntry, entry_b: MemoryEntry
) -> ContradictionPair | None:
similarity = self._matcher.similarity(entry_a.content, entry_b.content)
if similarity < self._threshold:
return None
triples_a = self._matcher.extract_triples(entry_a.content)
triples_b = self._matcher.extract_triples(entry_b.content)
conflicts = self._matcher.conflicts(triples_a, triples_b)
if conflicts:
conflict_descs = [
f"entity={c[0].entity!r} attr={c[0].attribute!r}: "
f"{c[0].value!r} vs {c[1].value!r}"
for c in conflicts
]
return ContradictionPair(
entry_a_id=entry_a.memory_id,
entry_b_id=entry_b.memory_id,
entry_a_content=entry_a.content,
entry_b_content=entry_b.content,
conflict_description="; ".join(conflict_descs),
similarity_scRepobility — same analyzer, your code, free for public repos · /scan/
ContradictionDetector._has_negation_conflict method · python · L137-L144 (8 LOC)src/agent_memory/contradiction/detector.py
def _has_negation_conflict(self, text_a: str, text_b: str) -> bool:
"""Return True if exactly one text contains a negation marker."""
negation_re = re.compile(
r"\b(not|never|no|cannot|can't|won't|isn't|aren't)\b", re.IGNORECASE
)
a_has = bool(negation_re.search(text_a))
b_has = bool(negation_re.search(text_b))
return a_has != b_has_tf function · python · L179-L185 (7 LOC)src/agent_memory/contradiction/detector.py
def _tf(tokens: list[str]) -> dict[str, float]:
"""Compute raw term frequency (count / document length)."""
if not tokens:
return {}
counter = Counter(tokens)
length = len(tokens)
return {term: count / length for term, count in counter.items()}_build_tfidf_vector function · python · L188-L204 (17 LOC)src/agent_memory/contradiction/detector.py
def _build_tfidf_vector(
tokens: list[str],
df_map: dict[str, int],
num_docs: int,
) -> dict[str, float]:
"""Compute TF-IDF weighted vector for a tokenised document.
Uses the standard smoothed IDF: log((N + 1) / (df + 1)) + 1
where N is the total number of documents in the corpus.
"""
tf_map = _tf(tokens)
vector: dict[str, float] = {}
for term, tf_val in tf_map.items():
df = df_map.get(term, 0)
idf = math.log((num_docs + 1) / (df + 1)) + 1.0
vector[term] = tf_val * idf
return vector_cosine_similarity function · python · L207-L220 (14 LOC)src/agent_memory/contradiction/detector.py
def _cosine_similarity(vec_a: dict[str, float], vec_b: dict[str, float]) -> float:
"""Compute cosine similarity between two sparse TF-IDF vectors."""
if not vec_a or not vec_b:
return 0.0
shared_terms = vec_a.keys() & vec_b.keys()
dot = sum(vec_a[t] * vec_b[t] for t in shared_terms)
norm_a = math.sqrt(sum(v * v for v in vec_a.values()))
norm_b = math.sqrt(sum(v * v for v in vec_b.values()))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)_pairwise_cosine function · python · L223-L247 (25 LOC)src/agent_memory/contradiction/detector.py
def _pairwise_cosine(text_a: str, text_b: str) -> float:
"""Compute TF-IDF cosine similarity between two text strings.
The mini two-document corpus is built on the fly so IDF is document-relative.
For a larger corpus callers should use ``TFIDFContradictionDetector`` directly
which batches the IDF computation.
"""
tokens_a = _tokenise(text_a)
tokens_b = _tokenise(text_b)
if not tokens_a and not tokens_b:
return 1.0
if not tokens_a or not tokens_b:
return 0.0
# Build document-frequency map over the two-doc corpus
df_map: dict[str, int] = {}
for token in set(tokens_a):
df_map[token] = df_map.get(token, 0) + 1
for token in set(tokens_b):
df_map[token] = df_map.get(token, 0) + 1
vec_a = _build_tfidf_vector(tokens_a, df_map, num_docs=2)
vec_b = _build_tfidf_vector(tokens_b, df_map, num_docs=2)
return _cosine_similarity(vec_a, vec_b)_temporal_distance_seconds function · python · L256-L279 (24 LOC)src/agent_memory/contradiction/detector.py
def _temporal_distance_seconds(
entry_a: LightweightMemoryEntry,
entry_b: LightweightMemoryEntry,
) -> float:
"""Return absolute temporal distance in seconds between two entries."""
from datetime import timezone
def to_utc(dt_val: object) -> object:
from datetime import datetime
if not isinstance(dt_val, datetime):
return dt_val
if dt_val.tzinfo is None:
return dt_val.replace(tzinfo=timezone.utc)
return dt_val.astimezone(timezone.utc)
ts_a = to_utc(entry_a.timestamp)
ts_b = to_utc(entry_b.timestamp)
from datetime import datetime
if isinstance(ts_a, datetime) and isinstance(ts_b, datetime):
return abs((ts_a - ts_b).total_seconds())
return 0.0_classify_contradiction_type function · python · L282-L334 (53 LOC)src/agent_memory/contradiction/detector.py
def _classify_contradiction_type(
entry_a: LightweightMemoryEntry,
entry_b: LightweightMemoryEntry,
similarity: float,
temporal_seconds: float,
negation_conflict: bool,
) -> tuple[ContradictionType, float, str]:
"""Classify the contradiction type and return (type, confidence, explanation).
Classification priority:
1. DIRECT_NEGATION — exactly one entry contains negation words and
similarity is high enough to indicate they discuss
the same claim.
2. TEMPORAL_INCONSISTENCY — entries are very similar but far apart in time.
3. FACTUAL_CONFLICT — high similarity with no negation; treated as a
factual claim conflict.
4. PREFERENCE_CHANGE — moderate similarity with temporal gap — preference
or stance has shifted.
"""
if negation_conflict and similarity >= 0.4:
confidence = min(1.0, 0.5 + similarity * 0.5)
explanatioTFIDFContradictionDetector.__init__ method · python · L377-L391 (15 LOC)src/agent_memory/contradiction/detector.py
def __init__(
self,
similarity_threshold: float = 0.85,
temporal_weight: float = 0.3,
) -> None:
if not 0.0 <= similarity_threshold <= 1.0:
raise ValueError(
f"similarity_threshold must be in [0, 1], got {similarity_threshold!r}"
)
if not 0.0 <= temporal_weight <= 1.0:
raise ValueError(
f"temporal_weight must be in [0, 1], got {temporal_weight!r}"
)
self._similarity_threshold = similarity_threshold
self._temporal_weight = temporal_weightRepobility · severity-and-effort ranking · https://repobility.com
TFIDFContradictionDetector.detect method · python · L405-L427 (23 LOC)src/agent_memory/contradiction/detector.py
def detect(
self,
memories: list[LightweightMemoryEntry],
) -> list[Contradiction]:
"""Scan all entries pairwise and return detected contradictions.
Parameters
----------
memories:
The list of memory entries to scan. Pairwise O(n^2) comparison.
Returns
-------
list[Contradiction]
All detected contradictions. Empty list if none found.
"""
result: list[Contradiction] = []
for i in range(len(memories)):
for j in range(i + 1, len(memories)):
contradiction = self.check_pair(memories[i], memories[j])
if contradiction is not None:
result.append(contradiction)
return resultTFIDFContradictionDetector.check_pair method · python · L429-L484 (56 LOC)src/agent_memory/contradiction/detector.py
def check_pair(
self,
a: LightweightMemoryEntry,
b: LightweightMemoryEntry,
) -> Contradiction | None:
"""Check a single pair of entries for contradictions.
Returns ``None`` if no contradiction is detected; otherwise returns a
``Contradiction`` describing the conflict.
Parameters
----------
a:
The first memory entry.
b:
The second memory entry.
Returns
-------
Contradiction | None
"""
# Fast path: use pre-computed embeddings when available for both entries
if a.embedding is not None and b.embedding is not None:
similarity = _cosine_similarity_dense(a.embedding, b.embedding)
else:
similarity = _pairwise_cosine(a.content, b.content)
if similarity < self._similarity_threshold:
return None
negation_conflict = _has_negation(a.content) != _has_negation(b.content)
_cosine_similarity_dense function · python · L492-L508 (17 LOC)src/agent_memory/contradiction/detector.py
def _cosine_similarity_dense(vec_a: list[float], vec_b: list[float]) -> float:
"""Compute cosine similarity between two dense float vectors."""
if len(vec_a) != len(vec_b):
raise ValueError(
f"Embedding dimension mismatch: {len(vec_a)} vs {len(vec_b)}"
)
if not vec_a:
return 0.0
dot = sum(a * b for a, b in zip(vec_a, vec_b))
norm_a = math.sqrt(sum(v * v for v in vec_a))
norm_b = math.sqrt(sum(v * v for v in vec_b))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)EntityAttributeMatcher.extract_triples method · python · L38-L54 (17 LOC)src/agent_memory/contradiction/matcher.py
def extract_triples(self, content: str) -> list[Triple]:
"""Extract structured triples from free-form content."""
results: list[Triple] = []
sentences = re.split(r"[.!?;]", content)
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
for pattern, attribute, _ in _PATTERNS:
match = pattern.match(sentence)
if match:
entity = _normalise(match.group(1))
value = _normalise(match.group(2))
if entity and value:
results.append(Triple(entity=entity, attribute=attribute, value=value))
break
return resultsEntityAttributeMatcher.conflicts method · python · L56-L71 (16 LOC)src/agent_memory/contradiction/matcher.py
def conflicts(
self,
triples_a: Sequence[Triple],
triples_b: Sequence[Triple],
) -> list[tuple[Triple, Triple]]:
"""Return pairs of triples that conflict (same entity+attribute, different value)."""
pairs: list[tuple[Triple, Triple]] = []
for ta in triples_a:
for tb in triples_b:
if (
ta.entity == tb.entity
and ta.attribute == tb.attribute
and ta.value != tb.value
):
pairs.append((ta, tb))
return pairsEntityAttributeMatcher.similarity method · python · L73-L81 (9 LOC)src/agent_memory/contradiction/matcher.py
def similarity(self, text_a: str, text_b: str) -> float:
"""Simple token-level Jaccard similarity between two texts."""
tokens_a = set(re.findall(r"[a-z0-9]+", text_a.lower()))
tokens_b = set(re.findall(r"[a-z0-9]+", text_b.lower()))
if not tokens_a and not tokens_b:
return 1.0
union = tokens_a | tokens_b
intersection = tokens_a & tokens_b
return len(intersection) / len(union)ContradictionReport.mark_resolved method · python · L48-L63 (16 LOC)src/agent_memory/contradiction/report.py
def mark_resolved(
self,
entry_a_id: str,
entry_b_id: str,
strategy: str,
winner_id: str,
) -> None:
"""Mark a contradiction pair as resolved."""
for pair in self.contradiction_pairs:
if (pair.entry_a_id == entry_a_id and pair.entry_b_id == entry_b_id) or (
pair.entry_a_id == entry_b_id and pair.entry_b_id == entry_a_id
):
pair.resolved = True
pair.resolution_strategy = strategy
pair.winner_id = winner_id
returnContradictionReport.summary method · python · L65-L72 (8 LOC)src/agent_memory/contradiction/report.py
def summary(self) -> dict[str, object]:
return {
"generated_at": self.generated_at.isoformat(),
"total_scanned": self.total_entries_scanned,
"total_contradictions": self.contradiction_count,
"unresolved": self.unresolved_count,
"resolved": self.contradiction_count - self.unresolved_count,
}Source: Repobility analyzer · https://repobility.com
ResolutionStrategyBase.resolve method · python · L99-L114 (16 LOC)src/agent_memory/contradiction/resolution.py
def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
"""Resolve the contradiction between *existing* and *new*.
Parameters
----------
existing:
The memory entry currently stored.
new:
The incoming memory entry that contradicts *existing*.
Returns
-------
ResolutionResult
The resolution outcome.
"""
...ContradictionResolver.__init__ method · python · L153-L174 (22 LOC)src/agent_memory/contradiction/resolution.py
def __init__(self, default_strategy: str = "latest_wins") -> None:
# Lazy import to avoid circular dependencies at module load time
from agent_memory.contradiction.resolution_strategies.latest_wins import LatestWinsStrategy
from agent_memory.contradiction.resolution_strategies.merge import MergeResolutionStrategy
from agent_memory.contradiction.resolution_strategies.user_confirmation import UserConfirmationStrategy
self._registry: dict[str, ResolutionStrategyBase] = {}
# Register built-in strategies
for strategy in (
LatestWinsStrategy(),
UserConfirmationStrategy(),
MergeResolutionStrategy(),
):
self._registry[strategy.name] = strategy
if default_strategy not in self._registry:
raise ValueError(
f"Unknown default strategy {default_strategy!r}. "
f"Valid options: {sorted(self._registry.keys())!r}"
)
ContradictionResolver.register_strategy method · python · L186-L215 (30 LOC)src/agent_memory/contradiction/resolution.py
def register_strategy(
self,
name: str,
strategy: ResolutionStrategyBase,
) -> None:
"""Register a custom resolution strategy.
Parameters
----------
name:
The strategy name used to look it up at resolution time.
Overrides any existing strategy with the same name.
strategy:
An instance of a :class:`ResolutionStrategyBase` subclass.
Example
-------
>>> class MyStrategy(ResolutionStrategyBase):
... @property
... def name(self): return "my_strategy"
... def resolve(self, existing, new): ...
>>> resolver = ContradictionResolver()
>>> resolver.register_strategy("my_strategy", MyStrategy())
"""
if not isinstance(strategy, ResolutionStrategyBase):
raise TypeError(
f"strategy must be a ResolutionStrategyBase instance, "
f"got {type(strategy).__name__}"
ContradictionResolver.resolve method · python · L217-L254 (38 LOC)src/agent_memory/contradiction/resolution.py
def resolve(
self,
existing: MemoryEntry,
new: MemoryEntry,
strategy: str | None = None,
) -> ResolutionResult:
"""Resolve the contradiction between *existing* and *new*.
Parameters
----------
existing:
The memory entry currently stored.
new:
The incoming memory entry that contradicts *existing*.
strategy:
Name of the strategy to use. Falls back to
:attr:`default_strategy` when ``None``.
Returns
-------
ResolutionResult
The resolution outcome from the selected strategy.
Raises
------
ValueError
If the requested strategy name is not registered.
"""
active_name = strategy if strategy is not None else self._default_strategy
strategy_obj = self._registry.get(active_name)
if strategy_obj is None:
raise ValueError(
f"UnknowContradictionResolver.resolve_batch method · python · L256-L278 (23 LOC)src/agent_memory/contradiction/resolution.py
def resolve_batch(
self,
pairs: list[tuple[MemoryEntry, MemoryEntry]],
strategy: str | None = None,
) -> list[ResolutionResult]:
"""Resolve multiple contradiction pairs using the same strategy.
Parameters
----------
pairs:
List of ``(existing, new)`` memory entry pairs.
strategy:
Strategy name override. Falls back to default when ``None``.
Returns
-------
list[ResolutionResult]
One result per input pair, in the same order.
"""
return [
self.resolve(existing, new, strategy=strategy)
for existing, new in pairs
]LatestWinsStrategy.resolve method · python · L54-L99 (46 LOC)src/agent_memory/contradiction/resolution_strategies/latest_wins.py
def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
"""Resolve by keeping the newer of the two entries.
Parameters
----------
existing:
The entry already in memory.
new:
The incoming entry that contradicts *existing*.
Returns
-------
ResolutionResult
``action="replace"`` if *new* is more recent; otherwise
``action="replace"`` is returned with *existing* as content
(stable tie-break: keep existing when timestamps are equal).
"""
ts_existing = _to_utc(existing.timestamp)
ts_new = _to_utc(new.timestamp)
if ts_new > ts_existing:
# Incoming entry is newer — replace existing with new
return ResolutionResult(
strategy_used=self.name,
action="replace",
resolved_content=new.content,
requires_user_input=False,
_to_utc function · python · L107-L115 (9 LOC)src/agent_memory/contradiction/resolution_strategies/latest_wins.py
def _to_utc(dt: object) -> object:
"""Normalise a datetime to UTC-aware. Returns input unchanged if not datetime."""
from datetime import datetime
if not isinstance(dt, datetime):
return dt
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)MergeResolutionStrategy.resolve method · python · L74-L109 (36 LOC)src/agent_memory/contradiction/resolution_strategies/merge.py
def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
"""Merge the content of *existing* and *new* into a single entry.
Parameters
----------
existing:
The entry already in memory.
new:
The incoming entry that contradicts *existing*.
Returns
-------
ResolutionResult
Result with ``action="merge"`` and the merged content in
``resolved_content``. ``requires_user_input=False``.
"""
merged_content = _merge_contents(
existing.content,
new.content,
separator=self._separator,
)
unique_sentences_count = len(_split_sentences(merged_content))
explanation = (
f"Merged {len(_split_sentences(existing.content))} sentence(s) from existing "
f"and {len(_split_sentences(new.content))} sentence(s) from new entry into "
f"{unique_sentences_count} uniquRepobility (the analyzer behind this table) · https://repobility.com
_split_sentences function · python · L117-L132 (16 LOC)src/agent_memory/contradiction/resolution_strategies/merge.py
def _split_sentences(text: str) -> list[str]:
"""Split *text* into sentences by common punctuation boundaries.
Parameters
----------
text:
Input text to split.
Returns
-------
list[str]
List of non-empty, stripped sentence strings.
"""
# Split on sentence-ending punctuation followed by whitespace or end-of-string
parts = re.split(r"(?<=[.!?])\s+", text.strip())
return [p.strip() for p in parts if p.strip()]_merge_contents function · python · L140-L180 (41 LOC)src/agent_memory/contradiction/resolution_strategies/merge.py
def _merge_contents(existing: str, new: str, separator: str = " ") -> str:
"""Merge two content strings by combining unique sentences.
Parameters
----------
existing:
Content of the existing memory entry.
new:
Content of the new memory entry.
separator:
String used between sentences in the output.
Returns
-------
str
Merged content string.
"""
existing_sentences = _split_sentences(existing)
new_sentences = _split_sentences(new)
# Build ordered union, deduplicating by normalised form
seen_normalised: set[str] = set()
merged: list[str] = []
for sentence in existing_sentences:
key = _normalise_sentence(sentence)
if key not in seen_normalised:
seen_normalised.add(key)
merged.append(sentence)
for sentence in new_sentences:
key = _normalise_sentence(sentence)
if key not in seen_normalised:
seen_normalised.add(key)
UserConfirmationStrategy.__init__ method · python · L65-L70 (6 LOC)src/agent_memory/contradiction/resolution_strategies/user_confirmation.py
def __init__(self, prompt_template: str | None = None) -> None:
self._prompt_template = (
prompt_template
if prompt_template is not None
else self._DEFAULT_PROMPT_TEMPLATE
)UserConfirmationStrategy.resolve method · python · L76-L102 (27 LOC)src/agent_memory/contradiction/resolution_strategies/user_confirmation.py
def resolve(self, existing: MemoryEntry, new: MemoryEntry) -> ResolutionResult:
"""Defer the resolution decision to the user.
Parameters
----------
existing:
The entry already in memory.
new:
The incoming entry that contradicts *existing*.
Returns
-------
ResolutionResult
Result with ``action="defer"``, ``resolved_content=None``,
and ``requires_user_input=True``.
"""
explanation = self._prompt_template.format(
existing=_truncate(existing.content, 200),
new=_truncate(new.content, 200),
)
return ResolutionResult(
strategy_used=self.name,
action="defer",
resolved_content=None,
requires_user_input=True,
explanation=explanation,
)ContradictionResolver.resolve method · python · L92-L147 (56 LOC)src/agent_memory/contradiction/resolver.py
def resolve(
self,
contradiction: ContradictionPair,
entry_a: MemoryEntry,
entry_b: MemoryEntry,
strategy: Optional[ResolutionStrategy] = None,
manual_winner_id: Optional[str] = None,
) -> ResolutionResult:
"""Resolve a contradiction between two memory entries.
Parameters
----------
contradiction:
The detected contradiction pair to resolve.
entry_a:
The first memory entry in the pair.
entry_b:
The second memory entry in the pair.
strategy:
Override strategy; falls back to ``default_strategy`` if None.
manual_winner_id:
Required when ``strategy`` is ``MANUAL``; must be the ID of
either entry_a or entry_b.
Returns
-------
ResolutionResult
Structured resolution outcome with winner, loser, and rationale.
Raises
------
ValueError
ContradictionResolver.resolve_batch method · python · L149-L171 (23 LOC)src/agent_memory/contradiction/resolver.py
def resolve_batch(
self,
pairs: list[tuple[ContradictionPair, MemoryEntry, MemoryEntry]],
strategy: Optional[ResolutionStrategy] = None,
) -> list[ResolutionResult]:
"""Resolve multiple contradiction pairs using the same strategy.
Parameters
----------
pairs:
List of ``(ContradictionPair, entry_a, entry_b)`` tuples.
strategy:
Strategy to apply to all pairs; falls back to default if None.
Returns
-------
list[ResolutionResult]
One result per input pair, in the same order.
"""
return [
self.resolve(pair, entry_a, entry_b, strategy=strategy)
for pair, entry_a, entry_b in pairs
]ContradictionResolver._resolve_recency method · python · L177-L206 (30 LOC)src/agent_memory/contradiction/resolver.py
def _resolve_recency(
self,
contradiction: ContradictionPair,
entry_a: MemoryEntry,
entry_b: MemoryEntry,
) -> ResolutionResult:
"""The more recently created entry wins."""
created_a = _to_utc(entry_a.created_at)
created_b = _to_utc(entry_b.created_at)
if created_a >= created_b:
winner, loser = entry_a, entry_b
rationale = (
f"Entry {entry_a.memory_id!r} is newer "
f"({entry_a.created_at.isoformat()} >= {entry_b.created_at.isoformat()})."
)
else:
winner, loser = entry_b, entry_a
rationale = (
f"Entry {entry_b.memory_id!r} is newer "
f"({entry_b.created_at.isoformat()} > {entry_a.created_at.isoformat()})."
)
return ResolutionResult(
contradiction_pair=contradiction,
strategy_used=ResolutionStrategy.RECENCY_WINS,
winner_id=winner.ContradictionResolver._resolve_source_priority method · python · L208-L237 (30 LOC)src/agent_memory/contradiction/resolver.py
def _resolve_source_priority(
self,
contradiction: ContradictionPair,
entry_a: MemoryEntry,
entry_b: MemoryEntry,
) -> ResolutionResult:
"""The entry from the more authoritative source wins."""
priority_a = _SOURCE_PRIORITY.get(entry_a.source.value, 0)
priority_b = _SOURCE_PRIORITY.get(entry_b.source.value, 0)
if priority_a >= priority_b:
winner, loser = entry_a, entry_b
rationale = (
f"Source {entry_a.source.value!r} (priority={priority_a}) >= "
f"{entry_b.source.value!r} (priority={priority_b})."
)
else:
winner, loser = entry_b, entry_a
rationale = (
f"Source {entry_b.source.value!r} (priority={priority_b}) > "
f"{entry_a.source.value!r} (priority={priority_a})."
)
return ResolutionResult(
contradiction_pair=contradiction,
strategy_used=RRepobility — same analyzer, your code, free for public repos · /scan/
ContradictionResolver._resolve_confidence method · python · L239-L268 (30 LOC)src/agent_memory/contradiction/resolver.py
def _resolve_confidence(
self,
contradiction: ContradictionPair,
entry_a: MemoryEntry,
entry_b: MemoryEntry,
) -> ResolutionResult:
"""The entry with higher importance_score * freshness_score wins."""
score_a = entry_a.composite_score
score_b = entry_b.composite_score
if score_a >= score_b:
winner, loser = entry_a, entry_b
rationale = (
f"Entry {entry_a.memory_id!r} has higher composite score "
f"({score_a:.4f} >= {score_b:.4f})."
)
else:
winner, loser = entry_b, entry_a
rationale = (
f"Entry {entry_b.memory_id!r} has higher composite score "
f"({score_b:.4f} > {score_a:.4f})."
)
return ResolutionResult(
contradiction_pair=contradiction,
strategy_used=ResolutionStrategy.CONFIDENCE_BASED,
winner_id=winner.memory_id,
lContradictionResolver._resolve_manual method · python · L270-L300 (31 LOC)src/agent_memory/contradiction/resolver.py
def _resolve_manual(
self,
contradiction: ContradictionPair,
entry_a: MemoryEntry,
entry_b: MemoryEntry,
manual_winner_id: Optional[str],
) -> ResolutionResult:
"""The caller explicitly designates the winner."""
valid_ids = {entry_a.memory_id, entry_b.memory_id}
if manual_winner_id not in valid_ids:
return ResolutionResult(
contradiction_pair=contradiction,
strategy_used=ResolutionStrategy.MANUAL,
winner_id="",
loser_id="",
rationale=(
f"Manual resolution requested but winner_id {manual_winner_id!r} "
f"is not one of {sorted(valid_ids)!r}. Requires human review."
),
requires_manual_review=True,
)
winner_id = manual_winner_id
loser_id = entry_b.memory_id if winner_id == entry_a.memory_id else entry_a.memory_id
return R_get_strategy_registry function · python · L312-L322 (11 LOC)src/agent_memory/contradiction/resolver.py
def _get_strategy_registry() -> dict[str, object]:
global _STRATEGY_REGISTRY
if not _STRATEGY_REGISTRY:
_STRATEGY_REGISTRY = {
"keep_newer": KeepNewerStrategy(),
"keep_older": _KeepOlderStrategy(),
"keep_both_with_context": KeepBothStrategy(),
"flag_for_review": FlagForReviewStrategy(),
"merge": MergeStrategy(),
}
return _STRATEGY_REGISTRY