SQSを使用してLambda間でデータ連携する構成
SQSを使用することで、リソース間の結合を疎の状態に保ちつつ、連携することができます。今回はSQSを使用して、3つのLambda関数間でデータを連携させて、SQSの挙動を確認します。
構築する環境
3つのLambda関数間にSQSを配置する構成です。Lambda1はSQSにメッセージを送信し、Lambda2と3はSQSからメッセージを受信します。3つのLambda関数のランタイム環境は、Python3.8とします。
環境構築用のCloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。以下のURLにCloudFormationテンプレートを配置してます。
https://github.com/awstut-an-r/awstut-fa/tree/main/021
テンプレートファイルのポイント解説
今回のアーキテクチャを構成するための、各テンプレートファイルのポイントを取り上げます。
SQSの基本はポーリング設定と可視性タイムアウト
まずSQSキューを確認します。キュー作成時のポイントは、ポーリング設定と可視性タイムアウトです。
Resources:
Queue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${Prefix}-queue
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)
ポイントの1つ目はポーリング設定です。メッセージを取得するためにキューにアクセス(ポーリング)した際の挙動を指定することができます。今回はReceiveMessageWaitTimeSecondsプロパティで20(秒)を指定することで、最大20秒待機するロングポーリングで動作するように指定します。
ロングポーリングとすると、ReceiveMessage要求は、すべてのサーバーにメッセージを照会します。リクエストで指定された最大メッセージ数まで、使用可能なメッセージを収集した後、Amazon SQS からレスポンスが送信されます。Amazon SQS は、ポーリング待機時間が経過した場合にのみ空のレスポンスを送信します。
Amazon SQS ショートポーリングとロングポーリング
ロングポーリングとすることで、SQSキューにアクセスする回数を抑制し、SQSのコストを下げることにつながります。
ポイントの2つ目は可視性タイムアウトです。可視性タイムアウトは、1度ポーリングされた後に、削除されるまでの間に、キューに留まっているメッセージの扱いに関する設定です。
メッセージが受信された直後は、メッセージはキューに残ったままです。他の消費者が再度メッセージを処理しないようにするため、Amazon SQS では可視性タイムアウトで、Amazon SQS によって他の消費者がそのメッセージを受信および処理できなくなる期間です。
Amazon SQS 可視性タイムアウト
可視性タイムアウトを設定しない場合、1つのデータを複数のLambda関数が受信してしまい、処理が重複して実行される恐れがあります。そのためReceiveMessageWaitTimeSecondsプロパティで90(秒)と設定することで、可視性タイムアウトを有効化します。これによって、例えば、Lambda2がデータを受信してから90秒間は、Lambda3が同データを受信することはなくなります。
PythonでSQSにメッセージを送信する
PythonコードからSQSにメッセージを送信する方法を確認します。
まずLambda関数そのものを確認します。
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
...
Environment:
Variables:
queue_name: !Ref QueueName
region_name: !Ref AWS::Region
FunctionName: !Sub ${Prefix}-function1
Handler: !Ref Handler
MemorySize: !Ref MemorySize
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)
Lambda関数の基本については、以下のページをご確認ください。
Lambda1は実行する関数をインラインで記載しています。以下のコードを実行します。
import boto3
import datetime
import json
import os
def lambda_handler(event, context):
queue_name = os.environ['queue_name']
region_name = os.environ['region_name']
sqs = boto3.resource('sqs', region_name=region_name)
queue = sqs.get_queue_by_name(QueueName=queue_name)
now = datetime.datetime.now()
now_str = now.strftime('%Y%m%d%H%M%S%f')
messages = [{
'Id': now_str,
'MessageBody': now_str
}]
response = queue.send_messages(Entries=messages)
return {
'statusCode': 200,
'body': json.dumps(response)
}
Code language: Python (python)
get_queue_by_nameメソッドで先述のSQSキューを指定して、send_messagesメソッドで同キューにメッセージを送信する。メッセージとして送信するデータは関数を実行した際の現在時刻とします。
この関数を実行するためのIAMロールは以下の通りです。
Resources:
LambdaRole1:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:GetQueueUrl
- sqs:SendMessage
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
ポイントは作成したキューに対して、Actionを2つ許可している点です。これらは先述のPythonスクリプトで呼び出した2つのメソッドを実行するために必要になる権限です。
PythonでSQSからメッセージを受信する
関数の定義自体はLambda1とほとんど同じです。ポイントは実行するPythonコードです。
import boto3
import datetime
import json
import os
def lambda_handler(event, context):
max_number_of_messages = os.environ['max_number_of_messages']
queue_name = os.environ['queue_name']
region_name = os.environ['region_name']
sqs = boto3.resource('sqs', region_name=region_name)
queue = sqs.get_queue_by_name(QueueName=queue_name)
messages = queue.receive_messages(
MaxNumberOfMessages=int(max_number_of_messages))
results = []
for msg in messages:
results.append(msg.body)
return {
'statusCode': 200,
'body': json.dumps(results, indent=2)
}
Code language: Python (python)
SQSキューからメッセージを受信する場合は、receive_messagesメソッドを使用することができます。取得したメッセージを順々に処理します。各メッセージのbodyアトリビュートを参照して、Lambda1から連携されてきたデータを取得して返します。
PythonでSQSからメッセージを受信して削除する
Lambda2とほとんど同じです。異なる点にフォーカスして確認します。
def lambda_handler(event, context):
...
results = []
for msg in messages:
results.append(msg.body)
msg.delete()
...
Code language: Python (python)
deleteメソッドを呼び出して、取得したメッセージを削除します。先述の通り、キューに送信されたメッセージは、保持期間(デフォルトは4日間)を過ぎるまでは、削除しなければ残り続けるため、能動的に削除する必要があります。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。
- SQSキュー名:fa-021-queue
- Lambda1の名前:fa-021-function1
- Lambda2の名前:fa-021-function2
- Lambda3の名前:fa-021-function3
AWS Management Consoleから作成されたリソースを確認します。まずSQSキューです。
正常にキューが作成されていることがわかります。次に3つのLambda関数です。
いずれも問題なく作成されています。
Lambda関数1実行
準備が整いましたので、順次Lambda関数を実行します。AWS Management Consoleから関数を実行する方法については、以下のページをご確認ください。
まずLambda1です。
上図がLambda1の実行結果です。正常に関数が実行されたことがわかります。つまりLambda関数を通じて、SQSキューにメッセージが溜まったことを意味します。次の関数の検証のために、複数回、Lambda1を実行しておきます。
Lambda関数2実行
次にLambda2を実行します。
2つの日時データが返ってきました。このことは、SQSキューに溜まっていたメッセージを取得できたことを意味しています。
Lambda関数3実行
Lambda2実行の直後に、Lambda3を実行します。
関数の実行自体は正常に完了しましたが、取得できたデータがありません。これが可視性タイムアウトの効果です。前項でLambda2がメッセージを受け取ったため、可視性タイムアウトが動作し、Lambda3がメッセージを取得することができなかったということです。少し時間を置いてから、改めて関数を実行してみます。
今度はメッセージを取得することができました。Lambda3ではメッセージの取得とともに、メッセージの削除を実施することになっています。これでキューが空になったはずです。
Lambda関数2再実行
最後にLambda2を実行します。
やはりメッセージが空です。キューに溜まっているメッセージが全て削除されたためです。
Durationの値にも注目してください。関数の実行時間を意味する値ですが、約20秒です。これはSQSキューのロングポーリング設定が原因です。今回の最大待機時間である20秒間、ロングポーリングを実施し、期間内にメッセージの送信がなかったため、空の結果が返ってきたという挙動です。そのため関数実行中に待機時間が発生し、全体として約20秒ほどの実行時間になったということです。
まとめ
SQSの入門として、複数のLambda間でデータ連携を行いました。
ロングポーリングの設定を行うことで、ポーリング回数が減るため、経済的です。
可視性タイムアウトの設定を行うことで、1つのメッセージが重複して受信されることを防ぐことができます。