CFNカスタムリソースでOpenSearchインデックス作成を自動化する

CloudFormationカスタムリソースを使って、OpenSearchインデックス作成用のドキュメントアップロードを自動化する

CloudFormationカスタムリソースはスタック操作(作成、更新、削除)時に、任意のアクションを実行できるというものです。

今回はカスタムリソースを使って、OpenSearchドメインを作成時に、自動的にS3バケットに設置されているJSONファイルをアップロードして、インデックスを作成します。

構築する環境

Diagram of automate OpenSearch indexing with CFN Custom Resources.

CloudFormationスタックを作成し、その内部に2つのリソースを定義します。

1つ目はOpenSearchです。
今回はマスターユーザーを作成し、ユーザー情報で認証を行います。

2つ目はLambda関数です。
関数のランタイムはPython3.8とします。
この関数をカスタムリソースに関連付けて、スタック作成時に実行されるように設定します。
この関数の働きは、S3バケットに保存されているJSONファイルを取得し、OpenSearchにアップロードすることです。

AWS公式では、curlコマンドを使用してJSONファイルをアップロードする方法が詳細されています。

https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/gsgupload-data.html

今回はPythonモジュールrequestsを使用して、ファイルアップロードを実装します。

CloudFormationテンプレートファイル

上記の構成をCloudFormationで構築します。
以下のURLにCloudFormationテンプレートを配置しています。

https://github.com/awstut-an-r/awstut-fa/tree/main/057

テンプレートファイルのポイント解説

今回はカスタムリソースを使用して、OpenSearchにファイルをアップロードする方法を中心に取り上げます。

OpenSearch作成に関する基本的な事項については、以下のページをご確認ください。

あわせて読みたい
CFNでOpenSearch入門 【CloudFormationでOpenSearchに入門する構成】 OpenSearchはElasticsearchからフォークして作成された検索および分析スイートです。今回は入門ということで、CloudForm...

カスタムリソースに関しては、以下のページをご確認ください。

あわせて読みたい
CloudFormationカスタムリソース入門 【CloudFormationカスタムリソースの挙動を確認する構成】 CloudFormationの機能の1つにカスタムリソースがあります。 カスタムリソースを使用すると、テンプレートにカ...

カスタムリソース用Lambda関数

Resources:
  Function:
    Type: AWS::Lambda::Function
    Properties:
      Environment:
        Variables:
          BULK_ENDPOINT: !Sub "https://${DomainEndpoint}/_bulk"
          BULK_S3_BUCKET: !Ref BulkS3Bucket
          BULK_S3_KEY: !Ref BulkS3Key
          MASTER_USERNAME: !Ref MasterUserName
          MASTER_PASSWORD: !Ref MasterUserPassword
      Code:
        S3Bucket: !Ref CodeS3Bucket
        S3Key: !Ref CodeS3Key
      FunctionName: !Sub "${Prefix}-function"
      Handler: !Ref Handler
      Layers:
        - !Ref LambdaLayer
      Runtime: !Ref Runtime
      Role: !GetAtt FunctionRole.Arn
      Timeout: !Ref Timeout
Code language: YAML (yaml)

特別な設定は不要です。
強いてポイントを挙げるとすれば、環境変数の設定(Environmentプロパティ)です。
環境変数を設定することで、CloudFormationテンプレートから関数に変数を渡すことができます。

今回は5つの変数を定義します。
BULK_ENDPOINTはOpenSearchドメインにおけるドキュメントをアップロードするエンドポイントです。
ドメインエンドポイントURLに「/_bulk」を追加することで、このURLがアップロードエンドポイントとなります。

BULK_S3_BUCKETおよびBULK_S3_KEYはアップロードするドキュメントに関する変数です。
前者はドキュメントが設置されているS3バケットの名前、後者はドキュメントの名前(キー)です。

MASTER_USERNAMEおよびMASTER_PASSWORDはOpenSearchのマスターユーザーに関する変数です。
今回作成するOpenSearchドメインはマスターユーザーによる認証を行いますので、そのユーザー情報です。

Lambda関数コード

import boto3
import cfnresponse
import json
import os
import requests
from requests.auth import HTTPBasicAuth

BULK_ENDPOINT = os.environ['BULK_ENDPOINT']
BULK_S3_BUCKET = os.environ['BULK_S3_BUCKET']
BULK_S3_KEY = os.environ['BULK_S3_KEY']

MASTER_USERNAME = os.environ['MASTER_USERNAME']
MASTER_PASSWORD = os.environ['MASTER_PASSWORD']

CREATE = 'Create'
response_data = {}

s3_client = boto3.client('s3')

def lambda_handler(event, context):
  try:
    if event['RequestType'] == CREATE:
      s3_response = s3_client.get_object(
        Bucket=BULK_S3_BUCKET,
        Key=BULK_S3_KEY)

      # binary
      bulk = s3_response['Body'].read()
      print(bulk)

      requests_response = requests.post(
        BULK_ENDPOINT,
        data=bulk,
        auth=HTTPBasicAuth(MASTER_USERNAME, MASTER_PASSWORD),
        headers={'Content-Type': 'application/json'}
        )
      print(requests_response.text)

    cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)

  except Exception as e:
    print(e)
    cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
Code language: Python (python)

ポイントは5点あります。

1点目はcfnresponseモジュールです。
CloudFormationカスタムリソースとLambda関数が通信する場合に、このモジュールを使用することができます。

注意点としては、Lambda関数作成時に、S3バケットからパッケージを読み込む場合、このモジュールはデフォルトのランタイム環境には含まれないという点です。
詳細は以下のページに詳しいのですが、ZipFileプロパティを使用して、CloudFormationテンプレートにインラインでコードを記載する場合は、このパッケージはランタイム環境に含まれているため、ユーザー側が用意しなくともインポートすることができます。

https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/cfn-lambda-function-code-cfnresponsemodule.html

しかしS3バケットに設置されたZIPファイルを参照する方法でLambda関数を作成する場合は、ユーザー側でこのモジュールをZIPファイルに含める等の対応が必要となりますのでご注意ください。

2点目は環境変数の取得です。
先ほど確認した通り、CloudFormationテンプレートで環境変数を定義しました。
Pythonではos.environで環境変数にアクセスすることができます。

3点目はCloudFormationスタックの操作内容です。
スタックの操作内容は「Create」「Update」「Delete」がありますが、これらはevent[‘RequestType’]で取得できます。
今回はこの値を参照して、スタック作成時にOpenSearchドメインにドキュメントをアップロードするように実装します。

4点目はS3バケットからドキュメントを取得する方法です。
boto3.clientでS3用クライアントオブジェクトを作成後、get_objectメソッドでドキュメントを取得します。

5点目はドキュメントをアップロードする方法です。
冒頭でご紹介した通り、AWS公式ではcurlコマンドを使ったアップロード方法が紹介されていますが、今回はrequestsモジュールを使用します。
アップロードはアップロードエンドポイントに対してファイルをPOSTする形を取ります。
POSTする際の注意点が3つあります。
1つ目はアップロードするファイルデータはバイナリ形式である必要があります。幸い、先述のget_objectメソッドでS3バケットからドキュメントを取得しますと、データはバイナリ形式となりますので、これをそのまま指定します。
2つ目はBASIC認証です。先述の通り、今回はマスターユーザー情報による認証を行いますが、これはBASIC認証となります。ですからHTTPBasicAuthにマスターユーザーのユーザー名・パスワードを指定します。
3つ目はヘッダー設定です。Content-Typeヘッダーに「application/json」を指定します。

Lambda関数用IAMロール

Resources:
  FunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: !Sub "${Prefix}-S3GetObjectPolicy"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource:
                  - !Sub "arn:aws:s3:::${BulkS3Bucket}/*"
Code language: YAML (yaml)

特別な設定は不要です。
S3バケットからドキュメントを取得しますので、必要な権限を与えるように設定します。

環境構築

CloudFormationを使用して、本環境を構築し、実際の挙動を確認します。

OpenSearchドメインにアップロードするドキュメントを用意する

AWS公式で紹介されているサンプルデータをそのまま使用します。

{ "index" : { "_index": "movies", "_id" : "2" } }
{"director": "Frankenheimer, John", "genre": ["Drama", "Mystery", "Thriller", "Crime"], "year": 1962, "actor": ["Lansbury, Angela", "Sinatra, Frank", "Leigh, Janet", "Harvey, Laurence", "Silva, Henry", "Frees, Paul", "Gregory, James", "Bissell, Whit", "McGiver, John", "Parrish, Leslie", "Edwards, James", "Flowers, Bess", "Dhiegh, Khigh", "Payne, Julie", "Kleeb, Helen", "Gray, Joe", "Nalder, Reggie", "Stevens, Bert", "Masters, Michael", "Lowell, Tom"], "title": "The Manchurian Candidate"}
{ "index" : { "_index": "movies", "_id" : "3" } }
{"director": "Baird, Stuart", "genre": ["Action", "Crime", "Thriller"], "year": 1998, "actor": ["Downey Jr., Robert", "Jones, Tommy Lee", "Snipes, Wesley", "Pantoliano, Joe", "Jacob, Ir\u00e8ne", "Nelligan, Kate", "Roebuck, Daniel", "Malahide, Patrick", "Richardson, LaTanya", "Wood, Tom", "Kosik, Thomas", "Stellate, Nick", "Minkoff, Robert", "Brown, Spitfire", "Foster, Reese", "Spielbauer, Bruce", "Mukherji, Kevin", "Cray, Ed", "Fordham, David", "Jett, Charlie"], "title": "U.S. Marshals"}
{ "index" : { "_index": "movies", "_id" : "4" } }
{"director": "Ray, Nicholas", "genre": ["Drama", "Romance"], "year": 1955, "actor": ["Hopper, Dennis", "Wood, Natalie", "Dean, James", "Mineo, Sal", "Backus, Jim", "Platt, Edward", "Ray, Nicholas", "Hopper, William", "Allen, Corey", "Birch, Paul", "Hudson, Rochelle", "Doran, Ann", "Hicks, Chuck", "Leigh, Nelson", "Williams, Robert", "Wessel, Dick", "Bryar, Paul", "Sessions, Almira", "McMahon, David", "Peters Jr., House"], "title": "Rebel Without a Cause"}
Code language: plaintext (plaintext)

このデータをbulk_movies.jsonというファイル名で保存し、所定のS3バケットに保存します。
なお今回は後述のCloudFormationテンプレートファイルと同じバケットに設置します。

Lambda関数用のデプロイパッケージを用意する

Lambda関数を作成する場合、3つの方法があります。
今回はデプロイパッケージをS3バケットにアップロードする方法を選択します。
詳細につきましては、以下のページをご確認ください。

あわせて読みたい
CloudFormationでLambdaを作成する3パータン(S3/インライン/コンテナ) 【CloudFormationでLambdaを作成する】 CloudFormationでLambdaを作成する場合、大別すると以下の3パターンあります。 S3バケットにコードをアップロードする インライ...

Lambdaレイヤー用のデプロイパッケージを用意する

先述のrequestsモジュールをLambdaレイヤーとして用意します。
Lambdaレイヤーに関する詳細は、以下のページをご確認ください。

あわせて読みたい
CFNでLambdaレイヤー作成 【CloudFormationでLambdaレイヤー作成】 本ページでは、CloudFormationでLambdaレイヤーを作成する方法を確認します。 Lambda レイヤーは、Lambda 関数で使用できるラ...

なおLambdaレイヤー用パッケージを作成ためのコマンドは以下となります。

$ sudo pip3 install requests -t python

$ zip -r layer.zip python
Code language: Bash (bash)

また先ほど触れたcfnresponseモジュールもLambdaレイヤーに含めることとします。

CloudFormationスタックを作成し、スタック内のリソースを確認する

AWS CLIを使ってCloudFormationスタックを作成します。
今回の構成は4つのテンプレートファイルに分割して構成されていますが、これらを任意のバケットに設置します。

以下は任意のS3バケットに配置したテンプレートファイルを参照して、スタックを作成する例です。
なおスタック名は「fa-057」、バケット名は「awstut-bucket」、ファイルを設置しているフォルダ名は「fa-057」とします。

$ aws cloudformation create-stack \
--stack-name fa-057 \
--template-url https://awstut-bucket.s3.ap-northeast-1.amazonaws.com/fa-057/fa-057.yaml \
--capabilities CAPABILITY_IAM
Code language: Bash (bash)

CloudFormationのネストされたスタックに関しては、詳細は以下のページをご確認ください。

あわせて読みたい
CloudFormationのネストされたスタックで環境を構築する 【CloudFormationのネストされたスタックで環境を構築する方法】 CloudFormationにおけるネストされたスタックを検証します。 CloudFormationでは、スタックをネストす...

各スタックのリソースを確認した結果、今回作成された主要リソースの情報は以下の通りです。

  • OpenSearchドメイン名:fa-057
  • OpenSearchドメインエンドポイントURL:https://search-fa-057-yzh3wx3anuc2iwplmwaime3ej4.ap-northeast-1.es.amazonaws.com/
  • マスターユーザーの名前:test
  • マスターユーザーのパスワード:P@ssw0rd

AWS Management Consoleからスタックの作成状況を確認します。

CloudFormation root stack.

コマンドで作成したスタックと、このスタックにネストされた3つのスタックが作成されていることがわかります。

ネストされたスタックの内、OpenSearch用スタックから生成されたリソースを確認します。

OpenSearch Domain.

確かにOpenSearchドメインが作成されています。

続いてカスタムリソースとLambda関数を確認します。

The Lambda Function for CloudFormation Custom Resource.
CloudFormation Custom Resource in the CFN Stack.

どちらも作成されています。
これらが正常に動作した場合は、スタック作成時にOpenSearchドメインに向かってドキュメントがアップロードされるはずです。

動作確認

準備が整いましたので、OpenSearchドメインに対して検索をかけてみます。

$ curl -XGET -u 'test:P@ssw0rd' 'https://search-fa-057-yzh3wx3anuc2iwplmwaime3ej4.ap-northeast-1.es.amazonaws.com/movies/_search?q=Bryar&pretty=true'
{
  "took" : 162,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.29058143,
    "hits" : [
      {
        "_index" : "movies",
        "_type" : "_doc",
        "_id" : "4",
        "_score" : 0.29058143,
        "_source" : {
          "director" : "Ray, Nicholas",
          "genre" : [
            "Drama",
            "Romance"
          ],
          "year" : 1955,
          "actor" : [
            "Hopper, Dennis",
            "Wood, Natalie",
            "Dean, James",
            "Mineo, Sal",
            "Backus, Jim",
            "Platt, Edward",
            "Ray, Nicholas",
            "Hopper, William",
            "Allen, Corey",
            "Birch, Paul",
            "Hudson, Rochelle",
            "Doran, Ann",
            "Hicks, Chuck",
            "Leigh, Nelson",
            "Williams, Robert",
            "Wessel, Dick",
            "Bryar, Paul",
            "Sessions, Almira",
            "McMahon, David",
            "Peters Jr., House"
          ],
          "title" : "Rebel Without a Cause"
        }
      }
    ]
  }
}
Code language: Bash (bash)

正常に検索が実行できました。
「Bryar」という単語で検索したところ、「Rebel Without a Cause」という文字列がヒットし、_scoreが「0.29058143」という結果でした。
つまりカスタムリソースによってLambda関数が実行されて、JSONファイルをアップロードされて、OpenSearchドメインにインデックスが作成されたということです。

一応、Lambda関数のCloudWatch Logsを確認します。

The CloudWatch Logs of Lambda Function.

カスタムリソースと連携して関数が実行されてたことが確認できます。

まとめ

カスタムリソースを使って、OpenSearchドメインを作成時に、自動的にS3バケットに設置されているJSONファイルをアップロードして、インデックスを作成する方法を確認しました。