← back to invincible-jha__aumai-transactions

Function bodies 14 total

All specs Real LLM only Function bodies
_load_manager function · python · L18-L28 (11 LOC)
src/aumai_transactions/cli.py
def _load_manager() -> TransactionManager:
    """Load a TransactionManager with persisted transaction state."""
    manager = TransactionManager()
    if _DEFAULT_STATE_FILE.exists():
        raw: list[dict[str, object]] = json.loads(
            _DEFAULT_STATE_FILE.read_text(encoding="utf-8")
        )
        for entry in raw:
            tx = Transaction.model_validate(entry)
            manager.register_transaction(tx)
    return manager
create_cmd function · python · L53-L69 (17 LOC)
src/aumai_transactions/cli.py
def create_cmd(timeout_seconds: int) -> None:
    """Create a new pending transaction and print its ID."""
    manager = _load_manager()
    tx = manager.begin(timeout_seconds=timeout_seconds)
    _save_manager(manager)

    click.echo(
        json.dumps(
            {
                "transaction_id": tx.transaction_id,
                "state": tx.state.value,
                "created_at": tx.created_at.isoformat(),
                "timeout_seconds": tx.timeout_seconds,
            },
            indent=2,
        )
    )
status_cmd function · python · L74-L92 (19 LOC)
src/aumai_transactions/cli.py
def status_cmd(tx_id: str) -> None:
    """Display the current status of a transaction."""
    manager = _load_manager()
    tx = manager.get_transaction(tx_id)
    if tx is None:
        raise click.ClickException(f"Transaction not found: {tx_id}")

    click.echo(
        json.dumps(
            {
                "transaction_id": tx.transaction_id,
                "state": tx.state.value,
                "steps": len(tx.steps),
                "created_at": tx.created_at.isoformat(),
                "timeout_seconds": tx.timeout_seconds,
            },
            indent=2,
        )
    )
TransactionManager.begin method · python · L45-L63 (19 LOC)
src/aumai_transactions/core.py
    def begin(self, timeout_seconds: int = 60) -> Transaction:
        """Create a new transaction in the *pending* state.

        Args:
            timeout_seconds: Maximum lifetime before the transaction is
                considered timed out.

        Returns:
            The newly created :class:`~aumai_transactions.models.Transaction`.
        """
        transaction_id = str(uuid.uuid4())
        tx = Transaction(
            transaction_id=transaction_id,
            state=TransactionState.pending,
            created_at=datetime.now(tz=timezone.utc),
            timeout_seconds=timeout_seconds,
        )
        self._transactions[transaction_id] = tx
        return tx
TransactionManager.add_step method · python · L65-L103 (39 LOC)
src/aumai_transactions/core.py
    def add_step(
        self,
        tx: Transaction,
        agent_id: str,
        action: str,
        data: dict[str, object],
        compensating_action: str | None = None,
    ) -> TransactionStep:
        """Append a step to *tx*.

        Args:
            tx: The transaction to extend.
            agent_id: Agent responsible for this step.
            action: Action descriptor string.
            data: Payload for the action.
            compensating_action: Optional undo action for rollback.

        Returns:
            The newly created :class:`~aumai_transactions.models.TransactionStep`.

        Raises:
            ValueError: When *tx* is not in the *pending* state.
        """
        if tx.state != TransactionState.pending:
            raise ValueError(
                f"Cannot add steps to transaction {tx.transaction_id!r} "
                f"in state {tx.state.value!r}; only 'pending' transactions accept new steps."
            )

        step = TransactionStep(
TransactionManager.commit method · python · L105-L160 (56 LOC)
src/aumai_transactions/core.py
    def commit(self, tx: Transaction) -> TransactionResult:
        """Execute all steps in order, rolling back on any failure.

        The transaction transitions to *active* before execution begins, and
        then to either *committed* or *rolled_back* when finished.

        Args:
            tx: The transaction to commit.

        Returns:
            A :class:`~aumai_transactions.models.TransactionResult` describing the outcome.

        Raises:
            ValueError: When *tx* is not in *pending* state.
        """
        if tx.state != TransactionState.pending:
            raise ValueError(
                f"Transaction {tx.transaction_id!r} is in state {tx.state.value!r}; "
                "only 'pending' transactions can be committed."
            )

        self._set_state(tx, TransactionState.active)

        # Check timeout
        if self._is_timed_out(tx):
            self._set_state(tx, TransactionState.failed)
            return TransactionResult(
                t
TransactionManager.rollback method · python · L162-L180 (19 LOC)
src/aumai_transactions/core.py
    def rollback(self, tx: Transaction) -> TransactionResult:
        """Execute compensating actions for all steps in reverse order.

        This can be called on a *pending* or *active* transaction.

        Args:
            tx: The transaction to roll back.

        Returns:
            A :class:`~aumai_transactions.models.TransactionResult` in *rolled_back* state.
        """
        all_step_ids = [step.step_id for step in tx.steps]
        compensated = self._execute_rollback(tx, all_step_ids)
        self._set_state(tx, TransactionState.rolled_back)
        return TransactionResult(
            transaction_id=tx.transaction_id,
            state=TransactionState.rolled_back,
            completed_steps=compensated,
        )
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
TransactionManager.get_transaction method · python · L182-L191 (10 LOC)
src/aumai_transactions/core.py
    def get_transaction(self, transaction_id: str) -> Transaction | None:
        """Return the transaction with *transaction_id*, or *None*.

        Args:
            transaction_id: The transaction identifier.

        Returns:
            The :class:`~aumai_transactions.models.Transaction`, or *None*.
        """
        return self._transactions.get(transaction_id)
TransactionManager.get_all_transactions method · python · L193-L199 (7 LOC)
src/aumai_transactions/core.py
    def get_all_transactions(self) -> list[Transaction]:
        """Return all transactions held by this manager.

        Returns:
            A list of all :class:`~aumai_transactions.models.Transaction` objects.
        """
        return list(self._transactions.values())
TransactionManager.register_transaction method · python · L201-L209 (9 LOC)
src/aumai_transactions/core.py
    def register_transaction(self, tx: Transaction) -> None:
        """Insert an existing transaction into the manager registry.

        This is primarily used to restore state from persistent storage.

        Args:
            tx: The transaction to register.
        """
        self._transactions[tx.transaction_id] = tx
TransactionManager._execute_rollback method · python · L221-L240 (20 LOC)
src/aumai_transactions/core.py
    def _execute_rollback(
        self, tx: Transaction, completed_step_ids: list[str]
    ) -> list[str]:
        """Execute compensating actions for completed steps in reverse order."""
        completed_set = set(completed_step_ids)
        compensated: list[str] = []

        for step in reversed(tx.steps):
            if step.step_id not in completed_set:
                continue
            if step.compensating_action is not None:
                handler = self._handlers.get(step.compensating_action)
                if handler is not None:
                    try:
                        handler(step.compensating_action, step.data)
                    except Exception:  # noqa: BLE001
                        pass  # Best-effort compensation
            compensated.append(step.step_id)

        return compensated
TransactionManager._set_state method · python · L242-L256 (15 LOC)
src/aumai_transactions/core.py
    def _set_state(self, tx: Transaction, state: TransactionState) -> None:
        """Update the transaction state in-place and persist the change in the registry.

        Pydantic v2 models are mutable by default, so we assign directly to
        ``tx.state``.  All callers hold a reference to the same ``tx`` object
        and therefore observe the updated state immediately.  The registry is
        updated to point at that same object so that lookups via
        :meth:`get_transaction` are also consistent.

        Choosing in-place mutation as the single pattern avoids the footgun of
        returning a stale copy to the caller while storing a different copy in
        the registry.
        """
        tx.state = state  # type: ignore[assignment]
        self._transactions[tx.transaction_id] = tx
SagaOrchestrator.register method · python · L278-L293 (16 LOC)
src/aumai_transactions/core.py
    def register(
        self,
        agent_id: str,
        action: str,
        data: dict[str, object],
        compensating_action: str | None = None,
    ) -> None:
        """Register a saga participant step.

        Args:
            agent_id: Participant agent identifier.
            action: Action to execute.
            data: Action payload.
            compensating_action: Undo action for rollback.
        """
        self._steps.append((agent_id, action, data, compensating_action))
SagaOrchestrator.execute method · python · L295-L313 (19 LOC)
src/aumai_transactions/core.py
    def execute(self, timeout_seconds: int = 60) -> TransactionResult:
        """Execute all registered steps as a single saga transaction.

        Args:
            timeout_seconds: Transaction timeout.

        Returns:
            The :class:`~aumai_transactions.models.TransactionResult`.
        """
        tx = self._manager.begin(timeout_seconds=timeout_seconds)
        for agent_id, action, data, compensating_action in self._steps:
            self._manager.add_step(
                tx,
                agent_id=agent_id,
                action=action,
                data=data,
                compensating_action=compensating_action,
            )
        return self._manager.commit(tx)