Function bodies 227 total
StorageBackend.exists method · python · L85-L96 (12 LOC)src/agent_session_linker/storage/base.py
def exists(self, session_id: str) -> bool:
"""Return True if an entry for ``session_id`` exists.
Parameters
----------
session_id:
The session identifier to test.
Returns
-------
bool
"""FilesystemBackend._path_for method · python · L46-L61 (16 LOC)src/agent_session_linker/storage/filesystem.py
def _path_for(self, session_id: str) -> Path:
"""Return the file path for ``session_id``.
Parameters
----------
session_id:
The session identifier.
Returns
-------
Path
Absolute path to the JSON file.
"""
# Guard against path traversal attacks.
safe_name = os.path.basename(session_id)
return self._storage_dir / f"{safe_name}{_FILE_EXTENSION}"FilesystemBackend.save method · python · L67-L81 (15 LOC)src/agent_session_linker/storage/filesystem.py
def save(self, session_id: str, payload: str) -> None:
"""Write ``payload`` to ``<storage_dir>/<session_id>.json``.
The directory is created if it does not yet exist.
Parameters
----------
session_id:
Storage key.
payload:
UTF-8 string to write.
"""
self._ensure_dir()
path = self._path_for(session_id)
path.write_text(payload, encoding="utf-8")FilesystemBackend.load method · python · L83-L106 (24 LOC)src/agent_session_linker/storage/filesystem.py
def load(self, session_id: str) -> str:
"""Read and return the payload for ``session_id``.
Parameters
----------
session_id:
The session to retrieve.
Returns
-------
str
File contents as a UTF-8 string.
Raises
------
KeyError
If the file does not exist.
"""
path = self._path_for(session_id)
if not path.exists():
raise KeyError(
f"Session {session_id!r} not found at {path}"
)
return path.read_text(encoding="utf-8")FilesystemBackend.list method · python · L108-L123 (16 LOC)src/agent_session_linker/storage/filesystem.py
def list(self) -> list[str]:
"""Return all session IDs present in the storage directory.
Returns
-------
list[str]
Session IDs derived from file stems. Empty list if the
directory does not yet exist.
"""
if not self._storage_dir.exists():
return []
return [
path.stem
for path in self._storage_dir.glob(f"*{_FILE_EXTENSION}")
if path.is_file()
]FilesystemBackend.delete method · python · L125-L143 (19 LOC)src/agent_session_linker/storage/filesystem.py
def delete(self, session_id: str) -> None:
"""Remove the file for ``session_id``.
Parameters
----------
session_id:
The session to delete.
Raises
------
KeyError
If the file does not exist.
"""
path = self._path_for(session_id)
if not path.exists():
raise KeyError(
f"Session {session_id!r} not found at {path}"
)
path.unlink()InMemoryBackend.load method · python · L38-L49 (12 LOC)src/agent_session_linker/storage/memory.py
def load(self, session_id: str) -> str:
"""Return the payload for ``session_id``.
Raises
------
KeyError
If ``session_id`` is not in the store.
"""
try:
return self._store[session_id]
except KeyError:
raise KeyError(f"Session {session_id!r} not found in InMemoryBackend.") from NoneRepobility · severity-and-effort ranking · https://repobility.com
InMemoryBackend.delete method · python · L55-L66 (12 LOC)src/agent_session_linker/storage/memory.py
def delete(self, session_id: str) -> None:
"""Remove ``session_id`` from the store.
Raises
------
KeyError
If ``session_id`` is not in the store.
"""
try:
del self._store[session_id]
except KeyError:
raise KeyError(f"Session {session_id!r} not found in InMemoryBackend.") from NoneRedisBackend.__init__ method · python · L48-L74 (27 LOC)src/agent_session_linker/storage/redis.py
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: str | None = None,
key_prefix: str = "agent_session:",
ttl_seconds: int | None = None,
url: str | None = None,
) -> None:
try:
import redis as redis_module # noqa: PLC0415
except ImportError as exc:
raise ImportError(_REDIS_IMPORT_ERROR) from exc
if url is not None:
self._client = redis_module.Redis.from_url(url)
else:
self._client = redis_module.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
)
self._key_prefix = key_prefix
self._ttl_seconds = ttl_secondsRedisBackend._key method · python · L80-L93 (14 LOC)src/agent_session_linker/storage/redis.py
def _key(self, session_id: str) -> str:
"""Return the full Redis key for ``session_id``.
Parameters
----------
session_id:
Raw session identifier.
Returns
-------
str
Prefixed Redis key.
"""
return f"{self._key_prefix}{session_id}"RedisBackend.save method · python · L99-L113 (15 LOC)src/agent_session_linker/storage/redis.py
def save(self, session_id: str, payload: str) -> None:
"""Write ``payload`` to Redis under the prefixed key.
Parameters
----------
session_id:
Storage key.
payload:
UTF-8 string payload.
"""
key = self._key(session_id)
if self._ttl_seconds is not None:
self._client.setex(key, self._ttl_seconds, payload)
else:
self._client.set(key, payload)RedisBackend.load method · python · L115-L136 (22 LOC)src/agent_session_linker/storage/redis.py
def load(self, session_id: str) -> str:
"""Return the payload for ``session_id``.
Parameters
----------
session_id:
The session to retrieve.
Returns
-------
str
Stored payload.
Raises
------
KeyError
If no key exists for ``session_id``.
"""
value = self._client.get(self._key(session_id))
if value is None:
raise KeyError(f"Session {session_id!r} not found in RedisBackend.")
return str(value)RedisBackend.list method · python · L138-L158 (21 LOC)src/agent_session_linker/storage/redis.py
def list(self) -> list[str]:
"""Return all session IDs stored under the configured prefix.
Uses a Redis SCAN to avoid blocking the server.
Returns
-------
list[str]
Session IDs with the prefix stripped.
"""
prefix = self._key_prefix
prefix_len = len(prefix)
session_ids: list[str] = []
cursor: int = 0
while True:
cursor, keys = self._client.scan(cursor=cursor, match=f"{prefix}*", count=100)
for key in keys:
session_ids.append(str(key)[prefix_len:])
if cursor == 0:
break
return session_idsRedisBackend.delete method · python · L160-L175 (16 LOC)src/agent_session_linker/storage/redis.py
def delete(self, session_id: str) -> None:
"""Remove the key for ``session_id``.
Parameters
----------
session_id:
The session to delete.
Raises
------
KeyError
If no key exists for ``session_id``.
"""
deleted = self._client.delete(self._key(session_id))
if deleted == 0:
raise KeyError(f"Session {session_id!r} not found in RedisBackend.")S3Backend.__init__ method · python · L43-L64 (22 LOC)src/agent_session_linker/storage/s3.py
def __init__(
self,
bucket_name: str,
prefix: str = "agent-sessions/",
region_name: str | None = None,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
endpoint_url: str | None = None,
) -> None:
try:
import boto3 # noqa: PLC0415
except ImportError as exc:
raise ImportError(_BOTO3_IMPORT_ERROR) from exc
session = boto3.session.Session(
region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
self._s3 = session.client("s3", endpoint_url=endpoint_url)
self._bucket = bucket_name
self._prefix = prefixSame scanner, your repo: https://repobility.com — Repobility
S3Backend._object_key method · python · L70-L83 (14 LOC)src/agent_session_linker/storage/s3.py
def _object_key(self, session_id: str) -> str:
"""Return the full S3 object key for ``session_id``.
Parameters
----------
session_id:
Raw session identifier.
Returns
-------
str
S3 object key string.
"""
return f"{self._prefix}{session_id}.json"S3Backend.save method · python · L89-L104 (16 LOC)src/agent_session_linker/storage/s3.py
def save(self, session_id: str, payload: str) -> None:
"""Write ``payload`` to S3.
Parameters
----------
session_id:
Storage key.
payload:
UTF-8 string (typically JSON).
"""
self._s3.put_object(
Bucket=self._bucket,
Key=self._object_key(session_id),
Body=payload.encode("utf-8"),
ContentType="application/json",
)S3Backend.load method · python · L106-L141 (36 LOC)src/agent_session_linker/storage/s3.py
def load(self, session_id: str) -> str:
"""Download and return the payload for ``session_id``.
Parameters
----------
session_id:
The session to retrieve.
Returns
-------
str
Decoded payload string.
Raises
------
KeyError
If the object does not exist in S3.
"""
try:
response = self._s3.get_object(
Bucket=self._bucket, Key=self._object_key(session_id)
)
return response["Body"].read().decode("utf-8")
except self._s3.exceptions.NoSuchKey:
raise KeyError(
f"Session {session_id!r} not found in S3Backend "
f"(bucket={self._bucket!r})."
) from None
except Exception as exc:
# Wrap unexpected S3/network errors in a descriptive KeyError.
error_code = getattr(getattr(exc, "response", None), "Error", {}).get("Code", S3Backend.list method · python · L143-L162 (20 LOC)src/agent_session_linker/storage/s3.py
def list(self) -> list[str]:
"""List all session IDs stored under the configured prefix.
Uses paginated ``list_objects_v2`` to handle large buckets.
Returns
-------
list[str]
Session IDs with prefix and ``.json`` suffix stripped.
"""
prefix_len = len(self._prefix)
session_ids: list[str] = []
paginator = self._s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self._bucket, Prefix=self._prefix):
for obj in page.get("Contents", []):
key: str = obj["Key"]
# Strip prefix and .json suffix.
if key.startswith(self._prefix) and key.endswith(".json"):
session_ids.append(key[prefix_len:-5])
return session_idsS3Backend.delete method · python · L164-L182 (19 LOC)src/agent_session_linker/storage/s3.py
def delete(self, session_id: str) -> None:
"""Remove the S3 object for ``session_id``.
Parameters
----------
session_id:
The session to delete.
Raises
------
KeyError
If the object does not exist.
"""
if not self.exists(session_id):
raise KeyError(
f"Session {session_id!r} not found in S3Backend "
f"(bucket={self._bucket!r})."
)
self._s3.delete_object(Bucket=self._bucket, Key=self._object_key(session_id))S3Backend.exists method · python · L184-L194 (11 LOC)src/agent_session_linker/storage/s3.py
def exists(self, session_id: str) -> bool:
"""Return True if the S3 object for ``session_id`` exists."""
try:
self._s3.head_object(Bucket=self._bucket, Key=self._object_key(session_id))
return True
except Exception as exc:
error_code = getattr(getattr(exc, "response", None), "Error", {}).get("Code", "")
if error_code in ("404", "NoSuchKey"):
return False
# Unexpected errors bubble up.
raiseSQLiteBackend._get_connection method · python · L58-L72 (15 LOC)src/agent_session_linker/storage/sqlite.py
def _get_connection(self) -> sqlite3.Connection:
"""Open (or reopen) a connection and ensure the table exists.
Returns
-------
sqlite3.Connection
A ready-to-use connection with row_factory set.
"""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(_CREATE_TABLE_SQL)
conn.commit()
return connSQLiteBackend.save method · python · L78-L89 (12 LOC)src/agent_session_linker/storage/sqlite.py
def save(self, session_id: str, payload: str) -> None:
"""Upsert ``payload`` for ``session_id`` in the sessions table.
Parameters
----------
session_id:
Storage key.
payload:
UTF-8 JSON string.
"""
with self._get_connection() as conn:
conn.execute(_UPSERT_SQL, (session_id, payload))Want this analysis on your repo? https://repobility.com/scan/
SQLiteBackend.load method · python · L91-L115 (25 LOC)src/agent_session_linker/storage/sqlite.py
def load(self, session_id: str) -> str:
"""Return the payload row for ``session_id``.
Parameters
----------
session_id:
The session to retrieve.
Returns
-------
str
Raw payload string.
Raises
------
KeyError
If no row exists for ``session_id``.
"""
with self._get_connection() as conn:
row = conn.execute(
"SELECT payload FROM sessions WHERE session_id = ?", (session_id,)
).fetchone()
if row is None:
raise KeyError(f"Session {session_id!r} not found in SQLiteBackend.")
return str(row["payload"])SQLiteBackend.list method · python · L117-L128 (12 LOC)src/agent_session_linker/storage/sqlite.py
def list(self) -> list[str]:
"""Return all session IDs stored in the database.
Returns
-------
list[str]
"""
with self._get_connection() as conn:
rows = conn.execute(
"SELECT session_id FROM sessions ORDER BY saved_at DESC"
).fetchall()
return [str(row["session_id"]) for row in rows]SQLiteBackend.delete method · python · L130-L148 (19 LOC)src/agent_session_linker/storage/sqlite.py
def delete(self, session_id: str) -> None:
"""Remove the row for ``session_id``.
Parameters
----------
session_id:
The session to delete.
Raises
------
KeyError
If no row exists for ``session_id``.
"""
with self._get_connection() as conn:
cursor = conn.execute(
"DELETE FROM sessions WHERE session_id = ?", (session_id,)
)
if cursor.rowcount == 0:
raise KeyError(f"Session {session_id!r} not found in SQLiteBackend.")SQLiteBackend.exists method · python · L150-L156 (7 LOC)src/agent_session_linker/storage/sqlite.py
def exists(self, session_id: str) -> bool:
"""Return True if a row for ``session_id`` exists."""
with self._get_connection() as conn:
row = conn.execute(
"SELECT 1 FROM sessions WHERE session_id = ?", (session_id,)
).fetchone()
return row is not None‹ prevpage 5 / 5