Function bodies 14 total
_load_manager function · python · L18-L28 (11 LOC)src/aumai_transactions/cli.py
def _load_manager() -> TransactionManager:
"""Load a TransactionManager with persisted transaction state."""
manager = TransactionManager()
if _DEFAULT_STATE_FILE.exists():
raw: list[dict[str, object]] = json.loads(
_DEFAULT_STATE_FILE.read_text(encoding="utf-8")
)
for entry in raw:
tx = Transaction.model_validate(entry)
manager.register_transaction(tx)
return managercreate_cmd function · python · L53-L69 (17 LOC)src/aumai_transactions/cli.py
def create_cmd(timeout_seconds: int) -> None:
"""Create a new pending transaction and print its ID."""
manager = _load_manager()
tx = manager.begin(timeout_seconds=timeout_seconds)
_save_manager(manager)
click.echo(
json.dumps(
{
"transaction_id": tx.transaction_id,
"state": tx.state.value,
"created_at": tx.created_at.isoformat(),
"timeout_seconds": tx.timeout_seconds,
},
indent=2,
)
)status_cmd function · python · L74-L92 (19 LOC)src/aumai_transactions/cli.py
def status_cmd(tx_id: str) -> None:
"""Display the current status of a transaction."""
manager = _load_manager()
tx = manager.get_transaction(tx_id)
if tx is None:
raise click.ClickException(f"Transaction not found: {tx_id}")
click.echo(
json.dumps(
{
"transaction_id": tx.transaction_id,
"state": tx.state.value,
"steps": len(tx.steps),
"created_at": tx.created_at.isoformat(),
"timeout_seconds": tx.timeout_seconds,
},
indent=2,
)
)TransactionManager.begin method · python · L45-L63 (19 LOC)src/aumai_transactions/core.py
def begin(self, timeout_seconds: int = 60) -> Transaction:
"""Create a new transaction in the *pending* state.
Args:
timeout_seconds: Maximum lifetime before the transaction is
considered timed out.
Returns:
The newly created :class:`~aumai_transactions.models.Transaction`.
"""
transaction_id = str(uuid.uuid4())
tx = Transaction(
transaction_id=transaction_id,
state=TransactionState.pending,
created_at=datetime.now(tz=timezone.utc),
timeout_seconds=timeout_seconds,
)
self._transactions[transaction_id] = tx
return txTransactionManager.add_step method · python · L65-L103 (39 LOC)src/aumai_transactions/core.py
def add_step(
self,
tx: Transaction,
agent_id: str,
action: str,
data: dict[str, object],
compensating_action: str | None = None,
) -> TransactionStep:
"""Append a step to *tx*.
Args:
tx: The transaction to extend.
agent_id: Agent responsible for this step.
action: Action descriptor string.
data: Payload for the action.
compensating_action: Optional undo action for rollback.
Returns:
The newly created :class:`~aumai_transactions.models.TransactionStep`.
Raises:
ValueError: When *tx* is not in the *pending* state.
"""
if tx.state != TransactionState.pending:
raise ValueError(
f"Cannot add steps to transaction {tx.transaction_id!r} "
f"in state {tx.state.value!r}; only 'pending' transactions accept new steps."
)
step = TransactionStep(
TransactionManager.commit method · python · L105-L160 (56 LOC)src/aumai_transactions/core.py
def commit(self, tx: Transaction) -> TransactionResult:
"""Execute all steps in order, rolling back on any failure.
The transaction transitions to *active* before execution begins, and
then to either *committed* or *rolled_back* when finished.
Args:
tx: The transaction to commit.
Returns:
A :class:`~aumai_transactions.models.TransactionResult` describing the outcome.
Raises:
ValueError: When *tx* is not in *pending* state.
"""
if tx.state != TransactionState.pending:
raise ValueError(
f"Transaction {tx.transaction_id!r} is in state {tx.state.value!r}; "
"only 'pending' transactions can be committed."
)
self._set_state(tx, TransactionState.active)
# Check timeout
if self._is_timed_out(tx):
self._set_state(tx, TransactionState.failed)
return TransactionResult(
tTransactionManager.rollback method · python · L162-L180 (19 LOC)src/aumai_transactions/core.py
def rollback(self, tx: Transaction) -> TransactionResult:
"""Execute compensating actions for all steps in reverse order.
This can be called on a *pending* or *active* transaction.
Args:
tx: The transaction to roll back.
Returns:
A :class:`~aumai_transactions.models.TransactionResult` in *rolled_back* state.
"""
all_step_ids = [step.step_id for step in tx.steps]
compensated = self._execute_rollback(tx, all_step_ids)
self._set_state(tx, TransactionState.rolled_back)
return TransactionResult(
transaction_id=tx.transaction_id,
state=TransactionState.rolled_back,
completed_steps=compensated,
)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
TransactionManager.get_transaction method · python · L182-L191 (10 LOC)src/aumai_transactions/core.py
def get_transaction(self, transaction_id: str) -> Transaction | None:
"""Return the transaction with *transaction_id*, or *None*.
Args:
transaction_id: The transaction identifier.
Returns:
The :class:`~aumai_transactions.models.Transaction`, or *None*.
"""
return self._transactions.get(transaction_id)TransactionManager.get_all_transactions method · python · L193-L199 (7 LOC)src/aumai_transactions/core.py
def get_all_transactions(self) -> list[Transaction]:
"""Return all transactions held by this manager.
Returns:
A list of all :class:`~aumai_transactions.models.Transaction` objects.
"""
return list(self._transactions.values())TransactionManager.register_transaction method · python · L201-L209 (9 LOC)src/aumai_transactions/core.py
def register_transaction(self, tx: Transaction) -> None:
"""Insert an existing transaction into the manager registry.
This is primarily used to restore state from persistent storage.
Args:
tx: The transaction to register.
"""
self._transactions[tx.transaction_id] = txTransactionManager._execute_rollback method · python · L221-L240 (20 LOC)src/aumai_transactions/core.py
def _execute_rollback(
self, tx: Transaction, completed_step_ids: list[str]
) -> list[str]:
"""Execute compensating actions for completed steps in reverse order."""
completed_set = set(completed_step_ids)
compensated: list[str] = []
for step in reversed(tx.steps):
if step.step_id not in completed_set:
continue
if step.compensating_action is not None:
handler = self._handlers.get(step.compensating_action)
if handler is not None:
try:
handler(step.compensating_action, step.data)
except Exception: # noqa: BLE001
pass # Best-effort compensation
compensated.append(step.step_id)
return compensatedTransactionManager._set_state method · python · L242-L256 (15 LOC)src/aumai_transactions/core.py
def _set_state(self, tx: Transaction, state: TransactionState) -> None:
"""Update the transaction state in-place and persist the change in the registry.
Pydantic v2 models are mutable by default, so we assign directly to
``tx.state``. All callers hold a reference to the same ``tx`` object
and therefore observe the updated state immediately. The registry is
updated to point at that same object so that lookups via
:meth:`get_transaction` are also consistent.
Choosing in-place mutation as the single pattern avoids the footgun of
returning a stale copy to the caller while storing a different copy in
the registry.
"""
tx.state = state # type: ignore[assignment]
self._transactions[tx.transaction_id] = txSagaOrchestrator.register method · python · L278-L293 (16 LOC)src/aumai_transactions/core.py
def register(
self,
agent_id: str,
action: str,
data: dict[str, object],
compensating_action: str | None = None,
) -> None:
"""Register a saga participant step.
Args:
agent_id: Participant agent identifier.
action: Action to execute.
data: Action payload.
compensating_action: Undo action for rollback.
"""
self._steps.append((agent_id, action, data, compensating_action))SagaOrchestrator.execute method · python · L295-L313 (19 LOC)src/aumai_transactions/core.py
def execute(self, timeout_seconds: int = 60) -> TransactionResult:
"""Execute all registered steps as a single saga transaction.
Args:
timeout_seconds: Transaction timeout.
Returns:
The :class:`~aumai_transactions.models.TransactionResult`.
"""
tx = self._manager.begin(timeout_seconds=timeout_seconds)
for agent_id, action, data, compensating_action in self._steps:
self._manager.add_step(
tx,
agent_id=agent_id,
action=action,
data=data,
compensating_action=compensating_action,
)
return self._manager.commit(tx)