Module aws_lambda_powertools.utilities.data_classes.rabbit_mq_event

Expand source code
import base64
import json
from typing import Any, Dict, List

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class BasicProperties(DictWrapper):
    @property
    def content_type(self) -> str:
        return self["contentType"]

    @property
    def content_encoding(self) -> str:
        return self["contentEncoding"]

    @property
    def headers(self) -> Dict[str, Any]:
        return self["headers"]

    @property
    def delivery_mode(self) -> int:
        return self["deliveryMode"]

    @property
    def priority(self) -> int:
        return self["priority"]

    @property
    def correlation_id(self) -> str:
        return self["correlationId"]

    @property
    def reply_to(self) -> str:
        return self["replyTo"]

    @property
    def expiration(self) -> str:
        return self["expiration"]

    @property
    def message_id(self) -> str:
        return self["messageId"]

    @property
    def timestamp(self) -> str:
        return self["timestamp"]

    @property
    def get_type(self) -> str:
        return self["type"]

    @property
    def user_id(self) -> str:
        return self["userId"]

    @property
    def app_id(self) -> str:
        return self["appId"]

    @property
    def cluster_id(self) -> str:
        return self["clusterId"]

    @property
    def body_size(self) -> int:
        return self["bodySize"]


class RabbitMessage(DictWrapper):
    @property
    def basic_properties(self) -> BasicProperties:
        return BasicProperties(self["basicProperties"])

    @property
    def redelivered(self) -> bool:
        return self["redelivered"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64.b64decode(self.data.encode()).decode()

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = json.loads(self.decoded_data)
        return self._json_data


class RabbitMQEvent(DictWrapper):
    """Represents a Rabbit MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._rmq_messages_by_queue = {
            key: [RabbitMessage(message) for message in messages]
            for key, messages in self["rmqMessagesByQueue"].items()
        }

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
        return self._rmq_messages_by_queue

Classes

class BasicProperties (data: Dict[str, Any])

Provides a single read only access to a wrapper dict

Expand source code
class BasicProperties(DictWrapper):
    @property
    def content_type(self) -> str:
        return self["contentType"]

    @property
    def content_encoding(self) -> str:
        return self["contentEncoding"]

    @property
    def headers(self) -> Dict[str, Any]:
        return self["headers"]

    @property
    def delivery_mode(self) -> int:
        return self["deliveryMode"]

    @property
    def priority(self) -> int:
        return self["priority"]

    @property
    def correlation_id(self) -> str:
        return self["correlationId"]

    @property
    def reply_to(self) -> str:
        return self["replyTo"]

    @property
    def expiration(self) -> str:
        return self["expiration"]

    @property
    def message_id(self) -> str:
        return self["messageId"]

    @property
    def timestamp(self) -> str:
        return self["timestamp"]

    @property
    def get_type(self) -> str:
        return self["type"]

    @property
    def user_id(self) -> str:
        return self["userId"]

    @property
    def app_id(self) -> str:
        return self["appId"]

    @property
    def cluster_id(self) -> str:
        return self["clusterId"]

    @property
    def body_size(self) -> int:
        return self["bodySize"]

Ancestors

Instance variables

var app_id : str
Expand source code
@property
def app_id(self) -> str:
    return self["appId"]
var body_size : int
Expand source code
@property
def body_size(self) -> int:
    return self["bodySize"]
var cluster_id : str
Expand source code
@property
def cluster_id(self) -> str:
    return self["clusterId"]
var content_encoding : str
Expand source code
@property
def content_encoding(self) -> str:
    return self["contentEncoding"]
var content_type : str
Expand source code
@property
def content_type(self) -> str:
    return self["contentType"]
var correlation_id : str
Expand source code
@property
def correlation_id(self) -> str:
    return self["correlationId"]
var delivery_mode : int
Expand source code
@property
def delivery_mode(self) -> int:
    return self["deliveryMode"]
var expiration : str
Expand source code
@property
def expiration(self) -> str:
    return self["expiration"]
var get_type : str
Expand source code
@property
def get_type(self) -> str:
    return self["type"]
var headers : Dict[str, Any]
Expand source code
@property
def headers(self) -> Dict[str, Any]:
    return self["headers"]
var message_id : str
Expand source code
@property
def message_id(self) -> str:
    return self["messageId"]
var priority : int
Expand source code
@property
def priority(self) -> int:
    return self["priority"]
var reply_to : str
Expand source code
@property
def reply_to(self) -> str:
    return self["replyTo"]
var timestamp : str
Expand source code
@property
def timestamp(self) -> str:
    return self["timestamp"]
var user_id : str
Expand source code
@property
def user_id(self) -> str:
    return self["userId"]

Inherited members

class RabbitMQEvent (data: Dict[str, Any])
Expand source code
class RabbitMQEvent(DictWrapper):
    """Represents a Rabbit MQ event sent to Lambda

    Documentation:
    --------------
    - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
    - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
    """

    def __init__(self, data: Dict[str, Any]):
        super().__init__(data)
        self._rmq_messages_by_queue = {
            key: [RabbitMessage(message) for message in messages]
            for key, messages in self["rmqMessagesByQueue"].items()
        }

    @property
    def event_source(self) -> str:
        return self["eventSource"]

    @property
    def event_source_arn(self) -> str:
        """The Amazon Resource Name (ARN) of the event source"""
        return self["eventSourceArn"]

    @property
    def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
        return self._rmq_messages_by_queue

Ancestors

Instance variables

var event_source : str
Expand source code
@property
def event_source(self) -> str:
    return self["eventSource"]
var event_source_arn : str

The Amazon Resource Name (ARN) of the event source

Expand source code
@property
def event_source_arn(self) -> str:
    """The Amazon Resource Name (ARN) of the event source"""
    return self["eventSourceArn"]
var rmq_messages_by_queue : Dict[str, List[RabbitMessage]]
Expand source code
@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
    return self._rmq_messages_by_queue

Inherited members

class RabbitMessage (data: Dict[str, Any])

Provides a single read only access to a wrapper dict

Expand source code
class RabbitMessage(DictWrapper):
    @property
    def basic_properties(self) -> BasicProperties:
        return BasicProperties(self["basicProperties"])

    @property
    def redelivered(self) -> bool:
        return self["redelivered"]

    @property
    def data(self) -> str:
        return self["data"]

    @property
    def decoded_data(self) -> str:
        """Decodes the data as a str"""
        return base64.b64decode(self.data.encode()).decode()

    @property
    def json_data(self) -> Any:
        """Parses the data as json"""
        if self._json_data is None:
            self._json_data = json.loads(self.decoded_data)
        return self._json_data

Ancestors

Instance variables

var basic_propertiesBasicProperties
Expand source code
@property
def basic_properties(self) -> BasicProperties:
    return BasicProperties(self["basicProperties"])
var data : str
Expand source code
@property
def data(self) -> str:
    return self["data"]
var decoded_data : str

Decodes the data as a str

Expand source code
@property
def decoded_data(self) -> str:
    """Decodes the data as a str"""
    return base64.b64decode(self.data.encode()).decode()
var json_data : Any

Parses the data as json

Expand source code
@property
def json_data(self) -> Any:
    """Parses the data as json"""
    if self._json_data is None:
        self._json_data = json.loads(self.decoded_data)
    return self._json_data
var redelivered : bool
Expand source code
@property
def redelivered(self) -> bool:
    return self["redelivered"]

Inherited members