How to Save MQTT Messages to DynamoDB Using AWS IoT Rules
In this article, we will explore how to use AWS IoT Rules to save the content of MQTT messages to DynamoDB.
The DynamoDB (dynamoDB) action writes all or part of an MQTT message to an Amazon DynamoDB table.
DynamoDB
This time, we will automatically construct AWS IoT Rules, a DynamoDB table, and an IAM role using CloudFormation to efficiently save messages. This will make it easier to manage and analyze IoT data.
We referred to the following page to confirm how to save data to DynamoDB:
Configuration
- AWS IoT Rule: Saves messages published to a topic into DynamoDB.
- IAM Role: Grants permissions for the IoT Rule to access DynamoDB.
- DynamoDB Table: Creates a table with defined partition and sort keys.
Resources
AWS IoT Rule
We define the rule using SQL and create a message composed of five data points. Additionally, we use the FROM clause to specify the topic from which to retrieve data.
temperature, humidity, barometer
These data points use the respective values included in the message delivered to the original topic.
wind_velocity, wind_bearing
These two data points reference values stored in the wind data within the message. They are nested values named velocity and bearing, respectively.
ROM Clause
In the FROM clause, we specify the topic name from which to retrieve data. In this configuration, we target the topic device/+/data.
AWS IoT Rule Action
There are three key points regarding the action for DynamoDB:
Information about DynamoDB and Partition/Sort Keys
We set the table name and the names and types of the table’s partition and sort keys.
Values for Partition and Sort Keys
When setting these values, we use substitution templates.
You can use a substitution template to augment the JSON data returned when a rule is triggered and AWS IoT performs an action. The syntax for a substitution template is ${expression}, where expression can be any expression supported by AWS IoT in SELECT clauses, WHERE clauses, and AWS IoT rule actions.
Substitution templates
You can use built-in functions within substitution templates.
You can use the following built-in functions in the SELECT or WHERE clauses of your SQL expressions.
Functions
For the partition key value, we use the timestamp() function to set the UNIX time when the message is received.
Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970, as observed by the AWS IoT rules engine.
timestamp()
For the sort key value, we use the cast() and topic() functions to split the topic name by slashes, retrieve the second string, and convert it to a number.
Converts a value from one data type to another.
cast()
Returns the topic to which the message that triggered the rule was sent. If no parameter is specified, the entire topic is returned.
topic(Decimal)
For more details, please refer to the following page:
IAM Role
In the trust policy, we allow the iot.amazonaws.com service to assume this IAM role. The IAM policy attached to the IAM role permits the dynamodb:PutItem action. We specify the ARN of the DynamoDB table as the resource, allowing write operations to this table.
DynamoDB Table
This DynamoDB table is used to store messages sent from AWS IoT. The names and types of the partition and sort keys match the contents specified in the earlier IoT Rule action.
For the basics of DynamoDB, please refer to the following page:
CloudFormation Template
AWS IoT Rule
Resources:
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: wx_data_ddb
TopicRulePayload:
Actions:
- DynamoDB:
HashKeyField: !Ref PartitionKey
HashKeyType: NUMBER
HashKeyValue: ${timestamp()}
RangeKeyField: !Ref SortKey
RangeKeyType: NUMBER
RangeKeyValue: ${cast(topic(2) AS DECIMAL)}
RoleArn: !GetAtt TopicRuleRole.Arn
TableName: !Ref Table
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub |
SELECT temperature, humidity, barometer,
wind.velocity as wind_velocity,
wind.bearing as wind_bearing,
FROM '${TopicName}'
Code language: YAML (yaml)
IAM Role
Resources:
TopicRuleRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- iot.amazonaws.com
Policies:
- PolicyName: TopicRulePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
- !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${Table}"
Code language: YAML (yaml)
DynamoDB Table
Resources:
Table:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: !Ref PartitionKey
AttributeType: N
- AttributeName: !Ref SortKey
AttributeType: N
BillingMode: PAY_PER_REQUEST
KeySchema:
- AttributeName: !Ref PartitionKey
KeyType: HASH
- AttributeName: !Ref SortKey
KeyType: RANGE
TableClass: STANDARD
TableName: wx_data
Code language: YAML (yaml)
Full Template
Verification
First, subscribe to the topic device/+/data.
Next, publish a message to the topic device/32/data.
You can receive the message on the subscribed topic device/+/data. This topic uses wildcard notation to receive messages.
Check the DynamoDB table.
An item has been added to the table.
The partition key is the UNIX time when the message was published, and the sort key is the numerical value included in the topic name. Both were specified in the IoT action.
The values specified in the SELECT clause of the SQL expression are saved under an attribute named payload. temperature, humidity, and barometer have the same names as in the message, while wind_velocity and wind_bearing are named as specified in the SELECT clause.
Conclusion
We have confirmed how to use AWS IoT Rules to save MQTT messages to DynamoDB. By utilizing CloudFormation, we automatically constructed the AWS IoT Rule, IAM role, and DynamoDB table. This setup enables efficient storage of IoT data and allows for real-time data processing.