Module aws_lambda_powertools.utilities.batch

Batch processing utility

Expand source code
# -*- coding: utf-8 -*-

"""
Batch processing utility
"""

from aws_lambda_powertools.utilities.batch.base import (
    BasePartialProcessor,
    BatchProcessor,
    EventType,
    FailureResponse,
    SuccessResponse,
    batch_processor,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor

__all__ = (
    "BatchProcessor",
    "BasePartialProcessor",
    "ExceptionInfo",
    "EventType",
    "FailureResponse",
    "PartialSQSProcessor",
    "SuccessResponse",
    "batch_processor",
    "sqs_batch_processor",
)

Sub-modules

aws_lambda_powertools.utilities.batch.base

Batch processing utilities

aws_lambda_powertools.utilities.batch.exceptions

Batch processing exceptions

aws_lambda_powertools.utilities.batch.sqs

Batch SQS utilities

Functions

def batch_processor(handler: Callable, event: Dict[~KT, ~VT], context: Dict[~KT, ~VT], record_handler: Callable, processor: BasePartialProcessor)

Middleware to handle batch event processing

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
record_handler : Callable
Callable to process each record from the batch
processor : PartialSQSProcessor
Batch Processor to handle partial failure cases

Examples

Processes Lambda's event with PartialSQSProcessor

>>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
>>>
>>> def record_handler(record):
>>>     return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
>>> def handler(event, context):
>>>     return {"StatusCode": 200}

Limitations

  • Async batch processors
Expand source code
@lambda_handler_decorator
def batch_processor(
    handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor
):
    """
    Middleware to handle batch event processing

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    record_handler: Callable
        Callable to process each record from the batch
    processor: PartialSQSProcessor
        Batch Processor to handle partial failure cases

    Examples
    --------
    **Processes Lambda's event with PartialSQSProcessor**

        >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}

    Limitations
    -----------
    * Async batch processors

    """
    records = event["Records"]

    with processor(records, record_handler):
        processor.process()

    return handler(event, context)
def sqs_batch_processor(handler: Callable, event: Dict[~KT, ~VT], context: Dict[~KT, ~VT], record_handler: Callable, config: Optional[botocore.config.Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None)

Middleware to handle SQS batch event processing

Parameters

handler : Callable
Lambda's handler
event : Dict
Lambda's Event
context : Dict
Lambda's Context
record_handler : Callable
Callable to process each record from the batch
config : Config
botocore config object
suppress_exception : bool, optional
Supress exception raised if any messages fail processing, by default False
boto3_session : boto3.session.Session, optional
Boto3 session to use for AWS API communication

Examples

Processes Lambda's event with PartialSQSProcessor

>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
>>>
>>> def record_handler(record):
>>>      return record["body"]
>>>
>>> @sqs_batch_processor(record_handler=record_handler)
>>> def handler(event, context):
>>>     return {"StatusCode": 200}

Limitations

  • Async batch processors
Expand source code
@lambda_handler_decorator
def sqs_batch_processor(
    handler: Callable,
    event: Dict,
    context: Dict,
    record_handler: Callable,
    config: Optional[Config] = None,
    suppress_exception: bool = False,
    boto3_session: Optional[boto3.session.Session] = None,
):
    """
    Middleware to handle SQS batch event processing

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: Dict
        Lambda's Event
    context: Dict
        Lambda's Context
    record_handler: Callable
        Callable to process each record from the batch
    config: Config
            botocore config object
    suppress_exception: bool, optional
        Supress exception raised if any messages fail processing, by default False
    boto3_session : boto3.session.Session, optional
            Boto3 session to use for AWS API communication

    Examples
    --------
    **Processes Lambda's event with PartialSQSProcessor**

        >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
        >>>
        >>> def record_handler(record):
        >>>      return record["body"]
        >>>
        >>> @sqs_batch_processor(record_handler=record_handler)
        >>> def handler(event, context):
        >>>     return {"StatusCode": 200}

    Limitations
    -----------
    * Async batch processors

    """
    config = config or Config()
    session = boto3_session or boto3.session.Session()

    processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception, boto3_session=session)

    records = event["Records"]

    with processor(records, record_handler):
        processor.process()

    return handler(event, context)

Classes

class BasePartialProcessor

Abstract class for batch processors.

Expand source code
class BasePartialProcessor(ABC):
    """
    Abstract class for batch processors.
    """

    def __init__(self):
        self.success_messages: List[BatchEventTypes] = []
        self.fail_messages: List[BatchEventTypes] = []
        self.exceptions: List[ExceptionInfo] = []

    @abstractmethod
    def _prepare(self):
        """
        Prepare context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _clean(self):
        """
        Clear context manager.
        """
        raise NotImplementedError()

    @abstractmethod
    def _process_record(self, record: dict):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return [self._process_record(record) for record in self.records]

    def __enter__(self):
        self._prepare()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self._clean()

    def __call__(self, records: List[dict], handler: Callable):
        """
        Set instance attributes before execution

        Parameters
        ----------
        records: List[dict]
            List with objects to be processed.
        handler: Callable
            Callable to process "records" entries.
        """
        self.records = records
        self.handler = handler
        return self

    def success_handler(self, record, result: Any) -> SuccessResponse:
        """
        Keeps track of batch records that were processed successfully

        Parameters
        ----------
        record: Any
            record that succeeded processing
        result: Any
            result from record handler

        Returns
        -------
        SuccessResponse
            "success", result, original record
        """
        entry = ("success", result, record)
        self.success_messages.append(record)
        return entry

    def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
        """
        Keeps track of batch records that failed processing

        Parameters
        ----------
        record: Any
            record that failed processing
        exception: ExceptionInfo
            Exception information containing type, value, and traceback (sys.exc_info())

        Returns
        -------
        FailureResponse
            "fail", exceptions args, original record
        """
        exception_string = f"{exception[0]}:{exception[1]}"
        entry = ("fail", exception_string, record)
        logger.debug(f"Record processing exception: {exception_string}")
        self.exceptions.append(exception)
        self.fail_messages.append(record)
        return entry

Ancestors

  • abc.ABC

Subclasses

Methods

def failure_handler(self, record, exception: Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]]) ‑> Tuple[str, str, Union[SQSRecordKinesisStreamRecordDynamoDBRecord, BatchTypeModels]]

Keeps track of batch records that failed processing

Parameters

record : Any
record that failed processing
exception : ExceptionInfo
Exception information containing type, value, and traceback (sys.exc_info())

Returns

FailureResponse
"fail", exceptions args, original record
Expand source code
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
    """
    Keeps track of batch records that failed processing

    Parameters
    ----------
    record: Any
        record that failed processing
    exception: ExceptionInfo
        Exception information containing type, value, and traceback (sys.exc_info())

    Returns
    -------
    FailureResponse
        "fail", exceptions args, original record
    """
    exception_string = f"{exception[0]}:{exception[1]}"
    entry = ("fail", exception_string, record)
    logger.debug(f"Record processing exception: {exception_string}")
    self.exceptions.append(exception)
    self.fail_messages.append(record)
    return entry
def process(self) ‑> List[Tuple[]]

Call instance's handler for each record.

Expand source code
def process(self) -> List[Tuple]:
    """
    Call instance's handler for each record.
    """
    return [self._process_record(record) for record in self.records]
def success_handler(self, record, result: Any) ‑> Tuple[str, Any, Union[SQSRecordKinesisStreamRecordDynamoDBRecord, BatchTypeModels]]

Keeps track of batch records that were processed successfully

Parameters

record : Any
record that succeeded processing
result : Any
result from record handler

Returns

SuccessResponse
"success", result, original record
Expand source code
def success_handler(self, record, result: Any) -> SuccessResponse:
    """
    Keeps track of batch records that were processed successfully

    Parameters
    ----------
    record: Any
        record that succeeded processing
    result: Any
        result from record handler

    Returns
    -------
    SuccessResponse
        "success", result, original record
    """
    entry = ("success", result, record)
    self.success_messages.append(record)
    return entry
class BatchProcessor (event_type: EventType, model: Optional[ForwardRef('BatchTypeModels')] = None)

Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.

Example

Process batch triggered by SQS

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by Kinesis Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Process batch triggered by DynamoDB Data Streams

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
    logger.info(record.dynamodb.new_image)
    payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
    # alternatively:
    # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image  # noqa: E800
    # payload = change.get("Message").raw_event -> {"S": "<payload>"}
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with processor(records=batch, processor=processor):
        processed_messages = processor.process() # kick off processing, return list[tuple]

    return processor.response()

Raises

BatchProcessingError
When all batch records fail processing
Process batch and partially report failed items
 

Parameters

event_type : EventType
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model : Optional["BatchTypeModels"]
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

Exceptions

BatchProcessingError Raised when the entire batch has failed processing

Expand source code
class BatchProcessor(BasePartialProcessor):
    """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.


    Example
    -------

    ## Process batch triggered by SQS

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.SQS)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: SQSRecord):
        payload: str = record.body
        if payload:
            item: dict = json.loads(payload)
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```

    ## Process batch triggered by Kinesis Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: KinesisStreamRecord):
        logger.info(record.kinesis.data_as_text)
        payload: dict = record.kinesis.data_as_json()
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    @batch_processor(record_handler=record_handler, processor=processor)
    def lambda_handler(event, context: LambdaContext):
        return processor.response()
    ```


    ## Process batch triggered by DynamoDB Data Streams

    ```python
    import json

    from aws_lambda_powertools import Logger, Tracer
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
    from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
    from aws_lambda_powertools.utilities.typing import LambdaContext


    processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
    tracer = Tracer()
    logger = Logger()


    @tracer.capture_method
    def record_handler(record: DynamoDBRecord):
        logger.info(record.dynamodb.new_image)
        payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
        # alternatively:
        # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image  # noqa: E800
        # payload = change.get("Message").raw_event -> {"S": "<payload>"}
        ...

    @logger.inject_lambda_context
    @tracer.capture_lambda_handler
    def lambda_handler(event, context: LambdaContext):
        batch = event["Records"]
        with processor(records=batch, processor=processor):
            processed_messages = processor.process() # kick off processing, return list[tuple]

        return processor.response()
    ```


    Raises
    ------
    BatchProcessingError
        When all batch records fail processing
    """

    DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []}

    def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
        """Process batch and partially report failed items

        Parameters
        ----------
        event_type: EventType
            Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
        model: Optional["BatchTypeModels"]
            Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord

        Exceptions
        ----------
        BatchProcessingError
            Raised when the entire batch has failed processing
        """
        self.event_type = event_type
        self.model = model
        self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)
        self._COLLECTOR_MAPPING = {
            EventType.SQS: self._collect_sqs_failures,
            EventType.KinesisDataStreams: self._collect_kinesis_failures,
            EventType.DynamoDBStreams: self._collect_dynamodb_failures,
        }
        self._DATA_CLASS_MAPPING = {
            EventType.SQS: SQSRecord,
            EventType.KinesisDataStreams: KinesisStreamRecord,
            EventType.DynamoDBStreams: DynamoDBRecord,
        }

        super().__init__()

    def response(self):
        """Batch items that failed processing, if any"""
        return self.batch_response

    def _prepare(self):
        """
        Remove results from previous execution.
        """
        self.success_messages.clear()
        self.fail_messages.clear()
        self.exceptions.clear()
        self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)

    def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
        try:
            result = self.handler(record=data)
            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=data, exception=sys.exc_info())

    def _clean(self):
        """
        Report messages to be deleted in case of partial failure.
        """

        if not self._has_messages_to_report():
            return

        if self._entire_batch_failed():
            raise BatchProcessingError(
                msg=f"All records failed processing. {len(self.exceptions)} individual errors logged "
                f"separately below.",
                child_exceptions=self.exceptions,
            )

        messages = self._get_messages_to_report()
        self.batch_response = {"batchItemFailures": messages}

    def _has_messages_to_report(self) -> bool:
        if self.fail_messages:
            return True

        logger.debug(f"All {len(self.success_messages)} records successfully processed")
        return False

    def _entire_batch_failed(self) -> bool:
        return len(self.exceptions) == len(self.records)

    def _get_messages_to_report(self) -> List[Dict[str, str]]:
        """
        Format messages to use in batch deletion
        """
        return self._COLLECTOR_MAPPING[self.event_type]()

    # Event Source Data Classes follow python idioms for fields
    # while Parser/Pydantic follows the event field names to the latter
    def _collect_sqs_failures(self):
        failures = []
        for msg in self.fail_messages:
            msg_id = msg.messageId if self.model else msg.message_id
            failures.append({"itemIdentifier": msg_id})
        return failures

    def _collect_kinesis_failures(self):
        failures = []
        for msg in self.fail_messages:
            msg_id = msg.kinesis.sequenceNumber if self.model else msg.kinesis.sequence_number
            failures.append({"itemIdentifier": msg_id})
        return failures

    def _collect_dynamodb_failures(self):
        failures = []
        for msg in self.fail_messages:
            msg_id = msg.dynamodb.SequenceNumber if self.model else msg.dynamodb.sequence_number
            failures.append({"itemIdentifier": msg_id})
        return failures

    @overload
    def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels":
        ...  # pragma: no cover

    @overload
    def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes:
        ...  # pragma: no cover

    def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None):
        if model is not None:
            return model.parse_obj(record)
        return self._DATA_CLASS_MAPPING[event_type](record)

Ancestors

Class variables

var DEFAULT_RESPONSE : Dict[str, List[Optional[dict]]]

Methods

def response(self)

Batch items that failed processing, if any

Expand source code
def response(self):
    """Batch items that failed processing, if any"""
    return self.batch_response

Inherited members

class EventType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class EventType(Enum):
    SQS = "SQS"
    KinesisDataStreams = "KinesisDataStreams"
    DynamoDBStreams = "DynamoDBStreams"

Ancestors

  • enum.Enum

Class variables

var DynamoDBStreams
var KinesisDataStreams
var SQS
class PartialSQSProcessor (config: Optional[botocore.config.Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None)

Amazon SQS batch processor to delete successes from the Queue.

The whole batch will be processed, even if failures occur. After all records are processed, SQSBatchProcessingError will be raised if there were any failures, causing messages to be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.

Parameters

config : Config
botocore config object
suppress_exception : bool, optional
Supress exception raised if any messages fail processing, by default False
boto3_session : boto3.session.Session, optional
Boto3 session to use for AWS API communication

Example

Process batch triggered by SQS

>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
>>>
>>> def record_handler(record):
>>>     return record["body"]
>>>
>>> def handler(event, context):
>>>     records = event["Records"]
>>>     processor = PartialSQSProcessor()
>>>
>>>     with processor(records=records, handler=record_handler):
>>>         result = processor.process()
>>>
>>>     # Case a partial failure occurred, all successful executions
>>>     # have been deleted from the queue after context's exit.
>>>
>>>     return result

Initializes sqs client.

Expand source code
class PartialSQSProcessor(BasePartialProcessor):
    """
    Amazon SQS batch processor to delete successes from the Queue.

    The whole batch will be processed, even if failures occur. After all records are processed,
    SQSBatchProcessingError will be raised if there were any failures, causing messages to
    be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.

    Parameters
    ----------
    config: Config
        botocore config object
    suppress_exception: bool, optional
        Supress exception raised if any messages fail processing, by default False
    boto3_session : boto3.session.Session, optional
            Boto3 session to use for AWS API communication


    Example
    -------
    **Process batch triggered by SQS**

        >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> def handler(event, context):
        >>>     records = event["Records"]
        >>>     processor = PartialSQSProcessor()
        >>>
        >>>     with processor(records=records, handler=record_handler):
        >>>         result = processor.process()
        >>>
        >>>     # Case a partial failure occurred, all successful executions
        >>>     # have been deleted from the queue after context's exit.
        >>>
        >>>     return result

    """

    def __init__(
        self,
        config: Optional[Config] = None,
        suppress_exception: bool = False,
        boto3_session: Optional[boto3.session.Session] = None,
    ):
        """
        Initializes sqs client.
        """
        config = config or Config()
        session = boto3_session or boto3.session.Session()
        self.client = session.client("sqs", config=config)
        self.suppress_exception = suppress_exception
        self.max_message_batch = 10

        super().__init__()

    def _get_queue_url(self) -> Optional[str]:
        """
        Format QueueUrl from first records entry
        """
        if not getattr(self, "records", None):
            return None

        *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":")
        return f"{self.client._endpoint.host}/{account_id}/{queue_name}"

    def _get_entries_to_clean(self) -> List[Dict[str, str]]:
        """
        Format messages to use in batch deletion
        """
        entries = []
        # success_messages has generic type of union of SQS, Dynamodb and Kinesis Streams records or Pydantic models.
        # Here we get SQS Record only
        messages = cast(List[SQSRecord], self.success_messages)
        for msg in messages:
            entries.append({"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]})
        return entries

    def _process_record(self, record) -> Tuple:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: Any
            An object to be processed.
        """
        try:
            result = self.handler(record=record)
            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=record, exception=sys.exc_info())

    def _prepare(self):
        """
        Remove results from previous execution.
        """
        self.success_messages.clear()
        self.fail_messages.clear()

    def _clean(self) -> Optional[List]:
        """
        Delete messages from Queue in case of partial failure.
        """

        # If all messages were successful, fall back to the default SQS -
        # Lambda behavior which deletes messages if Lambda responds successfully
        if not self.fail_messages:
            logger.debug(f"All {len(self.success_messages)} records successfully processed")
            return None

        queue_url = self._get_queue_url()
        if queue_url is None:
            logger.debug("No queue url found")
            return None

        entries_to_remove = self._get_entries_to_clean()
        # Batch delete up to 10 messages at a time (SQS limit)
        max_workers = math.ceil(len(entries_to_remove) / self.max_message_batch)

        if entries_to_remove:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures, results = [], []
                while entries_to_remove:
                    futures.append(
                        executor.submit(
                            self._delete_messages, queue_url, entries_to_remove[: self.max_message_batch], self.client
                        )
                    )
                    entries_to_remove = entries_to_remove[self.max_message_batch :]
                for future in as_completed(futures):
                    try:
                        logger.debug("Deleted batch of processed messages from SQS")
                        results.append(future.result())
                    except Exception:
                        logger.exception("Couldn't remove batch of processed messages from SQS")
                        raise
        if self.suppress_exception:
            logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
        else:
            logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
            raise SQSBatchProcessingError(
                msg=f"Not all records processed successfully. {len(self.exceptions)} individual errors logged "
                f"separately below.",
                child_exceptions=self.exceptions,
            )

        return results

    def _delete_messages(self, queue_url: str, entries_to_remove: List, sqs_client: Any):
        delete_message_response = sqs_client.delete_message_batch(
            QueueUrl=queue_url,
            Entries=entries_to_remove,
        )
        return delete_message_response

Ancestors

Inherited members