Module aws_lambda_powertools.utilities.data_classes.rabbit_mq_event
Expand source code
import base64
import json
from typing import Any, Dict, List
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
class BasicProperties(DictWrapper):
@property
def content_type(self) -> str:
return self["contentType"]
@property
def content_encoding(self) -> str:
return self["contentEncoding"]
@property
def headers(self) -> Dict[str, Any]:
return self["headers"]
@property
def delivery_mode(self) -> int:
return self["deliveryMode"]
@property
def priority(self) -> int:
return self["priority"]
@property
def correlation_id(self) -> str:
return self["correlationId"]
@property
def reply_to(self) -> str:
return self["replyTo"]
@property
def expiration(self) -> str:
return self["expiration"]
@property
def message_id(self) -> str:
return self["messageId"]
@property
def timestamp(self) -> str:
return self["timestamp"]
@property
def get_type(self) -> str:
return self["type"]
@property
def user_id(self) -> str:
return self["userId"]
@property
def app_id(self) -> str:
return self["appId"]
@property
def cluster_id(self) -> str:
return self["clusterId"]
@property
def body_size(self) -> int:
return self["bodySize"]
class RabbitMessage(DictWrapper):
@property
def basic_properties(self) -> BasicProperties:
return BasicProperties(self["basicProperties"])
@property
def redelivered(self) -> bool:
return self["redelivered"]
@property
def data(self) -> str:
return self["data"]
@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()
@property
def json_data(self) -> Any:
"""Parses the data as json"""
if self._json_data is None:
self._json_data = json.loads(self.decoded_data)
return self._json_data
class RabbitMQEvent(DictWrapper):
"""Represents a Rabbit MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
"""
def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._rmq_messages_by_queue = {
key: [RabbitMessage(message) for message in messages]
for key, messages in self["rmqMessagesByQueue"].items()
}
@property
def event_source(self) -> str:
return self["eventSource"]
@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]
@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
return self._rmq_messages_by_queue
Classes
class BasicProperties (data: Dict[str, Any])
-
Provides a single read only access to a wrapper dict
Expand source code
class BasicProperties(DictWrapper): @property def content_type(self) -> str: return self["contentType"] @property def content_encoding(self) -> str: return self["contentEncoding"] @property def headers(self) -> Dict[str, Any]: return self["headers"] @property def delivery_mode(self) -> int: return self["deliveryMode"] @property def priority(self) -> int: return self["priority"] @property def correlation_id(self) -> str: return self["correlationId"] @property def reply_to(self) -> str: return self["replyTo"] @property def expiration(self) -> str: return self["expiration"] @property def message_id(self) -> str: return self["messageId"] @property def timestamp(self) -> str: return self["timestamp"] @property def get_type(self) -> str: return self["type"] @property def user_id(self) -> str: return self["userId"] @property def app_id(self) -> str: return self["appId"] @property def cluster_id(self) -> str: return self["clusterId"] @property def body_size(self) -> int: return self["bodySize"]
Ancestors
Instance variables
var app_id : str
-
Expand source code
@property def app_id(self) -> str: return self["appId"]
var body_size : int
-
Expand source code
@property def body_size(self) -> int: return self["bodySize"]
var cluster_id : str
-
Expand source code
@property def cluster_id(self) -> str: return self["clusterId"]
var content_encoding : str
-
Expand source code
@property def content_encoding(self) -> str: return self["contentEncoding"]
var content_type : str
-
Expand source code
@property def content_type(self) -> str: return self["contentType"]
var correlation_id : str
-
Expand source code
@property def correlation_id(self) -> str: return self["correlationId"]
var delivery_mode : int
-
Expand source code
@property def delivery_mode(self) -> int: return self["deliveryMode"]
var expiration : str
-
Expand source code
@property def expiration(self) -> str: return self["expiration"]
var get_type : str
-
Expand source code
@property def get_type(self) -> str: return self["type"]
var headers : Dict[str, Any]
-
Expand source code
@property def headers(self) -> Dict[str, Any]: return self["headers"]
var message_id : str
-
Expand source code
@property def message_id(self) -> str: return self["messageId"]
var priority : int
-
Expand source code
@property def priority(self) -> int: return self["priority"]
var reply_to : str
-
Expand source code
@property def reply_to(self) -> str: return self["replyTo"]
var timestamp : str
-
Expand source code
@property def timestamp(self) -> str: return self["timestamp"]
var user_id : str
-
Expand source code
@property def user_id(self) -> str: return self["userId"]
Inherited members
class RabbitMQEvent (data: Dict[str, Any])
-
Represents a Rabbit MQ event sent to Lambda
Documentation:
Expand source code
class RabbitMQEvent(DictWrapper): """Represents a Rabbit MQ event sent to Lambda Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/ """ def __init__(self, data: Dict[str, Any]): super().__init__(data) self._rmq_messages_by_queue = { key: [RabbitMessage(message) for message in messages] for key, messages in self["rmqMessagesByQueue"].items() } @property def event_source(self) -> str: return self["eventSource"] @property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"] @property def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: return self._rmq_messages_by_queue
Ancestors
Instance variables
var event_source : str
-
Expand source code
@property def event_source(self) -> str: return self["eventSource"]
var event_source_arn : str
-
The Amazon Resource Name (ARN) of the event source
Expand source code
@property def event_source_arn(self) -> str: """The Amazon Resource Name (ARN) of the event source""" return self["eventSourceArn"]
var rmq_messages_by_queue : Dict[str, List[RabbitMessage]]
-
Expand source code
@property def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: return self._rmq_messages_by_queue
Inherited members
class RabbitMessage (data: Dict[str, Any])
-
Provides a single read only access to a wrapper dict
Expand source code
class RabbitMessage(DictWrapper): @property def basic_properties(self) -> BasicProperties: return BasicProperties(self["basicProperties"]) @property def redelivered(self) -> bool: return self["redelivered"] @property def data(self) -> str: return self["data"] @property def decoded_data(self) -> str: """Decodes the data as a str""" return base64.b64decode(self.data.encode()).decode() @property def json_data(self) -> Any: """Parses the data as json""" if self._json_data is None: self._json_data = json.loads(self.decoded_data) return self._json_data
Ancestors
Instance variables
var basic_properties : BasicProperties
-
Expand source code
@property def basic_properties(self) -> BasicProperties: return BasicProperties(self["basicProperties"])
var data : str
-
Expand source code
@property def data(self) -> str: return self["data"]
var decoded_data : str
-
Decodes the data as a str
Expand source code
@property def decoded_data(self) -> str: """Decodes the data as a str""" return base64.b64decode(self.data.encode()).decode()
var json_data : Any
-
Parses the data as json
Expand source code
@property def json_data(self) -> Any: """Parses the data as json""" if self._json_data is None: self._json_data = json.loads(self.decoded_data) return self._json_data
var redelivered : bool
-
Expand source code
@property def redelivered(self) -> bool: return self["redelivered"]
Inherited members