Each time a message is added to the SQS queue, it is read by Lambda and written to RDS via RDS Proxy

TOC

Each time a message is added to the SQS queue, it is read by Lambda and written to RDS via RDS Proxy

The following official AWS page introduces a configuration that combines SQS, Lambda, and RDS.

あわせて読みたい
Tutorial: Using a Lambda function to access an Amazon RDS database - Amazon Relational Database Serv... Learn how to use AWS Lambda to write data from an Amazon Simple Queue Service message queue to an Amazon RDS database in your AWS account's Amazon Virtual Priva...

The description of the configuration is as follows

In this tutorial, you use a Lambda function to write data to an Amazon Relational Database Service (Amazon RDS) database through RDS Proxy. Your Lambda function reads records from an Amazon Simple Queue Service (Amazon SQS) queue and writes a new item to a table in your database whenever a message is added. In this example, you use the AWS Management Console to manually add messages to your queue.

Tutorial: Using a Lambda function to access an Amazon RDS database

This page uses CloudFormation to build the above configuration.

Environment

Diagram of each time a message is added to the SQS queue, it is read by Lambda and written to RDS via RDS Proxy.

Create a Lambda function outside the VPC.
The function’s work is to send a message to the SQS queue.
In the above tutorial, we manually send messages to the SQS queue, but this configuration uses this function.

SQS queue creates a standard queue.

Create a Lambda function in the VPC.
The function’s work is to poll messages from the SQS queue.

If the message can be retrieved, it is written to the RDS.
Writing to RDS is not done directly to the DB instance, but through RDS Proxy.
The RDS is assumed to be of type MySQL.

The runtime environment for the Lambda function to be created this time is Python 3.12.

CloudFormation template files

The above configuration is built with CloudFormation.
The CloudFormation template is placed at the following URL

GitHub
awstut-saa/01/008 at main · awstut-an-r/awstut-saa Contribute to awstut-an-r/awstut-saa development by creating an account on GitHub.

Explanation of key points of template files

Lambda function outside of VPC

Function

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import boto3
          import datetime
          import json
          import os

          queue_url = os.environ['QUEUE_URL']
          region = os.environ['REGION']
          
          sqs_client = boto3.client('sqs', region_name=region)
          
          def lambda_handler(event, context):
            message = lambda id: json.dumps({'CustID': id, 'Name': str(id)*2})  
            entries = [{'Id': datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'), 'MessageBody': message(i)} for i in range(10)]
          
            response = sqs_client.send_message_batch(
              QueueUrl=queue_url,
              Entries=entries
            )
            
            return response
      Environment:
        Variables:
          QUEUE_URL: !Ref QueueUrl
          REGION: !Ref AWS::Region
      FunctionName: !Sub "${Prefix}-function-01"
      Handler: !Ref Handler
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)

Send a message to the SQS queue.
Create 10 test data with a data structure similar to the tutorial.

[
 '{"CustID": 0, "Name": "00"}',
 '{"CustID": 1, "Name": "11"}',
 '{"CustID": 2, "Name": "22"}',
 '{"CustID": 3, "Name": "33"}',
 '{"CustID": 4, "Name": "44"}',
 '{"CustID": 5, "Name": "55"}',
 '{"CustID": 6, "Name": "66"}',
 '{"CustID": 7, "Name": "77"}',
 '{"CustID": 8, "Name": "88"}',
 '{"CustID": 9, "Name": "99"}'
]
Code language: JSON / JSON with Comments (json)

After creating a client object for SQS, execute the send_message_batch method to send the above data to the SQS queue.

IAM Role

Resources:
  LambdaRole1:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

Authorization to send messages to the SQS queue.

SQS Queue

Resources:
  Queue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub "${Prefix}-queue"
Code language: YAML (yaml)

Create a standard queue.
No special settings are made.

Lambda function in VPC

Function

Resources:
  Function3:
    Type: AWS::Lambda::Function
    Properties:
      Architectures:
        - !Ref Architecture
      Code:
        ZipFile: |
          import sys
          import logging
          import pymysql
          import json
          import os
          
          user_name = os.environ['DB_USER']
          password = os.environ['DB_PASSWORD']
          rds_proxy_host = os.environ['DB_PROXY_ENDPOINT_ADDRESS']
          port = int(os.environ['DB_ENDPOINT_PORT'])
          db_name = os.environ['DB_NAME']
          region = os.environ['REGION']
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          try:
            conn = pymysql.connect(
              host=rds_proxy_host,
              port=port,
              user=user_name,
              passwd=password,
              db=db_name,
              connect_timeout=5
            )
          except pymysql.MySQLError as e:
            logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
            logger.error(e)
            sys.exit(1)
           
          logger.info("SUCCESS: Connection to RDS for MySQL instance succeeded")
          
          def lambda_handler(event, context):
            message = event['Records'][0]['body']
            data = json.loads(message)
            CustID = data['CustID']
            Name = data['Name']
        
            item_count = 0
            sql_string = f"insert into Customer (CustID, Name) values({CustID}, '{Name}')"
        
            with conn.cursor() as cur:
              cur.execute("create table if not exists Customer ( CustID  int NOT NULL, Name varchar(255) NOT NULL, PRIMARY KEY (CustID))")
              cur.execute(sql_string)
              conn.commit()
              cur.execute("select * from Customer")
              logger.info("The following items have been added to the database:")
              for row in cur:
                item_count += 1
                logger.info(row)
            conn.commit()
        
            return "Added %d items to RDS for MySQL table" %(item_count)
      Environment:
        Variables:
          DB_ENDPOINT_PORT: !Ref DBEndpointPort
          DB_NAME: !Ref DBName
          DB_PASSWORD: !Ref DBMasterUserPassword
          DB_PROXY_ENDPOINT_ADDRESS: !Ref DBProxyEndpointAddress
          DB_USER: !Ref DBMasterUsername
          REGION: !Ref AWS::Region
      FunctionName: !Sub "${Prefix}-function-03"
      Handler: !Ref Handler
      Layers:
        - !Ref LambdaLayer
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole3.Arn
      VpcConfig:
        SecurityGroupIds:
          - !Ref FunctionSecurityGroup
        SubnetIds:
          - !Ref FunctionSubnet
Code language: YAML (yaml)

The code to be executed is almost the same as described in the following tutorial.

あわせて読みたい
チュートリアル: Lambda 関数を使用して Amazon RDS にアクセスする - Amazon Relational Database Service Amazon Simple Queue Service のメッセージキューから、お客様の AWS アカウントの Amazon Virtual Private Cloud 内の Amazon RDS データベースに AWS Lambda を使用して...

Writes messages received from the SQS queue to RDS.
Specify the RDS Proxy endpoint as the RDS access point, so that the write is via RDS Proxy.
Please see the above page for code details.

Since this function accesses RDS, it must be executed within the VPC.
To do so, the VpcConfig property specifies the subnet to run on and the security group to attach.

IAM Role

Resources:
  LambdaRole3:
    Type: AWS::IAM::Role
    DeletionPolicy: Delete
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:ReceiveMessage
                  - sqs:DeleteMessage
                  - sqs:GetQueueAttributes
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
Code language: YAML (yaml)

Authorization to receive messages from the SQS queue in the form of an inline policy.

Attach the AWS management policy AWSLambdaVPCAccessExecutionRole to this IAM role.
This policy grants this function the necessary permissions to execute within the VPC.

Event Source Mapping

Resources:
  EventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties: 
      BatchSize: !Ref BatchSize
      Enabled: true
      EventSourceArn: !Ref QueueArn
      FunctionName: !Ref Function3
Code language: YAML (yaml)

By creating event source mappings, Lambda functions can automatically poll SQS to retrieve messages.

For details, please refer to the following page.

あわせて読みたい
Triggering Lambda function from SQS queue 【Triggering Lambda function from SQS queue】 In the following page, which is an introduction to SQS, we introduced a configuration that uses SQS queues to l...

RDS Proxy

Resources:
  DBProxy:
    Type: AWS::RDS::DBProxy
    Properties:
      Auth: 
        - IAMAuth: DISABLED
          AuthScheme: SECRETS
          SecretArn: !Ref Secret
      DBProxyName: !Sub "${Prefix}-dbproxy"
      EngineFamily: !Ref DBProxyEngineFamily
      IdleClientTimeout: 120
      RequireTLS: false
      RoleArn: !GetAtt DBProxyRole.Arn
      VpcSecurityGroupIds: 
        - !Ref DBProxySecurityGroup
      VpcSubnetIds: 
        - !Ref DBSubnet1
        - !Ref DBSubnet2
        
  DBProxyTargetGroup:
    Type: AWS::RDS::DBProxyTargetGroup
    Properties:
      DBProxyName: !Ref DBProxy
      DBInstanceIdentifiers:
        - !Ref DBInstance
      TargetGroupName: default
      ConnectionPoolConfigurationInfo:
        MaxConnectionsPercent: 100
        MaxIdleConnectionsPercent: 50
        ConnectionBorrowTimeout: 120
        
  DBProxyRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - rds.amazonaws.com
      Policies:
        - PolicyName: !Sub "${Prefix}-DBProxyPolicy"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: secretsmanager:GetSecretValue
                Resource: !Ref Secret
              - Effect: Allow
                Action: kms:Decrypt
                Resource: "*"
        
  Secret:
    Type: AWS::SecretsManager::Secret
    Properties: 
      Name: !Sub "${Prefix}-Secret"
      SecretString: !Sub '{"username":"${DBMasterUsername}","password":"${DBMasterUserPassword}"}'
Code language: YAML (yaml)

Create an RDS Proxy that will be the access point for the Lambda functions in the VPC.
In addition to the RDS Proxy itself, create a target group to specify the DB instance that will be the final access destination, IAM roles, etc.

For details, please refer to the following page.

あわせて読みたい
Connect to RDS from Lambda in VPC via RDS Proxy 【Connect to RDS from Lambda in VPC via RDS Proxy】 Consider a configuration where Lambda is deployed in a VPC and connects to RDS.When accessing RDS from La...

(Reference) Lambda layer

The RDS and RDS Proxy in this configuration are of type MySQL.
This time we will use the PyMySQL package to connect to the DB instance.
However, this package is not included in the runtime environment of the Lambda function, so we will prepare it in the form of a Lambda layer.

The key to creating a Lambda layer is how to create a package for the Lambda layer.
In this case, we will use CloudFormation to automatically create the package.

For details, please refer to the following page.

あわせて読みたい
Preparing Lambda Layer Package with CFN Custom Resources – Python Version 【Automatically create and deploy Lambda layer package for Python using CloudFormation custom resources】 The following page covers how to create a Lambda la...

Architecting

Use CloudFormation to build this environment and check its actual behavior.

Create CloudFormation stacks and check the resources in the stacks

Create CloudFormation stacks.
For information on how to create stacks and check each stack, please see the following page.

あわせて読みたい
CloudFormation’s nested stack 【How to build an environment with a nested CloudFormation stack】 Examine nested stacks in CloudFormation. CloudFormation allows you to nest stacks. Nested ...

Check the resources created.

Check the SQS queue.

Detail of SQS 01.

The SQS queue is successfully created.
If you look at the Lambda trigger item, you will see that a function has been set.
This is a function set to run within VPC.

Check the RDS DB instance and RDS Proxy.

Detail of RDS 01.
Detail of RDS 02.

Both are successfully created.
You have indeed specified a DB instance as the target of the RDS Proxy.

Check the Lambda functions executed in the VPC.

Detail of Lambda 01.

Here we can also see that this function is associated with SQS.
This means that the function will automatically poll SQS and process any messages according to the content of the code.

We can also see that a Lambda layer is associated with it.
This means that the CloudFormation custom resource automatically created a package for the Lambda layer, which was then used to create the Lambda layer.

Operation Check

Now that we are ready, we execute the Lambda function outside the VPC.

Detail of Lambda 02.

function was successfully executed.
The execution results show that 10 messages were sent to the SQS queue.

Check the logs of the Lambda functions in the VPC.

Detail of Lambda 03.

The log shows that the function was executed several times.
This means that it polled the SQS queue, received messages, if any, and wrote to the RDS.

The log of the last execution confirms that it did indeed write to the RDS for 10 messages.

Summary

We introduced a configuration in which each time a message is added to the SQS queue, it is read by a Lambda function and written to the RDS via RDS Proxy.

TOC