Function bodies 227 total
Session.load method · python · L73-L91 (19 LOC)src/agent_session_linker/convenience.py
def load(self, session_id: str) -> Any:
"""Load a session by its ID.
Parameters
----------
session_id:
ID of the session to load.
Returns
-------
SessionState
The loaded session state.
Raises
------
SessionNotFoundError
If no session with the given ID exists.
"""
return self._manager.load_session(session_id)Session.add_context method · python · L93-L103 (11 LOC)src/agent_session_linker/convenience.py
def add_context(self, key: str, value: str) -> None:
"""Add a key-value pair to the session preferences.
Parameters
----------
key:
Context key.
value:
Context value string.
"""
self._state.preferences[key] = value_remove_overlaps function · python · L150-L191 (42 LOC)src/agent_session_linker/entity/extractor.py
def _remove_overlaps(entities: list[Entity]) -> list[Entity]:
"""Remove lower-priority entities that overlap with higher-priority ones.
Priority order (highest first): URL, EMAIL, MONEY, DATE, ORG, PERSON, NUMBER.
Parameters
----------
entities:
Raw extracted entities, possibly overlapping.
Returns
-------
list[Entity]
Non-overlapping entities sorted by start offset.
"""
priority: dict[str, int] = {
"URL": 7,
"EMAIL": 6,
"MONEY": 5,
"DATE": 4,
"ORG": 3,
"PERSON": 2,
"NUMBER": 1,
}
sorted_entities = sorted(
entities,
key=lambda e: (-(priority.get(e.entity_type, 0)), e.start),
)
result: list[Entity] = []
occupied: list[tuple[int, int]] = []
for entity in sorted_entities:
overlaps = any(
entity.start < end and entity.end > start
for start, end in occupied
)
if not overlaps:
reEntityExtractor.__init__ method · python · L219-L227 (9 LOC)src/agent_session_linker/entity/extractor.py
def __init__(
self,
types: frozenset[str] | set[str] | None = None,
min_confidence: float = 0.5,
) -> None:
self.types: frozenset[str] = (
frozenset(types) if types is not None else self.SUPPORTED_TYPES
)
self.min_confidence = min_confidenceEntityExtractor.extract method · python · L233-L282 (50 LOC)src/agent_session_linker/entity/extractor.py
def extract(self, text: str) -> list[Entity]:
"""Extract all named entities from ``text``.
Parameters
----------
text:
The source text to analyse.
Returns
-------
list[Entity]
Non-overlapping entities in document order. When two patterns
match the same span, the higher-priority type wins.
"""
if not text:
return []
raw: list[Entity] = []
if "URL" in self.types:
raw.extend(self._extract_by_pattern(text, _URL_RE, "URL", confidence=1.0))
if "EMAIL" in self.types:
raw.extend(self._extract_by_pattern(text, _EMAIL_RE, "EMAIL", confidence=1.0))
if "MONEY" in self.types:
raw.extend(self._extract_by_pattern(text, _MONEY_RE, "MONEY", confidence=1.0))
if "DATE" in self.types:
raw.extend(self._extract_by_pattern(text, _DATE_RE, "DATE", confidence=0.95))
if "ORG" in self.tyEntityExtractor.extract_by_type method · python · L284-L299 (16 LOC)src/agent_session_linker/entity/extractor.py
def extract_by_type(self, text: str, entity_type: str) -> list[Entity]:
"""Extract only entities of a specific type.
Parameters
----------
text:
Source text.
entity_type:
One of the supported type labels.
Returns
-------
list[Entity]
Matching entities in document order.
"""
return [e for e in self.extract(text) if e.entity_type == entity_type]EntityExtractor._extract_by_pattern method · python · L306-L343 (38 LOC)src/agent_session_linker/entity/extractor.py
def _extract_by_pattern(
text: str,
pattern: re.Pattern[str],
entity_type: str,
confidence: float,
) -> list[Entity]:
"""Run a compiled regex over ``text`` and return Entity objects.
Parameters
----------
text:
Source text.
pattern:
Compiled regex pattern.
entity_type:
Type label for matched spans.
confidence:
Confidence to assign all matches.
Returns
-------
list[Entity]
Matched entities.
"""
entities: list[Entity] = []
for match in pattern.finditer(text):
matched_text = match.group().strip()
if matched_text:
entities.append(
Entity(
text=matched_text,
entity_type=entity_type,
start=match.start(),
end=match.end(),
Repobility · severity-and-effort ranking · https://repobility.com
_normalised_edit_distance function · python · L20-L71 (52 LOC)src/agent_session_linker/entity/linker.py
def _normalised_edit_distance(source: str, target: str) -> float:
"""Compute the normalised Levenshtein edit distance between two strings.
The normalised distance is in [0.0, 1.0], where 0.0 means identical
and 1.0 means completely different. Normalisation is by the length of
the longer string so the measure is symmetric.
Parameters
----------
source:
First string.
target:
Second string.
Returns
-------
float
Normalised edit distance in [0.0, 1.0].
"""
if source == target:
return 0.0
source_length = len(source)
target_length = len(target)
if source_length == 0:
return 1.0 if target_length > 0 else 0.0
if target_length == 0:
return 1.0
# Build a (source_length+1) x (target_length+1) DP matrix.
# Use two rolling rows to limit memory to O(min(m,n)).
if source_length < target_length:
source, target = target, source
source_length, target_lengEntityLinker.__init__ method · python · L98-L110 (13 LOC)src/agent_session_linker/entity/linker.py
def __init__(
self,
similarity_threshold: float = 0.8,
case_sensitive: bool = False,
require_same_type: bool = True,
) -> None:
if not 0.0 < similarity_threshold <= 1.0:
raise ValueError(
f"similarity_threshold must be in (0.0, 1.0], got {similarity_threshold!r}."
)
self.similarity_threshold = similarity_threshold
self.case_sensitive = case_sensitive
self.require_same_type = require_same_typeEntityLinker.link method · python · L116-L160 (45 LOC)src/agent_session_linker/entity/linker.py
def link(self, entity: Entity, known: list[Entity]) -> Entity | None:
"""Find the best matching known entity for a mention.
Parameters
----------
entity:
The entity mention to link.
known:
Catalogue of known entities to search.
Returns
-------
Entity | None
The best-matching known entity if similarity exceeds the
threshold, otherwise None.
"""
if not known:
return None
candidates = known
if self.require_same_type:
candidates = [k for k in known if k.entity_type == entity.entity_type]
if not candidates:
return None
mention_text = entity.text if self.case_sensitive else entity.text.lower()
best_entity: Entity | None = None
best_similarity: float = -1.0
for candidate in candidates:
candidate_text = (
candidate.text if self.case_sensitEntityLinker.link_all method · python · L162-L182 (21 LOC)src/agent_session_linker/entity/linker.py
def link_all(
self,
entities: list[Entity],
known: list[Entity],
) -> list[tuple[Entity, Entity | None]]:
"""Link every entity in ``entities`` against the catalogue.
Parameters
----------
entities:
Mentions to link.
known:
Known entity catalogue.
Returns
-------
list[tuple[Entity, Entity | None]]
Each pair is ``(mention, linked_entity)``. The second element
is None when no match met the similarity threshold.
"""
return [(entity, self.link(entity, known)) for entity in entities]EntityLinker.similarity method · python · L184-L201 (18 LOC)src/agent_session_linker/entity/linker.py
def similarity(self, text_a: str, text_b: str) -> float:
"""Return the similarity (1 - normalised edit distance) between two strings.
Parameters
----------
text_a:
First string.
text_b:
Second string.
Returns
-------
float
Similarity score in [0.0, 1.0].
"""
source = text_a if self.case_sensitive else text_a.lower()
target = text_b if self.case_sensitive else text_b.lower()
return 1.0 - _normalised_edit_distance(source, target)EntityTracker.update method · python · L88-L124 (37 LOC)src/agent_session_linker/entity/tracker.py
def update(self, entities: list[Entity]) -> None:
"""Record a batch of newly observed entities.
Each entity is merged into the tracker. If it has been seen before
(same normalised text + type), its frequency is incremented and
``last_seen`` is refreshed. Otherwise a new ``TrackedEntity`` is
created.
Parameters
----------
entities:
Entities extracted from a single turn or text block.
"""
now = datetime.now(timezone.utc)
for entity in entities:
normalised = entity.text if self.case_sensitive else entity.text.lower()
key = (normalised, entity.entity_type)
if key in self._entities:
tracked = self._entities[key]
tracked.frequency += 1
tracked.last_seen = now
# Update running average confidence.
old_total = tracked.confidence * (tracked.frequency - 1)
traEntityTracker.get_top method · python · L126-L146 (21 LOC)src/agent_session_linker/entity/tracker.py
def get_top(self, n: int, entity_type: str | None = None) -> list[TrackedEntity]:
"""Return the top-``n`` most frequently observed entities.
Parameters
----------
n:
Maximum number of results to return.
entity_type:
When provided, restrict results to this entity type.
Returns
-------
list[TrackedEntity]
Entities sorted by frequency descending, then by ``last_seen``
descending as a tie-breaker.
"""
candidates = list(self._entities.values())
if entity_type is not None:
candidates = [e for e in candidates if e.entity_type == entity_type]
candidates.sort(key=lambda e: (e.frequency, e.last_seen), reverse=True)
return candidates[:n]EntityTracker.get_by_type method · python · L148-L166 (19 LOC)src/agent_session_linker/entity/tracker.py
def get_by_type(self, entity_type: str) -> list[TrackedEntity]:
"""Return all tracked entities of a given type, most frequent first.
Parameters
----------
entity_type:
Category label to filter by (e.g. "PERSON", "ORG").
Returns
-------
list[TrackedEntity]
Matching entities sorted by frequency descending.
"""
matching = [
e for e in self._entities.values()
if e.entity_type == entity_type
]
matching.sort(key=lambda e: e.frequency, reverse=True)
return matchingAbout: code-quality intelligence by Repobility · https://repobility.com
EntityTracker.get_all method · python · L168-L176 (9 LOC)src/agent_session_linker/entity/tracker.py
def get_all(self) -> list[TrackedEntity]:
"""Return all tracked entities sorted by frequency descending.
Returns
-------
list[TrackedEntity]
All tracked entities.
"""
return sorted(self._entities.values(), key=lambda e: e.frequency, reverse=True)EntityTracker.get method · python · L178-L194 (17 LOC)src/agent_session_linker/entity/tracker.py
def get(self, text: str, entity_type: str) -> TrackedEntity | None:
"""Look up a specific tracked entity by text and type.
Parameters
----------
text:
Surface form (normalised according to ``case_sensitive`` setting).
entity_type:
Category label.
Returns
-------
TrackedEntity | None
The tracked entity if found, else None.
"""
normalised = text if self.case_sensitive else text.lower()
return self._entities.get((normalised, entity_type))HandoffConfig.__post_init__ method · python · L83-L88 (6 LOC)src/agent_session_linker/handoff/context_handoff.py
def __post_init__(self) -> None:
if self.max_segments is not None and self.max_segments < 0:
raise ValueError(
f"max_segments must be non-negative or None, "
f"got {self.max_segments!r}."
)HandoffPayload.from_json method · python · L162-L174 (13 LOC)src/agent_session_linker/handoff/context_handoff.py
def from_json(cls, json_str: str) -> "HandoffPayload":
"""Deserialise from a JSON string produced by :meth:`to_json`.
Parameters
----------
json_str:
JSON string previously produced by :meth:`to_json`.
Returns
-------
HandoffPayload
"""
return cls.model_validate_json(json_str)HandoffPayload.summary_line method · python · L176-L183 (8 LOC)src/agent_session_linker/handoff/context_handoff.py
def summary_line(self) -> str:
"""Return a one-line human-readable description."""
return (
f"HandoffPayload {self.handoff_id[:8]} | "
f"{self.source_agent_id} -> {self.target_agent_id} | "
f"{self.segment_count} segments, {self.task_count} tasks, "
f"{self.entity_count} entities | reason={self.handoff_reason!r}"
)HandoffBuilder.build method · python · L210-L277 (68 LOC)src/agent_session_linker/handoff/context_handoff.py
def build(
self,
source_session: SessionState,
target_agent_id: str,
handoff_reason: str = "",
extra_metadata: dict[str, object] | None = None,
) -> HandoffPayload:
"""Construct a :class:`HandoffPayload` from *source_session*.
Parameters
----------
source_session:
The session to extract context from.
target_agent_id:
The receiving agent.
handoff_reason:
Why this handoff is happening.
extra_metadata:
Additional key-value annotations to include.
Returns
-------
HandoffPayload
"""
cfg = self._config
# Segments
segments = list(source_session.segments)
if cfg.segment_types:
segments = [s for s in segments if s.segment_type in cfg.segment_types]
if cfg.max_segments is not None:
if cfg.max_segments == 0:
segments = []
_segment_to_dict function · python · L285-L295 (11 LOC)src/agent_session_linker/handoff/context_handoff.py
def _segment_to_dict(seg: ContextSegment) -> dict[str, object]:
return {
"segment_id": seg.segment_id,
"role": seg.role,
"content": seg.content,
"token_count": seg.token_count,
"segment_type": seg.segment_type,
"timestamp": seg.timestamp.isoformat(),
"turn_index": seg.turn_index,
"metadata": seg.metadata,
}_entity_to_dict function · python · L298-L306 (9 LOC)src/agent_session_linker/handoff/context_handoff.py
def _entity_to_dict(entity: EntityReference) -> dict[str, object]:
return {
"entity_id": entity.entity_id,
"canonical_name": entity.canonical_name,
"entity_type": entity.entity_type,
"aliases": entity.aliases,
"attributes": entity.attributes,
"confidence": entity.confidence,
}Repobility (the analyzer behind this table) · https://repobility.com
_task_to_dict function · python · L309-L318 (10 LOC)src/agent_session_linker/handoff/context_handoff.py
def _task_to_dict(task: TaskState) -> dict[str, object]:
return {
"task_id": task.task_id,
"title": task.title,
"description": task.description,
"status": task.status.value,
"priority": task.priority,
"tags": task.tags,
"notes": task.notes,
}SessionChain.__init__ method · python · L32-L38 (7 LOC)src/agent_session_linker/linking/chain.py
def __init__(
self,
manager: SessionManager,
initial_session_ids: list[str] | None = None,
) -> None:
self._manager = manager
self._chain: list[str] = list(initial_session_ids or [])SessionChain.append method · python · L44-L53 (10 LOC)src/agent_session_linker/linking/chain.py
def append(self, session_id: str) -> None:
"""Append a session ID to the end of the chain.
Parameters
----------
session_id:
The session to add. Duplicate IDs are allowed (the chain
records insertion order faithfully).
"""
self._chain.append(session_id)SessionChain.prepend method · python · L55-L63 (9 LOC)src/agent_session_linker/linking/chain.py
def prepend(self, session_id: str) -> None:
"""Prepend a session ID at the beginning of the chain.
Parameters
----------
session_id:
The session to add at position 0.
"""
self._chain.insert(0, session_id)SessionChain.remove method · python · L65-L78 (14 LOC)src/agent_session_linker/linking/chain.py
def remove(self, session_id: str) -> None:
"""Remove the first occurrence of ``session_id`` from the chain.
Parameters
----------
session_id:
Session to remove.
Raises
------
ValueError
If ``session_id`` is not in the chain.
"""
self._chain.remove(session_id)SessionChain.get_chain method · python · L84-L92 (9 LOC)src/agent_session_linker/linking/chain.py
def get_chain(self) -> list[str]:
"""Return the ordered list of session IDs (oldest first).
Returns
-------
list[str]
A shallow copy of the internal chain.
"""
return list(self._chain)SessionChain.get_sessions method · python · L94-L111 (18 LOC)src/agent_session_linker/linking/chain.py
def get_sessions(self) -> list[SessionState]:
"""Load and return all sessions in chain order.
Sessions that cannot be loaded (e.g. deleted from the backend) are
silently skipped.
Returns
-------
list[SessionState]
Loaded sessions in chain order (oldest first).
"""
states: list[SessionState] = []
for session_id in self._chain:
try:
states.append(self._manager.load_session(session_id))
except Exception: # noqa: BLE001
continue
return statesSessionChain.get_context_from_chain method · python · L113-L157 (45 LOC)src/agent_session_linker/linking/chain.py
def get_context_from_chain(self, n_recent: int) -> str:
"""Build a formatted context string from the ``n_recent`` newest sessions.
Loads the last ``n_recent`` sessions in the chain and concatenates
their segments into a single readable string, separated by session
boundaries. Sessions that cannot be loaded are skipped.
Parameters
----------
n_recent:
Number of most-recent sessions to include. Must be >= 1.
Returns
-------
str
Formatted context string. Empty string when the chain is empty
or no sessions could be loaded.
Raises
------
ValueError
If ``n_recent < 1``.
"""
if n_recent < 1:
raise ValueError(f"n_recent must be >= 1, got {n_recent!r}.")
recent_ids = self._chain[-n_recent:]
if not recent_ids:
return ""
parts: list[str] = []
for session_id in reRepobility — the code-quality scanner for AI-generated software · https://repobility.com
SessionChain.get_all_segments method · python · L159-L182 (24 LOC)src/agent_session_linker/linking/chain.py
def get_all_segments(self, n_recent: int | None = None) -> list[ContextSegment]:
"""Return all context segments from the chain (or the ``n_recent`` newest sessions).
Parameters
----------
n_recent:
When provided, only the last ``n_recent`` sessions are included.
When None, all sessions in the chain are included.
Returns
-------
list[ContextSegment]
Segments in document order (oldest session first, oldest segment
within each session first).
"""
session_ids = self._chain if n_recent is None else self._chain[-n_recent:]
segments: list[ContextSegment] = []
for session_id in session_ids:
try:
session = self._manager.load_session(session_id)
segments.extend(session.segments)
except Exception: # noqa: BLE001
continue
return segmentsSessionChain._format_segments method · python · L202-L219 (18 LOC)src/agent_session_linker/linking/chain.py
def _format_segments(segments: list[ContextSegment]) -> str:
"""Render a list of segments as a readable string.
Parameters
----------
segments:
Segments to render.
Returns
-------
str
Formatted string with role labels.
"""
lines: list[str] = []
for segment in segments:
role_label = segment.role.upper()
lines.append(f"{role_label}: {segment.content}")
return "\n".join(lines)LinkedSession.__repr__ method · python · L47-L53 (7 LOC)src/agent_session_linker/linking/session_linker.py
def __repr__(self) -> str:
return (
f"LinkedSession("
f"source={self.source_session_id!r}, "
f"target={self.target_session_id!r}, "
f"relationship={self.relationship!r})"
)SessionLinker.__init__ method · python · L75-L80 (6 LOC)src/agent_session_linker/linking/session_linker.py
def __init__(self, allow_self_links: bool = False) -> None:
self.allow_self_links = allow_self_links
# Maps session_id -> list of LinkedSession (outgoing links).
self._outgoing: dict[str, list[LinkedSession]] = {}
# Maps session_id -> list of LinkedSession (incoming links, for reverse lookup).
self._incoming: dict[str, list[LinkedSession]] = {}SessionLinker.link method · python · L86-L144 (59 LOC)src/agent_session_linker/linking/session_linker.py
def link(
self,
source_session_id: str,
target_session_id: str,
relationship: str,
*,
metadata: dict[str, str] | None = None,
) -> LinkedSession:
"""Create a directed relationship from ``source`` to ``target``.
Duplicate links (same source, target, and relationship) are silently
ignored — the existing link is returned unchanged.
Parameters
----------
source_session_id:
The session that initiates the relationship.
target_session_id:
The related session.
relationship:
Descriptive label for the relationship type.
metadata:
Optional additional key-value data to attach to the link.
Returns
-------
LinkedSession
The newly created (or existing duplicate) link.
Raises
------
ValueError
If ``source_session_id == target_session_id`` and
SessionLinker.unlink method · python · L146-L190 (45 LOC)src/agent_session_linker/linking/session_linker.py
def unlink(
self,
source_session_id: str,
target_session_id: str,
relationship: str,
) -> None:
"""Remove a specific directed link.
Parameters
----------
source_session_id:
Source of the link to remove.
target_session_id:
Target of the link to remove.
relationship:
Relationship label of the link to remove.
Raises
------
KeyError
If no matching link exists.
"""
outgoing = self._outgoing.get(source_session_id, [])
before = len(outgoing)
self._outgoing[source_session_id] = [
link for link in outgoing
if not (
link.target_session_id == target_session_id
and link.relationship == relationship
)
]
if len(self._outgoing[source_session_id]) == before:
raise KeyError(
f"No link from {source_sessSessionLinker.get_linked method · python · L196-L238 (43 LOC)src/agent_session_linker/linking/session_linker.py
def get_linked(
self,
session_id: str,
relationship: str | None = None,
direction: str = "both",
) -> list[LinkedSession]:
"""Return all links connected to ``session_id``.
Parameters
----------
session_id:
The session to query.
relationship:
When provided, filter results to only this relationship type.
direction:
One of ``"outgoing"``, ``"incoming"``, or ``"both"`` (default).
Returns
-------
list[LinkedSession]
Matching links sorted by ``created_at`` ascending.
"""
links: list[LinkedSession] = []
if direction in ("outgoing", "both"):
links.extend(self._outgoing.get(session_id, []))
if direction in ("incoming", "both"):
links.extend(self._incoming.get(session_id, []))
# De-duplicate (a link may appear in both directions when queried both ways
# — here we SessionLinker.get_related_session_ids method · python · L240-L264 (25 LOC)src/agent_session_linker/linking/session_linker.py
def get_related_session_ids(
self,
session_id: str,
relationship: str | None = None,
) -> list[str]:
"""Return the IDs of all sessions linked to ``session_id``.
Parameters
----------
session_id:
The session to query.
relationship:
Optional filter.
Returns
-------
list[str]
Unique session IDs (excluding ``session_id`` itself).
"""
related: set[str] = set()
for link in self.get_linked(session_id, relationship=relationship):
related.add(link.source_session_id)
related.add(link.target_session_id)
related.discard(session_id)
return sorted(related)Repobility · severity-and-effort ranking · https://repobility.com
SessionLinker.export_links method · python · L270-L293 (24 LOC)src/agent_session_linker/linking/session_linker.py
def export_links(self) -> list[dict[str, object]]:
"""Export all links as a list of dicts for persistence.
Returns
-------
list[dict[str, object]]
Serialisable representation of all links.
"""
all_links: set[int] = set()
result: list[dict[str, object]] = []
for links in self._outgoing.values():
for link in links:
if id(link) not in all_links:
all_links.add(id(link))
result.append(
{
"source_session_id": link.source_session_id,
"target_session_id": link.target_session_id,
"relationship": link.relationship,
"created_at": link.created_at.isoformat(),
"metadata": link.metadata,
}
)
return resultSessionLinker.import_links method · python · L295-L322 (28 LOC)src/agent_session_linker/linking/session_linker.py
def import_links(self, data: list[dict[str, object]]) -> None:
"""Restore links from a previously exported list.
Existing links are preserved; only new links are added.
Parameters
----------
data:
List of dicts as produced by ``export_links``.
"""
for record in data:
linked = LinkedSession(
source_session_id=str(record["source_session_id"]),
target_session_id=str(record["target_session_id"]),
relationship=str(record["relationship"]),
created_at=datetime.fromisoformat(str(record["created_at"])),
metadata=dict(record.get("metadata", {})), # type: ignore[arg-type]
)
# Check for duplicate before inserting.
existing_outgoing = self._outgoing.get(linked.source_session_id, [])
is_duplicate = any(
e.target_session_id == linked.target_session_id
and e.CheckpointRecord.to_dict method · python · L62-L71 (10 LOC)src/agent_session_linker/middleware/checkpoint.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dict for JSON storage."""
return {
"checkpoint_id": self.checkpoint_id,
"session_id": self.session_id,
"label": self.label,
"created_at": self.created_at.isoformat(),
"segment_count": self.segment_count,
"token_count": self.token_count,
}CheckpointRecord.from_dict method · python · L74-L83 (10 LOC)src/agent_session_linker/middleware/checkpoint.py
def from_dict(cls, data: dict[str, object]) -> CheckpointRecord:
"""Deserialise from a dict produced by ``to_dict``."""
return cls(
checkpoint_id=str(data["checkpoint_id"]),
session_id=str(data["session_id"]),
label=str(data["label"]),
created_at=datetime.fromisoformat(str(data["created_at"])),
segment_count=int(str(data["segment_count"])),
token_count=int(str(data["token_count"])),
)CheckpointManager.__init__ method · python · L114-L123 (10 LOC)src/agent_session_linker/middleware/checkpoint.py
def __init__(
self,
backend: StorageBackend,
manager: SessionManager | None = None,
max_checkpoints_per_session: int = 10,
) -> None:
self._backend = backend
self._manager = manager
self._serializer = SessionSerializer(validate_checksum=False)
self.max_checkpoints_per_session = max_checkpoints_per_sessionCheckpointManager.create_checkpoint method · python · L129-L184 (56 LOC)src/agent_session_linker/middleware/checkpoint.py
def create_checkpoint(
self,
session: SessionState,
label: str = "",
) -> CheckpointRecord:
"""Snapshot ``session`` and store it as a named checkpoint.
Parameters
----------
session:
The current session state to snapshot.
label:
Optional human-readable label (e.g. "before-refactor",
"post-planning-step"). Defaults to an ISO timestamp.
Returns
-------
CheckpointRecord
Metadata about the newly created checkpoint.
"""
now = datetime.now(timezone.utc)
existing = self._load_index(session.session_id)
# Determine next checkpoint sequence number.
sequence = len(existing)
checkpoint_id = _build_checkpoint_key(session.session_id, sequence)
# Evict oldest checkpoint if at capacity.
if len(existing) >= self.max_checkpoints_per_session and existing:
oldest = existing[0]
CheckpointManager.restore_checkpoint method · python · L186-L209 (24 LOC)src/agent_session_linker/middleware/checkpoint.py
def restore_checkpoint(self, checkpoint_id: str) -> SessionState:
"""Load and return the session state stored in a checkpoint.
Parameters
----------
checkpoint_id:
The ``checkpoint_id`` from a ``CheckpointRecord``.
Returns
-------
SessionState
The session state at the time the checkpoint was taken.
Raises
------
KeyError
If no checkpoint with ``checkpoint_id`` exists.
"""
if not self._backend.exists(checkpoint_id):
raise KeyError(f"Checkpoint {checkpoint_id!r} not found.")
raw = self._backend.load(checkpoint_id)
state = self._serializer.from_json(raw)
logger.debug("CheckpointManager: restored checkpoint %r", checkpoint_id)
return stateCheckpointManager.list_checkpoints method · python · L211-L224 (14 LOC)src/agent_session_linker/middleware/checkpoint.py
def list_checkpoints(self, session_id: str) -> list[CheckpointRecord]:
"""Return all checkpoint records for ``session_id``, oldest first.
Parameters
----------
session_id:
The session whose checkpoints to list.
Returns
-------
list[CheckpointRecord]
Checkpoint records in creation order (oldest first).
"""
return self._load_index(session_id)About: code-quality intelligence by Repobility · https://repobility.com
CheckpointManager.delete_checkpoint method · python · L226-L254 (29 LOC)src/agent_session_linker/middleware/checkpoint.py
def delete_checkpoint(self, checkpoint_id: str, session_id: str) -> None:
"""Remove a checkpoint and update the session index.
Parameters
----------
checkpoint_id:
The checkpoint to delete.
session_id:
The owning session ID (needed to update the index).
Raises
------
KeyError
If the checkpoint does not exist.
"""
if not self._backend.exists(checkpoint_id):
raise KeyError(f"Checkpoint {checkpoint_id!r} not found.")
self._backend.delete(checkpoint_id)
index = self._load_index(session_id)
updated = [r for r in index if r.checkpoint_id != checkpoint_id]
self._save_index(session_id, updated)
logger.debug(
"CheckpointManager: deleted checkpoint %r from session %r",
checkpoint_id,
session_id,
)CheckpointManager._load_index method · python · L263-L270 (8 LOC)src/agent_session_linker/middleware/checkpoint.py
def _load_index(self, session_id: str) -> list[CheckpointRecord]:
"""Load the checkpoint index for a session."""
index_key = self._index_key(session_id)
if not self._backend.exists(index_key):
return []
raw = self._backend.load(index_key)
records_data: list[dict[str, object]] = json.loads(raw)
return [CheckpointRecord.from_dict(record) for record in records_data]_build_checkpoint_key function · python · L284-L299 (16 LOC)src/agent_session_linker/middleware/checkpoint.py
def _build_checkpoint_key(session_id: str, sequence: int) -> str:
"""Build a deterministic storage key for a checkpoint.
Parameters
----------
session_id:
Owning session ID.
sequence:
Monotonically increasing sequence number.
Returns
-------
str
Storage key string.
"""
return f"{_CHECKPOINT_KEY_PREFIX}{session_id}__{sequence:04d}"