Module aws_lambda_powertools.utilities.idempotency.base

Expand source code
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple

from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
    IdempotencyAlreadyInProgressError,
    IdempotencyInconsistentStateError,
    IdempotencyItemAlreadyExistsError,
    IdempotencyItemNotFoundError,
    IdempotencyKeyError,
    IdempotencyPersistenceLayerError,
    IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
    STATUS_CONSTANTS,
    BasePersistenceLayer,
    DataRecord,
)

MAX_RETRIES = 2
logger = logging.getLogger(__name__)


def _prepare_data(data: Any) -> Any:
    """Prepare data for json serialization.

    We will convert Python dataclasses, pydantic models or event source data classes to a dict,
    otherwise return data as-is.
    """
    if hasattr(data, "__dataclass_fields__"):
        import dataclasses

        return dataclasses.asdict(data)

    if callable(getattr(data, "dict", None)):
        return data.dict()

    return getattr(data, "raw_event", data)


class IdempotencyHandler:
    """
    Base class to orchestrate calls to persistence layer.
    """

    def __init__(
        self,
        function: Callable,
        function_payload: Any,
        config: IdempotencyConfig,
        persistence_store: BasePersistenceLayer,
        function_args: Optional[Tuple] = None,
        function_kwargs: Optional[Dict] = None,
    ):
        """
        Initialize the IdempotencyHandler

        Parameters
        ----------
        function_payload: Any
            JSON Serializable payload to be hashed
        config: IdempotencyConfig
            Idempotency Configuration
        persistence_store : BasePersistenceLayer
            Instance of persistence layer to store idempotency records
        function_args: Optional[Tuple]
            Function arguments
        function_kwargs: Optional[Dict]
            Function keyword arguments
        """
        self.function = function
        self.data = deepcopy(_prepare_data(function_payload))
        self.fn_args = function_args
        self.fn_kwargs = function_kwargs

        persistence_store.configure(config, self.function.__name__)
        self.persistence_store = persistence_store

    def handle(self) -> Any:
        """
        Main entry point for handling idempotent execution of a function.

        Returns
        -------
        Any
            Function response

        """
        # IdempotencyInconsistentStateError can happen under rare but expected cases
        # when persistent state changes in the small time between put & get requests.
        # In most cases we can retry successfully on this exception.
        for i in range(MAX_RETRIES + 1):  # pragma: no cover
            try:
                return self._process_idempotency()
            except IdempotencyInconsistentStateError:
                if i == MAX_RETRIES:
                    raise  # Bubble up when exceeded max tries

    def _process_idempotency(self):
        try:
            # We call save_inprogress first as an optimization for the most common case where no idempotent record
            # already exists. If it succeeds, there's no need to call get_record.
            self.persistence_store.save_inprogress(data=self.data)
        except IdempotencyKeyError:
            raise
        except IdempotencyItemAlreadyExistsError:
            # Now we know the item already exists, we can retrieve it
            record = self._get_idempotency_record()
            return self._handle_for_status(record)
        except Exception as exc:
            raise IdempotencyPersistenceLayerError("Failed to save in progress record to idempotency store") from exc

        return self._get_function_response()

    def _get_idempotency_record(self) -> DataRecord:
        """
        Retrieve the idempotency record from the persistence layer.

        Raises
        ----------
        IdempotencyInconsistentStateError

        """
        try:
            data_record = self.persistence_store.get_record(data=self.data)
        except IdempotencyItemNotFoundError:
            # This code path will only be triggered if the record is removed between save_inprogress and get_record.
            logger.debug(
                f"An existing idempotency record was deleted before we could fetch it. Proceeding with {self.function}"
            )
            raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

        # Allow this exception to bubble up
        except IdempotencyValidationError:
            raise

        # Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
        # clients
        except Exception as exc:
            raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc

        return data_record

    def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]]:
        """
        Take appropriate action based on data_record's status

        Parameters
        ----------
        data_record: DataRecord

        Returns
        -------
        Optional[Dict[Any, Any]
            Function's response previously used for this idempotency key, if it has successfully executed already.

        Raises
        ------
        AlreadyInProgressError
            A function execution is already in progress
        IdempotencyInconsistentStateError
            The persistence store reports inconsistent states across different requests. Retryable.
        """
        # This code path will only be triggered if the record becomes expired between the save_inprogress call and here
        if data_record.status == STATUS_CONSTANTS["EXPIRED"]:
            raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            raise IdempotencyAlreadyInProgressError(
                f"Execution already in progress with idempotency key: "
                f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
            )

        return data_record.response_json_as_dict()

    def _get_function_response(self):
        try:
            response = self.function(*self.fn_args, **self.fn_kwargs)
        except Exception as handler_exception:
            # We need these nested blocks to preserve function's exception in case the persistence store operation
            # also raises an exception
            try:
                self.persistence_store.delete_record(data=self.data, exception=handler_exception)
            except Exception as delete_exception:
                raise IdempotencyPersistenceLayerError(
                    "Failed to delete record from idempotency store"
                ) from delete_exception
            raise

        else:
            try:
                self.persistence_store.save_success(data=self.data, result=response)
            except Exception as save_exception:
                raise IdempotencyPersistenceLayerError(
                    "Failed to update record state to success in idempotency store"
                ) from save_exception

        return response

Classes

class IdempotencyHandler (function: Callable, function_payload: Any, config: IdempotencyConfig, persistence_store: BasePersistenceLayer, function_args: Optional[Tuple[]] = None, function_kwargs: Optional[Dict[~KT, ~VT]] = None)

Base class to orchestrate calls to persistence layer.

Initialize the IdempotencyHandler

Parameters

function_payload : Any
JSON Serializable payload to be hashed
config : IdempotencyConfig
Idempotency Configuration
persistence_store : BasePersistenceLayer
Instance of persistence layer to store idempotency records
function_args : Optional[Tuple]
Function arguments
function_kwargs : Optional[Dict]
Function keyword arguments
Expand source code
class IdempotencyHandler:
    """
    Base class to orchestrate calls to persistence layer.
    """

    def __init__(
        self,
        function: Callable,
        function_payload: Any,
        config: IdempotencyConfig,
        persistence_store: BasePersistenceLayer,
        function_args: Optional[Tuple] = None,
        function_kwargs: Optional[Dict] = None,
    ):
        """
        Initialize the IdempotencyHandler

        Parameters
        ----------
        function_payload: Any
            JSON Serializable payload to be hashed
        config: IdempotencyConfig
            Idempotency Configuration
        persistence_store : BasePersistenceLayer
            Instance of persistence layer to store idempotency records
        function_args: Optional[Tuple]
            Function arguments
        function_kwargs: Optional[Dict]
            Function keyword arguments
        """
        self.function = function
        self.data = deepcopy(_prepare_data(function_payload))
        self.fn_args = function_args
        self.fn_kwargs = function_kwargs

        persistence_store.configure(config, self.function.__name__)
        self.persistence_store = persistence_store

    def handle(self) -> Any:
        """
        Main entry point for handling idempotent execution of a function.

        Returns
        -------
        Any
            Function response

        """
        # IdempotencyInconsistentStateError can happen under rare but expected cases
        # when persistent state changes in the small time between put & get requests.
        # In most cases we can retry successfully on this exception.
        for i in range(MAX_RETRIES + 1):  # pragma: no cover
            try:
                return self._process_idempotency()
            except IdempotencyInconsistentStateError:
                if i == MAX_RETRIES:
                    raise  # Bubble up when exceeded max tries

    def _process_idempotency(self):
        try:
            # We call save_inprogress first as an optimization for the most common case where no idempotent record
            # already exists. If it succeeds, there's no need to call get_record.
            self.persistence_store.save_inprogress(data=self.data)
        except IdempotencyKeyError:
            raise
        except IdempotencyItemAlreadyExistsError:
            # Now we know the item already exists, we can retrieve it
            record = self._get_idempotency_record()
            return self._handle_for_status(record)
        except Exception as exc:
            raise IdempotencyPersistenceLayerError("Failed to save in progress record to idempotency store") from exc

        return self._get_function_response()

    def _get_idempotency_record(self) -> DataRecord:
        """
        Retrieve the idempotency record from the persistence layer.

        Raises
        ----------
        IdempotencyInconsistentStateError

        """
        try:
            data_record = self.persistence_store.get_record(data=self.data)
        except IdempotencyItemNotFoundError:
            # This code path will only be triggered if the record is removed between save_inprogress and get_record.
            logger.debug(
                f"An existing idempotency record was deleted before we could fetch it. Proceeding with {self.function}"
            )
            raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

        # Allow this exception to bubble up
        except IdempotencyValidationError:
            raise

        # Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
        # clients
        except Exception as exc:
            raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc

        return data_record

    def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]]:
        """
        Take appropriate action based on data_record's status

        Parameters
        ----------
        data_record: DataRecord

        Returns
        -------
        Optional[Dict[Any, Any]
            Function's response previously used for this idempotency key, if it has successfully executed already.

        Raises
        ------
        AlreadyInProgressError
            A function execution is already in progress
        IdempotencyInconsistentStateError
            The persistence store reports inconsistent states across different requests. Retryable.
        """
        # This code path will only be triggered if the record becomes expired between the save_inprogress call and here
        if data_record.status == STATUS_CONSTANTS["EXPIRED"]:
            raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

        if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
            raise IdempotencyAlreadyInProgressError(
                f"Execution already in progress with idempotency key: "
                f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
            )

        return data_record.response_json_as_dict()

    def _get_function_response(self):
        try:
            response = self.function(*self.fn_args, **self.fn_kwargs)
        except Exception as handler_exception:
            # We need these nested blocks to preserve function's exception in case the persistence store operation
            # also raises an exception
            try:
                self.persistence_store.delete_record(data=self.data, exception=handler_exception)
            except Exception as delete_exception:
                raise IdempotencyPersistenceLayerError(
                    "Failed to delete record from idempotency store"
                ) from delete_exception
            raise

        else:
            try:
                self.persistence_store.save_success(data=self.data, result=response)
            except Exception as save_exception:
                raise IdempotencyPersistenceLayerError(
                    "Failed to update record state to success in idempotency store"
                ) from save_exception

        return response

Methods

def handle(self) ‑> Any

Main entry point for handling idempotent execution of a function.

Returns

Any
Function response
Expand source code
def handle(self) -> Any:
    """
    Main entry point for handling idempotent execution of a function.

    Returns
    -------
    Any
        Function response

    """
    # IdempotencyInconsistentStateError can happen under rare but expected cases
    # when persistent state changes in the small time between put & get requests.
    # In most cases we can retry successfully on this exception.
    for i in range(MAX_RETRIES + 1):  # pragma: no cover
        try:
            return self._process_idempotency()
        except IdempotencyInconsistentStateError:
            if i == MAX_RETRIES:
                raise  # Bubble up when exceeded max tries