SQSを使用してFargateコンテナ間でデータ連携する

目次

SQSを使用してFargateコンテナ間でデータ連携する構成

AWS SAAの出題範囲の1つでもある、高弾力性に関連する内容です。
SQSを使用することで、Fargateコンテナ間の結合を疎の状態に保ちつつ、連携することができます。

今回は2つのFargateタイプのサービス上で動作するコンテナ間を、SQSを介して連携させます。
またSQSキューの長さに応じて、Fargateタスク数をスケールアウトさせる方法も確認します。

構築する環境

Diagram of using SQS Data Linkage between Fargate Containers.

ECSクラスター上に、2つのFargateタイプのECSサービスおよびタスクを構築します。1つ目のサービス上で動作するコンテナは、SQSにメッセージを送信します。2つ目のサービス上で動作するコンテナは、SQSからメッセージを受信し、処理を行った後、削除します。またサービス2はスケーリングの設定を行います。SQSキューの長さに応じてタスク数を増減するようにします。

今回構築する各コンテナの働きを以下にまとめます。

  • サービス1で実行するコンテナ(①):ランダム(10~60秒ごと)に現在時刻のUNIX時間をSQSに送信する。
  • サービス2で実行するコンテナ(②):定期的にSQSにポーリングしてデータを取得後、60秒かけてUNIX時間が偶数か奇数かを判定し、その結果をCloudWatch Logsに書き込む。

ポイントは、①のデータ送信ペース(10~60秒)に比べて、②の処理ペース(60秒)は遅いという点です。そのため次第にSQSキューに未処理のメッセージが溜まります。そこで②のタスク数をスケーリングさせて対応します。SQSキューの長さをターゲットとします。今回は3を閾値とします。②のタスクは、デフォルトでは1つのコンテナが動作するものとし、最大3つまでスケールアウトさせることとします。なお上記のロジックは、Pythonにて実装します。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。以下のURLにCloudFormationテンプレートを配置してます。

https://github.com/awstut-an-r/awstut-saa/tree/main/01/002

テンプレートファイルのポイント解説

Fargateの基本に関しては、以下のページをご確認ください。

あわせて読みたい
CloudFormationでFargate入門 【CloudFormationでFargateに入門するための構成】 AWS FargateはサーバーレスでDockerコンテナを実行することができるサービスです。今回はFargate入門ということで、C...

プライベートサブネットにFargateを作成する方法については、以下のページをご確認ください。

あわせて読みたい
プライベートサブネットにECS(Fargate)を作成する 【プライベートサブネットにECS(Fargate)を作成する】 以下のページでFargateタイプのECSコンテナを作成する方法をご紹介しました。 https://awstut.com/2022/01/25/int...

FargateコンテナのログをCloudWatch Logsに配信する方法ついては、以下のページをご確認ください。

あわせて読みたい
プライベートサブネットのFargateコンテナのログをCloudWatch Logsに配信する 【プライベートサブネットのECS(Fargate)コンテナのログをCloudWatch Logsに配信する】 ECSタスク内で実行されているコンテナのログを収集する方法の1つに、CloudWatch ...

ECSタスクのスケーリングの基本については、以下のページをご確認ください。

あわせて読みたい
プライベートサブネット内のFargateコンテナをALBにアタッチ 【プライベートサブネット内のFargateコンテナをALBにアタッチするための構成】 以下のページで、ALBの3種類のターゲットタイプを確認しました。 https://awstut.com/20...

SQSの基本については、以下のページもご確認ください。

あわせて読みたい
SQS入門 Lambda同士をデータ連携する 【SQSを使用してLambda間でデータ連携する構成】 SQSを使用することで、リソース間の結合を疎の状態に保ちつつ、連携することができます。今回はSQSを使用して、3つのLa...

本ページでは、上記のページに含まれない、または異なる点のみ取り上げます。

Environmentプロパティで環境変数をコンテナに渡す

タスク定義を作成します。

Resources:
  TaskDefinition1:
    Type: AWS::ECS::TaskDefinition
    Properties:
      RequiresCompatibilities:
        - FARGATE
      Cpu: !Ref ServiceCpu
      Memory: !Ref ServiceMemory
      NetworkMode: awsvpc
      ExecutionRoleArn: !Ref FargateTaskExecutionRole
      TaskRoleArn: !Ref TaskRole1
      ContainerDefinitions:
        - Name: !Sub "${ServiceName}-container"
          Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/${Repository}:latest"
          LogConfiguration:
            LogDriver: awslogs
            Options:
              awslogs-group: !Ref LogGroup
              awslogs-region: !Ref AWS::Region
              awslogs-stream-prefix: !Ref ServiceName
          Environment:
            - Name: QueueName
              Value: !Ref QueueName
            - Name: RegionName
              Value: !Ref AWS::Region
            - Name: SQSEndpointUrl
              Value: !Ref SQSEndpointUrl
Code language: YAML (yaml)

ポイントは環境変数に関する設定です。Environmentプロパティで、タスク上で実行するコンテナに渡す環境変数を設定することができます。今回はSQSキューの名前や、リージョン名、SQSエンドポイントのURLを変数として設定します。

タスク1上で実行するコンテナイメージですが、以下のDockerfileで作成しました。

ROM python:3

RUN pip install boto3

COPY main.py ./

CMD ["python3", "main.py"]
Code language: Dockerfile (dockerfile)

Python3の公式イメージをベースとして、boto3をインストール後、実行するスクリプトをコピーし、同スクリプトを実行するという内容です。以下がmain.pyの中身です。

import boto3
import datetime
import logging
import os
import random
import time

logging.basicConfig(level=logging.INFO)

queue_name = os.environ['QueueName']
region_name = os.environ['RegionName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']

sqs = boto3.resource('sqs',
    region_name=region_name,
    endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)

def main():
    while True:
        now = datetime.datetime.now()
        now_str = now.strftime('%Y%m%d%H%M%S%f')
        epoch_time = int(time.mktime(now.timetuple()))

        logging.info(now_str)
        logging.info(epoch_time)

        messages = [{
            'Id': str(now_str),
            'MessageBody': str(epoch_time)
        }]
        response = queue.send_messages(Entries=messages)

        time.sleep(random.randint(10, 60))


if __name__ == '__main__':
    main()
Code language: Python (python)

os.environオブジェクトを参照して、先ほどタスク定義で指定した環境変数を取得します。boto3.resourceオブジェクト作成時の引数がポイントです。endpoint_urlを指定する必要があります。今回の構成では、VPCエンドポイント経由でSQSにアクセスするためです。SQS用のエンドポイントURL(https://sqs.ap-northeast-1.amazonaws.com)を渡します。
現在日時を取得後、time.mktimeでUNIX時間を取得します。取得したUNIX時間をログに出力後、send_messagesメソッドでメッセージを送信します。データを送信後、time.sleepでランダムに10〜60秒待機してから、最初の処理に戻ります。

SQSキューの長さでECSタスク数をスケーリングする

ECSタスクをスケーリングするためのポイントは、スケーリングポリシーです。

Resources:
  Service2ScalingPolicy:
    Type: AWS::ApplicationAutoScaling::ScalingPolicy
    DependsOn:
      - Service2
      - Service2ScalableTarget
    Properties:
      PolicyName: Service2ScalingPolicy
      PolicyType: TargetTrackingScaling
      ScalingTargetId: !Ref Service2ScalableTarget
      TargetTrackingScalingPolicyConfiguration:
        TargetValue: 3
        CustomizedMetricSpecification:
          Dimensions:
            - Name: QueueName
              Value: !Ref QueueName
          MetricName: ApproximateNumberOfMessagesVisible
          Namespace: AWS/SQS
          Statistic: Average
          Unit: Count
        DisableScaleIn: false
        ScaleInCooldown: 0
        ScaleOutCooldown: 0
Code language: YAML (yaml)

キューの長さをスケーリングの条件としますので、PolicyTypeプロパティにターゲット追跡スケーリングポリシーを意味する「TargetTrackingScaling」を指定します。TargetTrackingScalingPolicyConfigurationプロパティにて、ターゲットや追跡するメトリクスを定義します。Namespaceプロパティに「AWS/SQS」、MetricNameプロパティに「ApproximateNumberOfMessagesVisible」、Dimensionsプロパティに定義済みのキューを指定することで、SQSキューの未処理データの数をターゲットとします。Statisticプロパティに「Average」、Unitプロパティに「Count」、TargetValueプロパティを「3」とすることで、特定の期間に測定した未処理データ数の平均値を取得し、3を基準として、スケールアウト/インを実行します。

サービス2で実行するコンテナを構築するためのDockerfileは、先述のものと同様です。以下がサービス2のタスクで実行するスクリプト(main.py)の中身です。

import boto3
import logging
import os
import time


logging.basicConfig(level=logging.INFO)

interval = int(os.environ['Interval'])
queue_name = os.environ['QueueName']
region_name = os.environ['RegionName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']

sqs = boto3.resource('sqs',
    region_name=region_name,
    endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)

def is_even(num):
    if num % 2 == 0:
        return True
    return False

def main():
    while True:
        messages = queue.receive_messages(MaxNumberOfMessages=1)

        for msg in messages:
            logging.info(msg.body)
            num = int(msg.body)

            if is_even(num):
                logging.info('{num} is even.'.format(num=num))
            else:
                logging.info('{num} is odd.'.format(num=num))

            time.sleep(interval)

            msg.delete()


if __name__ == '__main__':
    main()
Code language: Python (python)

receive_messagesメソッドでキューからデータを取得します。偶数かどうかを判定する自作関数(is_even)の結果に応じて、出力するログを変更します。最後にtime.sleep関数で60秒待機した後、キューからデータを削除し、最初の処理に戻ります。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。今回は2回に分けてCloudFormationスタックを作成します。

ECRリポジトリを作成する

まずECRリポジトリを作成します。詳細は別ページをご確認ください。

残りのCloudFormationスタックを作成する

CloudFormationスタックを作成します。スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • SQSキュー:saa-01-002-queue
  • ECSクラスター:saa-01-002-cluster
  • ECSサービス1:saa-01-002-service1
  • ECSサービス2:saa-01-002-service2
  • ECSタスク1用のCloudWatch Logsロググループ名:saa-01-002-LogGroup1
  • ECSタスク2用のCloudWatch Logsロググループ名:saa-01-002-LogGroup2

作成されたリソースをAWS Management Consoleから確認します。まずSQSキューを確認します。

SQS Queue Overview

正常にキューが作成されていることがわかります。次にECSクラスターです。

Two services have been created in the cluster.

クラスター上に、2つのECSサービスが作成されていることが確認できます。

動作確認:サービス1

準備が整いましたので、実際に挙動を確認します。まずサービス1です。

A single task is running on service1.

1つのタスクが動作していることがわかります。次にCloudWatch Logsから、動作状況を確認します。

The container for service1 is working fine.

現在日時とUNIX時間が交互に出力されています。このことから、サービス1上のコンテナは正常に動作していると言えます。

動作確認:サービス2

次にサービス2側の状況を確認します。

A single task is running on servic2.

こちらも1つのタスクが動作していることが確認できます。次にCloudWatch Logsから、動作状況を確認します。

The container for service2 is working fine.

サービス1側で生成されたUNIX時間の偶数・奇数判定結果が出力されています。このことから、SQSを介して、2つのECSサービス上のコンテナがデータ連携できているということがわかります。

動作確認:SQSキュー – メッセージ数増加

しばらく待機した後、SQSの詳細を確認します。

SQS queue length increased

スケーリングの追跡ターゲットであるApproximate Number Of Messages Visibleの値が、閾値の3を超えてきました。サービス2が処理するスピードを上まるペースで、サービス1がメッセージを送信しているためです。

動作確認:サービス2 – スケールアウト

先ほど確認した通り、スケールアウトの条件を満たしましたので、スケールアウトが開始されます。

Start scale-out
The number of tasks has been scaled out to three.

スケーリングのログが示す通り、段階的にタスク数が増加し、上限の3つまで増えました。これで安定的にキュー内のメッセージを消化することができます。

動作確認:SQSキュー – メッセージ数減少

タスク数が3つに増えたことで、今度はサービス1がメッセージを送信するスピードを上回るペースで、サービス2が処理を行います。すると次第にSQSキューの長さが短くなります。

Decrease the length of the SQS queue

Approximate Number Of Messages Visibleの値が、閾値の3を下回りました。

動作確認:サービス2 – スケールイン

先ほど確認した通り、スケールインの条件を満たしましたので、スケールインが開始されます。

Start scale-in
The number of tasks has been scaled in to one.

スケーリングのログが示す通り、段階的にタスク数が減少し、下限の1つに戻りました。このようにSQSキューの長さに応じて、自動的にタスク数を増減できることができます。

まとめ

SQSを使用して、Fargateコンテナ間でデータ連携を行いました。

SQSキューの長さに応じて、ECSタスク数をスケーリングすることによって、コスト効率・弾力性の高いデータ連携が可能となります。

目次