Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する
以下のページで、Lambda関数で生成したデータをKinesis Data Streamsで受け取り、Kinessis Data Analyticsで分析する方法をご紹介しました。
今回はKinesis Data Firehoseを用いて、S3バケットにデータを保存することを目指します。
構築する環境
S3バケットに保存するストリームデータは、Lambda関数で生成します。
関数のランタイム環境はPython3.8とします。
関数が生成したデータをKinesis Data Streamsでキャプチャします。
Finesis Data Firehoseの配信ストリームを作成します。
データの送信元にData Streamsを指定します。
データの送信先にS3バケットを指定します。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートファイルを設置しています。
https://github.com/awstut-an-r/awstut-saa/tree/main/04/003
テンプレートファイルのポイント解説
Lambda関数
Resources:
DataSourceLambda:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import boto3
import datetime
import json
import os
import random
STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
LIMIT = 10
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'PRICE': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client, limit):
for i in range(limit):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data).encode('utf-8'),
PartitionKey="partitionkey")
def lambda_handler(event, context):
generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
Environment:
Variables:
KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
Handler: !Ref Handler
Role: !GetAtt DataSourceLambdaRole.Arn
Runtime: !Ref Runtime
Timeout: !Ref Timeout
Code language: YAML (yaml)
Lambda関数で実行するコードをインライン表記で定義します。
詳細につきましては、以下のページをご確認ください。
実行するコードの内容は、以下のAWS公式ページの内容を参考にしました。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/get-started-exercise.html
変更点としては、生成するデータに上限を設定したことです。
今回はデータを10個生成します。
以下が本関数用のIAMロールです。
Resources:
DataSourceLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: DataSourceLambdaPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:PutRecord
Resource:
- !Ref KinesisDataStreamArn
Code language: YAML (yaml)
LambdaからKinesis Data Streamsにアクセスするための権限を与えます。
具体的には、kinesis:PutRecordアクションを許可します。
Kinesis Data Streams
Resources:
KinesisDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub "${Prefix}-DataStream"
ShardCount: !Ref ShardCount
Code language: YAML (yaml)
Data Streamsを作成します。
特別な設定は不要です。
シャード数を「1」に設定します。
Kinesis Data Firehose
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: !Ref KinesisFirehoseDeliveryStreamName
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt KinesisDataStream.Arn
RoleARN: !GetAtt KinesisStreamSourceRole.Arn
S3DestinationConfiguration:
BucketARN: !Ref BucketArn
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref LogGroup
LogStreamName: !Ref LogStream
CompressionFormat: UNCOMPRESSED
Prefix: firehose/
RoleARN: !GetAtt KinesisS3DestinationRole.Arn
Code language: YAML (yaml)
Data Firehoseを作成します。
今回はData Streamsをデータ送信元としますので、2つのプロパティを設定します。
1つ目はDeliveryStreamTypeプロパティです。
本プロパティには「KinesisStreamAsSource」を指定します。
2点目はKinesisStreamSourceConfigurationプロパティです。
先述のData Streamsを指定します。
そして今回はS3バケットをデータ送信先としますので、S3DestinationConfigurationプロパティを設定します。
BucketARNプロパティに送信先のバケットを指定します。
以下がData Firehoseに関する2つのIAMロールです。
AWS公式ページに取り上げられているアクセス権の設定例を参考にします。
https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/controlling-access.html#using-iam-s3
まずData Streamsからデータを受信するために使用するIAMロールです。
Resources:
KinesisStreamSourceRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisStreamSourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
Resource:
- !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)
Data Streamsにアクセスしてデータ(レコード)を取得するために必要な権限を与えます。
次にS3バケットにデータを送信するために使用するIAMロールです。
Resources:
KinesisS3DestinationRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisS3DestinationPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Ref BucketArn
- !Sub "${BucketArn}/*"
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !GetAtt LogGroup.Arn
Code language: YAML (yaml)
S3バケットにオブジェクトをアップロードするために必要な権限を与えます。
S3
Resources:
Bucket:
Type: AWS::S3::Bucket
Properties:
AccessControl: Private
BucketName: !Ref Prefix
Code language: YAML (yaml)
S3バケットを作成します。
特別な設定は不要です。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。
- Lambda関数:saa-04-003-LambdaStack-T6C4JQMN9F-DataSourceLambda-zPRZbuXE1gEC
- Kinesis Data Streams:saa-04-003-DataStream
- Kinesis Data Firehose:saa-04-003-FirehoseDeliveryStream
- S3バケット:saa-04-003
AWS Management Consoleから各リソースを確認します。
Lambda関数を確認します。
正常に関数が作成されています。
Data Streamsを確認します。
Data Streamsも正常に作成されています。
プロビジョニングされたシャード数が「1」であることがわかります。
Data Firehoseを確認します。
Data Firehoseも正常に作成されています。
S3バケットへの送信(バッファ)に関する設定(サイズ、間隔)は、デフォルトの値です。
S3バケットを確認します。
バケットが正常に作成されています。
現時点ではバケットは空です。
動作確認
準備が整いましたので、Lambda関数を実行して、テストデータを生成します。
確かにテストデータが10個生成されました。
生成したデータはKinesis Data Streamsに配信されるはずです。
改めてData Streamsを確認します。
モニタリングの値を確認すると、GetRecordsの値が「10」です。
つまりLambda関数が生成したデータを、確かにData Streamsが受信したということです。
続いてKinesis Data Firehoseを確認します。
モニタリングの値を確認すると、Records read from Kinesis Data Streams (Sum)の値が「10」です。
つまり確かにData Streamsからデータを受信したということです。
最後にS3バケットを確認します。
確かにバケットにファイルが配置されています。
中身を確認します。
{"EVENT_TIME": "2023-05-04T08:30:21.351077", "TICKER": "AMZN", "PRICE": 73.84}{"EVENT_TIME": "2023-05-04T08:30:21.596428", "TICKER": "AAPL", "PRICE": 20.01}{"EVENT_TIME": "2023-05-04T08:30:21.630810", "TICKER": "MSFT", "PRICE": 17.05}{"EVENT_TIME": "2023-05-04T08:30:21.670717", "TICKER": "INTC", "PRICE": 58.96}{"EVENT_TIME": "2023-05-04T08:30:21.690740", "TICKER": "MSFT", "PRICE": 25.8}{"EVENT_TIME": "2023-05-04T08:30:21.730801", "TICKER": "AAPL", "PRICE": 85.31}{"EVENT_TIME": "2023-05-04T08:30:21.770721", "TICKER": "MSFT", "PRICE": 88.21}{"EVENT_TIME": "2023-05-04T08:30:21.790719", "TICKER": "INTC", "PRICE": 37.07}{"EVENT_TIME": "2023-05-04T08:30:21.830794", "TICKER": "INTC", "PRICE": 26.3}{"EVENT_TIME": "2023-05-04T08:30:21.850728", "TICKER": "TBV", "PRICE": 96.26}
Code language: JSON / JSON with Comments (json)
先ほどLambda関数が生成したデータです。
このようにKinesis Data Streamsで受信したストリームデータを、Firehose経由でS3バケットに保存することができます。
まとめ
Kinesis Data Streamsに配信されるストリームデータを、Kinesis Data Firehoseを用いて、S3バケットに保存する方法を確認しました。