SQSキュー内にメッセージが追加される度に、Lambdaで読み取りつつ、RDS Proxy経由でRDSに書き込む

SQSキュー内にメッセージが追加される度に、Lambdaで読み取りつつ、RDS Proxy経由でRDSに書き込む

以下のAWS公式ページで、SQS・Lambda・RDSを組み合わせた構成が紹介されています。

あわせて読みたい
チュートリアル: Lambda 関数を使用して Amazon RDS にアクセスする - Amazon Relational Database Service Amazon Simple Queue Service のメッセージキューから、お客様の AWS アカウントの Amazon Virtual Private Cloud 内の Amazon RDS データベースに AWS Lambda を使用して...

構成の説明は以下の通りです。

このチュートリアルでは、Lambda 関数を使用して、Amazon Relational Database Service (Amazon RDS) データベースに RDS プロキシ経由でデータを書き込みます。Lambda 関数は、メッセージが追加されるたびに Amazon Simple Queue Service (Amazon SQS) キューからレコードを読み取り、データベース内のテーブルに新しい項目を書き込みます。

チュートリアル: Lambda 関数を使用して Amazon RDS にアクセスする

本ページでは、CloudFormationを使用して、上記の構成を構築します。

構築する環境

Diagram of each time a message is added to the SQS queue, it is read by Lambda and written to RDS via RDS Proxy.

VPC外にLambda関数を作成します。
この関数の働きは、SQSキューにメッセージを送信することです。
上記のチュートリアルでは、SQSキューに手動でメッセージを送信していますが、本構成では、この関数を使用します。

SQSキューは標準キューを作成します。

VPC内にLambda関数を作成します。
この関数の働きは、SQSキューからメッセージをポーリングすることです。

メッセージを取得できた場合は、RDSに書き込みます。
RDSへの書き込みは直接DBインスタンスにアクセスするのではなく、RDS Proxyを経由します。
RDSはMySQLタイプとします。

また今回作成するLambda関数のランタイム環境はPython3.12とします。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。

GitHub
awstut-saa/01/008 at main · awstut-an-r/awstut-saa Contribute to awstut-an-r/awstut-saa development by creating an account on GitHub.

テンプレートファイルのポイント解説

VPC外のLambda関数

関数本体

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os

          queue_url = os.environ['QUEUE_URL']
          region = os.environ['REGION']
          
          sqs_client = boto3.client('sqs', region_name=region)
          
          def lambda_handler(event, context):
            message = lambda id: json.dumps({'CustID': id, 'Name': str(id)*2})  
            entries = [{'Id': datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'), 'MessageBody': message(i)} for i in range(10)]
          
            response = sqs_client.send_message_batch(
              QueueUrl=queue_url,
              Entries=entries
            )
            
            return response
      Environment:
        Variables:
          QUEUE_URL: !Ref QueueUrl
          REGION: !Ref AWS::Region
      FunctionName: !Sub "${Prefix}-function-01"
      Handler: !Ref Handler
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)

SQSキューにメッセージを送信します。
チュートリアルに準じるデータ構造を持った10個のテストデータを作成します。

[
 '{"CustID": 0, "Name": "00"}',
 '{"CustID": 1, "Name": "11"}',
 '{"CustID": 2, "Name": "22"}',
 '{"CustID": 3, "Name": "33"}',
 '{"CustID": 4, "Name": "44"}',
 '{"CustID": 5, "Name": "55"}',
 '{"CustID": 6, "Name": "66"}',
 '{"CustID": 7, "Name": "77"}',
 '{"CustID": 8, "Name": "88"}',
 '{"CustID": 9, "Name": "99"}'
]
Code language: JSON / JSON with Comments (json)

SQS用のクライアントオブジェクトを作成後、send_message_batchメソッドを実行して、上記のデータをSQSキューに送信します。

IAMロール

Resources:
  LambdaRole1:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

SQSキューに対してメッセージを送信するための権限を与えます。

SQSキュー

Resources:
  Queue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub "${Prefix}-queue"
Code language: YAML (yaml)

標準キューを作成します。
特別な設定は行いません。

VPC内のLambda関数

関数本体

Resources:
  Function3:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import sys
          import logging
          import pymysql
          import json
          import os
          
          user_name = os.environ['DB_USER']
          password = os.environ['DB_PASSWORD']
          rds_proxy_host = os.environ['DB_PROXY_ENDPOINT_ADDRESS']
          port = int(os.environ['DB_ENDPOINT_PORT'])
          db_name = os.environ['DB_NAME']
          region = os.environ['REGION']
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          try:
            conn = pymysql.connect(
              host=rds_proxy_host,
              port=port,
              user=user_name,
              passwd=password,
              db=db_name,
              connect_timeout=5
            )
          except pymysql.MySQLError as e:
            logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
            logger.error(e)
            sys.exit(1)
           
          logger.info("SUCCESS: Connection to RDS for MySQL instance succeeded")
          
          def lambda_handler(event, context):
            message = event['Records'][0]['body']
            data = json.loads(message)
            CustID = data['CustID']
            Name = data['Name']
        
            item_count = 0
            sql_string = f"insert into Customer (CustID, Name) values({CustID}, '{Name}')"
        
            with conn.cursor() as cur:
              cur.execute("create table if not exists Customer ( CustID  int NOT NULL, Name varchar(255) NOT NULL, PRIMARY KEY (CustID))")
              cur.execute(sql_string)
              conn.commit()
              cur.execute("select * from Customer")
              logger.info("The following items have been added to the database:")
              for row in cur:
                item_count += 1
                logger.info(row)
            conn.commit()
        
            return "Added %d items to RDS for MySQL table" %(item_count)
      Environment:
        Variables:
          DB_ENDPOINT_PORT: !Ref DBEndpointPort
          DB_NAME: !Ref DBName
          DB_PASSWORD: !Ref DBMasterUserPassword
          DB_PROXY_ENDPOINT_ADDRESS: !Ref DBProxyEndpointAddress
          DB_USER: !Ref DBMasterUsername
          REGION: !Ref AWS::Region
      FunctionName: !Sub "${Prefix}-function-03"
      Handler: !Ref Handler
      Layers:
        - !Ref LambdaLayer
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole3.Arn
      VpcConfig:
        SecurityGroupIds:
          - !Ref FunctionSecurityGroup
        SubnetIds:
          - !Ref FunctionSubnet
Code language: YAML (yaml)

実行するコードは以下のチュートリアルに記載されているものとほとんど同様です。

あわせて読みたい
チュートリアル: Lambda 関数を使用して Amazon RDS にアクセスする - Amazon Relational Database Service Amazon Simple Queue Service のメッセージキューから、お客様の AWS アカウントの Amazon Virtual Private Cloud 内の Amazon RDS データベースに AWS Lambda を使用して...

SQSキューから受け取ったメッセージをRDSに書き込みます。
RDSのアクセス先にRDS Proxyエンドポイントを指定することで、書き込みをRDS Proxy経由とします。
コードの詳細については、上記のページをご確認ください。

本関数はRDSにアクセスしますので、VPC内で実行する必要があります。
そのためにVpcConfigプロパティで実行するサブネットやアタッチするセキュリティグループを指定します。

IAMロール

Resources:
  LambdaRole3:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:ReceiveMessage
                  - sqs:DeleteMessage
                  - sqs:GetQueueAttributes
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
Code language: YAML (yaml)

インラインポリシーという形で、SQSキューからメッセージを受け取るための権限を与えます。

本IAMロールにAWS管理ポリシーAWSLambdaVPCAccessExecutionRoleをアタッチします。
このポリシーはVPC内で実行するために必要な権限を本関数に与えます。

イベントソースマッピング

Resources:
  EventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties: 
      BatchSize: !Ref BatchSize
      Enabled: true
      EventSourceArn: !Ref QueueArn
      FunctionName: !Ref Function3
Code language: YAML (yaml)

イベントソースマッピングを作成することで、Lambda関数がSQSを自動的にポーリングして、メッセージを取得することができるようになります。

詳細につきましては、以下のページをご確認ください。

あわせて読みたい
SQSキューでLambda関数をトリガーする 【SQSキューでLambda関数をトリガーする】 以下のページで、SQS入門ということで、SQSキューを使ってLambda関数間をデータ連携する構成をご紹介しました。 https://awst...

RDS Proxy

Resources:
  DBProxy:
    Type: AWS::RDS::DBProxy
    Properties:
      Auth: 
        - IAMAuth: DISABLED
          AuthScheme: SECRETS
          SecretArn: !Ref Secret
      DBProxyName: !Sub "${Prefix}-dbproxy"
      EngineFamily: !Ref DBProxyEngineFamily
      IdleClientTimeout: 120
      RequireTLS: false
      RoleArn: !GetAtt DBProxyRole.Arn
      VpcSecurityGroupIds: 
        - !Ref DBProxySecurityGroup
      VpcSubnetIds: 
        - !Ref DBSubnet1
        - !Ref DBSubnet2
        
  DBProxyTargetGroup:
    Type: AWS::RDS::DBProxyTargetGroup
    Properties:
      DBProxyName: !Ref DBProxy
      DBInstanceIdentifiers:
        - !Ref DBInstance
      TargetGroupName: default
      ConnectionPoolConfigurationInfo:
        MaxConnectionsPercent: 100
        MaxIdleConnectionsPercent: 50
        ConnectionBorrowTimeout: 120
        
  DBProxyRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - rds.amazonaws.com
      Policies:
        - PolicyName: !Sub "${Prefix}-DBProxyPolicy"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: secretsmanager:GetSecretValue
                Resource: !Ref Secret
              - Effect: Allow
                Action: kms:Decrypt
                Resource: "*"
        
  Secret:
    Type: AWS::SecretsManager::Secret
    Properties: 
      Name: !Sub "${Prefix}-Secret"
      SecretString: !Sub '{"username":"${DBMasterUsername}","password":"${DBMasterUserPassword}"}'
Code language: YAML (yaml)

VPC内Lambda関数のアクセス先となるRDS Proxyを作成します。
RDS Proxy本体に加えて、最終的なアクセス先であるDBインスタンスを指定するためのターゲットグループや、IAMロール等も合わせて作成します。

詳細につきましては、以下のページをご確認ください。

あわせて読みたい
VPC内のLambdaからRDS Proxy経由でRDSに接続する 【VPC内のLambdaからRDS Proxy経由でRDSに接続する】 VPC内にLambdaを配置し、RDSに接続する構成を考えます。LambdaからRDSにアクセスする際は、直接接続するのではなく...

(参考)Lambdaレイヤー

本構成のRDSおよびRDS ProxyはMySQLタイプです。
今回はPyMySQLパッケージを使用してDBインスタンスに接続します。
ただしこのパッケージはLambda関数のランタイム環境には含まれていないため、Lambdaレイヤーという形で用意します。

Lambdaレイヤーを作成する上でのポイントは、Lambdaレイヤー用パッケージの作成方法です。
今回はCloudFormationを使用して、自動的にパッケージを作成します。

詳細につきましては、以下のページをご確認ください。

あわせて読みたい
CFNカスタムリソースでLambdaレイヤーパッケージを準備する – Python版 【CloudFormationカスタムリソースを使って、Python用のLambdaレイヤーパッケージを自動的に作成・配置する】 以下のページでLambdaレイヤーの作成方法について取り上げ...

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

作成されたリソースを確認します。

SQSキューを確認します。

Detail of SQS 01.

正常にSQSキューが作成されています。
Lambdaトリガーの項目を見ると、関数が設定されています。
これはVPC内で実行されるように設定した関数です。

RDSのDBインスタンスおよびRDS Proxyを確認します。

Detail of RDS 01.
Detail of RDS 02.

どちらも正常に作成されています。
確かにRDS ProxyのターゲットにDBインスタンスが指定されています。

VPC内で実行されるLambda関数を確認します。

Detail of Lambda 01.

こちらからも、この関数がSQSに関連づいていることがわかります。
つまりこの関数は自動的にSQSをポーリングし、メッセージがあればコードの内容に従って処理を行うということです。

またLambdaレイヤーが関連づいていることもわかります。
つまりCloudFormationカスタムリソースによって、自動的にLambdaレイヤー用パッケージが作成され、これを用いてLambdaレイヤーが作成されたということです。

動作確認

準備が整いましたので、VPC外のLambda関数を実行します。

Detail of Lambda 02.

関数を正常に実行されました。
実行結果から、10個のメッセージがSQSキューに送信されたことがわかります。

VPC内のLambda関数のログを確認します。

Detail of Lambda 03.

ログの内容から、関数が何度か実行されたことがわかります。
つまりSQSキューにポーリングを実行し、メッセージがあった場合は、そのメッセージを受け取り、RDSへの書き込みを行ったということです。

最後に実行された際のログを見ると、確かにメッセージ10個分、RDSに書き込みを行ったことが確認できます。

まとめ

SQSキュー内にメッセージが追加される度に、Lambda関数で読み取りつつ、RDS Proxy経由でRDSに書き込む構成をご紹介しました。