Module aws_lambda_powertools.utilities.parser.models.sns

Expand source code
from datetime import datetime
from typing import Dict, List, Optional
from typing import Type as TypingType
from typing import Union

from pydantic import BaseModel, root_validator
from pydantic.networks import HttpUrl

from aws_lambda_powertools.utilities.parser.types import Literal


class SnsMsgAttributeModel(BaseModel):
    Type: str
    Value: str


class SnsNotificationModel(BaseModel):
    Subject: Optional[str]
    TopicArn: str
    UnsubscribeUrl: HttpUrl
    Type: Literal["Notification"]
    MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]]
    Message: Union[str, TypingType[BaseModel]]
    MessageId: str
    SigningCertUrl: HttpUrl
    Signature: str
    Timestamp: datetime
    SignatureVersion: str

    @root_validator(pre=True, allow_reuse=True)
    def check_sqs_protocol(cls, values):
        sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
        if any(key in sqs_rewritten_keys for key in values):
            values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
            values["SigningCertUrl"] = values.pop("SigningCertURL")
        return values


class SnsRecordModel(BaseModel):
    EventSource: Literal["aws:sns"]
    EventVersion: str
    EventSubscriptionArn: str
    Sns: SnsNotificationModel


class SnsModel(BaseModel):
    Records: List[SnsRecordModel]

Classes

class SnsModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SnsModel(BaseModel):
    Records: List[SnsRecordModel]

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Records : List[SnsRecordModel]
class SnsMsgAttributeModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SnsMsgAttributeModel(BaseModel):
    Type: str
    Value: str

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Type : str
var Value : str
class SnsNotificationModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SnsNotificationModel(BaseModel):
    Subject: Optional[str]
    TopicArn: str
    UnsubscribeUrl: HttpUrl
    Type: Literal["Notification"]
    MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]]
    Message: Union[str, TypingType[BaseModel]]
    MessageId: str
    SigningCertUrl: HttpUrl
    Signature: str
    Timestamp: datetime
    SignatureVersion: str

    @root_validator(pre=True, allow_reuse=True)
    def check_sqs_protocol(cls, values):
        sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
        if any(key in sqs_rewritten_keys for key in values):
            values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
            values["SigningCertUrl"] = values.pop("SigningCertURL")
        return values

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Message : Union[str, Type[pydantic.main.BaseModel]]
var MessageAttributes : Optional[Dict[str, SnsMsgAttributeModel]]
var MessageId : str
var Signature : str
var SignatureVersion : str
var SigningCertUrl : pydantic.networks.HttpUrl
var Subject : Optional[str]
var Timestamp : datetime.datetime
var TopicArn : str
var Type : Literal['Notification']
var UnsubscribeUrl : pydantic.networks.HttpUrl

Static methods

def check_sqs_protocol(values)
Expand source code
@root_validator(pre=True, allow_reuse=True)
def check_sqs_protocol(cls, values):
    sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
    if any(key in sqs_rewritten_keys for key in values):
        values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
        values["SigningCertUrl"] = values.pop("SigningCertURL")
    return values
class SnsRecordModel (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class SnsRecordModel(BaseModel):
    EventSource: Literal["aws:sns"]
    EventVersion: str
    EventSubscriptionArn: str
    Sns: SnsNotificationModel

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var EventSource : Literal['aws:sns']
var EventSubscriptionArn : str
var EventVersion : str
var SnsSnsNotificationModel