AWS_EN

Introduction to SQS Data Linkage between Lambdas

スポンサーリンク
Introduction to SQS Data Linkage between Lambdas. AWS_EN
スポンサーリンク
スポンサーリンク

Configuring Data Linkage between Lambda using SQS

By using SQS, you can link resources while keeping the coupling sparse. In this article, we will use SQS to link data between three Lambda functions and check the behavior of SQS.

Environment

Diagram of Introduction to SQS.

We will configure SQS between three Lambda functions: Lambda1 will send messages to SQS, and Lambda2 and 3 will receive messages from SQS. The runtime environment for the three Lambda functions will be Python 3.8.

CloudFormation template files

We will build the above configuration using CloudFormation. We have placed the CloudFormation template at the following URL

https://github.com/awstut-an-r/awstut-fa/tree/main/021

Explanation of key points of template file

Let’s take a look at the key points of each template file to configure this architecture.

Basics of SQS are polling settings and visibility timeout

First, we need to check the SQS queue. The key points when creating a queue are polling settings and visibility timeout.

Resources:
  Queue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub ${Prefix}-queue
      ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
      VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)


The first point is the polling setting. You can specify the behavior when the queue is accessed (polled) to retrieve data. In this case, we will specify 20 (seconds) in the ReceiveMessageWaitTimeSeconds property to operate in long polling, which waits for up to 20 second

With long polling, the ReceiveMessage request queries all of the servers for messages. Amazon SQS sends a response after it collects at least one available message, up to the maximum number of messages specified in the request. Amazon SQS sends an empty response only if the polling wait time expires.

Amazon SQS short and long polling

Long polling will reduce the number of times the SQS queue is accessed, which will lower the cost of SQS.

The second point is visibility timeout. Visibility timeout is a setting for handling messages that remain in the queue after they have been polled and before they are deleted.

Immediately after a message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message.

Amazon SQS visibility timeout

If the visibility timeout is not set, a single piece of data may be received by multiple Lambda functions, resulting in duplicate processing. Therefore, enable the visibility timeout by setting the ReceiveMessageWaitTimeSeconds property to 90 (seconds). This way, for example, for 90 seconds after Lambda2 receives the data, Lambda3 will not receive the same data.

Sending messages to SQS in Python

Let’s check how to send a message to SQS from Python code.
First, let’s check the Lambda function itself.

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          ...
      Environment:
        Variables:
          queue_name: !Ref QueueName
          region_name: !Ref AWS::Region
      FunctionName: !Sub ${Prefix}-function1
      Handler: !Ref Handler
      MemorySize: !Ref MemorySize
      Runtime: !Ref Runtime
      Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)

For more information on the basics of Lambda functions, please refer to the following page.

Lambda1 describes the function to be executed inline. The following code will be executed.

import boto3
import datetime
import json
import os

def lambda_handler(event, context):
  queue_name = os.environ['queue_name']
  region_name = os.environ['region_name']
  
  sqs = boto3.resource('sqs', region_name=region_name)
  queue = sqs.get_queue_by_name(QueueName=queue_name)
  
  now = datetime.datetime.now()
  now_str = now.strftime('%Y%m%d%H%M%S%f')
  
  messages = [{
    'Id': now_str,
    'MessageBody': now_str
  }]
  
  response = queue.send_messages(Entries=messages)

  return {
    'statusCode': 200,
    'body': json.dumps(response)
  }
Code language: Python (python)

The get_queue_by_name method specifies the SQS queue described above, and the send_messages method sends messages to the queue. The data to be sent as messages will be the current time when the function is executed.

The IAM role for executing this function is as follows

Resources:
  LambdaRole1:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      Policies:
        - PolicyName: GetSSMParameter
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:GetQueueUrl
                  - sqs:SendMessage
                Resource:
                  - !Ref QueueArn
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)

The point is that we are allowing two Actions for the queue we created. These are the permissions needed to execute the two methods called in the Python script described above.

Receiving messages from SQS in Python

The definition of the function itself is almost the same as Lambda1. The point is the Python code to be executed.

import boto3
import datetime
import json
import os

def lambda_handler(event, context):
  max_number_of_messages = os.environ['max_number_of_messages']
  queue_name = os.environ['queue_name']
  region_name = os.environ['region_name']
  
  sqs = boto3.resource('sqs', region_name=region_name)
  queue = sqs.get_queue_by_name(QueueName=queue_name)
  messages = queue.receive_messages(
    MaxNumberOfMessages=int(max_number_of_messages))
    
  results = []
  for msg in messages:
    results.append(msg.body)

  return {
    'statusCode': 200,
    'body': json.dumps(results, indent=2)
  }
Code language: Python (python)

If you want to receive messages from the SQS queue, you can use the receive_messages method. Process the retrieved messages in order. Refer to the body attribute of each message to retrieve and return the data that has been linked from Lambda1.

Receiving and deleting messages from SQS in Python

It is almost the same as Lambda2. We will focus on the differences and check them.

def lambda_handler(event, context):
  ...
    
  results = []
  for msg in messages:
    results.append(msg.body)
    msg.delete()

  ...
Code language: Python (python)

Call the delete method to delete the retrieved message. As mentioned earlier, messages sent to the queue will remain until the retention period (default is 4 days) expires, if not deleted, so you need to actively delete them.

Architecting

We will use CloudFormation to build this environment and check its actual behavior.

Create CloudFormation stacks and check resources in stacks

For information on how to create stacks and check each stack, please refer to the following page

As a result of checking the resources of each stack, the information of the main resources created this time is as follows

  • SQS queue name: fa-021-queue
  • Name of Lambda1: fa-021-function1
  • Name of Lambda2: fa-021-function2
  • Name of Lambda3: fa-021-function3

Check the created resources from the AWS Management Console. First is the SQS queue.

SQS Queue Overview

You can see that the queue has been created successfully. Next are the three Lambda functions.

Lambda1 Overview.
Lambda2 Overview.
Lambda3 Overview

All of them have been created without any problems.

Lambda Function 1 Execution

Now that everything is ready, let’s execute the Lambda functions one by one. see the following page for information on how to execute functions from the AWS Management Console.

Let’s start with Lambda1.

The Lambda function can send a message to SQS queue.

The above figure shows the execution result of Lambda1. You can see that the function has been executed successfully. This means that the SQS queue has been filled with messages through the Lambda function. For the verification of the next function, we will execute Lambda1 multiple times.

Lambda function 2 Execution

Next, we will run Lambda2.

Lambda function can receive messages from the SQS queue.

Two date/time data were returned. This means that we were able to retrieve the messages that were stored in the SQS queue.

Lambda Function 3 Execution

Immediately after executing Lambda2, we will execute Lambda3.

Cannot receive messages from SQS queue.

The execution of the function itself has completed successfully, but there is no data that could be retrieved. This is the effect of the visibility timeout. In the previous section, Lambda2 received a message, which means that the visibility timeout was activated and Lambda3 was not able to retrieve the message.
Let’s give it some time and try to run the function again.

Recieve messages from the SQS queue and delete them.

Now we are able to retrieve the message, and Lambda3 is supposed to delete the message as well as retrieve it. The queue should now be empty.

Lambda function 2 Re-Execution

Finally, let’s run Lambda2.

SQS queue is empty and no messages can be received.

The message is still empty. This is because all the messages in the queue have been deleted.
Notice the Duration value. It means the execution time of the function, and it is about 20 seconds. This is due to the long polling setting of the SQS queue. This is due to the long polling setting of the SQS queue, which is the maximum waiting time for this function. Therefore, a wait time was generated during the function execution, and the overall execution time was about 20 seconds.

Summary

As an introduction to SQS, we have performed data linkage between multiple Lambdas.
Setting up long polling is economical because it reduces the number of polling.
By setting the visibility timeout, you can prevent a single message from being received by multiple consumers in duplicate.

タイトルとURLをコピーしました