Each time a message is added to the SQS queue, it is read by Lambda and written to RDS via RDS Proxy
The following official AWS page introduces a configuration that combines SQS, Lambda, and RDS.
The description of the configuration is as follows
In this tutorial, you use a Lambda function to write data to an Amazon Relational Database Service (Amazon RDS) database through RDS Proxy. Your Lambda function reads records from an Amazon Simple Queue Service (Amazon SQS) queue and writes a new item to a table in your database whenever a message is added. In this example, you use the AWS Management Console to manually add messages to your queue.
Tutorial: Using a Lambda function to access an Amazon RDS database
This page uses CloudFormation to build the above configuration.
Environment
Create a Lambda function outside the VPC.
The function’s work is to send a message to the SQS queue.
In the above tutorial, we manually send messages to the SQS queue, but this configuration uses this function.
SQS queue creates a standard queue.
Create a Lambda function in the VPC.
The function’s work is to poll messages from the SQS queue.
If the message can be retrieved, it is written to the RDS.
Writing to RDS is not done directly to the DB instance, but through RDS Proxy.
The RDS is assumed to be of type MySQL.
The runtime environment for the Lambda function to be created this time is Python 3.12.
CloudFormation template files
The above configuration is built with CloudFormation.
The CloudFormation template is placed at the following URL
Explanation of key points of template files
Lambda function outside of VPC
Function
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import boto3
import datetime
import json
import os
queue_url = os.environ['QUEUE_URL']
region = os.environ['REGION']
sqs_client = boto3.client('sqs', region_name=region)
def lambda_handler(event, context):
message = lambda id: json.dumps({'CustID': id, 'Name': str(id)*2})
entries = [{'Id': datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'), 'MessageBody': message(i)} for i in range(10)]
response = sqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
return response
Environment:
Variables:
QUEUE_URL: !Ref QueueUrl
REGION: !Ref AWS::Region
FunctionName: !Sub "${Prefix}-function-01"
Handler: !Ref Handler
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)
Send a message to the SQS queue.
Create 10 test data with a data structure similar to the tutorial.
[
'{"CustID": 0, "Name": "00"}',
'{"CustID": 1, "Name": "11"}',
'{"CustID": 2, "Name": "22"}',
'{"CustID": 3, "Name": "33"}',
'{"CustID": 4, "Name": "44"}',
'{"CustID": 5, "Name": "55"}',
'{"CustID": 6, "Name": "66"}',
'{"CustID": 7, "Name": "77"}',
'{"CustID": 8, "Name": "88"}',
'{"CustID": 9, "Name": "99"}'
]
Code language: JSON / JSON with Comments (json)
After creating a client object for SQS, execute the send_message_batch method to send the above data to the SQS queue.
IAM Role
Resources:
LambdaRole1:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
Authorization to send messages to the SQS queue.
SQS Queue
Resources:
Queue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub "${Prefix}-queue"
Code language: YAML (yaml)
Create a standard queue.
No special settings are made.
Lambda function in VPC
Function
Resources:
Function3:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import sys
import logging
import pymysql
import json
import os
user_name = os.environ['DB_USER']
password = os.environ['DB_PASSWORD']
rds_proxy_host = os.environ['DB_PROXY_ENDPOINT_ADDRESS']
port = int(os.environ['DB_ENDPOINT_PORT'])
db_name = os.environ['DB_NAME']
region = os.environ['REGION']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
conn = pymysql.connect(
host=rds_proxy_host,
port=port,
user=user_name,
passwd=password,
db=db_name,
connect_timeout=5
)
except pymysql.MySQLError as e:
logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
logger.error(e)
sys.exit(1)
logger.info("SUCCESS: Connection to RDS for MySQL instance succeeded")
def lambda_handler(event, context):
message = event['Records'][0]['body']
data = json.loads(message)
CustID = data['CustID']
Name = data['Name']
item_count = 0
sql_string = f"insert into Customer (CustID, Name) values({CustID}, '{Name}')"
with conn.cursor() as cur:
cur.execute("create table if not exists Customer ( CustID int NOT NULL, Name varchar(255) NOT NULL, PRIMARY KEY (CustID))")
cur.execute(sql_string)
conn.commit()
cur.execute("select * from Customer")
logger.info("The following items have been added to the database:")
for row in cur:
item_count += 1
logger.info(row)
conn.commit()
return "Added %d items to RDS for MySQL table" %(item_count)
Environment:
Variables:
DB_ENDPOINT_PORT: !Ref DBEndpointPort
DB_NAME: !Ref DBName
DB_PASSWORD: !Ref DBMasterUserPassword
DB_PROXY_ENDPOINT_ADDRESS: !Ref DBProxyEndpointAddress
DB_USER: !Ref DBMasterUsername
REGION: !Ref AWS::Region
FunctionName: !Sub "${Prefix}-function-03"
Handler: !Ref Handler
Layers:
- !Ref LambdaLayer
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole3.Arn
VpcConfig:
SecurityGroupIds:
- !Ref FunctionSecurityGroup
SubnetIds:
- !Ref FunctionSubnet
Code language: YAML (yaml)
The code to be executed is almost the same as described in the following tutorial.
Writes messages received from the SQS queue to RDS.
Specify the RDS Proxy endpoint as the RDS access point, so that the write is via RDS Proxy.
Please see the above page for code details.
Since this function accesses RDS, it must be executed within the VPC.
To do so, the VpcConfig property specifies the subnet to run on and the security group to attach.
IAM Role
Resources:
LambdaRole3:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
Code language: YAML (yaml)
Authorization to receive messages from the SQS queue in the form of an inline policy.
Attach the AWS management policy AWSLambdaVPCAccessExecutionRole to this IAM role.
This policy grants this function the necessary permissions to execute within the VPC.
Event Source Mapping
Resources:
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: !Ref BatchSize
Enabled: true
EventSourceArn: !Ref QueueArn
FunctionName: !Ref Function3
Code language: YAML (yaml)
By creating event source mappings, Lambda functions can automatically poll SQS to retrieve messages.
For details, please refer to the following page.
RDS Proxy
Resources:
DBProxy:
Type: AWS::RDS::DBProxy
Properties:
Auth:
- IAMAuth: DISABLED
AuthScheme: SECRETS
SecretArn: !Ref Secret
DBProxyName: !Sub "${Prefix}-dbproxy"
EngineFamily: !Ref DBProxyEngineFamily
IdleClientTimeout: 120
RequireTLS: false
RoleArn: !GetAtt DBProxyRole.Arn
VpcSecurityGroupIds:
- !Ref DBProxySecurityGroup
VpcSubnetIds:
- !Ref DBSubnet1
- !Ref DBSubnet2
DBProxyTargetGroup:
Type: AWS::RDS::DBProxyTargetGroup
Properties:
DBProxyName: !Ref DBProxy
DBInstanceIdentifiers:
- !Ref DBInstance
TargetGroupName: default
ConnectionPoolConfigurationInfo:
MaxConnectionsPercent: 100
MaxIdleConnectionsPercent: 50
ConnectionBorrowTimeout: 120
DBProxyRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- rds.amazonaws.com
Policies:
- PolicyName: !Sub "${Prefix}-DBProxyPolicy"
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: secretsmanager:GetSecretValue
Resource: !Ref Secret
- Effect: Allow
Action: kms:Decrypt
Resource: "*"
Secret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub "${Prefix}-Secret"
SecretString: !Sub '{"username":"${DBMasterUsername}","password":"${DBMasterUserPassword}"}'
Code language: YAML (yaml)
Create an RDS Proxy that will be the access point for the Lambda functions in the VPC.
In addition to the RDS Proxy itself, create a target group to specify the DB instance that will be the final access destination, IAM roles, etc.
For details, please refer to the following page.
(Reference) Lambda layer
The RDS and RDS Proxy in this configuration are of type MySQL.
This time we will use the PyMySQL package to connect to the DB instance.
However, this package is not included in the runtime environment of the Lambda function, so we will prepare it in the form of a Lambda layer.
The key to creating a Lambda layer is how to create a package for the Lambda layer.
In this case, we will use CloudFormation to automatically create the package.
For details, please refer to the following page.
Architecting
Use CloudFormation to build this environment and check its actual behavior.
Create CloudFormation stacks and check the resources in the stacks
Create CloudFormation stacks.
For information on how to create stacks and check each stack, please see the following page.
Check the resources created.
Check the SQS queue.
The SQS queue is successfully created.
If you look at the Lambda trigger item, you will see that a function has been set.
This is a function set to run within VPC.
Check the RDS DB instance and RDS Proxy.
Both are successfully created.
You have indeed specified a DB instance as the target of the RDS Proxy.
Check the Lambda functions executed in the VPC.
Here we can also see that this function is associated with SQS.
This means that the function will automatically poll SQS and process any messages according to the content of the code.
We can also see that a Lambda layer is associated with it.
This means that the CloudFormation custom resource automatically created a package for the Lambda layer, which was then used to create the Lambda layer.
Operation Check
Now that we are ready, we execute the Lambda function outside the VPC.
function was successfully executed.
The execution results show that 10 messages were sent to the SQS queue.
Check the logs of the Lambda functions in the VPC.
The log shows that the function was executed several times.
This means that it polled the SQS queue, received messages, if any, and wrote to the RDS.
The log of the last execution confirms that it did indeed write to the RDS for 10 messages.
Summary
We introduced a configuration in which each time a message is added to the SQS queue, it is read by a Lambda function and written to the RDS via RDS Proxy.