SQSキュー内にメッセージが追加される度に、Lambdaで読み取りつつ、RDS Proxy経由でRDSに書き込む
以下のAWS公式ページで、SQS・Lambda・RDSを組み合わせた構成が紹介されています。
構成の説明は以下の通りです。
このチュートリアルでは、Lambda 関数を使用して、Amazon Relational Database Service (Amazon RDS) データベースに RDS プロキシ経由でデータを書き込みます。Lambda 関数は、メッセージが追加されるたびに Amazon Simple Queue Service (Amazon SQS) キューからレコードを読み取り、データベース内のテーブルに新しい項目を書き込みます。
チュートリアル: Lambda 関数を使用して Amazon RDS にアクセスする
本ページでは、CloudFormationを使用して、上記の構成を構築します。
構築する環境
VPC外にLambda関数を作成します。
この関数の働きは、SQSキューにメッセージを送信することです。
上記のチュートリアルでは、SQSキューに手動でメッセージを送信していますが、本構成では、この関数を使用します。
SQSキューは標準キューを作成します。
VPC内にLambda関数を作成します。
この関数の働きは、SQSキューからメッセージをポーリングすることです。
メッセージを取得できた場合は、RDSに書き込みます。
RDSへの書き込みは直接DBインスタンスにアクセスするのではなく、RDS Proxyを経由します。
RDSはMySQLタイプとします。
また今回作成するLambda関数のランタイム環境はPython3.12とします。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。
テンプレートファイルのポイント解説
VPC外のLambda関数
関数本体
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import boto3
import datetime
import json
import os
queue_url = os.environ['QUEUE_URL']
region = os.environ['REGION']
sqs_client = boto3.client('sqs', region_name=region)
def lambda_handler(event, context):
message = lambda id: json.dumps({'CustID': id, 'Name': str(id)*2})
entries = [{'Id': datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'), 'MessageBody': message(i)} for i in range(10)]
response = sqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
return response
Environment:
Variables:
QUEUE_URL: !Ref QueueUrl
REGION: !Ref AWS::Region
FunctionName: !Sub "${Prefix}-function-01"
Handler: !Ref Handler
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)
SQSキューにメッセージを送信します。
チュートリアルに準じるデータ構造を持った10個のテストデータを作成します。
[
'{"CustID": 0, "Name": "00"}',
'{"CustID": 1, "Name": "11"}',
'{"CustID": 2, "Name": "22"}',
'{"CustID": 3, "Name": "33"}',
'{"CustID": 4, "Name": "44"}',
'{"CustID": 5, "Name": "55"}',
'{"CustID": 6, "Name": "66"}',
'{"CustID": 7, "Name": "77"}',
'{"CustID": 8, "Name": "88"}',
'{"CustID": 9, "Name": "99"}'
]
Code language: JSON / JSON with Comments (json)
SQS用のクライアントオブジェクトを作成後、send_message_batchメソッドを実行して、上記のデータをSQSキューに送信します。
IAMロール
Resources:
LambdaRole1:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
SQSキューに対してメッセージを送信するための権限を与えます。
SQSキュー
Resources:
Queue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub "${Prefix}-queue"
Code language: YAML (yaml)
標準キューを作成します。
特別な設定は行いません。
VPC内のLambda関数
関数本体
Resources:
Function3:
Type: AWS::Lambda::Function
Properties:
Architectures:
- !Ref Architecture
Code:
ZipFile: |
import sys
import logging
import pymysql
import json
import os
user_name = os.environ['DB_USER']
password = os.environ['DB_PASSWORD']
rds_proxy_host = os.environ['DB_PROXY_ENDPOINT_ADDRESS']
port = int(os.environ['DB_ENDPOINT_PORT'])
db_name = os.environ['DB_NAME']
region = os.environ['REGION']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
conn = pymysql.connect(
host=rds_proxy_host,
port=port,
user=user_name,
passwd=password,
db=db_name,
connect_timeout=5
)
except pymysql.MySQLError as e:
logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
logger.error(e)
sys.exit(1)
logger.info("SUCCESS: Connection to RDS for MySQL instance succeeded")
def lambda_handler(event, context):
message = event['Records'][0]['body']
data = json.loads(message)
CustID = data['CustID']
Name = data['Name']
item_count = 0
sql_string = f"insert into Customer (CustID, Name) values({CustID}, '{Name}')"
with conn.cursor() as cur:
cur.execute("create table if not exists Customer ( CustID int NOT NULL, Name varchar(255) NOT NULL, PRIMARY KEY (CustID))")
cur.execute(sql_string)
conn.commit()
cur.execute("select * from Customer")
logger.info("The following items have been added to the database:")
for row in cur:
item_count += 1
logger.info(row)
conn.commit()
return "Added %d items to RDS for MySQL table" %(item_count)
Environment:
Variables:
DB_ENDPOINT_PORT: !Ref DBEndpointPort
DB_NAME: !Ref DBName
DB_PASSWORD: !Ref DBMasterUserPassword
DB_PROXY_ENDPOINT_ADDRESS: !Ref DBProxyEndpointAddress
DB_USER: !Ref DBMasterUsername
REGION: !Ref AWS::Region
FunctionName: !Sub "${Prefix}-function-03"
Handler: !Ref Handler
Layers:
- !Ref LambdaLayer
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole3.Arn
VpcConfig:
SecurityGroupIds:
- !Ref FunctionSecurityGroup
SubnetIds:
- !Ref FunctionSubnet
Code language: YAML (yaml)
実行するコードは以下のチュートリアルに記載されているものとほとんど同様です。
SQSキューから受け取ったメッセージをRDSに書き込みます。
RDSのアクセス先にRDS Proxyエンドポイントを指定することで、書き込みをRDS Proxy経由とします。
コードの詳細については、上記のページをご確認ください。
本関数はRDSにアクセスしますので、VPC内で実行する必要があります。
そのためにVpcConfigプロパティで実行するサブネットやアタッチするセキュリティグループを指定します。
IAMロール
Resources:
LambdaRole3:
Type: AWS::IAM::Role
DeletionPolicy: Delete
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
Code language: YAML (yaml)
インラインポリシーという形で、SQSキューからメッセージを受け取るための権限を与えます。
本IAMロールにAWS管理ポリシーAWSLambdaVPCAccessExecutionRoleをアタッチします。
このポリシーはVPC内で実行するために必要な権限を本関数に与えます。
イベントソースマッピング
Resources:
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: !Ref BatchSize
Enabled: true
EventSourceArn: !Ref QueueArn
FunctionName: !Ref Function3
Code language: YAML (yaml)
イベントソースマッピングを作成することで、Lambda関数がSQSを自動的にポーリングして、メッセージを取得することができるようになります。
詳細につきましては、以下のページをご確認ください。
RDS Proxy
Resources:
DBProxy:
Type: AWS::RDS::DBProxy
Properties:
Auth:
- IAMAuth: DISABLED
AuthScheme: SECRETS
SecretArn: !Ref Secret
DBProxyName: !Sub "${Prefix}-dbproxy"
EngineFamily: !Ref DBProxyEngineFamily
IdleClientTimeout: 120
RequireTLS: false
RoleArn: !GetAtt DBProxyRole.Arn
VpcSecurityGroupIds:
- !Ref DBProxySecurityGroup
VpcSubnetIds:
- !Ref DBSubnet1
- !Ref DBSubnet2
DBProxyTargetGroup:
Type: AWS::RDS::DBProxyTargetGroup
Properties:
DBProxyName: !Ref DBProxy
DBInstanceIdentifiers:
- !Ref DBInstance
TargetGroupName: default
ConnectionPoolConfigurationInfo:
MaxConnectionsPercent: 100
MaxIdleConnectionsPercent: 50
ConnectionBorrowTimeout: 120
DBProxyRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- rds.amazonaws.com
Policies:
- PolicyName: !Sub "${Prefix}-DBProxyPolicy"
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: secretsmanager:GetSecretValue
Resource: !Ref Secret
- Effect: Allow
Action: kms:Decrypt
Resource: "*"
Secret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub "${Prefix}-Secret"
SecretString: !Sub '{"username":"${DBMasterUsername}","password":"${DBMasterUserPassword}"}'
Code language: YAML (yaml)
VPC内Lambda関数のアクセス先となるRDS Proxyを作成します。
RDS Proxy本体に加えて、最終的なアクセス先であるDBインスタンスを指定するためのターゲットグループや、IAMロール等も合わせて作成します。
詳細につきましては、以下のページをご確認ください。
(参考)Lambdaレイヤー
本構成のRDSおよびRDS ProxyはMySQLタイプです。
今回はPyMySQLパッケージを使用してDBインスタンスに接続します。
ただしこのパッケージはLambda関数のランタイム環境には含まれていないため、Lambdaレイヤーという形で用意します。
Lambdaレイヤーを作成する上でのポイントは、Lambdaレイヤー用パッケージの作成方法です。
今回はCloudFormationを使用して、自動的にパッケージを作成します。
詳細につきましては、以下のページをご確認ください。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
作成されたリソースを確認します。
SQSキューを確認します。
正常にSQSキューが作成されています。
Lambdaトリガーの項目を見ると、関数が設定されています。
これはVPC内で実行されるように設定した関数です。
RDSのDBインスタンスおよびRDS Proxyを確認します。
どちらも正常に作成されています。
確かにRDS ProxyのターゲットにDBインスタンスが指定されています。
VPC内で実行されるLambda関数を確認します。
こちらからも、この関数がSQSに関連づいていることがわかります。
つまりこの関数は自動的にSQSをポーリングし、メッセージがあればコードの内容に従って処理を行うということです。
またLambdaレイヤーが関連づいていることもわかります。
つまりCloudFormationカスタムリソースによって、自動的にLambdaレイヤー用パッケージが作成され、これを用いてLambdaレイヤーが作成されたということです。
動作確認
準備が整いましたので、VPC外のLambda関数を実行します。
関数を正常に実行されました。
実行結果から、10個のメッセージがSQSキューに送信されたことがわかります。
VPC内のLambda関数のログを確認します。
ログの内容から、関数が何度か実行されたことがわかります。
つまりSQSキューにポーリングを実行し、メッセージがあった場合は、そのメッセージを受け取り、RDSへの書き込みを行ったということです。
最後に実行された際のログを見ると、確かにメッセージ10個分、RDSに書き込みを行ったことが確認できます。
まとめ
SQSキュー内にメッセージが追加される度に、Lambda関数で読み取りつつ、RDS Proxy経由でRDSに書き込む構成をご紹介しました。