Module aws_lambda_powertools.utilities.data_classes
Event Source Data Classes utility provides classes self-describing Lambda event sources.
Expand source code
"""
Event Source Data Classes utility provides classes self-describing Lambda event sources.
"""
from .alb_event import ALBEvent
from .api_gateway_proxy_event import APIGatewayProxyEvent, APIGatewayProxyEventV2
from .appsync_resolver_event import AppSyncResolverEvent
from .cloud_watch_logs_event import CloudWatchLogsEvent
from .code_pipeline_job_event import CodePipelineJobEvent
from .connect_contact_flow_event import ConnectContactFlowEvent
from .dynamo_db_stream_event import DynamoDBStreamEvent
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kinesis_stream_event import KinesisStreamEvent
from .s3_event import S3Event
from .ses_event import SESEvent
from .sns_event import SNSEvent
from .sqs_event import SQSEvent
__all__ = [
"APIGatewayProxyEvent",
"APIGatewayProxyEventV2",
"AppSyncResolverEvent",
"ALBEvent",
"CloudWatchLogsEvent",
"CodePipelineJobEvent",
"ConnectContactFlowEvent",
"DynamoDBStreamEvent",
"EventBridgeEvent",
"KinesisStreamEvent",
"S3Event",
"SESEvent",
"SNSEvent",
"SQSEvent",
"event_source",
]
Sub-modules
aws_lambda_powertools.utilities.data_classes.active_mq_event
aws_lambda_powertools.utilities.data_classes.alb_event
aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event
aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event
aws_lambda_powertools.utilities.data_classes.appsync
aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event
aws_lambda_powertools.utilities.data_classes.appsync_resolver_event
aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event
aws_lambda_powertools.utilities.data_classes.code_pipeline_job_event
aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event
aws_lambda_powertools.utilities.data_classes.common
aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event
aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event
aws_lambda_powertools.utilities.data_classes.event_bridge_event
aws_lambda_powertools.utilities.data_classes.kinesis_stream_event
aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
aws_lambda_powertools.utilities.data_classes.s3_event
aws_lambda_powertools.utilities.data_classes.s3_object_event
aws_lambda_powertools.utilities.data_classes.ses_event
aws_lambda_powertools.utilities.data_classes.sns_event
aws_lambda_powertools.utilities.data_classes.sqs_event
Functions
def event_source(handler: Callable[[Any, aws_lambda_powertools.utilities.lambda_context.LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, data_class: Type[DictWrapper])
-
Middleware to create an instance of the passed in event source data class
Parameters
handler
:Callable
- Lambda's handler
event
:Dict
- Lambda's Event
context
:Dict
- Lambda's Context
data_class
:Type[DictWrapper]
- Data class type to instantiate
Example
Sample usage
from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def handler(event: S3Event, context): return {"key": event.object_key}
Expand source code
@lambda_handler_decorator def event_source( handler: Callable[[Any, LambdaContext], Any], event: Dict[str, Any], context: LambdaContext, data_class: Type[DictWrapper], ): """Middleware to create an instance of the passed in event source data class Parameters ---------- handler: Callable Lambda's handler event: Dict Lambda's Event context: Dict Lambda's Context data_class: Type[DictWrapper] Data class type to instantiate Example -------- **Sample usage** from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def handler(event: S3Event, context): return {"key": event.object_key} """ return handler(data_class(event), context)
Classes
class ALBEvent (data: Dict[str, Any])
-
Application load balancer event
Documentation:
Expand source code
class ALBEvent(BaseProxyEvent): """Application load balancer event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html - https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html """ @property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data) @property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters") @property def multi_value_headers(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueHeaders")
Ancestors
Instance variables
var multi_value_headers : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_headers(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueHeaders")
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters")
var request_context : ALBEventRequestContext
-
Expand source code
@property def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data)
Inherited members
class APIGatewayProxyEvent (data: Dict[str, Any])
-
AWS Lambda proxy V1
Documentation:
Expand source code
class APIGatewayProxyEvent(BaseProxyEvent): """AWS Lambda proxy V1 Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def resource(self) -> str: return self["resource"] @property def multi_value_headers(self) -> Dict[str, List[str]]: return self["multiValueHeaders"] @property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters") @property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data) @property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters") @property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables")
Ancestors
Instance variables
var multi_value_headers : Dict[str, List[str]]
-
Expand source code
@property def multi_value_headers(self) -> Dict[str, List[str]]: return self["multiValueHeaders"]
var multi_value_query_string_parameters : Optional[Dict[str, List[str]]]
-
Expand source code
@property def multi_value_query_string_parameters(self) -> Optional[Dict[str, List[str]]]: return self.get("multiValueQueryStringParameters")
var path_parameters : Optional[Dict[str, str]]
-
Expand source code
@property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters")
var request_context : APIGatewayEventRequestContext
-
Expand source code
@property def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data)
var resource : str
-
Expand source code
@property def resource(self) -> str: return self["resource"]
var stage_variables : Optional[Dict[str, str]]
-
Expand source code
@property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables")
var version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Inherited members
class APIGatewayProxyEventV2 (data: Dict[str, Any])
-
AWS Lambda proxy V2 event
Notes:
Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field.
Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header.
Documentation:
Expand source code
class APIGatewayProxyEventV2(BaseProxyEvent): """AWS Lambda proxy V2 event Notes: ----- Format 2.0 doesn't have multiValueHeaders or multiValueQueryStringParameters fields. Duplicate headers are combined with commas and included in the headers field. Duplicate query strings are combined with commas and included in the queryStringParameters field. Format 2.0 includes a new cookies field. All cookie headers in the request are combined with commas and added to the cookies field. In the response to the client, each cookie becomes a set-cookie header. Documentation: -------------- - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html """ @property def version(self) -> str: return self["version"] @property def route_key(self) -> str: return self["routeKey"] @property def raw_path(self) -> str: return self["rawPath"] @property def raw_query_string(self) -> str: return self["rawQueryString"] @property def cookies(self) -> Optional[List[str]]: return self.get("cookies") @property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data) @property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters") @property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables") @property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path @property def http_method(self) -> str: """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" return self.request_context.http.method
Ancestors
Instance variables
-
Expand source code
@property def cookies(self) -> Optional[List[str]]: return self.get("cookies")
var path : str
-
Expand source code
@property def path(self) -> str: stage = self.request_context.stage if stage != "$default": return self.raw_path[len("/" + stage) :] return self.raw_path
var path_parameters : Optional[Dict[str, str]]
-
Expand source code
@property def path_parameters(self) -> Optional[Dict[str, str]]: return self.get("pathParameters")
var raw_path : str
-
Expand source code
@property def raw_path(self) -> str: return self["rawPath"]
var raw_query_string : str
-
Expand source code
@property def raw_query_string(self) -> str: return self["rawQueryString"]
var request_context : RequestContextV2
-
Expand source code
@property def request_context(self) -> RequestContextV2: return RequestContextV2(self._data)
var route_key : str
-
Expand source code
@property def route_key(self) -> str: return self["routeKey"]
var stage_variables : Optional[Dict[str, str]]
-
Expand source code
@property def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables")
var version : str
-
Expand source code
@property def version(self) -> str: return self["version"]
Inherited members
-
class AppSyncResolverEvent (data: dict)
-
AppSync resolver event
NOTE: AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver
Documentation:
Expand source code
class AppSyncResolverEvent(DictWrapper): """AppSync resolver event **NOTE:** AppSync Resolver Events can come in various shapes this data class supports both Amplify GraphQL directive @function and Direct Lambda Resolver Documentation: ------------- - https://docs.aws.amazon.com/appsync/latest/devguide/resolver-context-reference.html - https://docs.amplify.aws/cli/graphql-transformer/function#structure-of-the-function-event """ def __init__(self, data: dict): super().__init__(data) info: Optional[dict] = data.get("info") if not info: info = {"fieldName": self.get("fieldName"), "parentTypeName": self.get("typeName")} self._info = AppSyncResolverEventInfo(info) @property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name @property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name @property def arguments(self) -> Dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"] @property def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity")) @property def source(self) -> Optional[Dict[str, Any]]: """A map that contains the resolution of the parent field.""" return self.get("source") @property def request_headers(self) -> Dict[str, str]: """Request headers""" return self["request"]["headers"] @property def prev_result(self) -> Optional[Dict[str, Any]]: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result") @property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info @property def stash(self) -> Optional[dict]: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash") def get_header_value( self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False ) -> Optional[str]: """Get header value by name Parameters ---------- name: str Header name default_value: str, optional Default value if no value was found by name case_sensitive: bool Whether to use a case-sensitive look up Returns ------- str, optional Header value """ return get_header_value(self.request_headers, name, default_value, case_sensitive)
Ancestors
Instance variables
var arguments : Dict[str, Any]
-
A map that contains all GraphQL arguments for this field.
Expand source code
@property def arguments(self) -> Dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"]
var field_name : str
-
The name of the field that is currently being resolved.
Expand source code
@property def field_name(self) -> str: """The name of the field that is currently being resolved.""" return self.info.field_name
var identity : Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]
-
An object that contains information about the caller.
Depending on the type of identify found:
- API_KEY authorization - returns None
- AWS_IAM authorization - returns AppSyncIdentityIAM
- AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito
Expand source code
@property def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: """An object that contains information about the caller. Depending on the type of identify found: - API_KEY authorization - returns None - AWS_IAM authorization - returns AppSyncIdentityIAM - AMAZON_COGNITO_USER_POOLS authorization - returns AppSyncIdentityCognito """ return get_identity_object(self.get("identity"))
var info : AppSyncResolverEventInfo
-
The info section contains information about the GraphQL request.
Expand source code
@property def info(self) -> AppSyncResolverEventInfo: """The info section contains information about the GraphQL request.""" return self._info
var prev_result : Optional[Dict[str, Any]]
-
It represents the result of whatever previous operation was executed in a pipeline resolver.
Expand source code
@property def prev_result(self) -> Optional[Dict[str, Any]]: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: return None return prev.get("result")
var request_headers : Dict[str, str]
-
Request headers
Expand source code
@property def request_headers(self) -> Dict[str, str]: """Request headers""" return self["request"]["headers"]
var source : Optional[Dict[str, Any]]
-
A map that contains the resolution of the parent field.
Expand source code
@property def source(self) -> Optional[Dict[str, Any]]: """A map that contains the resolution of the parent field.""" return self.get("source")
var stash : Optional[dict]
-
The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.
Expand source code
@property def stash(self) -> Optional[dict]: """The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver.""" return self.get("stash")
var type_name : str
-
The name of the parent type for the field that is currently being resolved.
Expand source code
@property def type_name(self) -> str: """The name of the parent type for the field that is currently being resolved.""" return self.info.parent_type_name
Methods
def get_header_value(self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False) ‑> Optional[str]
-
Get header value by name
Parameters
name
:str
- Header name
default_value
:str
, optional- Default value if no value was found by name
case_sensitive
:bool
- Whether to use a case-sensitive look up
Returns
str
, optional- Header value
Expand source code
def get_header_value( self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False ) -> Optional[str]: """Get header value by name Parameters ---------- name: str Header name default_value: str, optional Default value if no value was found by name case_sensitive: bool Whether to use a case-sensitive look up Returns ------- str, optional Header value """ return get_header_value(self.request_headers, name, default_value, case_sensitive)
Inherited members
class CloudWatchLogsEvent (data: Dict[str, Any])
-
CloudWatch Logs log stream event
You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.
Documentation:
Expand source code
class CloudWatchLogsEvent(DictWrapper): """CloudWatch Logs log stream event You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html """ _decompressed_logs_data = None _json_logs_data = None @property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"] @property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data def parse_logs_data(self) -> CloudWatchLogsDecodedData: """Decode, decompress and parse json data as CloudWatchLogsDecodedData""" if self._json_logs_data is None: self._json_logs_data = json.loads(self.decompress_logs_data.decode("UTF-8")) return CloudWatchLogsDecodedData(self._json_logs_data)
Ancestors
Instance variables
var decompress_logs_data : bytes
-
Decode and decompress log data
Expand source code
@property def decompress_logs_data(self) -> bytes: """Decode and decompress log data""" if self._decompressed_logs_data is None: payload = base64.b64decode(self.raw_logs_data) self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32) return self._decompressed_logs_data
var raw_logs_data : str
-
The value of the
data
field is a Base64 encoded ZIP archive.Expand source code
@property def raw_logs_data(self) -> str: """The value of the `data` field is a Base64 encoded ZIP archive.""" return self["awslogs"]["data"]
Methods
def parse_logs_data(self) ‑> CloudWatchLogsDecodedData
-
Decode, decompress and parse json data as CloudWatchLogsDecodedData
Expand source code
def parse_logs_data(self) -> CloudWatchLogsDecodedData: """Decode, decompress and parse json data as CloudWatchLogsDecodedData""" if self._json_logs_data is None: self._json_logs_data = json.loads(self.decompress_logs_data.decode("UTF-8")) return CloudWatchLogsDecodedData(self._json_logs_data)
Inherited members
class CodePipelineJobEvent (data: Dict[str, Any])
-
AWS CodePipeline Job Event
Documentation:
Expand source code
class CodePipelineJobEvent(DictWrapper): """AWS CodePipeline Job Event Documentation: ------------- - https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html - https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._job = self["CodePipeline.job"] @property def get_id(self) -> str: """Job id""" return self._job["id"] @property def account_id(self) -> str: """Account id""" return self._job["accountId"] @property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"]) @property def user_parameters(self) -> str: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters @property def decoded_user_parameters(self) -> Dict[str, Any]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters @property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name @property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key def setup_s3_client(self): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Returns ------- BaseClient An S3 client with the appropriate credentials """ return boto3.client( "s3", aws_access_key_id=self.data.artifact_credentials.access_key_id, aws_secret_access_key=self.data.artifact_credentials.secret_access_key, aws_session_token=self.data.artifact_credentials.session_token, ) def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: """Find an input artifact by artifact name Parameters ---------- artifact_name : str The name of the input artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.input_artifacts: if artifact.name == artifact_name: return artifact return None def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: """Get a file within an artifact zip on s3 Parameters ---------- artifact_name : str Name of the S3 artifact to download filename : str The file name within the artifact zip to extract as a string Returns ------- str, None Returns the contents file contents as a string """ artifact = self.find_input_artifact(artifact_name) if artifact is None: return None with tempfile.NamedTemporaryFile() as tmp_file: s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, "r") as zip_file: return zip_file.read(filename).decode("UTF-8")
Ancestors
Instance variables
var account_id : str
-
Account id
Expand source code
@property def account_id(self) -> str: """Account id""" return self._job["accountId"]
var data : CodePipelineData
-
Code pipeline jab data
Expand source code
@property def data(self) -> CodePipelineData: """Code pipeline jab data""" return CodePipelineData(self._job["data"])
var decoded_user_parameters : Dict[str, Any]
-
Json Decoded action configuration user parameters
Expand source code
@property def decoded_user_parameters(self) -> Dict[str, Any]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters
var get_id : str
-
Job id
Expand source code
@property def get_id(self) -> str: """Job id""" return self._job["id"]
var input_bucket_name : str
-
Get the first input artifact bucket name
Expand source code
@property def input_bucket_name(self) -> str: """Get the first input artifact bucket name""" return self.data.input_artifacts[0].location.s3_location.bucket_name
var input_object_key : str
-
Get the first input artifact order key unquote plus
Expand source code
@property def input_object_key(self) -> str: """Get the first input artifact order key unquote plus""" return self.data.input_artifacts[0].location.s3_location.object_key
var user_parameters : str
-
Action configuration user parameters
Expand source code
@property def user_parameters(self) -> str: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters
Methods
def find_input_artifact(self, artifact_name: str) ‑> Optional[CodePipelineArtifact]
-
Find an input artifact by artifact name
Parameters
artifact_name
:str
- The name of the input artifact to look for
Returns
CodePipelineArtifact, None
- Matching CodePipelineArtifact if found
Expand source code
def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: """Find an input artifact by artifact name Parameters ---------- artifact_name : str The name of the input artifact to look for Returns ------- CodePipelineArtifact, None Matching CodePipelineArtifact if found """ for artifact in self.data.input_artifacts: if artifact.name == artifact_name: return artifact return None
def get_artifact(self, artifact_name: str, filename: str) ‑> Optional[str]
-
Get a file within an artifact zip on s3
Parameters
artifact_name
:str
- Name of the S3 artifact to download
filename
:str
- The file name within the artifact zip to extract as a string
Returns
str, None
- Returns the contents file contents as a string
Expand source code
def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: """Get a file within an artifact zip on s3 Parameters ---------- artifact_name : str Name of the S3 artifact to download filename : str The file name within the artifact zip to extract as a string Returns ------- str, None Returns the contents file contents as a string """ artifact = self.find_input_artifact(artifact_name) if artifact is None: return None with tempfile.NamedTemporaryFile() as tmp_file: s3 = self.setup_s3_client() bucket = artifact.location.s3_location.bucket_name key = artifact.location.s3_location.key s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, "r") as zip_file: return zip_file.read(filename).decode("UTF-8")
def setup_s3_client(self)
-
Creates an S3 client
Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket.
Returns
BaseClient
- An S3 client with the appropriate credentials
Expand source code
def setup_s3_client(self): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Returns ------- BaseClient An S3 client with the appropriate credentials """ return boto3.client( "s3", aws_access_key_id=self.data.artifact_credentials.access_key_id, aws_secret_access_key=self.data.artifact_credentials.secret_access_key, aws_session_token=self.data.artifact_credentials.session_token, )
Inherited members
class ConnectContactFlowEvent (data: Dict[str, Any])
-
Amazon Connect contact flow event
Documentation:
Expand source code
class ConnectContactFlowEvent(DictWrapper): """Amazon Connect contact flow event Documentation: ------------- - https://docs.aws.amazon.com/connect/latest/adminguide/connect-lambda-functions.html """ @property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"]) @property def parameters(self) -> Dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Ancestors
Instance variables
var contact_data : ConnectContactFlowData
-
This is always passed by Amazon Connect for every contact. Some parameters are optional.
Expand source code
@property def contact_data(self) -> ConnectContactFlowData: """This is always passed by Amazon Connect for every contact. Some parameters are optional.""" return ConnectContactFlowData(self["Details"]["ContactData"])
var parameters : Dict[str, str]
-
These are parameters specific to this call that were defined when you created the Lambda function.
Expand source code
@property def parameters(self) -> Dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"]
Inherited members
class DynamoDBStreamEvent (data: Dict[str, Any])
-
Dynamo DB Stream Event
Documentation:
Example
Process dynamodb stream events and use get_type and get_value for handling conversions
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( AttributeValueType, AttributeValue, ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: key: AttributeValue = record.dynamodb.keys["id"] if key == AttributeValueType.Number: assert key.get_value == key.n_value print(key.get_value) elif key == AttributeValueType.Map: assert key.get_value == key.map_value print(key.get_value)
Expand source code
class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html Example ------- **Process dynamodb stream events and use get_type and get_value for handling conversions** from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( AttributeValueType, AttributeValue, ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: key: AttributeValue = record.dynamodb.keys["id"] if key == AttributeValueType.Number: assert key.get_value == key.n_value print(key.get_value) elif key == AttributeValueType.Map: assert key.get_value == key.map_value print(key.get_value) """ @property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Ancestors
Instance variables
var records : Iterator[DynamoDBRecord]
-
Expand source code
@property def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record)
Inherited members
class EventBridgeEvent (data: Dict[str, Any])
-
Amazon EventBridge Event
Documentation:
Expand source code
class EventBridgeEvent(DictWrapper): """Amazon EventBridge Event Documentation: -------------- - https://docs.aws.amazon.com/eventbridge/latest/userguide/aws-events.html """ @property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"] @property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"] @property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"] @property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"] @property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"] @property def resources(self) -> List[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"] @property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"] @property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"] @property def detail(self) -> Dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"] @property def replay_name(self) -> Optional[str]: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
Ancestors
Instance variables
var account : str
-
The 12-digit number identifying an AWS account.
Expand source code
@property def account(self) -> str: """The 12-digit number identifying an AWS account.""" return self["account"]
var detail : Dict[str, Any]
-
A JSON object, whose content is at the discretion of the service originating the event.
Expand source code
@property def detail(self) -> Dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"]
var detail_type : str
-
Identifies, in combination with the source field, the fields and values that appear in the detail field.
Expand source code
@property def detail_type(self) -> str: """Identifies, in combination with the source field, the fields and values that appear in the detail field.""" return self["detail-type"]
var get_id : str
-
A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.
Expand source code
@property def get_id(self) -> str: """A unique value is generated for every event. This can be helpful in tracing events as they move through rules to targets, and are processed.""" # Note: this name conflicts with existing python builtins return self["id"]
var region : str
-
Identifies the AWS region where the event originated.
Expand source code
@property def region(self) -> str: """Identifies the AWS region where the event originated.""" return self["region"]
var replay_name : Optional[str]
-
Identifies whether the event is being replayed and what is the name of the replay.
Expand source code
@property def replay_name(self) -> Optional[str]: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"]
var resources : List[str]
-
This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.
Expand source code
@property def resources(self) -> List[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"]
var source : str
-
Identifies the service that sourced the event. All events sourced from within AWS begin with "aws."
Expand source code
@property def source(self) -> str: """Identifies the service that sourced the event. All events sourced from within AWS begin with "aws." """ return self["source"]
var time : str
-
The event timestamp, which can be specified by the service originating the event.
If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received.
Expand source code
@property def time(self) -> str: """The event timestamp, which can be specified by the service originating the event. If the event spans a time interval, the service might choose to report the start time, so this value can be noticeably before the time the event is actually received. """ return self["time"]
var version : str
-
By default, this is set to 0 (zero) in all events.
Expand source code
@property def version(self) -> str: """By default, this is set to 0 (zero) in all events.""" return self["version"]
Inherited members
class KinesisStreamEvent (data: Dict[str, Any])
-
Expand source code
class KinesisStreamEvent(DictWrapper): """Kinesis stream event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html """ @property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Ancestors
Instance variables
var records : Iterator[KinesisStreamRecord]
-
Expand source code
@property def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record)
Inherited members
class S3Event (data: Dict[str, Any])
-
S3 event notification
Documentation:
Expand source code
class S3Event(DictWrapper): """S3 event notification Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html - https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html """ @property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record) @property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records) @property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"] @property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
Ancestors
Instance variables
var bucket_name : str
-
Get the bucket name for the first s3 event record
Expand source code
@property def bucket_name(self) -> str: """Get the bucket name for the first s3 event record""" return self["Records"][0]["s3"]["bucket"]["name"]
var object_key : str
-
Get the object key for the first s3 event record and unquote plus
Expand source code
@property def object_key(self) -> str: """Get the object key for the first s3 event record and unquote plus""" return unquote_plus(self["Records"][0]["s3"]["object"]["key"])
var record : S3EventRecord
-
Get the first s3 event record
Expand source code
@property def record(self) -> S3EventRecord: """Get the first s3 event record""" return next(self.records)
var records : Iterator[S3EventRecord]
-
Expand source code
@property def records(self) -> Iterator[S3EventRecord]: for record in self["Records"]: yield S3EventRecord(record)
Inherited members
class SESEvent (data: Dict[str, Any])
-
Amazon SES to receive message event trigger
NOTE: There is a 30-second timeout on RequestResponse invocations.
Documentation:
Expand source code
class SESEvent(DictWrapper): """Amazon SES to receive message event trigger NOTE: There is a 30-second timeout on RequestResponse invocations. Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-ses.html - https://docs.aws.amazon.com/ses/latest/DeveloperGuide/receiving-email-action-lambda.html """ @property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record) @property def record(self) -> SESEventRecord: return next(self.records) @property def mail(self) -> SESMail: return self.record.ses.mail @property def receipt(self) -> SESReceipt: return self.record.ses.receipt
Ancestors
Instance variables
var mail : SESMail
-
Expand source code
@property def mail(self) -> SESMail: return self.record.ses.mail
var receipt : SESReceipt
-
Expand source code
@property def receipt(self) -> SESReceipt: return self.record.ses.receipt
var record : SESEventRecord
-
Expand source code
@property def record(self) -> SESEventRecord: return next(self.records)
var records : Iterator[SESEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SESEventRecord]: for record in self["Records"]: yield SESEventRecord(record)
Inherited members
class SNSEvent (data: Dict[str, Any])
-
Expand source code
class SNSEvent(DictWrapper): """SNS Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html """ @property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record) @property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records) @property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Ancestors
Instance variables
var record : SNSEventRecord
-
Return the first SNS event record
Expand source code
@property def record(self) -> SNSEventRecord: """Return the first SNS event record""" return next(self.records)
var records : Iterator[SNSEventRecord]
-
Expand source code
@property def records(self) -> Iterator[SNSEventRecord]: for record in self["Records"]: yield SNSEventRecord(record)
var sns_message : str
-
Return the message for the first sns event record
Expand source code
@property def sns_message(self) -> str: """Return the message for the first sns event record""" return self.record.sns.message
Inherited members
class SQSEvent (data: Dict[str, Any])
-
Expand source code
class SQSEvent(DictWrapper): """SQS Event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html """ @property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(record)
Ancestors
Instance variables
var records : Iterator[SQSRecord]
-
Expand source code
@property def records(self) -> Iterator[SQSRecord]: for record in self["Records"]: yield SQSRecord(record)
Inherited members