Data received by Kinesis Data Streams and stored in OpenSearch Serverless via Firehose
The following pages cover how to store data received by Kinesis Data Streams in S3 buckets via Firehose.
In this article, we will show you how to distribute to OpenSearch Serverless.
Environment
Overall, the page is similar to the one presented at the beginning of this article.
This time we specify OpenSearch Serverless as the Firehose destination.
The data to be delivered is generated by a Lambda function.
The runtime environment for the function is Python 3.8.
CloudFormation template files
The above configuration is built with CloudFormation.
The CloudFormation template file is located at the following URL
Explanation of key points of template files
Kinesis Data Stream
Resources:
KinesisDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub "${Prefix}-DataStream"
RetentionPeriodHours: 24
ShardCount: !Ref ShardCount
Code language: YAML (yaml)
Kinesis Data Stream.
No special configuration is required.
Kinesis Firehose
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
AmazonOpenSearchServerlessDestinationConfiguration:
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref LogGroup
LogStreamName: !Ref LogStream
CollectionEndpoint: !Ref CollectionEndpoint
IndexName: !Ref OpenSearchIndexName
RoleARN: !Ref KinesisOpenSearchServerlessDestinationRoleArn
S3BackupMode: FailedDocumentsOnly
S3Configuration:
BucketARN: !Sub "arn:aws:s3:::${BucketName}"
CompressionFormat: GZIP
Prefix: firehose/
RoleARN: !GetAtt KinesisS3DestinationRole.Arn
DeliveryStreamName: !Sub "${Prefix}-DeliveryStream"
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt KinesisDataStream.Arn
RoleARN: !GetAtt KinesisStreamSourceRole.Arn
Code language: YAML (yaml)
The key point is the AmazonOpenSearchServerlessDestinationConfiguration property.
Specify the OpenSearch Serverless endpoint URL in the CollectionEndpoint property.
IndexName property specifies the name of the index created for the OpenSearch Serverless collection.
There are several IAM roles associated with Kinesis Firehose.
The following two IAM roles are particularly important among them.
The first is a role to receive data from the Kinesis Data Stream.
Resources:
KinesisStreamSourceRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisStreamSourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
Resource:
- !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)
Specify a Kinesis Data Stream as the Resource and allow the actions required to receive data.
The second is an IAM role for submitting data to OpenSearch Serverless.
Resources:
KinesisOpenSearchServerlessDestinationRole:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- firehose.amazonaws.com
Policies:
- PolicyName: KinesisOpenSearchServerlessDestinationRolePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:APIAccessAll
Resource:
- !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
- Effect: Allow
Action:
- logs:PutLogEvents
Resource: "*"
RoleName: !Sub "${Prefix}-KinesisOpenSearchServerlessDestinationRole"
Code language: YAML (yaml)
Allows all API access to the OpenSearch Serverless collection.
OpenSearch Serverless
Resources:
Collection:
Type: AWS::OpenSearchServerless::Collection
DependsOn:
- EncryptionSecurityPolicy
Properties:
Name: !Ref CollectionName
StandbyReplicas: DISABLED
Type: TIMESERIES
DataAccessPolicy1:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-01"
Policy: !Sub >-
[{"Description":"Access for cfn user","Rules":[{"ResourceType":"index","Resource":["index/*/*"],"Permission":["aoss:*"]},
{"ResourceType":"collection","Resource":["collection/${CollectionName}"],"Permission":["aoss:*"]}],
"Principal":["${UserArn}"]}]
Type: data
DataAccessPolicy2:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-02"
Policy: !Sub >-
[{"Description":"Access for Function2","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:CreateIndex"]}],
"Principal":["${FunctionRoleArn2}"]}]
Type: data
DataAccessPolicy3:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub "${Prefix}-data-policy-03"
Policy: !Sub >-
[{"Description":"Access for Kinesis Firehose","Rules":[{"ResourceType":"index","Resource":["index/${CollectionName}/${OpenSearchIndexName}"],"Permission":["aoss:WriteDocument","aoss:UpdateIndex"]}],
"Principal":["${KinesisOpenSearchServerlessDestinationRoleArn}"]}]
Type: data
NetworkSecurityPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub "${Prefix}-network-policy"
Policy: !Sub >-
[{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]},
{"ResourceType":"dashboard","Resource":["collection/${CollectionName}"]}],"AllowFromPublic":true}]
Type: network
EncryptionSecurityPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub "${Prefix}-encryption-policy"
Policy: !Sub >-
{"Rules":[{"ResourceType":"collection","Resource":["collection/${CollectionName}"]}],"AWSOwnedKey":true}
Type: encryption
Code language: YAML (yaml)
Create various policies in addition to the OpenSearch Serverless collection.
For basic information about OpenSearch Serverless, please refer to the following page.
The key point is the data access policy.
The third policy is important for sending data from Kinesis Firehose to OpenSearch Serverless.
Specify the IAM role for OpenSearch Serverless mentioned above to allow two actions (aoss:WriteDocument, aoss:. UpdateIndex) to the index of the target collection.
The remaining two data access policies should also be mentioned.
The first is for the following IAM users
Resources:
User:
Type: AWS::IAM::User
Properties:
LoginProfile:
Password: !Ref Password
Policies:
- PolicyName: OpenSearchServerlessUserPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:*
Resource: "*"
UserName: !Sub "${Prefix}-user"
Code language: YAML (yaml)
This user is for working with the OpenSearch Serverless dashboard.
Inline policy allows all actions related to OpenSearch Serverless.
The second is for Lambda functions.
Action (aoss:CreateIndex) is allowed to fulfill the function of the Lambda function, which is explained in the next section.
(Reference) Lambda function for data generation
Resources:
DataSourceLambda:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import boto3
import datetime
import json
import os
import random
STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
LIMIT = 10
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'PRICE': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client, limit):
for i in range(limit):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data).encode('utf-8'),
PartitionKey="partitionkey")
def lambda_handler(event, context):
generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
Environment:
Variables:
KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
Handler: !Ref Handler
Role: !GetAtt DataSourceLambdaRole.Arn
Runtime: !Ref Runtime
Timeout: !Ref Timeout
Code language: YAML (yaml)
Lambda function to generate data to be stored in OpenSearch Serverless.
Generates 10 test data.
The code to be executed by this function is based on the following sample code from the official AWS website.
For more information, please see the page mentioned at the beginning of this article.
(Reference) Automatically create indexes in OpenSearch Serverless using CloudFormation custom resources
The index specified on the Kinesis Firehose side must be created in the OpenSearch Serverless collection.
In this case, we will use a CloudFormation custom resource to automatically create the index.
Specifically, associate the following Lambda function with the custom resource.
Resources:
Function2:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Environment:
Variables:
COLLECTION_ENDPOINT: !Sub "${Collection}.${AWS::Region}.aoss.amazonaws.com"
OPENSEARCH_INDEX_NAME: !Ref OpenSearchIndexName
REGION: !Ref AWS::Region
Code:
ZipFile: |
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import cfnresponse
import os
host = os.environ['COLLECTION_ENDPOINT']
opensearch_index_name = os.environ['OPENSEARCH_INDEX_NAME']
region = os.environ['REGION']
service = 'aoss'
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
CREATE = 'Create'
response_data = {}
client = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20,
)
def lambda_handler(event, context):
try:
if event['RequestType'] == CREATE:
create_response = client.indices.create(
opensearch_index_name
)
print(create_response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
except Exception as e:
print(e)
cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
FunctionName: !Sub "${Prefix}-function-02"
Handler: !Ref Handler
Layers:
- !Ref LambdaLayer
Runtime: !Ref Runtime
Role: !Ref FunctionRoleArn2
Timeout: 60
Code language: YAML (yaml)
In this case, we will create an index named “test-stock-index”.
The following is the IAM role for this function.
Resources:
FunctionRole2:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: FunctionRole2Policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- aoss:APIAccessAll
Resource:
- !Sub "arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*"
RoleName: !Sub "${Prefix}-FunctionRole2"
Code language: YAML (yaml)
This is the content that allows all API access to the OpenSearch Serverless collection.
It is important to note that this IAM role is specified in the data access policy for OpenSearch Serverless (the second one) described above.
This means that the function with this IAM role associated will be able to perform actions (indexing) on the OpenSearch Serverless collection.
For details, please refer to the following page.
Architecting
Use CloudFormation to build this environment and check its actual behavior.
Create CloudFormation stacks and check the resources in the stacks
Create CloudFormation stacks.
For information on how to create stacks and check each stack, please see the following page.
This time, we will create an IAM resource (e.g., IAM user) to be named, so set the options as follows
$ aws cloudformation create-stack \
--stack-name [stack-name] \
--template-url https://[bucket-name].s3.ap-northeast-1.amazonaws.com/[folder-name]/dva-03-010.yaml \
--capabilities CAPABILITY_NAMED_IAM
Code language: Bash (bash)
Check the created resource from the AWS Management Console.
Check Kinesis Data Streams.
Created successfully.
Check Kinesis Fireshose.
This is also successfully created.
The Kinesis Data Streams mentioned above is specified as Source and OpenSearch Serverless is specified as Destination.
Check the OpenSearch Serverless collection.
Indeed, a collection has been created.
In particular, check the data access policy for Kinesis Firehose.
This is also successfully created.
This content allows the IAM role as principal to write data (documents).
This IAM role is what Kinesis Firehose uses to send data to the OpenSearch Serverless collection.
Check the indexes created for the collection at this time.
The “test-stock-index” is created.
This means that when the CloudFormation stack was created, the Lambda function associated with the CloudFormation custom resource was executed to create the index.
Operation Check
We are ready.
Execute Lambda functions for data generation.
function has been successfully executed.
As per the log, 10 test data were generated.
Check Kinesis Data Streams metrics.
We can see that the data is indeed flowing.
Next, check Kinesis Firehose.
You can see that data is also flowing into Firehose.
Finally, access the OpenSearch Serverless dashboard to review the stored data (documents).
The dashboard is accessed using a dedicated user (dva-03-010) created with CloudFormation.
Indeed, 10 pieces of data are stored.
The data generated by the Lambda function could be stored in OpenSearch Serverless via Kinesis.
Summary
We covered how to store data received by Kinesis Data Streams in OpenSearch Serverless via Firehose.