SQS FIFOキュー入門

SQSのFIFOキュー入門

SQS FIFOキュー入門

SQSには2種類のキューがあり、今回はFIFOキューに触れます。

FIFirst-Out) キューは、標準キューの機能をすべて備えていますが、オペレーションやイベントの順序が重要である場合、または重複不可の場合などにアプリケーション間のメッセージングを強化をするために設計されています。

Amazon SQS FIFO(先入れ先出し) キュー

本ページでは、FIFOキューとスタンダードキューを作成し、両者の設定や挙動を比較します。

構築する環境

Diagram of introduction to SQS FIFO Queues.

2つのLambda関数間にSQSを配置する構成です。

SQSキューはFIFOキューとスタンダードキューを用意します。

Lambda関数1は2つのキューにメッセージを送信します。
Lambda関数2は両キューをポーリングし、キューにメッセージがあれば、それを受け取って処理します。
Lambda関数のランタイム環境は、Python3.8とします。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。

https://github.com/awstut-an-r/awstut-dva/tree/main/03/008

テンプレートファイルのポイント解説

本ページでは、FIFOキューとスタンダードキューの比較を中心に取り上げます。

SQSキューでLambda関数をトリガーする方法については、以下のページをご確認ください。

あわせて読みたい
SQSキューでLambda関数をトリガーする 【SQSキューでLambda関数をトリガーする】 以下のページで、SQS入門ということで、SQSキューを使ってLambda関数間をデータ連携する構成をご紹介しました。 https://awst...

SQS

最初にスタンダードキューを確認します。

Resources:
  StandardQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub "${Prefix}-standard"
      ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
      VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)

特別な設定は行いません。
20秒のロングポーリング(ReceiveMessageWaitTimeSeconds)と、90秒の可視性タイムアウト(VisibilityTimeout)だけ設定します。

次にFIFOキューを確認します。

Resources:
  FifoQueue:
    Type: AWS::SQS::Queue
    Properties:
      ContentBasedDeduplication: true
      FifoQueue: true
      QueueName: !Sub "${Prefix}.fifo"
      ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
      VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)

ポイントは3つです。

1つ目はキュー名です。
以下の通り、命名規則があります。

The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, must end with .fifo suffix and be 1 to 80 in length.

上記に従い、QueueNameプロパティを設定します。

2つ目は重複除外設定です。
FIFOキューは重複を認めません。

スタンダードキューとは異なり、FIFOキューではメッセージの重複を招きません。FIFOキューを使用すると、重複をキューに送信することを防止するのに役立ちます。5分間の重複除外間隔内にSendMessageアクションを再試行しても、Amazon SQS ではキューに重複を招きません。

1回だけの処理

重複除外の設定方法は2つありますが、コンテンツベースの除外排除を有効化する方法がお手軽です。

コンテンツベースの重複排除を有効にします。これは、Amazon SQSにSHA-256ハッシュを使用して、メッセージ本文を使用したメッセージ重複除外ID(ただしメッセージの属性ではない) を生成するように指示するものです。

1回だけの処理

ContentBasedDeduplicationプロパティにtrueを設定することで、コンテンツベースの重複排除を有効化できます。

3つ目はFifoQueueプロパティです。
本プロパティにtrueを設定することで、本キューはFIFOキューとして構築されます。

Lambda関数

関数1

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          import boto3
          import datetime
          import os

          n = 10

          fifo_queue_url = os.environ['FIFO_QUEUE_URL']
          message_group_id = os.environ['MESSAGE_GROUP_ID']
          standard_queue_url = os.environ['STANDARD_QUEUE_URL']
          region_name = os.environ['REGION_NAME']

          client = boto3.client('sqs', region_name=region_name)

          def lambda_handler(event, context):
            for i in range(n):
              now = datetime.datetime.now()
              now_str = now.strftime('%Y-%m-%d %H:%M:%S:%f')
              body = '{n}: {datetime}'.format(n=i, datetime=now_str)

              response_stndard = client.send_message(
                QueueUrl=standard_queue_url,
                MessageBody=body
              )
              print(response_stndard)

              response_fifo = client.send_message(
                QueueUrl=fifo_queue_url,
                MessageBody=body,
                MessageGroupId=message_group_id
              )
              print(response_fifo)
      Environment:
        Variables:
          FIFO_QUEUE_URL: !Ref FifoQueueUrl
          MESSAGE_GROUP_ID: !Ref Prefix
          REGION_NAME: !Ref AWS::Region
          STANDARD_QUEUE_URL: !Ref StandardQueueUrl
      FunctionName: !Sub "${Prefix}-Function1"
      Handler: !Ref LambdaHandler
      MemorySize: !Ref LambdaMemorySize
      Runtime: !Ref LambdaRuntime
      Role: !GetAtt LambdaRole1.Arn
      Timeout: !Ref LambdaTimeout
Code language: YAML (yaml)

Lambda関数で実行するコードをインライン形式で記載します。
詳細につきましては、以下のページをご確認ください。

あわせて読みたい
CloudFormationでLambdaを作成する3パータン(S3/インライン/コンテナ) 【CloudFormationでLambdaを作成する】 CloudFormationでLambdaを作成する場合、大別すると以下の3パターンあります。 S3バケットにコードをアップロードする インライ...

boto3のSQS用クライアントオブジェクトを作成します。
SQSキューにメッセージを送信するためには、同オブジェクトのsend_messageメソッドを使用します。

本関数では同メソッドを2回呼び出しています。
1回目はスタンダードキュー向け、2回目はFIFOキュー向けです。

スタンダードキューへのメッセージの送信から確認します。
メッセージを送信するためには、キューのURLと本文を指定します。

対してFIFOキューにメッセージを送信する場合は、上記2パラメータに加えて、メッセージグループIDを指定します。

FIFOキューロジックはメッセージグループIDごとにのみ適用されます。各メッセージグループ ID は、 Amazon SQSキュー内の異なる順序付けされたメッセージグループを表します。メッセージグループIDごとに、すべてのメッセージが厳密な順序で送受信されます。

FIFO配信ロジック

今回はメッセージグループIDとして、全てのメッセージに共通した文字列(dva-03-008)を設定します。
つまり本キューに送信されたメッセージは、全て厳密な順序付けがなされるということです。

以下が本関数用のIAMロールです。

Resources:
  LambdaRole1:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: SendSQSMessagePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:GetQueueUrl
                  - sqs:SendMessage
                Resource:
                  - !Ref FifoQueueArn
                  - !Ref StandardQueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

2つのSQSキューにメッセージを送信する権限を与えます。

(参考)関数2

Resources:
  Function2:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          def lambda_handler(event, context):
            print(event)
      FunctionName: !Sub "${Prefix}-Function2"
      Handler: !Ref LambdaHandler
      MemorySize: !Ref LambdaMemorySize
      Runtime: !Ref LambdaRuntime
      Role: !GetAtt LambdaRole2.Arn
      Timeout: !Ref LambdaTimeout
Code language: YAML (yaml)

SQSによってトリガーされるLambda関数です。
eventオブジェクトを出力するシンプルな内容です。

以下が本関数用のIAMロールです。

Resources:
  LambdaRole2:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSQSMEssagePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:ReceiveMessage
                  - sqs:DeleteMessage
                  - sqs:GetQueueAttributes
                Resource:
                  - !Ref StandardQueueArn
                  - !Ref FifoQueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

2つのSQSキューからメッセージを取得するための権限を与えます。

イベントソースマッピングを作成し、SQSキューとLambda関数を関連付けます。

Resources:
  EventSourceMapping1:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: !Ref BatchSize
      Enabled: true
      EventSourceArn: !Ref StandardQueueArn
      FunctionName: !Ref Function2

  EventSourceMapping2:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: !Ref BatchSize
      Enabled: true
      EventSourceArn: !Ref FifoQueueArn
      FunctionName: !Ref Function2
Code language: YAML (yaml)

1つの関数に2つのSQSキューを関連付けます。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • SQSスタンダードキュー:dva-03-008-standard
  • SQS FIFOキュー:dva-03-008.fifo
  • Lambda関数1:dva-03-008-Function1
  • Lambda関数2:dva-03-008-Function2

AWS Management Consoleから各リソースを確認します。

SQSキューを確認します。

Detail of SQS 1.
Detail of SQS 2.

確かに2つのSQSキューが作成されています。

一方はスタンダード、もう一方はFIFOキューです。

Lambdaトリガーとして、後述のLambda関数2が設定されています。

Lambda関数を確認します。

Detail of Lambda 1.
Detail of Lambda 2.

正常に2つの関数が作成されています。

動作確認

準備が整いましたので、関数1を実行します。

以下が実行結果です。

Detail of Lambda 3.

正常に完了しました。

次にLambda関数2の実行ログを確認します。

Detail of Lambda 4.

上記はログの一部ですが、確かにスタンダードキューとFIFOキューからメッセージを受信していることがわかります。

つまり本関数が2つのSQSキューからトリガーされたということです。

参考に、本関数がSQSキューからメッセージを取り出す順序を確認します。
メッセージの本文は「[数字]: [日時]」という構成であり、数字は0~9を順番に取ります。
つまりこの数字を見れば、順序通りにメッセージを受け取れているか判別できます。

今回の検証を10回試行しました。
その結果、スタンダードキューからは、2回順序通りにメッセージを受け取ることができませんでした。
対してFIFOキューでは、全て順序通りにメッセージを受け取ることができました。

メッセージを順序通りに処理する必要がある要件の場合は、FIFOキューの使用を検討しましょう。

まとめ

FIFOキューとスタンダードキューを作成し、両者の設定や挙動を比較しました。