How to Save MQTT Messages to DynamoDB Using AWS IoT Rules
In this article, we will explore how to use AWS IoT Rules to save the content of MQTT messages to DynamoDB.
The DynamoDB (dynamoDB) action writes all or part of an MQTT message to an Amazon DynamoDB table.
DynamoDB
This time, we will automatically construct AWS IoT Rules, a DynamoDB table, and an IAM role using CloudFormation to efficiently save messages. This will make it easier to manage and analyze IoT data.
We referred to the following page to confirm how to save data to DynamoDB:
Configuration

- AWS IoT Rule: Saves messages published to a topic into DynamoDB.
- IAM Role: Grants permissions for the IoT Rule to access DynamoDB.
- DynamoDB Table: Creates a table with defined partition and sort keys.
Resources
AWS IoT Rule

We define the rule using SQL and create a message composed of five data points. Additionally, we use the FROM clause to specify the topic from which to retrieve data.
temperature, humidity, barometer
These data points use the respective values included in the message delivered to the original topic.
wind_velocity, wind_bearing
These two data points reference values stored in the wind data within the message. They are nested values named velocity and bearing, respectively.
ROM Clause
In the FROM clause, we specify the topic name from which to retrieve data. In this configuration, we target the topic device/+/data.
AWS IoT Rule Action

There are three key points regarding the action for DynamoDB:
Information about DynamoDB and Partition/Sort Keys
We set the table name and the names and types of the table’s partition and sort keys.
Values for Partition and Sort Keys
When setting these values, we use substitution templates.
You can use a substitution template to augment the JSON data returned when a rule is triggered and AWS IoT performs an action. The syntax for a substitution template is ${expression}, where expression can be any expression supported by AWS IoT in SELECT clauses, WHERE clauses, and AWS IoT rule actions.
Substitution templates
You can use built-in functions within substitution templates.
You can use the following built-in functions in the SELECT or WHERE clauses of your SQL expressions.
Functions
For the partition key value, we use the timestamp() function to set the UNIX time when the message is received.
Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970, as observed by the AWS IoT rules engine.
timestamp()
For the sort key value, we use the cast() and topic() functions to split the topic name by slashes, retrieve the second string, and convert it to a number.
Converts a value from one data type to another.
cast()
Returns the topic to which the message that triggered the rule was sent. If no parameter is specified, the entire topic is returned.
topic(Decimal)
For more details, please refer to the following page:

IAM Role


In the trust policy, we allow the iot.amazonaws.com service to assume this IAM role. The IAM policy attached to the IAM role permits the dynamodb:PutItem action. We specify the ARN of the DynamoDB table as the resource, allowing write operations to this table.
DynamoDB Table

This DynamoDB table is used to store messages sent from AWS IoT. The names and types of the partition and sort keys match the contents specified in the earlier IoT Rule action.
For the basics of DynamoDB, please refer to the following page:

CloudFormation Template
AWS IoT Rule
Resources:
  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: wx_data_ddb
      TopicRulePayload: 
        Actions: 
          - DynamoDB: 
              HashKeyField: !Ref PartitionKey
              HashKeyType: NUMBER
              HashKeyValue: ${timestamp()}
              RangeKeyField: !Ref SortKey
              RangeKeyType: NUMBER
              RangeKeyValue: ${cast(topic(2) AS DECIMAL)}
              RoleArn: !GetAtt TopicRuleRole.Arn
              TableName: !Ref Table
        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: !Sub |
          SELECT temperature, humidity, barometer,
            wind.velocity as wind_velocity,
            wind.bearing as wind_bearing,
          FROM '${TopicName}'
Code language: YAML (yaml)IAM Role
Resources:
  TopicRuleRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - iot.amazonaws.com
      Policies:
        - PolicyName: TopicRulePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:PutItem
                Resource:
                  - !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${Table}"
Code language: YAML (yaml)DynamoDB Table
Resources:
  Table:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: !Ref PartitionKey
          AttributeType: N
        - AttributeName: !Ref SortKey
          AttributeType: N
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: !Ref PartitionKey
          KeyType: HASH
        - AttributeName: !Ref SortKey
          KeyType: RANGE
      TableClass: STANDARD
      TableName: wx_data
Code language: YAML (yaml)Full Template
Verification
First, subscribe to the topic device/+/data.

Next, publish a message to the topic device/32/data.

You can receive the message on the subscribed topic device/+/data. This topic uses wildcard notation to receive messages.
Check the DynamoDB table.

An item has been added to the table.
The partition key is the UNIX time when the message was published, and the sort key is the numerical value included in the topic name. Both were specified in the IoT action.
The values specified in the SELECT clause of the SQL expression are saved under an attribute named payload. temperature, humidity, and barometer have the same names as in the message, while wind_velocity and wind_bearing are named as specified in the SELECT clause.
Conclusion
We have confirmed how to use AWS IoT Rules to save MQTT messages to DynamoDB. By utilizing CloudFormation, we automatically constructed the AWS IoT Rule, IAM role, and DynamoDB table. This setup enables efficient storage of IoT data and allows for real-time data processing.