SQSを使用してFargateコンテナ間でデータ連携する構成
AWS SAAの出題範囲の1つでもある、高弾力性に関連する内容です。
SQSを使用することで、Fargateコンテナ間の結合を疎の状態に保ちつつ、連携することができます。
今回は2つのFargateタイプのサービス上で動作するコンテナ間を、SQSを介して連携させます。
またSQSキューの長さに応じて、Fargateタスク数をスケールアウトさせる方法も確認します。
構築する環境
ECSクラスター上に、2つのFargateタイプのECSサービスおよびタスクを構築します。1つ目のサービス上で動作するコンテナは、SQSにメッセージを送信します。2つ目のサービス上で動作するコンテナは、SQSからメッセージを受信し、処理を行った後、削除します。またサービス2はスケーリングの設定を行います。SQSキューの長さに応じてタスク数を増減するようにします。
今回構築する各コンテナの働きを以下にまとめます。
- サービス1で実行するコンテナ(①):ランダム(10~60秒ごと)に現在時刻のUNIX時間をSQSに送信する。
- サービス2で実行するコンテナ(②):定期的にSQSにポーリングしてデータを取得後、60秒かけてUNIX時間が偶数か奇数かを判定し、その結果をCloudWatch Logsに書き込む。
ポイントは、①のデータ送信ペース(10~60秒)に比べて、②の処理ペース(60秒)は遅いという点です。そのため次第にSQSキューに未処理のメッセージが溜まります。そこで②のタスク数をスケーリングさせて対応します。SQSキューの長さをターゲットとします。今回は3を閾値とします。②のタスクは、デフォルトでは1つのコンテナが動作するものとし、最大3つまでスケールアウトさせることとします。なお上記のロジックは、Pythonにて実装します。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。以下のURLにCloudFormationテンプレートを配置してます。
https://github.com/awstut-an-r/awstut-saa/tree/main/01/002
テンプレートファイルのポイント解説
Fargateの基本に関しては、以下のページをご確認ください。
プライベートサブネットにFargateを作成する方法については、以下のページをご確認ください。
FargateコンテナのログをCloudWatch Logsに配信する方法ついては、以下のページをご確認ください。
ECSタスクのスケーリングの基本については、以下のページをご確認ください。
SQSの基本については、以下のページもご確認ください。
本ページでは、上記のページに含まれない、または異なる点のみ取り上げます。
Environmentプロパティで環境変数をコンテナに渡す
タスク定義を作成します。
Resources:
TaskDefinition1:
Type: AWS::ECS::TaskDefinition
Properties:
RequiresCompatibilities:
- FARGATE
Cpu: !Ref ServiceCpu
Memory: !Ref ServiceMemory
NetworkMode: awsvpc
ExecutionRoleArn: !Ref FargateTaskExecutionRole
TaskRoleArn: !Ref TaskRole1
ContainerDefinitions:
- Name: !Sub "${ServiceName}-container"
Image: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/${Repository}:latest"
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-group: !Ref LogGroup
awslogs-region: !Ref AWS::Region
awslogs-stream-prefix: !Ref ServiceName
Environment:
- Name: QueueName
Value: !Ref QueueName
- Name: RegionName
Value: !Ref AWS::Region
- Name: SQSEndpointUrl
Value: !Ref SQSEndpointUrl
Code language: YAML (yaml)
ポイントは環境変数に関する設定です。Environmentプロパティで、タスク上で実行するコンテナに渡す環境変数を設定することができます。今回はSQSキューの名前や、リージョン名、SQSエンドポイントのURLを変数として設定します。
タスク1上で実行するコンテナイメージですが、以下のDockerfileで作成しました。
ROM python:3
RUN pip install boto3
COPY main.py ./
CMD ["python3", "main.py"]
Code language: Dockerfile (dockerfile)
Python3の公式イメージをベースとして、boto3をインストール後、実行するスクリプトをコピーし、同スクリプトを実行するという内容です。以下がmain.pyの中身です。
import boto3
import datetime
import logging
import os
import random
import time
logging.basicConfig(level=logging.INFO)
queue_name = os.environ['QueueName']
region_name = os.environ['RegionName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']
sqs = boto3.resource('sqs',
region_name=region_name,
endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)
def main():
while True:
now = datetime.datetime.now()
now_str = now.strftime('%Y%m%d%H%M%S%f')
epoch_time = int(time.mktime(now.timetuple()))
logging.info(now_str)
logging.info(epoch_time)
messages = [{
'Id': str(now_str),
'MessageBody': str(epoch_time)
}]
response = queue.send_messages(Entries=messages)
time.sleep(random.randint(10, 60))
if __name__ == '__main__':
main()
Code language: Python (python)
os.environオブジェクトを参照して、先ほどタスク定義で指定した環境変数を取得します。boto3.resourceオブジェクト作成時の引数がポイントです。endpoint_urlを指定する必要があります。今回の構成では、VPCエンドポイント経由でSQSにアクセスするためです。SQS用のエンドポイントURL(https://sqs.ap-northeast-1.amazonaws.com)を渡します。
現在日時を取得後、time.mktimeでUNIX時間を取得します。取得したUNIX時間をログに出力後、send_messagesメソッドでメッセージを送信します。データを送信後、time.sleepでランダムに10〜60秒待機してから、最初の処理に戻ります。
SQSキューの長さでECSタスク数をスケーリングする
ECSタスクをスケーリングするためのポイントは、スケーリングポリシーです。
Resources:
Service2ScalingPolicy:
Type: AWS::ApplicationAutoScaling::ScalingPolicy
DependsOn:
- Service2
- Service2ScalableTarget
Properties:
PolicyName: Service2ScalingPolicy
PolicyType: TargetTrackingScaling
ScalingTargetId: !Ref Service2ScalableTarget
TargetTrackingScalingPolicyConfiguration:
TargetValue: 3
CustomizedMetricSpecification:
Dimensions:
- Name: QueueName
Value: !Ref QueueName
MetricName: ApproximateNumberOfMessagesVisible
Namespace: AWS/SQS
Statistic: Average
Unit: Count
DisableScaleIn: false
ScaleInCooldown: 0
ScaleOutCooldown: 0
Code language: YAML (yaml)
キューの長さをスケーリングの条件としますので、PolicyTypeプロパティにターゲット追跡スケーリングポリシーを意味する「TargetTrackingScaling」を指定します。TargetTrackingScalingPolicyConfigurationプロパティにて、ターゲットや追跡するメトリクスを定義します。Namespaceプロパティに「AWS/SQS」、MetricNameプロパティに「ApproximateNumberOfMessagesVisible」、Dimensionsプロパティに定義済みのキューを指定することで、SQSキューの未処理データの数をターゲットとします。Statisticプロパティに「Average」、Unitプロパティに「Count」、TargetValueプロパティを「3」とすることで、特定の期間に測定した未処理データ数の平均値を取得し、3を基準として、スケールアウト/インを実行します。
サービス2で実行するコンテナを構築するためのDockerfileは、先述のものと同様です。以下がサービス2のタスクで実行するスクリプト(main.py)の中身です。
import boto3
import logging
import os
import time
logging.basicConfig(level=logging.INFO)
interval = int(os.environ['Interval'])
queue_name = os.environ['QueueName']
region_name = os.environ['RegionName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']
sqs = boto3.resource('sqs',
region_name=region_name,
endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)
def is_even(num):
if num % 2 == 0:
return True
return False
def main():
while True:
messages = queue.receive_messages(MaxNumberOfMessages=1)
for msg in messages:
logging.info(msg.body)
num = int(msg.body)
if is_even(num):
logging.info('{num} is even.'.format(num=num))
else:
logging.info('{num} is odd.'.format(num=num))
time.sleep(interval)
msg.delete()
if __name__ == '__main__':
main()
Code language: Python (python)
receive_messagesメソッドでキューからデータを取得します。偶数かどうかを判定する自作関数(is_even)の結果に応じて、出力するログを変更します。最後にtime.sleep関数で60秒待機した後、キューからデータを削除し、最初の処理に戻ります。
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。今回は2回に分けてCloudFormationスタックを作成します。
ECRリポジトリを作成する
まずECRリポジトリを作成します。詳細は別ページをご確認ください。
残りのCloudFormationスタックを作成する
CloudFormationスタックを作成します。スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。
- SQSキュー:saa-01-002-queue
- ECSクラスター:saa-01-002-cluster
- ECSサービス1:saa-01-002-service1
- ECSサービス2:saa-01-002-service2
- ECSタスク1用のCloudWatch Logsロググループ名:saa-01-002-LogGroup1
- ECSタスク2用のCloudWatch Logsロググループ名:saa-01-002-LogGroup2
作成されたリソースをAWS Management Consoleから確認します。まずSQSキューを確認します。
正常にキューが作成されていることがわかります。次にECSクラスターです。
クラスター上に、2つのECSサービスが作成されていることが確認できます。
動作確認:サービス1
準備が整いましたので、実際に挙動を確認します。まずサービス1です。
1つのタスクが動作していることがわかります。次にCloudWatch Logsから、動作状況を確認します。
現在日時とUNIX時間が交互に出力されています。このことから、サービス1上のコンテナは正常に動作していると言えます。
動作確認:サービス2
次にサービス2側の状況を確認します。
こちらも1つのタスクが動作していることが確認できます。次にCloudWatch Logsから、動作状況を確認します。
サービス1側で生成されたUNIX時間の偶数・奇数判定結果が出力されています。このことから、SQSを介して、2つのECSサービス上のコンテナがデータ連携できているということがわかります。
動作確認:SQSキュー – メッセージ数増加
しばらく待機した後、SQSの詳細を確認します。
スケーリングの追跡ターゲットであるApproximate Number Of Messages Visibleの値が、閾値の3を超えてきました。サービス2が処理するスピードを上まるペースで、サービス1がメッセージを送信しているためです。
動作確認:サービス2 – スケールアウト
先ほど確認した通り、スケールアウトの条件を満たしましたので、スケールアウトが開始されます。
スケーリングのログが示す通り、段階的にタスク数が増加し、上限の3つまで増えました。これで安定的にキュー内のメッセージを消化することができます。
動作確認:SQSキュー – メッセージ数減少
タスク数が3つに増えたことで、今度はサービス1がメッセージを送信するスピードを上回るペースで、サービス2が処理を行います。すると次第にSQSキューの長さが短くなります。
Approximate Number Of Messages Visibleの値が、閾値の3を下回りました。
動作確認:サービス2 – スケールイン
先ほど確認した通り、スケールインの条件を満たしましたので、スケールインが開始されます。
スケーリングのログが示す通り、段階的にタスク数が減少し、下限の1つに戻りました。このようにSQSキューの長さに応じて、自動的にタスク数を増減できることができます。
まとめ
SQSを使用して、Fargateコンテナ間でデータ連携を行いました。
SQSキューの長さに応じて、ECSタスク数をスケーリングすることによって、コスト効率・弾力性の高いデータ連携が可能となります。