Kinesis Data Streamsで受信したデータを、Firehose経由でOpenSearch Serverlessに保存する
以下のページで、Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する方法を取り上げました。
今回はOpenSearch Serverlessに配信する方法をご紹介します。
構築する環境
全体的には冒頭でご紹介したページと同様です。
今回はFirehoseの配信先にOpenSearch Serverlessを指定します。
配信するデータはLambda関数で生成します。
関数のランタイム環境はPython3.8とします。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートファイルを設置しています。
テンプレートファイルのポイント解説
Kinesis Data Stream
Resources:
KinesisDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub "${Prefix}-DataStream"
RetentionPeriodHours: 24
ShardCount: !Ref ShardCount
Code language: YAML (yaml)
Kinesis Data Streamです。
特別な設定は不要です。
Kinesis Firehose
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
AmazonOpenSearchServerlessDestinationConfiguration:
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref LogGroup
LogStreamName: !Ref LogStream
CollectionEndpoint: !Ref CollectionEndpoint
IndexName: !Ref OpenSearchIndexName
RoleARN: !Ref KinesisOpenSearchServerlessDestinationRoleArn
S3BackupMode: FailedDocumentsOnly
S3Configuration:
BucketARN: !Sub "arn:aws:s3:::${BucketName}"
CompressionFormat: GZIP
Prefix: firehose/
RoleARN: !GetAtt KinesisS3DestinationRole.Arn
DeliveryStreamName: !Sub "${Prefix}-DeliveryStream"
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt KinesisDataStream.Arn
RoleARN: !GetAtt KinesisStreamSourceRole.Arn
Code language: YAML (yaml)
ポイントはAmazonOpenSearchServerlessDestinationConfigurationプロパティです。
CollectionEndpointプロパティにOpenSearch ServerlessのエンドポイントURLを指定します。
IndexNameプロパティには、OpenSearch Serverlessコレクションに作成したインデックスの名前を指定します。
Kinesis FirehoseにはいくつかのIAMロールを関連付けます。
その中で特に以下の2つのIAMロールがポイントです。
1つ目はKinesis Data Streamからデータを受け取るためのロールです。
Resources:
KinesisStreamSourceRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisStreamSourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
Resource:
- !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)
ResourceにKinesis Data Streamを指定して、データ受信に必要なアクションを許可します。
2つ目はOpenSearch Serverlessにデータを送信するためのIAMロールです。
Resources:
KinesisOpenSearchServerlessDestinationRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisOpenSearchServerlessDestinationRolePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:APIAccessAll
Resource:
- !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
- Effect: Allow
Action:
- logs:PutLogEvents
Resource: "*"
RoleName: !Sub "${Prefix}-KinesisOpenSearchServerlessDestinationRole"
Code language: YAML (yaml)
OpenSearch Serverlessコレクションへの全APIアクセスを許可します。
OpenSearch Serverless
Resources:
Collection:
Type: AWS::OpenSearchServerless::Collection
DependsOn:
- EncryptionSecurityPolicy
Properties:
Name: !Ref CollectionName
StandbyReplicas: DISABLED
Type: TIMESERIES
DataAccessPolicy1:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-01"
Policy: !Sub >-
[{"Description":"Access for cfn user","Rules":[{"ResourceType":"index","Resource":["index/*/*"],"Permission":["aoss:*"]},
{"ResourceType":"collection","Resource":["collection/${CollectionName}"],"Permission":["aoss:*"]}],
"Principal":["${UserArn}"]}]
Type: data
DataAccessPolicy2:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-02"
Policy: !Sub >-
[{"Description":"Access for Function2","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:CreateIndex"]}],
"Principal":["${FunctionRoleArn2}"]}]
Type: data
DataAccessPolicy3:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-03"
Policy: !Sub >-
[{"Description":"Access for Kinesis Firehose","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:WriteDocument","aoss:UpdateIndex"]}],
"Principal":["${KinesisOpenSearchServerlessDestinationRoleArn}"]}]
Type: data
NetworkSecurityPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub "${Prefix}-network-policy"
Policy: !Sub >-
[{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]},
{"ResourceType":"dashboard","Resource":["collection/${CollectionName}"]}],"AllowFromPublic":true}]
Type: network
EncryptionSecurityPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub "${Prefix}-encryption-policy"
Policy: !Sub >-
{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]}],"AWSOwnedKey":true}
Type: encryption
Code language: YAML (yaml)
OpenSearch Serverlessコレクションに加えて、各種ポリシーを作成します。
OpenSearch Serverlessに関する基本的な事項につきましては、以下のページをご確認ください。
ポイントはデータアクセスポリシーです。
Kinesis FirehoseからOpenSearch Serverlessにデータを送信する上で、3つ目のポリシーが重要です。
先述のOpenSearch Serverless用のIAMロールを指定して、対象コレクションのインデックスへの2アクション(aoss:WriteDocument, aoss:UpdateIndex)を許可します。
残りの2つのデータアクセスポリシーについても触れておきます。
1つ目は以下のIAMユーザ用です。
Resources:
User:
Type: AWS::IAM::User
Properties:
LoginProfile:
Password: !Ref Password
Policies:
- PolicyName: OpenSearchServerlessUserPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:*
Resource: "*"
UserName: !Sub "${Prefix}-user"
Code language: YAML (yaml)
このユーザはOpenSearch Serverlessダッシュボードを操作するためのものです。
インラインポリシーでOpenSearch Serverlessに関する全アクションを許可します。
2つ目はLambda関数用です。
次項でご説明するLambda関数の働きを満たすアクション(aoss:CreateIndex)を許可します。
(参考)データ生成用のLambda関数
Resources:
DataSourceLambda:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import boto3
import datetime
import json
import os
import random
STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
LIMIT = 10
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'PRICE': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client, limit):
for i in range(limit):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data).encode('utf-8'),
PartitionKey="partitionkey")
def lambda_handler(event, context):
generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
Environment:
Variables:
KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
Handler: !Ref Handler
Role: !GetAtt DataSourceLambdaRole.Arn
Runtime: !Ref Runtime
Timeout: !Ref Timeout
Code language: YAML (yaml)
OpenSearch Serverlessに保存するデータを生成するLambda関数です。
テストデータを10個生成します。
この関数で実行するコードは、以下のAWS公式サイトのサンプルコードを参考にしています。
詳細は冒頭でご紹介したページをご確認ください。
(参考)CloudFormationカスタムリソースを使用して、OpenSearch Serverlessにインデックスを自動的に作成する
Kinesis Firehose側で指定したインデックスをOpenSearch Serverlessコレクションに作成する必要があります。
今回はCloudFormationカスタムリソースを使用して、自動的にインデックスを作成します。
具体的には、カスタムリソースに以下のLambda関数を関連付けます。
Resources:
Function2:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Environment:
Variables:
COLLECTION_ENDPOINT: !Sub "${Collection}.${AWS::Region}.aoss.amazonaws.com"
OPENSEARCH_INDEX_NAME: !Ref OpenSearchIndexName
REGION: !Ref AWS::Region
Code:
ZipFile: |
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import cfnresponse
import os
host = os.environ['COLLECTION_ENDPOINT']
opensearch_index_name = os.environ['OPENSEARCH_INDEX_NAME']
region = os.environ['REGION']
service = 'aoss'
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
CREATE = 'Create'
response_data = {}
client = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20,
)
def lambda_handler(event, context):
try:
if event['RequestType'] == CREATE:
create_response = client.indices.create(
opensearch_index_name
)
print(create_response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
except Exception as e:
print(e)
cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
FunctionName: !Sub "${Prefix}-function-02"
Handler: !Ref Handler
Layers:
- !Ref LambdaLayer
Runtime: !Ref Runtime
Role: !Ref FunctionRoleArn2
Timeout: 60
Code language: YAML (yaml)
今回は「test-stock-index」という名前のインデックスを作成します。
以下が本関数用のIAMロールです。
Resources:
FunctionRole2:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: FunctionRole2Policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:APIAccessAll
Resource:
- !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
RoleName: !Sub "${Prefix}-FunctionRole2"
Code language: YAML (yaml)
OpenSearch Serverlessコレクションへの全APIアクセスを許可する内容です。
このIAMロールが先述のOpenSearch Serverless用のデータアクセスポリシー(2つ目)で指定されている点が重要です。
つまりこのIAMロールが関連づけられている本関数は、OpenSearch Serverlessコレクションへのアクション(インデックス作成)が可能になリます。
詳細につきましては以下のページをご確認ください。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
今回は名前をつけるIAMリソース(IAMユーザ等)を作成しますから、以下の通りにオプションを設定します。
$ aws cloudformation create-stack \
--stack-name [stack-name] \
--template-url https://[bucket-name].s3.ap-northeast-1.amazonaws.com/[folder-name]/dva-03-010.yaml \
--capabilities CAPABILITY_NAMED_IAM
Code language: Bash (bash)
作成されたリソースをAWS Management Consoleから確認します。
Kinesis Data Streamsを確認します。
正常に作成されています。
Kinesis Fireshoseを確認します。
こちらも正常に作成されています。
Sourceに先述のKinesis Data Streams、DestinationにOpenSearch Serverlessが指定されています。
OpenSearch Serverlessコレクションを確認します。
確かにコレクションが作成されています。
特にKinesis Firehose用のデータアクセスポリシーを確認します。
こちらも正常に作成されています。
IAMロールをプリンシパルとして、データ(ドキュメント)の書き込みを許可する内容です。
このIAMロールはKinesis FirehoseがOpenSearch Serverlessコレクションにデータを送信する際に使用するものです。
現時点でコレクションに作成されているインデックスを確認します。
「test-stock-index」が作成されています。
つまりCloudFormationスタック作成時に、CloudFormationカスタムリソースに紐づくLambda関数が実行されて、インデックスが作成されたということです。
動作確認
準備が整いました。
データ生成用のLambda関数を実行します。
関数が正常に実行されました。
ログの通り、テストデータが10個生成されました。
Kinesis Data Streamsのメトリクスを確認します。
確かにデータが流れてきていることがわかります。
次にKinesis Firehoseを確認します。
Firehoseにもデータが流れてきていることがわかります。
最後にOpenSearch Serverlessダッシュボードにアクセスし、保存されているデータ(ドキュメント)を確認します。
ダッシュボードへのアクセスはCloudFormationで作成した専用ユーザ(dva-03-010)を使用します。
確かに10個のデータが保存されています。
Lambda関数によって生成されたデータをKinesisを経由して、OpenSearch Serverlessに保存できました。
まとめ
Kinesis Data Streamsで受信したデータを、Firehose経由でOpenSearch Serverlessに保存する方法を取り上げました。