SQS FIFOキュー入門
SQSには2種類のキューがあり、今回はFIFOキューに触れます。
FIFirst-Out) キューは、標準キューの機能をすべて備えていますが、オペレーションやイベントの順序が重要である場合、または重複不可の場合などにアプリケーション間のメッセージングを強化をするために設計されています。
Amazon SQS FIFO(先入れ先出し) キュー
本ページでは、FIFOキューとスタンダードキューを作成し、両者の設定や挙動を比較します。
構築する環境
2つのLambda関数間にSQSを配置する構成です。
SQSキューはFIFOキューとスタンダードキューを用意します。
Lambda関数1は2つのキューにメッセージを送信します。
Lambda関数2は両キューをポーリングし、キューにメッセージがあれば、それを受け取って処理します。
Lambda関数のランタイム環境は、Python3.8とします。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。
https://github.com/awstut-an-r/awstut-dva/tree/main/03/008
テンプレートファイルのポイント解説
本ページでは、FIFOキューとスタンダードキューの比較を中心に取り上げます。
SQSキューでLambda関数をトリガーする方法については、以下のページをご確認ください。
SQS
最初にスタンダードキューを確認します。
Resources:
StandardQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub "${Prefix}-standard"
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)
特別な設定は行いません。
20秒のロングポーリング(ReceiveMessageWaitTimeSeconds)と、90秒の可視性タイムアウト(VisibilityTimeout)だけ設定します。
次にFIFOキューを確認します。
Resources:
FifoQueue:
Type: AWS::SQS::Queue
Properties:
ContentBasedDeduplication: true
FifoQueue: true
QueueName: !Sub "${Prefix}.fifo"
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)
ポイントは3つです。
1つ目はキュー名です。
以下の通り、命名規則があります。
The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, must end with .fifo suffix and be 1 to 80 in length.
上記に従い、QueueNameプロパティを設定します。
2つ目は重複除外設定です。
FIFOキューは重複を認めません。
スタンダードキューとは異なり、FIFOキューではメッセージの重複を招きません。FIFOキューを使用すると、重複をキューに送信することを防止するのに役立ちます。5分間の重複除外間隔内にSendMessageアクションを再試行しても、Amazon SQS ではキューに重複を招きません。
1回だけの処理
重複除外の設定方法は2つありますが、コンテンツベースの除外排除を有効化する方法がお手軽です。
コンテンツベースの重複排除を有効にします。これは、Amazon SQSにSHA-256ハッシュを使用して、メッセージ本文を使用したメッセージ重複除外ID(ただしメッセージの属性ではない) を生成するように指示するものです。
1回だけの処理
ContentBasedDeduplicationプロパティにtrueを設定することで、コンテンツベースの重複排除を有効化できます。
3つ目はFifoQueueプロパティです。
本プロパティにtrueを設定することで、本キューはFIFOキューとして構築されます。
Lambda関数
関数1
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
import boto3
import datetime
import os
n = 10
fifo_queue_url = os.environ['FIFO_QUEUE_URL']
message_group_id = os.environ['MESSAGE_GROUP_ID']
standard_queue_url = os.environ['STANDARD_QUEUE_URL']
region_name = os.environ['REGION_NAME']
client = boto3.client('sqs', region_name=region_name)
def lambda_handler(event, context):
for i in range(n):
now = datetime.datetime.now()
now_str = now.strftime('%Y-%m-%d %H:%M:%S:%f')
body = '{n}: {datetime}'.format(n=i, datetime=now_str)
response_stndard = client.send_message(
QueueUrl=standard_queue_url,
MessageBody=body
)
print(response_stndard)
response_fifo = client.send_message(
QueueUrl=fifo_queue_url,
MessageBody=body,
MessageGroupId=message_group_id
)
print(response_fifo)
Environment:
Variables:
FIFO_QUEUE_URL: !Ref FifoQueueUrl
MESSAGE_GROUP_ID: !Ref Prefix
REGION_NAME: !Ref AWS::Region
STANDARD_QUEUE_URL: !Ref StandardQueueUrl
FunctionName: !Sub "${Prefix}-Function1"
Handler: !Ref LambdaHandler
MemorySize: !Ref LambdaMemorySize
Runtime: !Ref LambdaRuntime
Role: !GetAtt LambdaRole1.Arn
Timeout: !Ref LambdaTimeout
Code language: YAML (yaml)
Lambda関数で実行するコードをインライン形式で記載します。
詳細につきましては、以下のページをご確認ください。
boto3のSQS用クライアントオブジェクトを作成します。
SQSキューにメッセージを送信するためには、同オブジェクトのsend_messageメソッドを使用します。
本関数では同メソッドを2回呼び出しています。
1回目はスタンダードキュー向け、2回目はFIFOキュー向けです。
スタンダードキューへのメッセージの送信から確認します。
メッセージを送信するためには、キューのURLと本文を指定します。
対してFIFOキューにメッセージを送信する場合は、上記2パラメータに加えて、メッセージグループIDを指定します。
FIFOキューロジックはメッセージグループIDごとにのみ適用されます。各メッセージグループ ID は、 Amazon SQSキュー内の異なる順序付けされたメッセージグループを表します。メッセージグループIDごとに、すべてのメッセージが厳密な順序で送受信されます。
FIFO配信ロジック
今回はメッセージグループIDとして、全てのメッセージに共通した文字列(dva-03-008)を設定します。
つまり本キューに送信されたメッセージは、全て厳密な順序付けがなされるということです。
以下が本関数用のIAMロールです。
Resources:
LambdaRole1:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: SendSQSMessagePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:GetQueueUrl
- sqs:SendMessage
Resource:
- !Ref FifoQueueArn
- !Ref StandardQueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
2つのSQSキューにメッセージを送信する権限を与えます。
(参考)関数2
Resources:
Function2:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
def lambda_handler(event, context):
print(event)
FunctionName: !Sub "${Prefix}-Function2"
Handler: !Ref LambdaHandler
MemorySize: !Ref LambdaMemorySize
Runtime: !Ref LambdaRuntime
Role: !GetAtt LambdaRole2.Arn
Timeout: !Ref LambdaTimeout
Code language: YAML (yaml)
SQSによってトリガーされるLambda関数です。
eventオブジェクトを出力するシンプルな内容です。
以下が本関数用のIAMロールです。
Resources:
LambdaRole2:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSQSMEssagePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Resource:
- !Ref StandardQueueArn
- !Ref FifoQueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
2つのSQSキューからメッセージを取得するための権限を与えます。
イベントソースマッピングを作成し、SQSキューとLambda関数を関連付けます。
Resources:
EventSourceMapping1:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: !Ref BatchSize
Enabled: true
EventSourceArn: !Ref StandardQueueArn
FunctionName: !Ref Function2
EventSourceMapping2:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: !Ref BatchSize
Enabled: true
EventSourceArn: !Ref FifoQueueArn
FunctionName: !Ref Function2
Code language: YAML (yaml)
1つの関数に2つのSQSキューを関連付けます。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。
- SQSスタンダードキュー:dva-03-008-standard
- SQS FIFOキュー:dva-03-008.fifo
- Lambda関数1:dva-03-008-Function1
- Lambda関数2:dva-03-008-Function2
AWS Management Consoleから各リソースを確認します。
SQSキューを確認します。
確かに2つのSQSキューが作成されています。
一方はスタンダード、もう一方はFIFOキューです。
Lambdaトリガーとして、後述のLambda関数2が設定されています。
Lambda関数を確認します。
正常に2つの関数が作成されています。
動作確認
準備が整いましたので、関数1を実行します。
以下が実行結果です。
正常に完了しました。
次にLambda関数2の実行ログを確認します。
上記はログの一部ですが、確かにスタンダードキューとFIFOキューからメッセージを受信していることがわかります。
つまり本関数が2つのSQSキューからトリガーされたということです。
参考に、本関数がSQSキューからメッセージを取り出す順序を確認します。
メッセージの本文は「[数字]: [日時]」という構成であり、数字は0~9を順番に取ります。
つまりこの数字を見れば、順序通りにメッセージを受け取れているか判別できます。
今回の検証を10回試行しました。
その結果、スタンダードキューからは、2回順序通りにメッセージを受け取ることができませんでした。
対してFIFOキューでは、全て順序通りにメッセージを受け取ることができました。
メッセージを順序通りに処理する必要がある要件の場合は、FIFOキューの使用を検討しましょう。
まとめ
FIFOキューとスタンダードキューを作成し、両者の設定や挙動を比較しました。