AWS

Step FunctionsのMapを使用した反復処理

Step FunctionsのMapステートを使用した反復処理

Mapステートを使用すると、反復処理を行うことができます。

Mapステートを使用して、データセット内の各項目に対して一連のワークフローステップを実行します。Mapステートの反復処理はparallel 実行されるため、データセットを迅速に処理できます。

マップ

今回はMapステートを使用したStep Functionsステートマシンを構築します。

構築する環境

Diagram of iteration using Map in Step Functions

Step Functionsステートマシンを作成します。
ステートマシンは大別すると2つのステートで構成されます。

  • 1つ目のステート:組み込み関数States.ArrayRangeで配列を作成する。
  • 2つ目のステート:前ステートで生成した配列内の各数値を処理するMapステート。

Mapステートは以下の2つのサブステートから構成されます。

  • 1つ目のサブステート:Lambda関数を使用して、引数で受け取った数値を2乗して返す。
  • 2つ目のサブステート:Lambda関数を使用して、引数で受け取った数値を2倍して返す。

関数のランタイム環境はPython3.8とします。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。

awstut-fa/121 at main · awstut-an-r/awstut-fa
Contribute to awstut-an-r/awstut-fa development by creating an account on GitHub.

テンプレートファイルのポイント解説

本ページは、Step FunctionsのMapステートを中心に取り上げます。

Step Functionsステートマシンの作成方法については、以下のページをご確認ください。

ステートマシン

Resources:
  StateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      Definition:
        Comment: !Sub "${Prefix}-StateMachine"
        StartAt: FirstState
        States:
          FirstState:
            Type: Pass
            Parameters:
              numbers.$: States.ArrayRange(0, 9, 1)
            Next: MapState
          MapState:
            Type: Map
            MaxConcurrency: 5
            InputPath: $.numbers
            ItemSelector:
              number-origin.$: $$.Map.Item.Value
            ItemProcessor:
              ProcessorConfig:
                Mode: INLINE
              StartAt: SecondState
              States:
                SecondState:
                  Type: Task
                  Resource: !Ref Function1Arn
                  Parameters:
                    number.$: $.number-origin
                  ResultPath: $.number-squared
                  Next: LastState
                LastState:
                  Type: Task
                  Resource: !Ref Function2Arn
                  Parameters:
                    number.$: $.number-squared
                  ResultPath: $.number-doubled
                  End: true
            End: true
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroup.Arn
        IncludeExecutionData: true
        Level: ALL
      RoleArn: !GetAtt StateMachineRole.Arn
      StateMachineName: !Ref Prefix
      StateMachineType: STANDARD
Code language: YAML (yaml)

最初のステップ

Mapステートを検証するためのテストデータを生成します。

テストデータの生成には、組み込み関数States.ArrayRangeを使用します。
今回は「States.ArrayRange(0, 9, 1)」とすることで、10個の数値を生成します。

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Code language: plaintext (plaintext)

この関数の使用方法については、以下のページをご確認ください。

組み込み関数 - AWS Step Functions
組み込み関数を使用して、配列操作、データのエンコードとデコード、ハッシュ計算、JSON データ操作、数学関数演算、一意識別子の生成などの、基本的なデータ処理タスクの実行方法を学びます。

Parametersプロパティに「numbers.$: States.ArrayRange(0, 9, 1)」を設定することで、生成した配列をnumbersにセットします。
具体的には、以下のようなデータが生成されます。

{
"numbers": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
}
Code language: plaintext (plaintext)

Mapステート

MaxConcurrencyプロパティで同時実行数を設定できます。
今回は「5」を指定することによって、10個のデータを2回に分けて処理することになります。

InputPathプロパティで、受け取るデータを設定できます。
今回は「$.numbers」を指定することで、以下のデータを反復処理の対象とします。

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Code language: plaintext (plaintext)

ItemSelectorプロパティで、1つの処理におけるデータの形式を指定できます。
「number-origin.$: $$.Map.Item.Value」とすることで、例えば以下のデータが生成されます。

{
"number-origin": 0
}
Code language: plaintext (plaintext)

ItemProcessorプロパティでMapステートの詳細を設定できます。

ProcessorConfigプロパティでMapステートのモードを設定することができます。
詳細については以下のページでご確認いただけますが、今回はインラインモードを指定します。

ERROR: The request could not be satisfied

Statesプロパティで、Mapステート内のサブステートを定義できます。

1つ目のサブステート

Parametersプロパティで「number.$: $.number-origin」とすることで、以下のようにnumberに数値をセットします。

{
"number": 0
}
Code language: plaintext (plaintext)

ResourceプロパティにLambda関数を指定することで、渡された数値の2乗を計算します。

Resources:
  Function1:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          def lambda_handler(event, context):
            num = event['number']
            num_squared = num ** 2
            
            return num_squared
      FunctionName: !Sub "${Prefix}-function-01"
      Handler: !Ref Handler
      Runtime: !Ref Runtime
      Role: !GetAtt FunctionRole.Arn
Code language: YAML (yaml)

Lambda関数で実行するコードをインライン形式で記載します。
詳細につきましては、以下のページをご確認ください。

先ほどParametersプロパティで設定した通り、eventオブジェクトのnumberにアクセスして、引数を受け取ります。
引数として受け取った値を2乗して返します。

ResultPathプロパティで「$.number-squared」と設定することによって、以下の通り、先述の関数の結果をnumber-squaredにセットできます。

{
"number-squared": 0
}
Code language: plaintext (plaintext)

2つ目のサブステート

基本的に1つ目のサブステートと同様です。

Resourceプロパティで指定するLambda関数は、渡された数値の2倍を計算します。

Resources:
  Function2:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          def lambda_handler(event, context):
            num= event['number']
            num_doubled = num * 2
            
            return num_doubled
      FunctionName: !Sub "${Prefix}-function-02"
      Handler: !Ref Handler
      Runtime: !Ref Runtime
      Role: !GetAtt FunctionRole.Arn
Code language: YAML (yaml)

ResultPathプロパティで「$.number-doubled」と設定することによって、以下の通り、先述の関数の結果をnumber-doubledにセットできます。

{
"number-doubled": 0
}
Code language: plaintext (plaintext)

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

CloudFormationスタックを作成し、スタック内のリソースを確認する

CloudFormationスタックを作成します。
スタックの作成および各スタックの確認方法については、以下のページをご確認ください。

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • Step Functionsステートマシン:fa-121
  • Lambda関数1:fa-121-function-01
  • Lambda関数2:fa-122-function-02

AWS Management Consoleから作成されたリソースを確認します。

ステートマシンを確認します。

Detail of Step Functions 1.

正常に作成されています。
Mapステートを含むステートマシンが作成されていることがわかります。
Mapステート内には、2つのサブステートがあり、それぞれLambda関数を呼び出すように設定されています。

Lambda関数を確認します。

Detail of Lambda 1.
Detail of Lambda 2.

確かに2つの関数が作成されています。

動作確認

準備が整いましたので、ステートマシンを実行します。

Detail of Step Functions 2.
Detail of Step Functions 3.

ステートマシンの動作が開始します。

Detail of Step Functions 4.

Mapステートで並列処理が行われています。

しばらく待つと、ステートマシンの実行が正常に完了します。

Detail of Step Functions 5.

実行後の出力を確認します。

Detail of Step Functions 6.

0〜9の数字が2つの関数で処理された結果が確認できます。

このようにMapステートを使用すると、並列処理を実行することができます。

まとめ

Mapステートを使用したStep Functionsステートマシンを構築しました。

タイトルとURLをコピーしました