AWS IoTルールでMQTTメッセージをDynamoDBに保存する
この記事では、AWS IoTルールを使用して、MQTTメッセージの内容をDynamoDBに保存する方法を確認します。
DynamoDB (dynamoDB) アクションは、MQTT メッセージの全部または一部を Amazon DynamoDB テーブルに書き込みます。
DynamoDB
今回は、AWS IoTルールとDynamoDBテーブル、IAMロールをCloudFormationで自動構築し、メッセージを効率的に保存する方法を解説します。これにより、IoTデータの管理と分析を容易に行えるようになります。
今回は以下のページを参考に、DynamoDBに保存する方法を確認します。
構成
- AWS IoTルール:トピックに発行されたメッセージをDynamoDBに保存します。
- IAMロール:IoTルールがDynamoDBにアクセスするための権限を付与します。
- DynamoDBテーブル:パーティションキー・ソートキーを定義したテーブルを作成します。
リソース
AWS IoTルール
SQLでルールを定義し、5つのデータで構成されるメッセージを作成します。さらに、FROM句を用いてデータを取得するトピックを設定します。
temperature, humidity, barometer
これらのデータは、元のトピックに配信されたメッセージに含まれる各データを使用します。
wind_velocity, wind_bearing
これらの2つのデータは、メッセージ内のwindというデータに格納されている値を参照します。それぞれvelocityおよびbearingというネストされた値です。
FROM句
FROM句でデータを取得するトピック名を指定します。今回の構成では、トピックdevice/+/dataを対象とします。
AWS IoTルールアクション
DynamoDB向けアクションに関するポイントは以下の3点です。
DynamoDBとパーティションキー・ソートキーの情報
テーブル名、テーブルの両キーの名前・型を設定します。
パーティションキー・ソートキーの値
これらの値を設定する際に、置換テンプレートを使用します。
代替テンプレートを使用して、ルールがトリガーされてアクションを実行したときに返される JSON データを拡張できます。 AWS IoT ${代替テンプレートの構文は式です。式には SELECT 句}、WHERE 句、 AWS IoT およびでサポートされている任意の式を使用できます。 AWS IoT ルールアクション
置換テンプレート
置換テンプレート内では、関数を利用することができます。
SQL 式の SELECT 句または WHERE 句では、以下の組み込み関数を使用できます。
関数
パーティションキーの値として、timestamp()関数を使用して、メッセージ取得時のUNIX時間を設定します。
ルールエンジンが監視する 1970 年 1 月 1 日 (木) の 00:00:00 からの現在のタイムスタンプをミリ秒単位で返します。
timestamp()
ソートキーの値として、cast()関数およびtopic()関数を使用して、トピック名をスラッシュで分割し、2番目の文字列を取得後、それを数値に変換します。
値を 1 つのデータ型から別のデータ型へ変換します。
cast()
ルールをトリガーしたメッセージが送信されたトピックを返します。パラメータが指定されていない場合、トピック全体が返されます。
topic(10 進数)
AWS IoTルールの基本的な事項については、以下のページをご確認ください。
IAMロール
信頼ポリシーでiot.amazonaws.comサービスがこのIAMロールを引き受けることを許可します。IAMロールにアタッチしたIAMポリシーでは、dynamodb:PutItemアクションを許可します。対象としてDynamoDBのARNを指定し、このテーブルに対して書き込むことを許可します。
DynamoDBテーブル
AWS IoTから送信されてきたメッセージを保存するためのDynamoDBテーブルです。パーティションキー・ソートキーの名前や型は、先述のIoTルールアクションの内容と一致させます。
DynamoDBに関する基本的な事項は以下のページをご確認ください。
CloudFormationテンプレート
AWS IoTルール
Resources:
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: wx_data_ddb
TopicRulePayload:
Actions:
- DynamoDB:
HashKeyField: !Ref PartitionKey
HashKeyType: NUMBER
HashKeyValue: ${timestamp()}
RangeKeyField: !Ref SortKey
RangeKeyType: NUMBER
RangeKeyValue: ${cast(topic(2) AS DECIMAL)}
RoleArn: !GetAtt TopicRuleRole.Arn
TableName: !Ref Table
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub |
SELECT temperature, humidity, barometer,
wind.velocity as wind_velocity,
wind.bearing as wind_bearing,
FROM '${TopicName}'
Code language: YAML (yaml)
IAMロール
Resources:
TopicRuleRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- iot.amazonaws.com
Policies:
- PolicyName: TopicRulePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
- !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${Table}"
Code language: YAML (yaml)
DynamoDBテーブル
Resources:
Table:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: !Ref PartitionKey
AttributeType: N
- AttributeName: !Ref SortKey
AttributeType: N
BillingMode: PAY_PER_REQUEST
KeySchema:
- AttributeName: !Ref PartitionKey
KeyType: HASH
- AttributeName: !Ref SortKey
KeyType: RANGE
TableClass: STANDARD
TableName: wx_data
Code language: YAML (yaml)
テンプレート全体
動作確認
まず、トピックdevice/+/dataをサブスクライブします。
次に、トピックdevice/32/dataにメッセージを発行します。
サブスクライブしたトピックでメッセージを受信できました。このトピックはワイルドカード表記により、メッセージを受け取ることができます。
DynamoDBテーブルを確認します。
テーブルにアイテムが追加されていました。
パーティションキーはメッセージ発行時のUNIX時間で、ソートキーはトピック名に含まれる数値です。どちらもIoTアクションで指定したものです。
SQL式のSELECT句で指定した値はpayloadというアトリビュート名で保存されています。temperature、humidity、barometerはメッセージのものと同名、wind_velocityおよびwind_bearingはSELECT句で指定した名前です。
まとめ
AWS IoTルールを使用して、MQTTメッセージをDynamoDBに保存する方法を確認しました。CloudFormationを利用して、AWS IoTルール、IAMロール、DynamoDBテーブルを自動構築しました。この仕組みにより、IoTデータの効率的な保存とリアルタイムなデータ処理が可能になります。