Configuring Data Linkage between Lambda using SQS
By using SQS, you can link resources while keeping the coupling sparse. In this article, we will use SQS to link data between three Lambda functions and check the behavior of SQS.
Environment
We will configure SQS between three Lambda functions: Lambda1 will send messages to SQS, and Lambda2 and 3 will receive messages from SQS. The runtime environment for the three Lambda functions will be Python 3.8.
CloudFormation template files
We will build the above configuration using CloudFormation. We have placed the CloudFormation template at the following URL
https://github.com/awstut-an-r/awstut-fa/tree/main/021
Explanation of key points of template file
Let’s take a look at the key points of each template file to configure this architecture.
Basics of SQS are polling settings and visibility timeout
First, we need to check the SQS queue. The key points when creating a queue are polling settings and visibility timeout.
Resources:
Queue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${Prefix}-queue
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout
Code language: YAML (yaml)
The first point is the polling setting. You can specify the behavior when the queue is accessed (polled) to retrieve data. In this case, we will specify 20 (seconds) in the ReceiveMessageWaitTimeSeconds property to operate in long polling, which waits for up to 20 second
With long polling, the ReceiveMessage request queries all of the servers for messages. Amazon SQS sends a response after it collects at least one available message, up to the maximum number of messages specified in the request. Amazon SQS sends an empty response only if the polling wait time expires.
Amazon SQS short and long polling
Long polling will reduce the number of times the SQS queue is accessed, which will lower the cost of SQS.
The second point is visibility timeout. Visibility timeout is a setting for handling messages that remain in the queue after they have been polled and before they are deleted.
Immediately after a message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message.
Amazon SQS visibility timeout
If the visibility timeout is not set, a single piece of data may be received by multiple Lambda functions, resulting in duplicate processing. Therefore, enable the visibility timeout by setting the ReceiveMessageWaitTimeSeconds property to 90 (seconds). This way, for example, for 90 seconds after Lambda2 receives the data, Lambda3 will not receive the same data.
Sending messages to SQS in Python
Let’s check how to send a message to SQS from Python code.
First, let’s check the Lambda function itself.
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
...
Environment:
Variables:
queue_name: !Ref QueueName
region_name: !Ref AWS::Region
FunctionName: !Sub ${Prefix}-function1
Handler: !Ref Handler
MemorySize: !Ref MemorySize
Runtime: !Ref Runtime
Role: !GetAtt LambdaRole1.Arn
Code language: YAML (yaml)
For more information on the basics of Lambda functions, please refer to the following page.
Lambda1 describes the function to be executed inline. The following code will be executed.
import boto3
import datetime
import json
import os
def lambda_handler(event, context):
queue_name = os.environ['queue_name']
region_name = os.environ['region_name']
sqs = boto3.resource('sqs', region_name=region_name)
queue = sqs.get_queue_by_name(QueueName=queue_name)
now = datetime.datetime.now()
now_str = now.strftime('%Y%m%d%H%M%S%f')
messages = [{
'Id': now_str,
'MessageBody': now_str
}]
response = queue.send_messages(Entries=messages)
return {
'statusCode': 200,
'body': json.dumps(response)
}
Code language: Python (python)
The get_queue_by_name method specifies the SQS queue described above, and the send_messages method sends messages to the queue. The data to be sent as messages will be the current time when the function is executed.
The IAM role for executing this function is as follows
Resources:
LambdaRole1:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
Policies:
- PolicyName: GetSSMParameter
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:GetQueueUrl
- sqs:SendMessage
Resource:
- !Ref QueueArn
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Code language: YAML (yaml)
The point is that we are allowing two Actions for the queue we created. These are the permissions needed to execute the two methods called in the Python script described above.
Receiving messages from SQS in Python
The definition of the function itself is almost the same as Lambda1. The point is the Python code to be executed.
import boto3
import datetime
import json
import os
def lambda_handler(event, context):
max_number_of_messages = os.environ['max_number_of_messages']
queue_name = os.environ['queue_name']
region_name = os.environ['region_name']
sqs = boto3.resource('sqs', region_name=region_name)
queue = sqs.get_queue_by_name(QueueName=queue_name)
messages = queue.receive_messages(
MaxNumberOfMessages=int(max_number_of_messages))
results = []
for msg in messages:
results.append(msg.body)
return {
'statusCode': 200,
'body': json.dumps(results, indent=2)
}
Code language: Python (python)
If you want to receive messages from the SQS queue, you can use the receive_messages method. Process the retrieved messages in order. Refer to the body attribute of each message to retrieve and return the data that has been linked from Lambda1.
Receiving and deleting messages from SQS in Python
It is almost the same as Lambda2. We will focus on the differences and check them.
def lambda_handler(event, context):
...
results = []
for msg in messages:
results.append(msg.body)
msg.delete()
...
Code language: Python (python)
Call the delete method to delete the retrieved message. As mentioned earlier, messages sent to the queue will remain until the retention period (default is 4 days) expires, if not deleted, so you need to actively delete them.
Architecting
We will use CloudFormation to build this environment and check its actual behavior.
Create CloudFormation stacks and check resources in stacks
For information on how to create stacks and check each stack, please refer to the following page
As a result of checking the resources of each stack, the information of the main resources created this time is as follows
- SQS queue name: fa-021-queue
- Name of Lambda1: fa-021-function1
- Name of Lambda2: fa-021-function2
- Name of Lambda3: fa-021-function3
Check the created resources from the AWS Management Console. First is the SQS queue.
You can see that the queue has been created successfully. Next are the three Lambda functions.
All of them have been created without any problems.
Lambda Function 1 Execution
Now that everything is ready, let’s execute the Lambda functions one by one. see the following page for information on how to execute functions from the AWS Management Console.
Let’s start with Lambda1.
The above figure shows the execution result of Lambda1. You can see that the function has been executed successfully. This means that the SQS queue has been filled with messages through the Lambda function. For the verification of the next function, we will execute Lambda1 multiple times.
Lambda function 2 Execution
Next, we will run Lambda2.
Two date/time data were returned. This means that we were able to retrieve the messages that were stored in the SQS queue.
Lambda Function 3 Execution
Immediately after executing Lambda2, we will execute Lambda3.
The execution of the function itself has completed successfully, but there is no data that could be retrieved. This is the effect of the visibility timeout. In the previous section, Lambda2 received a message, which means that the visibility timeout was activated and Lambda3 was not able to retrieve the message.
Let’s give it some time and try to run the function again.
Now we are able to retrieve the message, and Lambda3 is supposed to delete the message as well as retrieve it. The queue should now be empty.
Lambda function 2 Re-Execution
Finally, let’s run Lambda2.
The message is still empty. This is because all the messages in the queue have been deleted.
Notice the Duration value. It means the execution time of the function, and it is about 20 seconds. This is due to the long polling setting of the SQS queue. This is due to the long polling setting of the SQS queue, which is the maximum waiting time for this function. Therefore, a wait time was generated during the function execution, and the overall execution time was about 20 seconds.
Summary
As an introduction to SQS, we have performed data linkage between multiple Lambdas.
Setting up long polling is economical because it reduces the number of polling.
By setting the visibility timeout, you can prevent a single message from being received by multiple consumers in duplicate.