SQS入門 Lambda同士をデータ連携する

目次

SQSを使用してLambda間でデータ連携する構成

SQSを使用することで、リソース間の結合を疎の状態に保ちつつ、連携することができます。今回はSQSを使用して、3つのLambda関数間でデータを連携させて、SQSの挙動を確認します。

構築する環境

Diagram of Introduction to SQS.

3つのLambda関数間にSQSを配置する構成です。Lambda1はSQSにメッセージを送信し、Lambda2と3はSQSからメッセージを受信します。3つのLambda関数のランタイム環境は、Python3.8とします。

環境構築用のCloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。以下のURLにCloudFormationテンプレートを配置してます。

https://github.com/awstut-an-r/awstut-fa/tree/main/021

テンプレートファイルのポイント解説

今回のアーキテクチャを構成するための、各テンプレートファイルのポイントを取り上げます。

SQSの基本はポーリング設定と可視性タイムアウト

まずSQSキューを確認します。キュー作成時のポイントは、ポーリング設定と可視性タイムアウトです。

Resources:
  Queue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub ${Prefix}-queue
      ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
      VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)

ポイントの1つ目はポーリング設定です。メッセージを取得するためにキューにアクセス(ポーリング)した際の挙動を指定することができます。今回はReceiveMessageWaitTimeSecondsプロパティで20(秒)を指定することで、最大20秒待機するロングポーリングで動作するように指定します。

ロングポーリングとすると、ReceiveMessage要求は、すべてのサーバーにメッセージを照会します。リクエストで指定された最大メッセージ数まで、使用可能なメッセージを収集した後、Amazon SQS からレスポンスが送信されます。Amazon SQS は、ポーリング待機時間が経過した場合にのみ空のレスポンスを送信します。

Amazon SQS ショートポーリングとロングポーリング

ロングポーリングとすることで、SQSキューにアクセスする回数を抑制し、SQSのコストを下げることにつながります。

ポイントの2つ目は可視性タイムアウトです。可視性タイムアウトは、1度ポーリングされた後に、削除されるまでの間に、キューに留まっているメッセージの扱いに関する設定です。

メッセージが受信された直後は、メッセージはキューに残ったままです。他の消費者が再度メッセージを処理しないようにするため、Amazon SQS では可視性タイムアウトで、Amazon SQS によって他の消費者がそのメッセージを受信および処理できなくなる期間です。

Amazon SQS 可視性タイムアウト

可視性タイムアウトを設定しない場合、1つのデータを複数のLambda関数が受信してしまい、処理が重複して実行される恐れがあります。そのためReceiveMessageWaitTimeSecondsプロパティで90(秒)と設定することで、可視性タイムアウトを有効化します。これによって、例えば、Lambda2がデータを受信してから90秒間は、Lambda3が同データを受信することはなくなります。

PythonでSQSにメッセージを送信する

PythonコードからSQSにメッセージを送信する方法を確認します。
まずLambda関数そのものを確認します。

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          ...
      Environment:
        Variables:
          queue_name: !Ref QueueName
          region_name: !Ref AWS::Region
      FunctionName: !Sub ${Prefix}-function1
      Handler: !Ref Handler
      MemorySize: !Ref MemorySize
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)

Lambda関数の基本については、以下のページをご確認ください。

あわせて読みたい
CloudFormationでLambdaを作成する3パータン(S3/インライン/コンテナ) 【CloudFormationでLambdaを作成する】 CloudFormationでLambdaを作成する場合、大別すると以下の3パターンあります。 S3バケットにコードをアップロードする インライ...

Lambda1は実行する関数をインラインで記載しています。以下のコードを実行します。

import boto3
import datetime
import json
import os

def lambda_handler(event, context):
  queue_name = os.environ['queue_name']
  region_name = os.environ['region_name']

  sqs = boto3.resource('sqs', region_name=region_name)
  queue = sqs.get_queue_by_name(QueueName=queue_name)

  now = datetime.datetime.now()
  now_str = now.strftime('%Y%m%d%H%M%S%f')

  messages = [{
    'Id': now_str,
    'MessageBody': now_str
  }]

  response = queue.send_messages(Entries=messages)

  return {
    'statusCode': 200,
    'body': json.dumps(response)
  }
Code language: Python (python)

get_queue_by_nameメソッドで先述のSQSキューを指定して、send_messagesメソッドで同キューにメッセージを送信する。メッセージとして送信するデータは関数を実行した際の現在時刻とします。

この関数を実行するためのIAMロールは以下の通りです。

Resources:
  LambdaRole1:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:GetQueueUrl
                  - sqs:SendMessage
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

ポイントは作成したキューに対して、Actionを2つ許可している点です。これらは先述のPythonスクリプトで呼び出した2つのメソッドを実行するために必要になる権限です。

PythonでSQSからメッセージを受信する

関数の定義自体はLambda1とほとんど同じです。ポイントは実行するPythonコードです。

import boto3
import datetime
import json
import os

def lambda_handler(event, context):
  max_number_of_messages = os.environ['max_number_of_messages']
  queue_name = os.environ['queue_name']
  region_name = os.environ['region_name']

  sqs = boto3.resource('sqs', region_name=region_name)
  queue = sqs.get_queue_by_name(QueueName=queue_name)
  messages = queue.receive_messages(
    MaxNumberOfMessages=int(max_number_of_messages))

  results = []
  for msg in messages:
    results.append(msg.body)

  return {
    'statusCode': 200,
    'body': json.dumps(results, indent=2)
  }
Code language: Python (python)

SQSキューからメッセージを受信する場合は、receive_messagesメソッドを使用することができます。取得したメッセージを順々に処理します。各メッセージのbodyアトリビュートを参照して、Lambda1から連携されてきたデータを取得して返します。

PythonでSQSからメッセージを受信して削除する

Lambda2とほとんど同じです。異なる点にフォーカスして確認します。

def lambda_handler(event, context):
  ...

  results = []
  for msg in messages:
    results.append(msg.body)
    msg.delete()

  ...
Code language: Python (python)

deleteメソッドを呼び出して、取得したメッセージを削除します。先述の通り、キューに送信されたメッセージは、保持期間(デフォルトは4日間)を過ぎるまでは、削除しなければ残り続けるため、能動的に削除する必要があります。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • SQSキュー名:fa-021-queue
  • Lambda1の名前:fa-021-function1
  • Lambda2の名前:fa-021-function2
  • Lambda3の名前:fa-021-function3

AWS Management Consoleから作成されたリソースを確認します。まずSQSキューです。

SQS Queue Overview

正常にキューが作成されていることがわかります。次に3つのLambda関数です。

Lambda1 Overview.
Lambda2 Overview.
Lambda3 Overview

いずれも問題なく作成されています。

Lambda関数1実行

準備が整いましたので、順次Lambda関数を実行します。AWS Management Consoleから関数を実行する方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationでLambdaを作成する3パータン(S3/インライン/コンテナ) 【CloudFormationでLambdaを作成する】 CloudFormationでLambdaを作成する場合、大別すると以下の3パターンあります。 S3バケットにコードをアップロードする インライ...

まずLambda1です。

The Lambda function can send a message to SQS queue.

上図がLambda1の実行結果です。正常に関数が実行されたことがわかります。つまりLambda関数を通じて、SQSキューにメッセージが溜まったことを意味します。次の関数の検証のために、複数回、Lambda1を実行しておきます。

Lambda関数2実行

次にLambda2を実行します。

Lambda function can receive messages from the SQS queue.

2つの日時データが返ってきました。このことは、SQSキューに溜まっていたメッセージを取得できたことを意味しています。

Lambda関数3実行

Lambda2実行の直後に、Lambda3を実行します。

Cannot receive messages from SQS queue.

関数の実行自体は正常に完了しましたが、取得できたデータがありません。これが可視性タイムアウトの効果です。前項でLambda2がメッセージを受け取ったため、可視性タイムアウトが動作し、Lambda3がメッセージを取得することができなかったということです。少し時間を置いてから、改めて関数を実行してみます。

Recieve messages from the SQS queue and delete them.

今度はメッセージを取得することができました。Lambda3ではメッセージの取得とともに、メッセージの削除を実施することになっています。これでキューが空になったはずです。

Lambda関数2再実行

最後にLambda2を実行します。

SQS queue is empty and no messages can be received.

やはりメッセージが空です。キューに溜まっているメッセージが全て削除されたためです。

Durationの値にも注目してください。関数の実行時間を意味する値ですが、約20秒です。これはSQSキューのロングポーリング設定が原因です。今回の最大待機時間である20秒間、ロングポーリングを実施し、期間内にメッセージの送信がなかったため、空の結果が返ってきたという挙動です。そのため関数実行中に待機時間が発生し、全体として約20秒ほどの実行時間になったということです。

まとめ

SQSの入門として、複数のLambda間でデータ連携を行いました。

ロングポーリングの設定を行うことで、ポーリング回数が減るため、経済的です。

可視性タイムアウトの設定を行うことで、1つのメッセージが重複して受信されることを防ぐことができます。

目次