Store data in S3 bucket received by Kinesis Data Streams via Firehose

TOC

Store data received by Kinesis Data Streams in S3 buckets via Firehose

In the following pages, we have shown how data generated by Lambda functions can be received by Kinesis Data Streams and analyzed by Kinesis Data Analytics.

あわせて読みたい
Introduction to Kinesis Data Analytics – Real-time analysis of streaming data 【Real-time analysis of streaming data with Kinesis Data Analytics】 In this introduction to Kinesis Data Analytics, we will actually build and verify the co...

In this case, we will use Kinesis Data Firehose to store data in S3 buckets.

Environment

Diagram of storing data in S3 bucket received by Kinesis Data Streams via Firehose.

The stream data to be stored in the S3 bucket will be generated by a Lambda function.
The runtime environment for the function is Python 3.8.

Capture the data generated by the function with Kinesis Data Streams.

Create a Finesis Data Firehose delivery stream.
Specify Data Streams as the source of the data.
Specify S3 bucket as the destination for the data.

CloudFormation template files

The above configuration is built with CloudFormation.
The CloudFormation template file is located at the following URL

https://github.com/awstut-an-r/awstut-saa/tree/main/04/003

Explanation of key points of template files

Lambda関数

Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random

          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10

          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}

          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")

          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)

Define the code to be executed by the Lambda function in inline notation.
For more information, please see the following page.

あわせて読みたい
3 parterns to create Lambda with CloudFormation (S3/Inline/Container) 【Creating Lambda with CloudFormation】 When creating a Lambda with CloudFormation, there are three main patterns as follows. Uploading the code to an S3 buc...

The content of the code to be executed is based on the following official AWS page.

あわせて読みたい
Create and run a Managed Service for Apache Flink application - Amazon Kinesis Data Streams In this exercise, you create a Managed Service for Apache Flink application with data streams as a source and a sink.

One change is to set an upper limit on the data to be generated.
This time we generate 10 pieces of data.

The following is the IAM role for this function.

Resources:
  DataSourceLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: DataSourceLambdaPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                Resource:
                  - !Ref KinesisDataStreamArn
Code language: YAML (yaml)

Grant permission to access Kinesis Data Streams from Lambda.
Specifically, allow the kinesis:PutRecord action.

Kinesis Data Streams

Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      ShardCount: !Ref ShardCount
Code language: YAML (yaml)

Create Data Streams.

No special settings are required.
Set the number of shards to “1”.

Kinesis Data Firehose

Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Ref KinesisFirehoseDeliveryStreamName
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
      S3DestinationConfiguration:
        BucketARN: !Ref BucketArn
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN: !GetAtt KinesisS3DestinationRole.Arn
Code language: YAML (yaml)

Create a Data Firehose.

In this case, Data Streams will be used as the data source, so two properties will be set.

The first is the DeliveryStreamType property.
Specify “KinesisStreamAsSource” for this property.

The second point is the KinesisStreamSourceConfiguration property.
Specify the Data Streams mentioned above.

Then set the S3DestinationConfiguration property, this time using the S3 bucket as the data destination.
Specify the destination bucket in the BucketARN property.

The following are the two IAM roles related to Data Firehose.

Refer to the example of setting access rights covered on the official AWS page.

あわせて読みたい
Controlling access with Amazon Data Firehose - Amazon Data Firehose Control access to and from your Amazon Data Firehose resources by using IAM.

First is the IAM role used to receive data from Data Streams.

Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)

Grant the necessary permissions to access Data Streams and retrieve data (records).

Next is the IAM role used to send data to the S3 bucket.

Resources:
  KinesisS3DestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisS3DestinationPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !Ref BucketArn
                  - !Sub "${BucketArn}/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !GetAtt LogGroup.Arn
Code language: YAML (yaml)

Grant the necessary permissions to upload objects to the S3 bucket.

S3

Resources:
  Bucket:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
      BucketName: !Ref Prefix
Code language: YAML (yaml)

Create an S3 bucket.

No special settings are required.

Architecting

Use CloudFormation to build this environment and check its actual behavior.

Create CloudFormation stacks and check the resources in the stacks

Create CloudFormation stacks.
For information on how to create stacks and check each stack, please see the following page.

あわせて読みたい
CloudFormation’s nested stack 【How to build an environment with a nested CloudFormation stack】 Examine nested stacks in CloudFormation. CloudFormation allows you to nest stacks. Nested ...

After reviewing the resources in each stack, information on the main resources created in this case is as follows

  • Lambda function: saa-04-003-LambdaStack-T6C4JQMN9F-DataSourceLambda-zPRZbuXE1gEC
  • Kinesis Data Streams:saa-04-003-DataStream
  • Kinesis Data Firehose: saa-04-003-FirehoseDeliveryStream
  • S3 bucket: saa-04-003

Check each resource from the AWS Management Console.

Check the Lambda function.

Detail of Lambda 1.

The function is successfully created.

Check Data Streams.

Detail of Kinesis 1.

Data Streams are also successfully created.
You can see that the number of shards provisioned is “1”.

Check Data Firehose.

Detail of Kinesis 2.
Detail of Kinesis 3.

Data Firehose is also created successfully.
The settings (size, interval) for sending (buffer) to S3 bucket are default values.

Check the S3 bucket.

Detail of S3 1.

The bucket has been successfully created.
The bucket is empty at this time.

Action Check

Now that we are ready, we run the Lambda function to generate test data.

Detail of Lambda 2.

Indeed, 10 pieces of test data were generated.

The generated data should be delivered to Kinesis Data Streams.

Check Data Streams again.

Detail of Kinesis 4.

Checking the monitoring values, the GetRecords value is “10”.
This means that Data Streams has indeed received the data generated by the Lambda function.

Then check Kinesis Data Firehose.

Detail of Kinesis 5.

Checking the monitoring values, the Records read from Kinesis Data Streams (Sum) has a value of “10”.
This means that we have indeed received data from Data Streams.

Finally, check the S3 bucket.

Detail of S3 2.

Indeed, the file is placed in the bucket.

Check the contents.

{"EVENT_TIME": "2023-05-04T08:30:21.351077", "TICKER": "AMZN", "PRICE": 73.84}{"EVENT_TIME": "2023-05-04T08:30:21.596428", "TICKER": "AAPL", "PRICE": 20.01}{"EVENT_TIME": "2023-05-04T08:30:21.630810", "TICKER": "MSFT", "PRICE": 17.05}{"EVENT_TIME": "2023-05-04T08:30:21.670717", "TICKER": "INTC", "PRICE": 58.96}{"EVENT_TIME": "2023-05-04T08:30:21.690740", "TICKER": "MSFT", "PRICE": 25.8}{"EVENT_TIME": "2023-05-04T08:30:21.730801", "TICKER": "AAPL", "PRICE": 85.31}{"EVENT_TIME": "2023-05-04T08:30:21.770721", "TICKER": "MSFT", "PRICE": 88.21}{"EVENT_TIME": "2023-05-04T08:30:21.790719", "TICKER": "INTC", "PRICE": 37.07}{"EVENT_TIME": "2023-05-04T08:30:21.830794", "TICKER": "INTC", "PRICE": 26.3}{"EVENT_TIME": "2023-05-04T08:30:21.850728", "TICKER": "TBV", "PRICE": 96.26}Code language: JSON / JSON with Comments (json)

This is the data that was just generated by the Lambda function.

Thus, stream data received by Kinesis Data Streams can be stored in S3 buckets via Firehose.

Summary

We have identified how stream data delivered to Kinesis Data Streams can be stored in S3 buckets using Kinesis Data Firehose.

TOC