SAA

Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する

Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する

以下のページで、Lambda関数で生成したデータをKinesis Data Streamsで受け取り、Kinessis Data Analyticsで分析する方法をご紹介しました。

今回はKinesis Data Firehoseを用いて、S3バケットにデータを保存することを目指します。

構築する環境

Diagram of storing data in S3 bucket received by Kinesis Data Streams via Firehose.

S3バケットに保存するストリームデータは、Lambda関数で生成します。
関数のランタイム環境はPython3.8とします。

関数が生成したデータをKinesis Data Streamsでキャプチャします。

Finesis Data Firehoseの配信ストリームを作成します。
データの送信元にData Streamsを指定します。
データの送信先にS3バケットを指定します。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートファイルを設置しています。

awstut-saa/04/003 at main · awstut-an-r/awstut-saa
Contribute to awstut-an-r/awstut-saa development by creating an account on GitHub.

テンプレートファイルのポイント解説

Lambda関数

Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random
          
          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10
          
          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}
          
          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")
        
          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)

Lambda関数で実行するコードをインライン表記で定義します。
詳細につきましては、以下のページをご確認ください。

実行するコードの内容は、以下のAWS公式ページの内容を参考にしました。

ステップ 3: Flink アプリケーション向けの Managed Service for Apache Flink を作成して実行する - Amazon Kinesis Data Streams
この演習では、データストリームをソースおよびシンクとして使用して、Flink アプリケーション向けの Managed Service for Apache Flink を作成します。

変更点としては、生成するデータに上限を設定したことです。
今回はデータを10個生成します。

以下が本関数用のIAMロールです。

Resources:
  DataSourceLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: DataSourceLambdaPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                Resource:
                  - !Ref KinesisDataStreamArn
Code language: YAML (yaml)

LambdaからKinesis Data Streamsにアクセスするための権限を与えます。
具体的には、kinesis:PutRecordアクションを許可します。

Kinesis Data Streams

Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      ShardCount: !Ref ShardCount
Code language: YAML (yaml)

Data Streamsを作成します。

特別な設定は不要です。
シャード数を「1」に設定します。

Kinesis Data Firehose

Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Ref KinesisFirehoseDeliveryStreamName
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
      S3DestinationConfiguration: 
        BucketARN: !Ref BucketArn
        CloudWatchLoggingOptions: 
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN: !GetAtt KinesisS3DestinationRole.Arn
Code language: YAML (yaml)

Data Firehoseを作成します。

今回はData Streamsをデータ送信元としますので、2つのプロパティを設定します。

1つ目はDeliveryStreamTypeプロパティです。
本プロパティには「KinesisStreamAsSource」を指定します。

2点目はKinesisStreamSourceConfigurationプロパティです。
先述のData Streamsを指定します。

そして今回はS3バケットをデータ送信先としますので、S3DestinationConfigurationプロパティを設定します。
BucketARNプロパティに送信先のバケットを指定します。

以下がData Firehoseに関する2つのIAMロールです。

AWS公式ページに取り上げられているアクセス権の設定例を参考にします。

Amazon Data Firehose によるアクセスの制御 - Amazon Data Firehose
IAM を使用して、Amazon Data Firehose リソースへのアクセスを制御します。

まずData Streamsからデータを受信するために使用するIAMロールです。

Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)

Data Streamsにアクセスしてデータ(レコード)を取得するために必要な権限を与えます。

次にS3バケットにデータを送信するために使用するIAMロールです。

Resources:
  KinesisS3DestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisS3DestinationPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !Ref BucketArn
                  - !Sub "${BucketArn}/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !GetAtt LogGroup.Arn
Code language: YAML (yaml)

S3バケットにオブジェクトをアップロードするために必要な権限を与えます。

S3

Resources:
  Bucket:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
      BucketName: !Ref Prefix
Code language: YAML (yaml)

S3バケットを作成します。

特別な設定は不要です。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • Lambda関数:saa-04-003-LambdaStack-T6C4JQMN9F-DataSourceLambda-zPRZbuXE1gEC
  • Kinesis Data Streams:saa-04-003-DataStream
  • Kinesis Data Firehose:saa-04-003-FirehoseDeliveryStream
  • S3バケット:saa-04-003

AWS Management Consoleから各リソースを確認します。

Lambda関数を確認します。

Detail of Lambda 1.

正常に関数が作成されています。

Data Streamsを確認します。

Detail of Kinesis 1.

Data Streamsも正常に作成されています。
プロビジョニングされたシャード数が「1」であることがわかります。

Data Firehoseを確認します。

Detail of Kinesis 2.
Detail of Kinesis 3.

Data Firehoseも正常に作成されています。
S3バケットへの送信(バッファ)に関する設定(サイズ、間隔)は、デフォルトの値です。

S3バケットを確認します。

Detail of S3 1.

バケットが正常に作成されています。
現時点ではバケットは空です。

動作確認

準備が整いましたので、Lambda関数を実行して、テストデータを生成します。

Detail of Lambda 2.

確かにテストデータが10個生成されました。

生成したデータはKinesis Data Streamsに配信されるはずです。

改めてData Streamsを確認します。

Detail of Kinesis 4.

モニタリングの値を確認すると、GetRecordsの値が「10」です。
つまりLambda関数が生成したデータを、確かにData Streamsが受信したということです。

続いてKinesis Data Firehoseを確認します。

Detail of Kinesis 5.

モニタリングの値を確認すると、Records read from Kinesis Data Streams (Sum)の値が「10」です。
つまり確かにData Streamsからデータを受信したということです。

最後にS3バケットを確認します。

Detail of S3 2.

確かにバケットにファイルが配置されています。

中身を確認します。

{"EVENT_TIME": "2023-05-04T08:30:21.351077", "TICKER": "AMZN", "PRICE": 73.84}{"EVENT_TIME": "2023-05-04T08:30:21.596428", "TICKER": "AAPL", "PRICE": 20.01}{"EVENT_TIME": "2023-05-04T08:30:21.630810", "TICKER": "MSFT", "PRICE": 17.05}{"EVENT_TIME": "2023-05-04T08:30:21.670717", "TICKER": "INTC", "PRICE": 58.96}{"EVENT_TIME": "2023-05-04T08:30:21.690740", "TICKER": "MSFT", "PRICE": 25.8}{"EVENT_TIME": "2023-05-04T08:30:21.730801", "TICKER": "AAPL", "PRICE": 85.31}{"EVENT_TIME": "2023-05-04T08:30:21.770721", "TICKER": "MSFT", "PRICE": 88.21}{"EVENT_TIME": "2023-05-04T08:30:21.790719", "TICKER": "INTC", "PRICE": 37.07}{"EVENT_TIME": "2023-05-04T08:30:21.830794", "TICKER": "INTC", "PRICE": 26.3}{"EVENT_TIME": "2023-05-04T08:30:21.850728", "TICKER": "TBV", "PRICE": 96.26}Code language: JSON / JSON with Comments (json)

先ほどLambda関数が生成したデータです。

このようにKinesis Data Streamsで受信したストリームデータを、Firehose経由でS3バケットに保存することができます。

まとめ

Kinesis Data Streamsに配信されるストリームデータを、Kinesis Data Firehoseを用いて、S3バケットに保存する方法を確認しました。

タイトルとURLをコピーしました