Module aws_lambda_powertools.utilities.batch
Batch processing utility
Expand source code
# -*- coding: utf-8 -*-
"""
Batch processing utility
"""
from aws_lambda_powertools.utilities.batch.base import (
BasePartialProcessor,
BatchProcessor,
EventType,
FailureResponse,
SuccessResponse,
batch_processor,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor
__all__ = (
"BatchProcessor",
"BasePartialProcessor",
"ExceptionInfo",
"EventType",
"FailureResponse",
"PartialSQSProcessor",
"SuccessResponse",
"batch_processor",
"sqs_batch_processor",
)
Sub-modules
aws_lambda_powertools.utilities.batch.base
-
Batch processing utilities
aws_lambda_powertools.utilities.batch.exceptions
-
Batch processing exceptions
aws_lambda_powertools.utilities.batch.sqs
-
Batch SQS utilities
Functions
def batch_processor(handler: Callable, event: Dict[~KT, ~VT], context: Dict[~KT, ~VT], record_handler: Callable, processor: BasePartialProcessor)
-
Middleware to handle batch event processing
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
record_handler
:Callable
- Callable to process each record from the batch
processor
:PartialSQSProcessor
- Batch Processor to handle partial failure cases
Examples
Processes Lambda's event with PartialSQSProcessor
>>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200}
Limitations
- Async batch processors
Expand source code
@lambda_handler_decorator def batch_processor( handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor ): """ Middleware to handle batch event processing Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context record_handler: Callable Callable to process each record from the batch processor: PartialSQSProcessor Batch Processor to handle partial failure cases Examples -------- **Processes Lambda's event with PartialSQSProcessor** >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200} Limitations ----------- * Async batch processors """ records = event["Records"] with processor(records, record_handler): processor.process() return handler(event, context)
def sqs_batch_processor(handler: Callable, event: Dict[~KT, ~VT], context: Dict[~KT, ~VT], record_handler: Callable, config: Optional[botocore.config.Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None)
-
Middleware to handle SQS batch event processing
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
record_handler
:Callable
- Callable to process each record from the batch
config
:Config
- botocore config object
suppress_exception
:bool
, optional- Supress exception raised if any messages fail processing, by default False
boto3_session
:boto3.session.Session
, optional- Boto3 session to use for AWS API communication
Examples
Processes Lambda's event with PartialSQSProcessor
>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @sqs_batch_processor(record_handler=record_handler) >>> def handler(event, context): >>> return {"StatusCode": 200}
Limitations
- Async batch processors
Expand source code
@lambda_handler_decorator def sqs_batch_processor( handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Optional[Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None, ): """ Middleware to handle SQS batch event processing Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context record_handler: Callable Callable to process each record from the batch config: Config botocore config object suppress_exception: bool, optional Supress exception raised if any messages fail processing, by default False boto3_session : boto3.session.Session, optional Boto3 session to use for AWS API communication Examples -------- **Processes Lambda's event with PartialSQSProcessor** >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @sqs_batch_processor(record_handler=record_handler) >>> def handler(event, context): >>> return {"StatusCode": 200} Limitations ----------- * Async batch processors """ config = config or Config() session = boto3_session or boto3.session.Session() processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception, boto3_session=session) records = event["Records"] with processor(records, record_handler): processor.process() return handler(event, context)
Classes
class BasePartialProcessor
-
Abstract class for batch processors.
Expand source code
class BasePartialProcessor(ABC): """ Abstract class for batch processors. """ def __init__(self): self.success_messages: List[BatchEventTypes] = [] self.fail_messages: List[BatchEventTypes] = [] self.exceptions: List[ExceptionInfo] = [] @abstractmethod def _prepare(self): """ Prepare context manager. """ raise NotImplementedError() @abstractmethod def _clean(self): """ Clear context manager. """ raise NotImplementedError() @abstractmethod def _process_record(self, record: dict): """ Process record with handler. """ raise NotImplementedError() def process(self) -> List[Tuple]: """ Call instance's handler for each record. """ return [self._process_record(record) for record in self.records] def __enter__(self): self._prepare() return self def __exit__(self, exception_type, exception_value, traceback): self._clean() def __call__(self, records: List[dict], handler: Callable): """ Set instance attributes before execution Parameters ---------- records: List[dict] List with objects to be processed. handler: Callable Callable to process "records" entries. """ self.records = records self.handler = handler return self def success_handler(self, record, result: Any) -> SuccessResponse: """ Keeps track of batch records that were processed successfully Parameters ---------- record: Any record that succeeded processing result: Any result from record handler Returns ------- SuccessResponse "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) return entry def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: """ Keeps track of batch records that failed processing Parameters ---------- record: Any record that failed processing exception: ExceptionInfo Exception information containing type, value, and traceback (sys.exc_info()) Returns ------- FailureResponse "fail", exceptions args, original record """ exception_string = f"{exception[0]}:{exception[1]}" entry = ("fail", exception_string, record) logger.debug(f"Record processing exception: {exception_string}") self.exceptions.append(exception) self.fail_messages.append(record) return entry
Ancestors
- abc.ABC
Subclasses
Methods
def failure_handler(self, record, exception: Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[traceback]]) ‑> Tuple[str, str, Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord, BatchTypeModels]]
-
Keeps track of batch records that failed processing
Parameters
record
:Any
- record that failed processing
exception
:ExceptionInfo
- Exception information containing type, value, and traceback (sys.exc_info())
Returns
FailureResponse
- "fail", exceptions args, original record
Expand source code
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: """ Keeps track of batch records that failed processing Parameters ---------- record: Any record that failed processing exception: ExceptionInfo Exception information containing type, value, and traceback (sys.exc_info()) Returns ------- FailureResponse "fail", exceptions args, original record """ exception_string = f"{exception[0]}:{exception[1]}" entry = ("fail", exception_string, record) logger.debug(f"Record processing exception: {exception_string}") self.exceptions.append(exception) self.fail_messages.append(record) return entry
def process(self) ‑> List[Tuple[]]
-
Call instance's handler for each record.
Expand source code
def process(self) -> List[Tuple]: """ Call instance's handler for each record. """ return [self._process_record(record) for record in self.records]
def success_handler(self, record, result: Any) ‑> Tuple[str, Any, Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord, BatchTypeModels]]
-
Keeps track of batch records that were processed successfully
Parameters
record
:Any
- record that succeeded processing
result
:Any
- result from record handler
Returns
SuccessResponse
- "success", result, original record
Expand source code
def success_handler(self, record, result: Any) -> SuccessResponse: """ Keeps track of batch records that were processed successfully Parameters ---------- record: Any record that succeeded processing result: Any result from record handler Returns ------- SuccessResponse "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) return entry
class BatchProcessor (event_type: EventType, model: Optional[ForwardRef('BatchTypeModels')] = None)
-
Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.
Example
Process batch triggered by SQS
import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.SQS) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response()
Process batch triggered by Kinesis Data Streams
import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.KinesisDataStreams) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: KinesisStreamRecord): logger.info(record.kinesis.data_as_text) payload: dict = record.kinesis.data_as_json() ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response()
Process batch triggered by DynamoDB Data Streams
import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.DynamoDBStreams) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) # alternatively: # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image # noqa: E800 # payload = change.get("Message").raw_event -> {"S": "<payload>"} ... @logger.inject_lambda_context @tracer.capture_lambda_handler def lambda_handler(event, context: LambdaContext): batch = event["Records"] with processor(records=batch, processor=processor): processed_messages = processor.process() # kick off processing, return list[tuple] return processor.response()
Raises
BatchProcessingError
- When all batch records fail processing
Process batch and partially report failed items
Parameters
event_type
:EventType
- Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
model
:Optional["BatchTypeModels"]
- Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
Exceptions
BatchProcessingError Raised when the entire batch has failed processing
Expand source code
class BatchProcessor(BasePartialProcessor): """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB. Example ------- ## Process batch triggered by SQS ```python import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.SQS) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: SQSRecord): payload: str = record.body if payload: item: dict = json.loads(payload) ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response() ``` ## Process batch triggered by Kinesis Data Streams ```python import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.KinesisDataStreams) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: KinesisStreamRecord): logger.info(record.kinesis.data_as_text) payload: dict = record.kinesis.data_as_json() ... @logger.inject_lambda_context @tracer.capture_lambda_handler @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): return processor.response() ``` ## Process batch triggered by DynamoDB Data Streams ```python import json from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.DynamoDBStreams) tracer = Tracer() logger = Logger() @tracer.capture_method def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) # alternatively: # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image # noqa: E800 # payload = change.get("Message").raw_event -> {"S": "<payload>"} ... @logger.inject_lambda_context @tracer.capture_lambda_handler def lambda_handler(event, context: LambdaContext): batch = event["Records"] with processor(records=batch, processor=processor): processed_messages = processor.process() # kick off processing, return list[tuple] return processor.response() ``` Raises ------ BatchProcessingError When all batch records fail processing """ DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []} def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None): """Process batch and partially report failed items Parameters ---------- event_type: EventType Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event model: Optional["BatchTypeModels"] Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord Exceptions ---------- BatchProcessingError Raised when the entire batch has failed processing """ self.event_type = event_type self.model = model self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, EventType.KinesisDataStreams: self._collect_kinesis_failures, EventType.DynamoDBStreams: self._collect_dynamodb_failures, } self._DATA_CLASS_MAPPING = { EventType.SQS: SQSRecord, EventType.KinesisDataStreams: KinesisStreamRecord, EventType.DynamoDBStreams: DynamoDBRecord, } super().__init__() def response(self): """Batch items that failed processing, if any""" return self.batch_response def _prepare(self): """ Remove results from previous execution. """ self.success_messages.clear() self.fail_messages.clear() self.exceptions.clear() self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE) def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]: """ Process a record with instance's handler Parameters ---------- record: dict A batch record to be processed. """ data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) try: result = self.handler(record=data) return self.success_handler(record=record, result=result) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) def _clean(self): """ Report messages to be deleted in case of partial failure. """ if not self._has_messages_to_report(): return if self._entire_batch_failed(): raise BatchProcessingError( msg=f"All records failed processing. {len(self.exceptions)} individual errors logged " f"separately below.", child_exceptions=self.exceptions, ) messages = self._get_messages_to_report() self.batch_response = {"batchItemFailures": messages} def _has_messages_to_report(self) -> bool: if self.fail_messages: return True logger.debug(f"All {len(self.success_messages)} records successfully processed") return False def _entire_batch_failed(self) -> bool: return len(self.exceptions) == len(self.records) def _get_messages_to_report(self) -> List[Dict[str, str]]: """ Format messages to use in batch deletion """ return self._COLLECTOR_MAPPING[self.event_type]() # Event Source Data Classes follow python idioms for fields # while Parser/Pydantic follows the event field names to the latter def _collect_sqs_failures(self): failures = [] for msg in self.fail_messages: msg_id = msg.messageId if self.model else msg.message_id failures.append({"itemIdentifier": msg_id}) return failures def _collect_kinesis_failures(self): failures = [] for msg in self.fail_messages: msg_id = msg.kinesis.sequenceNumber if self.model else msg.kinesis.sequence_number failures.append({"itemIdentifier": msg_id}) return failures def _collect_dynamodb_failures(self): failures = [] for msg in self.fail_messages: msg_id = msg.dynamodb.SequenceNumber if self.model else msg.dynamodb.sequence_number failures.append({"itemIdentifier": msg_id}) return failures @overload def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels": ... # pragma: no cover @overload def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes: ... # pragma: no cover def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None): if model is not None: return model.parse_obj(record) return self._DATA_CLASS_MAPPING[event_type](record)
Ancestors
- BasePartialProcessor
- abc.ABC
Class variables
var DEFAULT_RESPONSE : Dict[str, List[Optional[dict]]]
Methods
def response(self)
-
Batch items that failed processing, if any
Expand source code
def response(self): """Batch items that failed processing, if any""" return self.batch_response
Inherited members
class EventType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class EventType(Enum): SQS = "SQS" KinesisDataStreams = "KinesisDataStreams" DynamoDBStreams = "DynamoDBStreams"
Ancestors
- enum.Enum
Class variables
var DynamoDBStreams
var KinesisDataStreams
var SQS
class PartialSQSProcessor (config: Optional[botocore.config.Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None)
-
Amazon SQS batch processor to delete successes from the Queue.
The whole batch will be processed, even if failures occur. After all records are processed, SQSBatchProcessingError will be raised if there were any failures, causing messages to be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.
Parameters
config
:Config
- botocore config object
suppress_exception
:bool
, optional- Supress exception raised if any messages fail processing, by default False
boto3_session
:boto3.session.Session
, optional- Boto3 session to use for AWS API communication
Example
Process batch triggered by SQS
>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> def handler(event, context): >>> records = event["Records"] >>> processor = PartialSQSProcessor() >>> >>> with processor(records=records, handler=record_handler): >>> result = processor.process() >>> >>> # Case a partial failure occurred, all successful executions >>> # have been deleted from the queue after context's exit. >>> >>> return result
Initializes sqs client.
Expand source code
class PartialSQSProcessor(BasePartialProcessor): """ Amazon SQS batch processor to delete successes from the Queue. The whole batch will be processed, even if failures occur. After all records are processed, SQSBatchProcessingError will be raised if there were any failures, causing messages to be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception. Parameters ---------- config: Config botocore config object suppress_exception: bool, optional Supress exception raised if any messages fail processing, by default False boto3_session : boto3.session.Session, optional Boto3 session to use for AWS API communication Example ------- **Process batch triggered by SQS** >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> >>> def handler(event, context): >>> records = event["Records"] >>> processor = PartialSQSProcessor() >>> >>> with processor(records=records, handler=record_handler): >>> result = processor.process() >>> >>> # Case a partial failure occurred, all successful executions >>> # have been deleted from the queue after context's exit. >>> >>> return result """ def __init__( self, config: Optional[Config] = None, suppress_exception: bool = False, boto3_session: Optional[boto3.session.Session] = None, ): """ Initializes sqs client. """ config = config or Config() session = boto3_session or boto3.session.Session() self.client = session.client("sqs", config=config) self.suppress_exception = suppress_exception self.max_message_batch = 10 super().__init__() def _get_queue_url(self) -> Optional[str]: """ Format QueueUrl from first records entry """ if not getattr(self, "records", None): return None *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") return f"{self.client._endpoint.host}/{account_id}/{queue_name}" def _get_entries_to_clean(self) -> List[Dict[str, str]]: """ Format messages to use in batch deletion """ entries = [] # success_messages has generic type of union of SQS, Dynamodb and Kinesis Streams records or Pydantic models. # Here we get SQS Record only messages = cast(List[SQSRecord], self.success_messages) for msg in messages: entries.append({"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]}) return entries def _process_record(self, record) -> Tuple: """ Process a record with instance's handler Parameters ---------- record: Any An object to be processed. """ try: result = self.handler(record=record) return self.success_handler(record=record, result=result) except Exception: return self.failure_handler(record=record, exception=sys.exc_info()) def _prepare(self): """ Remove results from previous execution. """ self.success_messages.clear() self.fail_messages.clear() def _clean(self) -> Optional[List]: """ Delete messages from Queue in case of partial failure. """ # If all messages were successful, fall back to the default SQS - # Lambda behavior which deletes messages if Lambda responds successfully if not self.fail_messages: logger.debug(f"All {len(self.success_messages)} records successfully processed") return None queue_url = self._get_queue_url() if queue_url is None: logger.debug("No queue url found") return None entries_to_remove = self._get_entries_to_clean() # Batch delete up to 10 messages at a time (SQS limit) max_workers = math.ceil(len(entries_to_remove) / self.max_message_batch) if entries_to_remove: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures, results = [], [] while entries_to_remove: futures.append( executor.submit( self._delete_messages, queue_url, entries_to_remove[: self.max_message_batch], self.client ) ) entries_to_remove = entries_to_remove[self.max_message_batch :] for future in as_completed(futures): try: logger.debug("Deleted batch of processed messages from SQS") results.append(future.result()) except Exception: logger.exception("Couldn't remove batch of processed messages from SQS") raise if self.suppress_exception: logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed") else: logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception") raise SQSBatchProcessingError( msg=f"Not all records processed successfully. {len(self.exceptions)} individual errors logged " f"separately below.", child_exceptions=self.exceptions, ) return results def _delete_messages(self, queue_url: str, entries_to_remove: List, sqs_client: Any): delete_message_response = sqs_client.delete_message_batch( QueueUrl=queue_url, Entries=entries_to_remove, ) return delete_message_response
Ancestors
- BasePartialProcessor
- abc.ABC
Inherited members