KMSを使用して、Kinesis Data StreamsとFirehoseをサーバ側暗号化する

KMSを使用して、Kinesis Data StreamsとFirehoseをサーバ側暗号化する

以下のページでKinessis Data Streamsで受信したデータを、Kinesis Data Firehoseを経由してS3バケットに保存する方法をご紹介しました。

あわせて読みたい
Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する 【Kinesis Data Streamsで受信したデータを、Firehose経由でS3バケットに保存する】 以下のページで、Lambda関数で生成したデータをKinesis Data Streamsで受け取り、Ki...

今回は上記ページと同様の構成ですが、Kinesis Data StreamsおよびKinesis Data Firehoseで扱うデータを暗号化します。
具体的には、Kinesis Data Streamsに送信されたデータと、Kinesis Data FirehoseによってS3バケットに配信されたデータを暗号化します。
どちらもKMSで作成したCMKを使用して暗号化します。

構築する環境

Diagram of Server-side encryption of Kinesis Data Streams and Firehose using KMS.

基本的には、冒頭でご紹介したページと同様の構成です。

変更点は以下の2点です。
1点目はKinesis Data StreamsでSSEを有効化します。
2点目はKinesis Data FirehoseからS3バケットに配信するデータに関してSSEを有効化します。
どちらもKMS(CMK)を使用して暗号化します。

なおLambda関数のランタイム環境はPython3.12とします。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。

GitHub
awstut-dva/02/005 at main · awstut-an-r/awstut-dva Contribute to awstut-an-r/awstut-dva development by creating an account on GitHub.

テンプレートファイルのポイント解説

(参考) Lambda関数

Resources:
  DataSourceLambda:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        # https://docs.aws.amazon.com/ja_jp/streams/latest/dev/get-started-exercise.html
        ZipFile: |
          import boto3
          import datetime
          import json
          import os
          import random
          
          STREAM_NAME = os.environ['KINESIS_STREAM_NAME']
          LIMIT = 10
          
          def get_data():
            return {
              'EVENT_TIME': datetime.datetime.now().isoformat(),
              'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'PRICE': round(random.random() * 100, 2)}
          
          def generate(stream_name, kinesis_client, limit):
            for i in range(limit):
              data = get_data()
              print(data)
              kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data).encode('utf-8'),
                PartitionKey="partitionkey")
        
          def lambda_handler(event, context):
            generate(STREAM_NAME, boto3.client('kinesis'), LIMIT)
      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisDataStreamName
      Handler: !Ref Handler
      Role: !GetAtt DataSourceLambdaRole.Arn
      Runtime: !Ref Runtime
      Timeout: !Ref Timeout
Code language: YAML (yaml)

Kinesis経由でS3バケットに配信するテストデータを作成する関数です。

以下のページで取り上げられているコードを使用しています。

あわせて読みたい
ステップ 3: Flink アプリケーション向けの Managed Service for Apache Flink を作成して実行する - Amazo... この演習では、データストリームをソースおよびシンクとして使用して、Flink アプリケーション向けの Managed Service for Apache Flink を作成します。

以下が本関数用IAMロールです。

Resources:
  DataSourceLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: DataSourceLambdaPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kms:GenerateDataKey
                Resource:
                  - !Ref KinesisKeyArn
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                Resource:
                  - !Ref KinesisDataStreamArn
Code language: YAML (yaml)

ポイントはKinesis Data Steams用KMSキーに関する権限(kms:GenerateDataKey)を与えている点です。
この関数はKinesis Data Streamsから見るとKinesis ストリームプロデューサーという位置付けですが、プロデューサーに与えるべき権限について以下の通り説明されています。

Kinesis ストリームプロデューサーには kms:GenerateDataKey 許可が必要です。

プロデューサーのアクセス許可の例

Kinesis Data Streams

Resources:
  KinesisDataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub "${Prefix}-DataStream"
      RetentionPeriodHours: 24
      ShardCount: !Ref ShardCount
      StreamEncryption: 
        EncryptionType: KMS
        KeyId: !Ref KinesisKeyArn
Code language: YAML (yaml)

Kinesis Data StreamsにおいてSSEを有効化する上でのポイントはStreamEncryptionプロパティです。
EncryptionTypeプロパティに「KMS」、KeyIdプロパティにKMSのARNを指定します。

今回は以下のKMSキーを指定します。

Resources:
  KinesisKey:
    Type: AWS::KMS::Key
    Properties:
      Enabled: true
      KeyPolicy:
        Version: 2012-10-17
        Id: !Sub "${Prefix}-kinesis"
        Statement:
          - Effect: Allow
            Principal:
              AWS: "*"
            Action:
              - kms:Encrypt
              - kms:Decrypt
              - kms:ReEncrypt*
              - kms:GenerateDataKey*
              - kms:DescribeKey
            Resource: "*"
            Condition:
              StringEquals:
                kms:CallerAccount: !Ref AWS::AccountId
                kms:ViaService: !Sub "kinesis.${AWS::Region}.amazonaws.com"
          - Effect: Allow
            Principal:
              AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
            Action: "*"
            Resource: "*"
      KeySpec: SYMMETRIC_DEFAULT
      KeyUsage: ENCRYPT_DECRYPT
      Origin: AWS_KMS
Code language: YAML (yaml)

キーポリシーは、Kinesis Data Streams用のAWSマネージドキーを参考に設定しました。

以下がKinesis Data Streams用のIAMロールです。

Resources:
  KinesisStreamSourceRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisStreamSourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !GetAtt KinesisDataStream.Arn
Code language: YAML (yaml)

IAMロールの方では、KMSに関する設定は行いません。
キーポリシーでKinesis Data Streams用の権限(kms:Decrypt)が許可されているためです。

Kinesis Data Firehose

Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Ref KinesisFirehoseDeliveryStreamName
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !GetAtt KinesisDataStream.Arn
        RoleARN: !GetAtt KinesisStreamSourceRole.Arn
      S3DestinationConfiguration: 
        BucketARN: !Ref BucketArn
        CloudWatchLoggingOptions: 
          Enabled: true
          LogGroupName: !Ref LogGroup
          LogStreamName: !Ref LogStream
        CompressionFormat: UNCOMPRESSED
        EncryptionConfiguration: 
          KMSEncryptionConfig: 
            AWSKMSKeyARN: !Ref S3KeyArn
        Prefix: firehose/
        RoleARN: !GetAtt KinesisS3DestinationRole.Arn
Code language: YAML (yaml)

今回の構成では、Kinesis Data Firehoseの送信先はS3バケットです。
このバケットに配置する際にKMSを用いて暗号化する場合は、EncryptionConfigurationプロパティを設定します。
同プロパティ内のAWSKMSKeyARNプロパティにS3用のKMSキーのARNを指定します。

以下がS3用KMSキーです。

Resources:
  S3Key:
    Type: AWS::KMS::Key
    Properties:
      Enabled: true
      KeyPolicy:
        Version: 2012-10-17
        Id: !Sub "${Prefix}-s3"
        Statement:
          - Effect: Allow
            Principal:
              AWS: "*"
            Action:
              - kms:Encrypt
              - kms:Decrypt
              - kms:ReEncrypt*
              - kms:GenerateDataKey*
              - kms:DescribeKey
            Resource: "*"
            Condition:
              StringEquals:
                kms:CallerAccount: !Ref AWS::AccountId
                kms:ViaService: !Sub "s3.${AWS::Region}.amazonaws.com"
          - Effect: Allow
            Principal:
              AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
            Action: "*"
            Resource: "*"
      KeySpec: SYMMETRIC_DEFAULT
      KeyUsage: ENCRYPT_DECRYPT
      Origin: AWS_KMS
Code language: YAML (yaml)

キーポリシーはKinesis Data Streams用のものとほとんど同様です。

以下がKinesis Data Firehose用のIAMロールです。

Resources:
  KinesisS3DestinationRole:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - firehose.amazonaws.com
      Policies:
        - PolicyName: KinesisS3DestinationPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !Ref BucketArn
                  - !Sub "${BucketArn}/*"
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !GetAtt LogGroup.Arn
Code language: YAML (yaml)

こちらのIAMロールの方でも、KMSに関する設定は行いません。

(参考) S3バケット

Resources:
  Bucket:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
      BucketName: !Ref Prefix
Code language: YAML (yaml)

SSEに関する設定は行いません。
つまりこのバケットのデフォルトの暗号化方式は、S3マネージドキーを使用したSSE-S3です。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

マネージメントコンソールから作成されたリソースを確認します。

Detail of KMS 01.
Detail of KMS 02.

2つのKMSキーが正常に作成されています。

Kinesis Data Streamsを確認します。

Detail of Kinesis 01.

確かにKMSによるSSEが有効化されていることがわかります。
そして使用されるKSMキーはKinesis用に作成されたものであることもわかります。

Kinesis Data Firehoseを確認します。

Detail of Kinesis 02.
Detail of Kinesis 03.

確かにKinesis Data Firehoseが正常に作成されています。
詳細を確認すると、先述のKinesis Data Streamsから受け取ったデータをS3バケットに配信する設定であることがわかります。
そしてS3バケットに配信時に、先述のS3用KMSキーを使用して暗号化する設定がなされていることも確認できます。

動作確認

準備ができました。

Lambda関数を実行して、テストデータを生成します。

Detail of Lambda 01.

関数が正常に実行されて、テストデータが生成されたました。
このデータはKinesis Data Streamsに送信されます。

しばらく待つと、Kinesisのモニタリングでデータが送信されたことが確認できます。

Detail of Kinesis 04.
Detail of Kinesis 05.

Kinesis Data StreamsおよびKinesis Data Firehoseにデータが送られてきたことがわかります。

S3バケットを確認します。

Detail of S3 01.

確かにS3バケットにオブジェクトが作成されました。

Detail of S3 02.

このオブジェクト暗号化の設定を見ると、先述のS3用KMSキーによって暗号化されていることがわかります。

最後にこのファイルをダウンロードして中身を確認します。

% cat dva-02-005-FirehoseDeliveryStream-1-2024-03-27-10-57-58-2f6bd68e-e997-444a-b231-2c9f4e277df7 | jq  
{
  "EVENT_TIME": "2024-03-27T10:57:58.105290",
  "TICKER": "INTC",
  "PRICE": 61.97
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.691875",
  "TICKER": "AAPL",
  "PRICE": 99.22
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.725142",
  "TICKER": "MSFT",
  "PRICE": 30.67
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.745062",
  "TICKER": "INTC",
  "PRICE": 12.41
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.765025",
  "TICKER": "INTC",
  "PRICE": 39.63
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.805054",
  "TICKER": "MSFT",
  "PRICE": 56.14
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.825060",
  "TICKER": "AAPL",
  "PRICE": 4.66
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.845055",
  "TICKER": "AAPL",
  "PRICE": 85.08
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.885011",
  "TICKER": "AMZN",
  "PRICE": 70.01
}
{
  "EVENT_TIME": "2024-03-27T10:57:58.905023",
  "TICKER": "INTC",
  "PRICE": 75.95
}
Code language: Bash (bash)

確かにLambda関数によって生成されたテストデータです。

このようにKMSキーによって、Kinesis Data Streamsを追加するデータ、およびKinesis Data FirehoseによってS3バケットに配置されるオブジェクトを暗号化できることを確認しました。

まとめ

Kinesis Data StreamsおよびKinesis Data Firehoseで扱うデータを、KMSキーを使って暗号化する方法を確認しました。