Step FunctionsのMapステートを使用した反復処理
Mapステートを使用すると、反復処理を行うことができます。
Mapステートを使用して、データセット内の各項目に対して一連のワークフローステップを実行します。Mapステートの反復処理はparallel 実行されるため、データセットを迅速に処理できます。
マップ
今回はMapステートを使用したStep Functionsステートマシンを構築します。
構築する環境
Step Functionsステートマシンを作成します。
ステートマシンは大別すると2つのステートで構成されます。
- 1つ目のステート:組み込み関数States.ArrayRangeで配列を作成する。
- 2つ目のステート:前ステートで生成した配列内の各数値を処理するMapステート。
Mapステートは以下の2つのサブステートから構成されます。
- 1つ目のサブステート:Lambda関数を使用して、引数で受け取った数値を2乗して返す。
- 2つ目のサブステート:Lambda関数を使用して、引数で受け取った数値を2倍して返す。
関数のランタイム環境はPython3.8とします。
CloudFormationテンプレートファイル
上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。
https://github.com/awstut-an-r/awstut-fa/tree/main/121
テンプレートファイルのポイント解説
本ページは、Step FunctionsのMapステートを中心に取り上げます。
Step Functionsステートマシンの作成方法については、以下のページをご確認ください。
ステートマシン
Resources:
StateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
Definition:
Comment: !Sub "${Prefix}-StateMachine"
StartAt: FirstState
States:
FirstState:
Type: Pass
Parameters:
numbers.$: States.ArrayRange(0, 9, 1)
Next: MapState
MapState:
Type: Map
MaxConcurrency: 5
InputPath: $.numbers
ItemSelector:
number-origin.$: $$.Map.Item.Value
ItemProcessor:
ProcessorConfig:
Mode: INLINE
StartAt: SecondState
States:
SecondState:
Type: Task
Resource: !Ref Function1Arn
Parameters:
number.$: $.number-origin
ResultPath: $.number-squared
Next: LastState
LastState:
Type: Task
Resource: !Ref Function2Arn
Parameters:
number.$: $.number-squared
ResultPath: $.number-doubled
End: true
End: true
LoggingConfiguration:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt LogGroup.Arn
IncludeExecutionData: true
Level: ALL
RoleArn: !GetAtt StateMachineRole.Arn
StateMachineName: !Ref Prefix
StateMachineType: STANDARD
Code language: YAML (yaml)
最初のステップ
Mapステートを検証するためのテストデータを生成します。
テストデータの生成には、組み込み関数States.ArrayRangeを使用します。
今回は「States.ArrayRange(0, 9, 1)」とすることで、10個の数値を生成します。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Code language: plaintext (plaintext)
この関数の使用方法については、以下のページをご確認ください。
Parametersプロパティに「numbers.$: States.ArrayRange(0, 9, 1)」を設定することで、生成した配列をnumbersにセットします。
具体的には、以下のようなデータが生成されます。
{
"numbers": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
}
Code language: plaintext (plaintext)
Mapステート
MaxConcurrencyプロパティで同時実行数を設定できます。
今回は「5」を指定することによって、10個のデータを2回に分けて処理することになります。
InputPathプロパティで、受け取るデータを設定できます。
今回は「$.numbers」を指定することで、以下のデータを反復処理の対象とします。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Code language: plaintext (plaintext)
ItemSelectorプロパティで、1つの処理におけるデータの形式を指定できます。
「number-origin.$: $$.Map.Item.Value」とすることで、例えば以下のデータが生成されます。
{
"number-origin": 0
}
Code language: plaintext (plaintext)
ItemProcessorプロパティでMapステートの詳細を設定できます。
ProcessorConfigプロパティでMapステートのモードを設定することができます。
詳細については以下のページでご確認いただけますが、今回はインラインモードを指定します。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-inline-vs-distributed-map.html
Statesプロパティで、Mapステート内のサブステートを定義できます。
1つ目のサブステート
Parametersプロパティで「number.$: $.number-origin」とすることで、以下のようにnumberに数値をセットします。
{
"number": 0
}
Code language: plaintext (plaintext)
ResourceプロパティにLambda関数を指定することで、渡された数値の2乗を計算します。
Resources:
Function1:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
def lambda_handler(event, context):
num = event['number']
num_squared = num ** 2
return num_squared
FunctionName: !Sub "${Prefix}-function-01"
Handler: !Ref Handler
Runtime: !Ref Runtime
Role: !GetAtt FunctionRole.Arn
Code language: YAML (yaml)
Lambda関数で実行するコードをインライン形式で記載します。
詳細につきましては、以下のページをご確認ください。
先ほどParametersプロパティで設定した通り、eventオブジェクトのnumberにアクセスして、引数を受け取ります。
引数として受け取った値を2乗して返します。
ResultPathプロパティで「$.number-squared」と設定することによって、以下の通り、先述の関数の結果をnumber-squaredにセットできます。
{
"number-squared": 0
}
Code language: plaintext (plaintext)
2つ目のサブステート
基本的に1つ目のサブステートと同様です。
Resourceプロパティで指定するLambda関数は、渡された数値の2倍を計算します。
Resources:
Function2:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
def lambda_handler(event, context):
num= event['number']
num_doubled = num * 2
return num_doubled
FunctionName: !Sub "${Prefix}-function-02"
Handler: !Ref Handler
Runtime: !Ref Runtime
Role: !GetAtt FunctionRole.Arn
Code language: YAML (yaml)
ResultPathプロパティで「$.number-doubled」と設定することによって、以下の通り、先述の関数の結果をnumber-doubledにセットできます。
{
"number-doubled": 0
}
Code language: plaintext (plaintext)
環境構築
CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。
CloudFormationスタックを作成し、スタック内のリソースを確認する
CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。
各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。
- Step Functionsステートマシン:fa-121
- Lambda関数1:fa-121-function-01
- Lambda関数2:fa-122-function-02
AWS Management Consoleから作成されたリソースを確認します。
ステートマシンを確認します。
正常に作成されています。
Mapステートを含むステートマシンが作成されていることがわかります。
Mapステート内には、2つのサブステートがあり、それぞれLambda関数を呼び出すように設定されています。
Lambda関数を確認します。
確かに2つの関数が作成されています。
動作確認
準備が整いましたので、ステートマシンを実行します。
ステートマシンの動作が開始します。
Mapステートで並列処理が行われています。
しばらく待つと、ステートマシンの実行が正常に完了します。
実行後の出力を確認します。
0〜9の数字が2つの関数で処理された結果が確認できます。
このようにMapステートを使用すると、並列処理を実行することができます。
まとめ
Mapステートを使用したStep Functionsステートマシンを構築しました。