Kinesis Data Streamsで受信したデータを、Firehose経由でOpenSearch Serverlessに保存する
以下のページで、Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する方法を取り上げました。

今回はOpenSearch Serverlessに配信する方法をご紹介します。
構築する環境

全体的には冒頭でご紹介したページと同様です。
今回はFirehoseの配信先にOpenSearch Serverlessを指定します。
配信するデータはLambda関数で生成します。
関数のランタイム環境はPython3.8とします。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートファイルを設置しています。
テンプレートファイルのポイント解説
Kinesis Data Stream
Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      RetentionPeriodHours: 24
      ShardCount: !Ref ShardCount
Code language: YAML (yaml)Kinesis Data Streamです。
特別な設定は不要です。
Kinesis Firehose
Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      AmazonOpenSearchServerlessDestinationConfiguration: 
        CloudWatchLoggingOptions: 
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CollectionEndpoint: !Ref CollectionEndpoint
        IndexName: !Ref OpenSearchIndexName
        RoleARN: !Ref KinesisOpenSearchServerlessDestinationRoleArn
        S3BackupMode: FailedDocumentsOnly
        S3Configuration: 
          BucketARN: !Sub "arn:aws:s3:::${BucketName}"
          CompressionFormat: GZIP
          Prefix: firehose/
          RoleARN: !GetAtt KinesisS3DestinationRole.Arn
      DeliveryStreamName: !Sub "${Prefix}-DeliveryStream"
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
Code language: YAML (yaml)ポイントはAmazonOpenSearchServerlessDestinationConfigurationプロパティです。
CollectionEndpointプロパティにOpenSearch ServerlessのエンドポイントURLを指定します。
IndexNameプロパティには、OpenSearch Serverlessコレクションに作成したインデックスの名前を指定します。
Kinesis FirehoseにはいくつかのIAMロールを関連付けます。
その中で特に以下の2つのIAMロールがポイントです。
1つ目はKinesis Data Streamからデータを受け取るためのロールです。
Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)ResourceにKinesis Data Streamを指定して、データ受信に必要なアクションを許可します。
2つ目はOpenSearch Serverlessにデータを送信するためのIAMロールです。
Resources:
  KinesisOpenSearchServerlessDestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisOpenSearchServerlessDestinationRolePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:APIAccessAll
                Resource:
                  - !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource: "*"
      RoleName: !Sub "${Prefix}-KinesisOpenSearchServerlessDestinationRole"
Code language: YAML (yaml)OpenSearch Serverlessコレクションへの全APIアクセスを許可します。
OpenSearch Serverless
Resources:
  Collection:
    Type: AWS::OpenSearchServerless::Collection
    DependsOn:
      - EncryptionSecurityPolicy
    Properties:
      Name: !Ref CollectionName
      StandbyReplicas: DISABLED
      Type: TIMESERIES
      
  DataAccessPolicy1:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-01"
      Policy: !Sub >-
        [{"Description":"Access for cfn user","Rules":[{"ResourceType":"index","Resource":["index/*/*"],"Permission":["aoss:*"]},
        {"ResourceType":"collection","Resource":["collection/${CollectionName}"],"Permission":["aoss:*"]}],
        "Principal":["${UserArn}"]}]
      Type: data
      
  DataAccessPolicy2:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-02"
      Policy: !Sub >-
        [{"Description":"Access for Function2","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:CreateIndex"]}],
        "Principal":["${FunctionRoleArn2}"]}]
      Type: data
      
  DataAccessPolicy3:
    Type: AWS::OpenSearchServerless::AccessPolicy
    Properties:
      Name: !Sub "${Prefix}-data-policy-03"
      Policy: !Sub >-
        [{"Description":"Access for Kinesis Firehose","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:WriteDocument","aoss:UpdateIndex"]}],
        "Principal":["${KinesisOpenSearchServerlessDestinationRoleArn}"]}]
      Type: data
      
  NetworkSecurityPolicy:
    Type: AWS::OpenSearchServerless::SecurityPolicy
    Properties:
      Name: !Sub "${Prefix}-network-policy"
      Policy: !Sub >-
        [{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]},
        {"ResourceType":"dashboard","Resource":["collection/${CollectionName}"]}],"AllowFromPublic":true}]
      Type: network
      
  EncryptionSecurityPolicy:
    Type: AWS::OpenSearchServerless::SecurityPolicy
    Properties:
      Name: !Sub "${Prefix}-encryption-policy"
      Policy: !Sub >-
        {"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]}],"AWSOwnedKey":true}
      Type: encryption
Code language: YAML (yaml)OpenSearch Serverlessコレクションに加えて、各種ポリシーを作成します。
OpenSearch Serverlessに関する基本的な事項につきましては、以下のページをご確認ください。

ポイントはデータアクセスポリシーです。
Kinesis FirehoseからOpenSearch Serverlessにデータを送信する上で、3つ目のポリシーが重要です。
先述のOpenSearch Serverless用のIAMロールを指定して、対象コレクションのインデックスへの2アクション(aoss:WriteDocument, aoss:UpdateIndex)を許可します。
残りの2つのデータアクセスポリシーについても触れておきます。
1つ目は以下のIAMユーザ用です。
Resources:
  User:
    Type: AWS::IAM::User
    Properties:
      LoginProfile: 
        Password: !Ref Password
      Policies: 
        - PolicyName: OpenSearchServerlessUserPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:*
                Resource: "*"
      UserName: !Sub "${Prefix}-user"
Code language: YAML (yaml)このユーザはOpenSearch Serverlessダッシュボードを操作するためのものです。
インラインポリシーでOpenSearch Serverlessに関する全アクションを許可します。
2つ目はLambda関数用です。
次項でご説明するLambda関数の働きを満たすアクション(aoss:CreateIndex)を許可します。
(参考)データ生成用のLambda関数
Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random
          
          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10
          
          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}
          
          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")
        
          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)OpenSearch Serverlessに保存するデータを生成するLambda関数です。
テストデータを10個生成します。
この関数で実行するコードは、以下のAWS公式サイトのサンプルコードを参考にしています。
詳細は冒頭でご紹介したページをご確認ください。
(参考)CloudFormationカスタムリソースを使用して、OpenSearch Serverlessにインデックスを自動的に作成する
Kinesis Firehose側で指定したインデックスをOpenSearch Serverlessコレクションに作成する必要があります。
今回はCloudFormationカスタムリソースを使用して、自動的にインデックスを作成します。
具体的には、カスタムリソースに以下のLambda関数を関連付けます。
Resources:
  Function2:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Environment:
        Variables:
          COLLECTION_ENDPOINT: !Sub "${Collection}.${AWS::Region}.aoss.amazonaws.com"
          OPENSEARCH_INDEX_NAME: !Ref OpenSearchIndexName
          REGION: !Ref AWS::Region
      Code:
        ZipFile: |
          from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
          import boto3
          import cfnresponse
          import os
          
          host = os.environ['COLLECTION_ENDPOINT']
          opensearch_index_name = os.environ['OPENSEARCH_INDEX_NAME']
          region = os.environ['REGION']
          
          service = 'aoss'
          credentials = boto3.Session().get_credentials()
          auth = AWSV4SignerAuth(credentials, region, service)
          
          CREATE = 'Create'
          response_data = {}
          
          client = OpenSearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            pool_maxsize=20,
            )
          
          def lambda_handler(event, context):
            try:
              if event['RequestType'] == CREATE:
                create_response = client.indices.create(
                  opensearch_index_name
                )
                print(create_response)
                
              cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
                
            except Exception as e:
              print(e)
              cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
      FunctionName: !Sub "${Prefix}-function-02"
      Handler: !Ref Handler
      Layers:
        - !Ref LambdaLayer
      Runtime: !Ref Runtime
      Role: !Ref FunctionRoleArn2
      Timeout: 60
Code language: YAML (yaml)今回は「test-stock-index」という名前のインデックスを作成します。
以下が本関数用のIAMロールです。
Resources:
  FunctionRole2:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: FunctionRole2Policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - aoss:APIAccessAll
                Resource:
                  - !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
      RoleName: !Sub "${Prefix}-FunctionRole2"
Code language: YAML (yaml)OpenSearch Serverlessコレクションへの全APIアクセスを許可する内容です。
このIAMロールが先述のOpenSearch Serverless用のデータアクセスポリシー(2つ目)で指定されている点が重要です。
つまりこのIAMロールが関連づけられている本関数は、OpenSearch Serverlessコレクションへのアクション(インデックス作成)が可能になリます。
詳細につきましては以下のページをご確認ください。

環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

今回は名前をつけるIAMリソース(IAMユーザ等)を作成しますから、以下の通りにオプションを設定します。
$ aws cloudformation create-stack \
--stack-name [stack-name] \
--template-url https://[bucket-name].s3.ap-northeast-1.amazonaws.com/[folder-name]/dva-03-010.yaml \
--capabilities CAPABILITY_NAMED_IAM
Code language: Bash (bash)作成されたリソースをAWS Management Consoleから確認します。
Kinesis Data Streamsを確認します。

正常に作成されています。
Kinesis Fireshoseを確認します。

こちらも正常に作成されています。
Sourceに先述のKinesis Data Streams、DestinationにOpenSearch Serverlessが指定されています。
OpenSearch Serverlessコレクションを確認します。

確かにコレクションが作成されています。
特にKinesis Firehose用のデータアクセスポリシーを確認します。

こちらも正常に作成されています。
IAMロールをプリンシパルとして、データ(ドキュメント)の書き込みを許可する内容です。
このIAMロールはKinesis FirehoseがOpenSearch Serverlessコレクションにデータを送信する際に使用するものです。
現時点でコレクションに作成されているインデックスを確認します。

「test-stock-index」が作成されています。
つまりCloudFormationスタック作成時に、CloudFormationカスタムリソースに紐づくLambda関数が実行されて、インデックスが作成されたということです。
動作確認
準備が整いました。
データ生成用のLambda関数を実行します。

関数が正常に実行されました。
ログの通り、テストデータが10個生成されました。
Kinesis Data Streamsのメトリクスを確認します。

確かにデータが流れてきていることがわかります。
次にKinesis Firehoseを確認します。

Firehoseにもデータが流れてきていることがわかります。
最後にOpenSearch Serverlessダッシュボードにアクセスし、保存されているデータ(ドキュメント)を確認します。
ダッシュボードへのアクセスはCloudFormationで作成した専用ユーザ(dva-03-010)を使用します。

確かに10個のデータが保存されています。
Lambda関数によって生成されたデータをKinesisを経由して、OpenSearch Serverlessに保存できました。
まとめ
Kinesis Data Streamsで受信したデータを、Firehose経由でOpenSearch Serverlessに保存する方法を取り上げました。