The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.
With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place:
ReportBatchItemFailures is set in your SQS, Kinesis, or DynamoDB event source properties
A specific response is returned so Lambda knows which records should not be deleted during partial responses
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it
We recommend implementing processing logic in an idempotent manner wherever possible.
You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.
Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use `ReportBatchItemFailures.
You do not need any additional IAM permissions to use this utility, except for what each event source requires.
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:python3.9Tracing:ActiveEnvironment:Variables:LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:app.lambda_handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records to DLQ from Kinesis/DynamoDB-Version:"2012-10-17"Statement:Effect:"Allow"Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:KinesisStream:Type:KinesisProperties:Stream:!GetAttSampleStream.ArnBatchSize:100StartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleStream:Type:AWS::Kinesis::StreamProperties:ShardCount:1
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:python3.9Tracing:ActiveEnvironment:Variables:LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:app.lambda_handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records from Kinesis/DynamoDB-Version:"2012-10-17"Statement:Effect:"Allow"Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:DynamoDBStream:Type:DynamoDBProperties:Stream:!GetAttSampleTable.StreamArnStartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleTable:Type:AWS::DynamoDB::TableProperties:BillingMode:PAY_PER_REQUESTAttributeDefinitions:-AttributeName:pkAttributeType:S-AttributeName:skAttributeType:SKeySchema:-AttributeName:pkKeyType:HASH-AttributeName:skKeyType:RANGESSESpecification:SSEEnabled:yesStreamSpecification:StreamViewType:NEW_AND_OLD_IMAGES
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
All records successfully processed. We will return an empty list of item failures {'batchItemFailures': []}
Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
All records failed to be processed. We will raise BatchProcessingError exception with a list of all exceptions raised when processing
Warning
You will not have access to the processed messages within the Lambda Handler; use context manager for that.
All processing logic will and should be performed by the record_handler function.
You can bring your own Pydantic models via model parameter when inheriting from SqsRecordModel, KinesisDataStreamRecord, or DynamoDBStreamRecordModel
Inheritance is importance because we need to access message IDs and sequence numbers from these records in the event of failure. Mypy is fully integrated with this utility, so it should identify whether you're passing the incorrect Model.
importjsonfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.utilities.batchimportBatchProcessor,EventType,batch_processorfromaws_lambda_powertools.utilities.parser.modelsimportSqsRecordModelfromaws_lambda_powertools.utilities.typingimportLambdaContextclassOrder(BaseModel):item:dictclassOrderSqsRecord(SqsRecordModel):body:Order# auto transform json string# so Pydantic can auto-initialize nested Order model@validator("body",pre=True)deftransform_body_to_dict(cls,value:str):returnjson.loads(value)processor=BatchProcessor(event_type=EventType.SQS,model=OrderSqsRecord)tracer=Tracer()logger=Logger()@tracer.capture_methoddefrecord_handler(record:OrderSqsRecord):returnrecord.body.item@logger.inject_lambda_context@tracer.capture_lambda_handler@batch_processor(record_handler=record_handler,processor=processor)deflambda_handler(event,context:LambdaContext):returnprocessor.response()
importjsonfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.utilities.batchimportBatchProcessor,EventType,batch_processorfromaws_lambda_powertools.utilities.parser.modelsimportKinesisDataStreamRecordfromaws_lambda_powertools.utilities.typingimportLambdaContextclassOrder(BaseModel):item:dictclassOrderKinesisPayloadRecord(KinesisDataStreamRecordPayload):data:Order# auto transform json string# so Pydantic can auto-initialize nested Order model@validator("data",pre=True)deftransform_message_to_dict(cls,value:str):# Powertools KinesisDataStreamRecordModel already decodes b64 to str herereturnjson.loads(value)classOrderKinesisRecord(KinesisDataStreamRecordModel):kinesis:OrderKinesisPayloadRecordprocessor=BatchProcessor(event_type=EventType.KinesisDataStreams,model=OrderKinesisRecord)tracer=Tracer()logger=Logger()@tracer.capture_methoddefrecord_handler(record:OrderKinesisRecord):returnrecord.kinesis.data.item@logger.inject_lambda_context@tracer.capture_lambda_handler@batch_processor(record_handler=record_handler,processor=processor)deflambda_handler(event,context:LambdaContext):returnprocessor.response()
importjsonfromtypingimportDict,Literalfromaws_lambda_powertoolsimportLogger,Tracerfromaws_lambda_powertools.utilities.batchimportBatchProcessor,EventType,batch_processorfromaws_lambda_powertools.utilities.parser.modelsimportDynamoDBStreamRecordModelfromaws_lambda_powertools.utilities.typingimportLambdaContextclassOrder(BaseModel):item:dictclassOrderDynamoDB(BaseModel):Message:Order# auto transform json string# so Pydantic can auto-initialize nested Order model@validator("Message",pre=True)deftransform_message_to_dict(cls,value:Dict[Literal["S"],str]):returnjson.loads(value["S"])classOrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):NewImage:Optional[OrderDynamoDB]OldImage:Optional[OrderDynamoDB]classOrderDynamoDBRecord(DynamoDBStreamRecordModel):dynamodb:OrderDynamoDBChangeRecordprocessor=BatchProcessor(event_type=EventType.DynamoDBStreams,model=OrderKinesisRecord)tracer=Tracer()logger=Logger()@tracer.capture_methoddefrecord_handler(record:OrderDynamoDBRecord):returnrecord.dynamodb.NewImage.Message.item@logger.inject_lambda_context@tracer.capture_lambda_handler@batch_processor(record_handler=record_handler,processor=processor)deflambda_handler(event,context:LambdaContext):returnprocessor.response()
You can create your own partial batch processor from scratch by inheriting the BasePartialProcessor class, and implementing _prepare(), _clean() and _process_record().
_process_record() – handles all processing logic for each individual message of a batch, including calling the record_handler (self.handler)
_prepare() – called once as part of the processor initialization
clean() – teardown logic called once after _process_record completes
You can then use this class as a context manager, or pass it to batch_processor to use as a decorator on your Lambda handler function.
fromrandomimportrandintfromaws_lambda_powertools.utilities.batchimportBasePartialProcessor,batch_processorimportboto3importostable_name=os.getenv("TABLE_NAME","table_not_found")classMyPartialProcessor(BasePartialProcessor):""" Process a record and stores successful results at a Amazon DynamoDB Table Parameters ---------- table_name: str DynamoDB table name to write results to """def__init__(self,table_name:str):self.table_name=table_namesuper().__init__()def_prepare(self):# It's called once, *before* processing# Creates table resource and clean previous resultsself.ddb_table=boto3.resource("dynamodb").Table(self.table_name)self.success_messages.clear()def_clean(self):# It's called once, *after* closing processing all records (closing the context manager)# Here we're sending, at once, all successful messages to a ddb tablewithself.ddb_table.batch_writer()asbatch:forresultinself.success_messages:batch.put_item(Item=result)def_process_record(self,record):# It handles how your record is processed# Here we're keeping the status of each run# where self.handler is the record_handler function passed as an argumenttry:result=self.handler(record)# record_handler passed to decorator/context managerreturnself.success_handler(record,result)exceptExceptionasexc:returnself.failure_handler(record,exc)defsuccess_handler(self,record):entry=("success",result,record)message={"age":result}self.success_messages.append(message)returnentrydefrecord_handler(record):returnrandint(0,100)@batch_processor(record_handler=record_handler,processor=MyPartialProcessor(table_name))deflambda_handler(event,context):return{"statusCode":200}
Tracer response auto-capture for large batch sizes¶
When using Tracer to capture responses for each batch record processing, you might exceed 64K of tracing data depending on what you return from your record_handler function, or how big is your batch size.
As there is no external calls, you can unit test your code with BatchProcessor quite easily.
Example:
Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.
Use context manager when you want access to the processed messages or handle BatchProcessingError exception when all records within the batch fail to be processed.
fromtypingimportTuplefromaws_lambda_powertools.utilities.batchimportBatchProcessor,FailureResponsefromsentry_sdkimportcapture_exceptionclassMyProcessor(BatchProcessor):deffailure_handler(self,record,exception)->FailureResponse:capture_exception()# send exception to Sentryreturnsuper().failure_handler(record,exception)
Keep reading if you are using sqs_batch_processor or PartialSQSProcessor.
As of Nov 2021, this is no longer needed as both SQS, Kinesis, and DynamoDB Streams offer this capability natively with one caveat - it's an opt-in feature.
Being a native feature, we no longer need to instantiate boto3 nor other customizations like exception suppressing – this lowers the cost of your Lambda function as you can delegate deleting partial failures to Lambda.
The config and boto3_session parameters enable you to pass in a custom botocore config object
or a custom boto3 session when using the sqs_batch_processor
decorator or PartialSQSProcessor class.
Custom config example
1 2 3 4 5 6 7 8 91011121314
fromaws_lambda_powertools.utilities.batchimportsqs_batch_processorfrombotocore.configimportConfigconfig=Config(region_name="us-east-1")defrecord_handler(record):# This will be called for each individual message from a batch# It should raise an exception if the message was not processed successfullyreturn_value=do_something_with(record["body"])returnreturn_value@sqs_batch_processor(record_handler=record_handler,config=config)deflambda_handler(event,context):return{"statusCode":200}
1 2 3 4 5 6 7 8 9101112131415161718192021
fromaws_lambda_powertools.utilities.batchimportPartialSQSProcessorfrombotocore.configimportConfigconfig=Config(region_name="us-east-1")defrecord_handler(record):# This will be called for each individual message from a batch# It should raise an exception if the message was not processed successfullyreturn_value=do_something_with(record["body"])returnreturn_valuedeflambda_handler(event,context):records=event["Records"]processor=PartialSQSProcessor(config=config)withprocessor(records,record_handler):result=processor.process()returnresult
Custom boto3 session example
1 2 3 4 5 6 7 8 91011121314
fromaws_lambda_powertools.utilities.batchimportsqs_batch_processorfrombotocore.configimportConfigsession=boto3.session.Session()defrecord_handler(record):# This will be called for each individual message from a batch# It should raise an exception if the message was not processed successfullyreturn_value=do_something_with(record["body"])returnreturn_value@sqs_batch_processor(record_handler=record_handler,boto3_session=session)deflambda_handler(event,context):return{"statusCode":200}
1 2 3 4 5 6 7 8 9101112131415161718192021
fromaws_lambda_powertools.utilities.batchimportPartialSQSProcessorimportboto3session=boto3.session.Session()defrecord_handler(record):# This will be called for each individual message from a batch# It should raise an exception if the message was not processed successfullyreturn_value=do_something_with(record["body"])returnreturn_valuedeflambda_handler(event,context):records=event["Records"]processor=PartialSQSProcessor(boto3_session=session)withprocessor(records,record_handler):result=processor.process()returnresult
If you want to disable the default behavior where SQSBatchProcessingError is raised if there are any errors, you can pass the suppress_exception boolean argument.