Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する

Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する

以下のページで、Lambda関数で生成したデータをKinesis Data Streamsで受け取り、Kinessis Data Analyticsで分析する方法をご紹介しました。

あわせて読みたい
Kinesis Data Analytics入門 Lambdaで生成したストリーミングデータをリアルタイム分析 【Kinesis Data Analyticsでストリーミングデータをリアルタイム分析する】 Kinesis Data Analyticsの入門ということで、AWS公式で紹介されている構成を、実際に構築し...

今回はKinesis Data Firehoseを用いて、S3バケットにデータを保存することを目指します。

構築する環境

Diagram of storing data in S3 bucket received by Kinesis Data Streams via Firehose.

S3バケットに保存するストリームデータは、Lambda関数で生成します。
関数のランタイム環境はPython3.8とします。

関数が生成したデータをKinesis Data Streamsでキャプチャします。

Finesis Data Firehoseの配信ストリームを作成します。
データの送信元にData Streamsを指定します。
データの送信先にS3バケットを指定します。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートファイルを設置しています。

https://github.com/awstut-an-r/awstut-saa/tree/main/04/003

テンプレートファイルのポイント解説

Lambda関数

Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random

          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10

          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}

          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")

          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)

Lambda関数で実行するコードをインライン表記で定義します。
詳細につきましては、以下のページをご確認ください。

あわせて読みたい
CloudFormationでLambdaを作成する3パータン(S3/インライン/コンテナ) 【CloudFormationでLambdaを作成する】 CloudFormationでLambdaを作成する場合、大別すると以下の3パターンあります。 S3バケットにコードをアップロードする インライ...

実行するコードの内容は、以下のAWS公式ページの内容を参考にしました。

https://docs.aws.amazon.com/ja_jp/streams/latest/dev/get-started-exercise.html

変更点としては、生成するデータに上限を設定したことです。
今回はデータを10個生成します。

以下が本関数用のIAMロールです。

Resources:
  DataSourceLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: DataSourceLambdaPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                Resource:
                  - !Ref KinesisDataStreamArn
Code language: YAML (yaml)

LambdaからKinesis Data Streamsにアクセスするための権限を与えます。
具体的には、kinesis:PutRecordアクションを許可します。

Kinesis Data Streams

Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      ShardCount: !Ref ShardCount
Code language: YAML (yaml)

Data Streamsを作成します。

特別な設定は不要です。
シャード数を「1」に設定します。

Kinesis Data Firehose

Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Ref KinesisFirehoseDeliveryStreamName
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
      S3DestinationConfiguration:
        BucketARN: !Ref BucketArn
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN: !GetAtt KinesisS3DestinationRole.Arn
Code language: YAML (yaml)

Data Firehoseを作成します。

今回はData Streamsをデータ送信元としますので、2つのプロパティを設定します。

1つ目はDeliveryStreamTypeプロパティです。
本プロパティには「KinesisStreamAsSource」を指定します。

2点目はKinesisStreamSourceConfigurationプロパティです。
先述のData Streamsを指定します。

そして今回はS3バケットをデータ送信先としますので、S3DestinationConfigurationプロパティを設定します。
BucketARNプロパティに送信先のバケットを指定します。

以下がData Firehoseに関する2つのIAMロールです。

AWS公式ページに取り上げられているアクセス権の設定例を参考にします。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/controlling-access.html#using-iam-s3

まずData Streamsからデータを受信するために使用するIAMロールです。

Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)

Data Streamsにアクセスしてデータ(レコード)を取得するために必要な権限を与えます。

次にS3バケットにデータを送信するために使用するIAMロールです。

Resources:
  KinesisS3DestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisS3DestinationPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !Ref BucketArn
                  - !Sub "${BucketArn}/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !GetAtt LogGroup.Arn
Code language: YAML (yaml)

S3バケットにオブジェクトをアップロードするために必要な権限を与えます。

S3

Resources:
  Bucket:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
      BucketName: !Ref Prefix
Code language: YAML (yaml)

S3バケットを作成します。

特別な設定は不要です。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • Lambda関数:saa-04-003-LambdaStack-T6C4JQMN9F-DataSourceLambda-zPRZbuXE1gEC
  • Kinesis Data Streams:saa-04-003-DataStream
  • Kinesis Data Firehose:saa-04-003-FirehoseDeliveryStream
  • S3バケット:saa-04-003

AWS Management Consoleから各リソースを確認します。

Lambda関数を確認します。

Detail of Lambda 1.

正常に関数が作成されています。

Data Streamsを確認します。

Detail of Kinesis 1.

Data Streamsも正常に作成されています。
プロビジョニングされたシャード数が「1」であることがわかります。

Data Firehoseを確認します。

Detail of Kinesis 2.
Detail of Kinesis 3.

Data Firehoseも正常に作成されています。
S3バケットへの送信(バッファ)に関する設定(サイズ、間隔)は、デフォルトの値です。

S3バケットを確認します。

Detail of S3 1.

バケットが正常に作成されています。
現時点ではバケットは空です。

動作確認

準備が整いましたので、Lambda関数を実行して、テストデータを生成します。

Detail of Lambda 2.

確かにテストデータが10個生成されました。

生成したデータはKinesis Data Streamsに配信されるはずです。

改めてData Streamsを確認します。

Detail of Kinesis 4.

モニタリングの値を確認すると、GetRecordsの値が「10」です。
つまりLambda関数が生成したデータを、確かにData Streamsが受信したということです。

続いてKinesis Data Firehoseを確認します。

Detail of Kinesis 5.

モニタリングの値を確認すると、Records read from Kinesis Data Streams (Sum)の値が「10」です。
つまり確かにData Streamsからデータを受信したということです。

最後にS3バケットを確認します。

Detail of S3 2.

確かにバケットにファイルが配置されています。

中身を確認します。

{"EVENT_TIME": "2023-05-04T08:30:21.351077", "TICKER": "AMZN", "PRICE": 73.84}{"EVENT_TIME": "2023-05-04T08:30:21.596428", "TICKER": "AAPL", "PRICE": 20.01}{"EVENT_TIME": "2023-05-04T08:30:21.630810", "TICKER": "MSFT", "PRICE": 17.05}{"EVENT_TIME": "2023-05-04T08:30:21.670717", "TICKER": "INTC", "PRICE": 58.96}{"EVENT_TIME": "2023-05-04T08:30:21.690740", "TICKER": "MSFT", "PRICE": 25.8}{"EVENT_TIME": "2023-05-04T08:30:21.730801", "TICKER": "AAPL", "PRICE": 85.31}{"EVENT_TIME": "2023-05-04T08:30:21.770721", "TICKER": "MSFT", "PRICE": 88.21}{"EVENT_TIME": "2023-05-04T08:30:21.790719", "TICKER": "INTC", "PRICE": 37.07}{"EVENT_TIME": "2023-05-04T08:30:21.830794", "TICKER": "INTC", "PRICE": 26.3}{"EVENT_TIME": "2023-05-04T08:30:21.850728", "TICKER": "TBV", "PRICE": 96.26}Code language: JSON / JSON with Comments (json)

先ほどLambda関数が生成したデータです。

このようにKinesis Data Streamsで受信したストリームデータを、Firehose経由でS3バケットに保存することができます。

まとめ

Kinesis Data Streamsに配信されるストリームデータを、Kinesis Data Firehoseを用いて、S3バケットに保存する方法を確認しました。