Data received by Kinesis Data Streams and stored in OpenSearch Serverless via Firehose

TOC

Data received by Kinesis Data Streams and stored in OpenSearch Serverless via Firehose

The following pages cover how to store data received by Kinesis Data Streams in S3 buckets via Firehose.

あわせて読みたい
Store data in S3 bucket received by Kinesis Data Streams via Firehose 【Store data received by Kinesis Data Streams in S3 buckets via Firehose】 In the following pages, we have shown how data generated by Lambda functions can b...

In this article, we will show you how to distribute to OpenSearch Serverless.

Environment

Diagram of data received by Kinesis Data Streams and stored in OpenSearch Serverless via Firehose

Overall, the page is similar to the one presented at the beginning of this article.
This time we specify OpenSearch Serverless as the Firehose destination.

The data to be delivered is generated by a Lambda function.
The runtime environment for the function is Python 3.8.

CloudFormation template files

The above configuration is built with CloudFormation.
The CloudFormation template file is located at the following URL

GitHub
awstut-dva/03/010 at main · awstut-an-r/awstut-dva Contribute to awstut-an-r/awstut-dva development by creating an account on GitHub.

Explanation of key points of template files

Kinesis Data Stream

Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      RetentionPeriodHours: 24
      ShardCount: !Ref ShardCount
Code language: YAML (yaml)

Kinesis Data Stream.
No special configuration is required.

Kinesis Firehose

Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      AmazonOpenSearchServerlessDestinationConfiguration: 
        CloudWatchLoggingOptions: 
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CollectionEndpoint: !Ref CollectionEndpoint
        IndexName: !Ref OpenSearchIndexName
        RoleARN: !Ref KinesisOpenSearchServerlessDestinationRoleArn
        S3BackupMode: FailedDocumentsOnly
        S3Configuration: 
          BucketARN: !Sub "arn:aws:s3:::${BucketName}"
          CompressionFormat: GZIP
          Prefix: firehose/
          RoleARN: !GetAtt KinesisS3DestinationRole.Arn
      DeliveryStreamName: !Sub "${Prefix}-DeliveryStream"
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
Code language: YAML (yaml)

The key point is the AmazonOpenSearchServerlessDestinationConfiguration property.
Specify the OpenSearch Serverless endpoint URL in the CollectionEndpoint property.
IndexName property specifies the name of the index created for the OpenSearch Serverless collection.

There are several IAM roles associated with Kinesis Firehose.
The following two IAM roles are particularly important among them.

The first is a role to receive data from the Kinesis Data Stream.

Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)

Specify a Kinesis Data Stream as the Resource and allow the actions required to receive data.

The second is an IAM role for submitting data to OpenSearch Serverless.

Resources:
  KinesisOpenSearchServerlessDestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisOpenSearchServerlessDestinationRolePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:APIAccessAll
                Resource:
                  - !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource: "*"
      RoleName: !Sub "${Prefix}-KinesisOpenSearchServerlessDestinationRole"
Code language: YAML (yaml)

Allows all API access to the OpenSearch Serverless collection.

OpenSearch Serverless

Resources:
  Collection:
    Type: AWS::OpenSearchServerless::Collection
    DependsOn:
      - EncryptionSecurityPolicy
    Properties:
      Name: !Ref CollectionName
      StandbyReplicas: DISABLED
      Type: TIMESERIES
      
  DataAccessPolicy1:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-01"
      Policy: !Sub >-
        [{"Description":"Access for cfn user","Rules":[{"ResourceType":"index","Resource":["index/*/*"],"Permission":["aoss:*"]},
        {"ResourceType":"collection","Resource":["collection/${CollectionName}"],"Permission":["aoss:*"]}],
        "Principal":["${UserArn}"]}]
      Type: data
      
  DataAccessPolicy2:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-02"
      Policy: !Sub >-
        [{"Description":"Access for Function2","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:CreateIndex"]}],
        "Principal":["${FunctionRoleArn2}"]}]
      Type: data
      
  DataAccessPolicy3:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-03"
      Policy: !Sub >-
        [{"Description":"Access for Kinesis Firehose","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:WriteDocument","aoss:UpdateIndex"]}],
        "Principal":["${KinesisOpenSearchServerlessDestinationRoleArn}"]}]
      Type: data
      
  NetworkSecurityPolicy:
    Type: AWS::OpenSearchServerless::SecurityPolicy
    Properties:
      Name: !Sub "${Prefix}-network-policy"
      Policy: !Sub >-
        [{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]},
        {"ResourceType":"dashboard","Resource":["collection/${CollectionName}"]}],"AllowFromPublic":true}]
      Type: network
      
  EncryptionSecurityPolicy:
    Type: AWS::OpenSearchServerless::SecurityPolicy
    Properties:
      Name: !Sub "${Prefix}-encryption-policy"
      Policy: !Sub >-
        {"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]}],"AWSOwnedKey":true}
      Type: encryption
Code language: YAML (yaml)

Create various policies in addition to the OpenSearch Serverless collection.

For basic information about OpenSearch Serverless, please refer to the following page.

あわせて読みたい
Create OpenSearch Serverless using CloudFormation 【Create OpenSearch Serverless using CloudFormation】 This page covers OpenSearch Serverless. Amazon OpenSearch Serverless is an on-demand serverless configu...

The key point is the data access policy.

The third policy is important for sending data from Kinesis Firehose to OpenSearch Serverless.
Specify the IAM role for OpenSearch Serverless mentioned above to allow two actions (aoss:WriteDocument, aoss:. UpdateIndex) to the index of the target collection.

The remaining two data access policies should also be mentioned.

The first is for the following IAM users

Resources:
  User:
    Type: AWS::IAM::User
    Properties:
      LoginProfile: 
        Password: !Ref Password
      Policies: 
        - PolicyName: OpenSearchServerlessUserPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:*
                Resource: "*"
      UserName: !Sub "${Prefix}-user"
Code language: YAML (yaml)

This user is for working with the OpenSearch Serverless dashboard.
Inline policy allows all actions related to OpenSearch Serverless.

The second is for Lambda functions.
Action (aoss:CreateIndex) is allowed to fulfill the function of the Lambda function, which is explained in the next section.

(Reference) Lambda function for data generation

Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random
          
          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10
          
          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}
          
          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")
        
          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)

Lambda function to generate data to be stored in OpenSearch Serverless.
Generates 10 test data.

The code to be executed by this function is based on the following sample code from the official AWS website.

あわせて読みたい
Create and run a Managed Service for Apache Flink application - Amazon Kinesis Data Streams In this exercise, you create a Managed Service for Apache Flink application with data streams as a source and a sink.

For more information, please see the page mentioned at the beginning of this article.

(Reference) Automatically create indexes in OpenSearch Serverless using CloudFormation custom resources

The index specified on the Kinesis Firehose side must be created in the OpenSearch Serverless collection.
In this case, we will use a CloudFormation custom resource to automatically create the index.
Specifically, associate the following Lambda function with the custom resource.

Resources:
  Function2:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Environment:
        Variables:
          COLLECTION_ENDPOINT: !Sub "${Collection}.${AWS::Region}.aoss.amazonaws.com"
          OPENSEARCH_INDEX_NAME: !Ref OpenSearchIndexName
          REGION: !Ref AWS::Region
      Code:
        ZipFile: |
          from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
          import boto3
          import cfnresponse
          import os
          
          host = os.environ['COLLECTION_ENDPOINT']
          opensearch_index_name = os.environ['OPENSEARCH_INDEX_NAME']
          region = os.environ['REGION']
          
          service = 'aoss'
          credentials = boto3.Session().get_credentials()
          auth = AWSV4SignerAuth(credentials, region, service)
          
          CREATE = 'Create'
          response_data = {}
          
          client = OpenSearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            pool_maxsize=20,
            )
          
          def lambda_handler(event, context):
            try:
              if event['RequestType'] == CREATE:
                create_response = client.indices.create(
                  opensearch_index_name
                )
                print(create_response)
                
              cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
                
            except Exception as e:
              print(e)
              cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
      FunctionName: !Sub "${Prefix}-function-02"
      Handler: !Ref Handler
      Layers:
        - !Ref LambdaLayer
      Runtime: !Ref Runtime
      Role: !Ref FunctionRoleArn2
      Timeout: 60
Code language: YAML (yaml)

In this case, we will create an index named “test-stock-index”.

The following is the IAM role for this function.

Resources:
  FunctionRole2:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: FunctionRole2Policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:APIAccessAll
                Resource:
                  - !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
      RoleName: !Sub "${Prefix}-FunctionRole2"
Code language: YAML (yaml)

This is the content that allows all API access to the OpenSearch Serverless collection.
It is important to note that this IAM role is specified in the data access policy for OpenSearch Serverless (the second one) described above.
This means that the function with this IAM role associated will be able to perform actions (indexing) on the OpenSearch Serverless collection.

For details, please refer to the following page.

あわせて読みたい
Automate OpenSearch Serverless indexing using CFN custom resources 【Automate OpenSearch Serverless indexing using CFN custom resources】 The following pages discuss OpenSearch Serverless. https://awstut.com/en/2024/02/25/cr...

Architecting

Use CloudFormation to build this environment and check its actual behavior.

Create CloudFormation stacks and check the resources in the stacks

Create CloudFormation stacks.
For information on how to create stacks and check each stack, please see the following page.

あわせて読みたい
CloudFormation’s nested stack 【How to build an environment with a nested CloudFormation stack】 Examine nested stacks in CloudFormation. CloudFormation allows you to nest stacks. Nested ...

This time, we will create an IAM resource (e.g., IAM user) to be named, so set the options as follows

$ aws cloudformation create-stack \
--stack-name [stack-name] \
--template-url https://[bucket-name].s3.ap-northeast-1.amazonaws.com/[folder-name]/dva-03-010.yaml \
--capabilities CAPABILITY_NAMED_IAM
Code language: Bash (bash)

Check the created resource from the AWS Management Console.

Check Kinesis Data Streams.

Detail of Kinesis 01.

Created successfully.

Check Kinesis Fireshose.

Detail of Kinesis 02.

This is also successfully created.
The Kinesis Data Streams mentioned above is specified as Source and OpenSearch Serverless is specified as Destination.

Check the OpenSearch Serverless collection.

Detail of OpenSearch 01.

Indeed, a collection has been created.

In particular, check the data access policy for Kinesis Firehose.

Detail of OpenSearch 02.

This is also successfully created.
This content allows the IAM role as principal to write data (documents).
This IAM role is what Kinesis Firehose uses to send data to the OpenSearch Serverless collection.

Check the indexes created for the collection at this time.

Detail of OpenSearch 03.

The “test-stock-index” is created.
This means that when the CloudFormation stack was created, the Lambda function associated with the CloudFormation custom resource was executed to create the index.

Operation Check

We are ready.

Execute Lambda functions for data generation.

Detail of Lambda 01.

function has been successfully executed.
As per the log, 10 test data were generated.

Check Kinesis Data Streams metrics.

Detail of Kinesis 03.

We can see that the data is indeed flowing.

Next, check Kinesis Firehose.

Detail of Kinesis 04.

You can see that data is also flowing into Firehose.

Finally, access the OpenSearch Serverless dashboard to review the stored data (documents).
The dashboard is accessed using a dedicated user (dva-03-010) created with CloudFormation.

Detail of OpenSearch 03.

Indeed, 10 pieces of data are stored.
The data generated by the Lambda function could be stored in OpenSearch Serverless via Kinesis.

Summary

We covered how to store data received by Kinesis Data Streams in OpenSearch Serverless via Firehose.

TOC