AppSync – DataSource: OpenSearch

Setting up OpenSearch as DataSource for AppSync

AppSync allows you to select a data source from the following services

  • Lambda
  • DynamoDB
  • OpenSearch
  • None
  • HTTP endpoint
  • RDS

This time, we will check the configuration with Lambda as the data source.
For a basic explanation of AppSync and the configuration with DynamoDB as the data source, please refer to the following page.


Diagram of AppSync - DataSource: OpenSearch

Create OpenSearch to act as a data source.
Store AWS official movie data in the OpenSearch domain.

Amazon OpenSearch Service

Define a schema resolver to operate this OpenSearch domain in AppSync.
Define the following query and mutation based on the sample presented in the AWS official

Tutorial: Amazon OpenSearch Service Resolvers - AWS AppSync
Amazon OpenSearch Service Resolvers tutorial for AWS AppSync.
  • listMovies: retrieve all stored movies data.
  • getMovie: retrieve movie data by specifying ID.
  • getMovieByActor: Obtains movie data by specifying the actor name.
  • addMovie: Adds movie data.

Create two Lambda functions.
The runtime environment for the functions is Python 3.8.

The first function will be associated with a custom resource and set to run when the stack is created.
The function’s function is to upload a JSON file to the OpenSearch domain.
The aforementioned data is stored in an S3 bucket in the form of a JSON file, which is then uploaded to the domain.

The second function is set up as a client to run the GraphQL API.
Enable the Function URL so that the URL query parameter can specify the operation to be performed.

CloudFormation template files

Build the above configuration with CloudFormation.
The CloudFormation templates are located at the following URL

awstut-fa/056 at main · awstut-an-r/awstut-fa
Contribute to awstut-an-r/awstut-fa development by creating an account on GitHub.

Explanation of key points of the template file


    Type: AWS::AppSync::DataSource
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: DataSource
        AwsRegion: !Ref AWS::Region
        Endpoint: !Sub "https://${DomainEndpoint}"
      ServiceRoleArn: !Ref DataSourceRoleArn
Code language: YAML (yaml)

The key point is the Type property.
When OpenSearch is used as the data source, specify “AMAZON_OPENSEARCH_SERVICE.

Also, set the details in the OpenSearchServiceConfig property.
Set the domain endpoints, etc.

IAM role for DataSource

    Type: AWS::IAM::Role
        Version: 2012-10-17
          - Effect: Allow
            Action: sts:AssumeRole
        - PolicyName: !Sub "${Prefix}-DataSourcePolicy"
            Version: 2012-10-17
              - Effect: Allow
                  - es:ESHttpDelete
                  - es:ESHttpHead
                  - es:ESHttpGet
                  - es:ESHttpPost
                  - es:ESHttpPut
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*"
Code language: YAML (yaml)

To specify OpenSearch as the data source, authorize AppSync to access OpenSearch.


    Type: AWS::AppSync::GraphQLSchema
      ApiId: !GetAtt GraphQLApi.ApiId
      Definition: |
        schema {
          query: Query
          mutation: Mutation
        type Query {
          listMovies: [Movie]
          getMovie(_id: ID!): Movie
          getMovieByActor(actor: String!): [Movie]
        type Mutation {
          addMovie(director: String, genre: [String], year: Int, actor: [String], title: String): Movie
        type Movie {
          _id: ID!
          director: String
          genre: [String]
          year: Int
          actor: [String]
          title: String
Code language: YAML (yaml)

Define the schemas with reference to the official AWS page discussed at the beginning of this section.


Resolver for listMovies query

Resolver for retrieving all data stored in the OpenSearch domain.

    Type: AWS::AppSync::Resolver
      - GraphQLSchema
      ApiId: !GetAtt GraphQLApi.ApiId
      DataSourceName: !GetAtt DataSource.Name
      FieldName: listMovies
      Kind: UNIT
      RequestMappingTemplate: !Sub |
          "version": "2017-02-28",
          "operation": "GET",
          "path": "/${IndexName}/_doc/_search",
          "params": {
            "headers": {},
            "queryString": {
              "pretty": "true"
            "body": {}
      ResponseMappingTemplate: |
          #foreach($hit in $context.result.hits.hits)
            ## print ',' of list.
            #if( $velocityCount > 1 )
            #set ($source = $hit.get("_source"))
            ## append _id to $hit.
            $util.quiet($source.put("_id", $hit.get("_id")))
      TypeName: Query
Code language: YAML (yaml)

The key point is the configuration of the resolver’s request mapping and response mapping.
The following page details the mapping settings when AppSync is used as the data source.

Resolver Mapping Template Reference for OpenSearch - AWS AppSync
Resolver Mapping Template Reference for Amazon OpenSearch Service for AWS AppSync.

First, check the request mapping (RequestMappingTemplate property).
The operation field specifies the operation to be performed on the OpenSearch domain. Since this resolver is for queries to retrieve data, “GET” is specified.
The path field specifies the endpoint that performs the operation for the OpenSearch domain. To perform a search, specify the following PATH

/[index name]/_doc/_search

This time, use CloudFormation’s built-in function Fn::Sub to embed the index name.
You can set search criteria in the body field. Since this mapping is for a query that retrieves all data, we do not set anything.

Next, check the response mapping (ResponseMappingTemplate property).
The first line of this property should be “[” and the last line should be “]”. This will return a list object as the return value of the response mapping.
You can access the search results with $context.result.hits. This object is of type list and can be looped through using #foreach.
You can define variables with #set. The search results stored in $hit._source are stored in a variable called source.
Use #put method to add data named _id to source. In doing so, use $util.quiet. This is done so that the return value will not be affected by the action of the put method.
In $util.toJson, the contents of the variable source are converted to JSON and output. This becomes the return value.
As mentioned above, the return value is a list object. Therefore, after the second round of loop processing, a delimiter character “,” is required between the previous value and the return value. Therefore, use #if to check $velocityCount, that is, the value of the loop count, and set it so that “,” is output after the second round.

Resolver for getMovie query

Resolver to retrieve specific movie data by specifying ID.

    Type: AWS::AppSync::Resolver
      - GraphQLSchema
      ApiId: !GetAtt GraphQLApi.ApiId
      DataSourceName: !GetAtt DataSource.Name
      FieldName: getMovie
      Kind: UNIT
      RequestMappingTemplate: !Sub |
        #set ($_id = $context.arguments.get("_id"))
          "version": "2017-02-28",
          "operation": "GET",
          "path": $util.toJson("/${IndexName}/_doc/$_id"),
          "params": {
            "headers": {},
            "queryString": {
              "pretty": "true"
            "body": {}
      ResponseMappingTemplate: |
        #set ($source = $context.result.get("_source"))
        ## append _id.
        $util.quiet($source.put("_id", $context.result.get("_id")))
      TypeName: Query
Code language: YAML (yaml)

The key points here are also request mapping and response mapping.

First, check the request mapping.
GraphQL arguments are stored in $context.arguments. This time, the argument _id is stored in the variable of the same name.
path field, but if you want to search by ID, specify the following URL in this field.

/[index name]/_doc/[id].

The index name is embedded using the built-in function Fn::Sub.
The ID is the value of the GraphQL argument described above.

Next, check the response mapping.
When a search is performed by specifying an ID, the search result is stored in $context.result._source.
The rest of the flow is the same as the previous resolver, adding the ID data, converting it to JSON, outputting it, and creating the return value.

Resolver for getMovieByActor query

Resolver for retrieving movie data by actor name.

    Type: AWS::AppSync::Resolver
      - GraphQLSchema
      ApiId: !GetAtt GraphQLApi.ApiId
      DataSourceName: !GetAtt DataSource.Name
      FieldName: getMovieByActor
      Kind: UNIT
      RequestMappingTemplate: !Sub |
        #set ($actor = $context.arguments.get("actor"))
          "version": "2017-02-28",
          "operation": "GET",
          "path": "/${IndexName}/_doc/_search",
          "params": {
            "headers": {},
            "queryString": {
              "pretty": "true"
            "body": {
              "query": {
                "match": {
                  "actor": $util.toJson($actor)
      ResponseMappingTemplate: |
          #foreach($hit in $context.result.hits.hits)
            ## print ',' of list.
            #if( $velocityCount > 1 )
            #set ($source = $hit.get("_source"))
            ## append _id to $hit.
            $util.quiet($source.put("_id", $hit.get("_id")))
      TypeName: Query
Code language: YAML (yaml)

The key point is the request mapping.
Set the search condition in the query in the body field. Since the search is performed by performer name, set the performer name in the match query.

Response mapping is the same as the first resolver.
Loop through the search results and create a return value.

Resolver for addMovie mutation

Resolver for adding movie data.

    Type: AWS::AppSync::Resolver
      - GraphQLSchema
      ApiId: !GetAtt GraphQLApi.ApiId
      DataSourceName: !GetAtt DataSource.Name
      FieldName: addMovie
      Kind: UNIT
      RequestMappingTemplate: !Sub |
        #set ($_id = $util.autoId())
        #set ($director = $context.arguments.director)
        #set ($genre = $context.arguments.genre)
        #set ($year = $context.arguments.year)
        #set ($actor = $
        #set ($title = $context.arguments.title)
          "version": "2017-02-28",
          "operation": "PUT",
          "path": $util.toJson("/${IndexName}/_doc/$_id"),
          "params": {
            "headers": {},
            "queryString": {
              "pretty": "true"
            "body": {
              "director": $util.toJson($director),
              "genre": $util.toJson($genre),
              "year": $util.toJson($year),
              "actor": $util.toJson($actor),
              "title": $util.toJson($title)
      ResponseMappingTemplate: |
        #set ($source = $context.result.get("_source"))
        ## append _id to $hit.
        $util.quiet($source.put("_id", $context.result.get("_id")))
      TypeName: Mutation
Code language: YAML (yaml)

The request mapping is the key.
Prepare the six values that make up the movie data; five use GraphQL arguments; for the ID, use the ID automatically generated by $util.autoId().
For the operation field, specify “PUT”. This is for adding data.
The path field is the same as for ID-based search.
Specify 5 data in the body field except ID.

The request mapping is the same as for the second resolver.
Add the ID data and create the return value.

(Reference) OpenSearch

For basic information on OpenSearch, please refer to the following page

This page covers the key points in using OpenSearch as a data source for AppSync.

    Type: AWS::OpenSearchService::Domain
        Version: 2012-10-17
          - Effect: Allow
                - !Ref DataSourceRoleArn
                - !Ref FunctionRole2Arn
              - es:ESHttpDelete
              - es:ESHttpHead
              - es:ESHttpGet
              - es:ESHttpPost
              - es:ESHttpPut
            Resource: !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*"
      #  Enabled: true
      #  InternalUserDatabaseEnabled: true
      #  MasterUserOptions:
      #    MasterUserName: !Ref MasterUserName
      #    MasterUserPassword: !Ref MasterUserPassword
        DedicatedMasterEnabled: false
        InstanceCount: !Ref InstanceCount
        InstanceType: !Ref InstanceType
        WarmEnabled: false
        ZoneAwarenessEnabled: false
        Enabled: false
        CustomEndpointEnabled: false
        EnforceHTTPS: true
        TLSSecurityPolicy: Policy-Min-TLS-1-0-2019-07
      DomainName: !Ref DomainName
        EBSEnabled: true
        VolumeSize: !Ref VolumeSize
        VolumeType: gp2
        Enabled: true
        KmsKeyId: !Ref Key
      EngineVersion: !Ref EngineVersion
        Enabled: true
Code language: YAML (yaml)

There are two points to note related to this property.

The first is the principal setting.
When granting access permission using IAM authentication information, as in this case, a description that specifies all resources using wildcards is not allowed.
As mentioned earlier, specific resources such as IAM roles must be specified.

The second point concerns authentication by a master user using fine-grained access control (FGAC).
In the page introduced at the beginning of this section, we introduced a configuration with master user authentication enabled.
However, please note that when using IAM credentials to grant access permissions as in this case, authentication by master user is not possible, as quoted below.

If a resource-based access policy contains IAM users or roles, clients must send signed requests using AWS Signature Version 4. As such, access policies can conflict with fine-grained access control, especially if you use the internal user database and HTTP basic authentication. You can’t sign a request with a user name and password and IAM credentials.

Fine-grained access control in Amazon OpenSearch Service

(Reference) GraphQL client Lambda function

import json
import os
import time
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

api_key = os.environ['API_KEY']
graphql_url = os.environ['GRAPHQL_URL']

transport = AIOHTTPTransport(
    'x-api-key': api_key
client = Client(transport=transport, fetch_schema_from_transport=True)

LIST = 'List'
GET = 'Get'
ACTOR = 'Actor'
ADD = 'Add'

def lambda_handler(event, context):
  operation = ''
  document = None
  result = None
  if not 'queryStringParameters' in event or (
      not 'operation' in event['queryStringParameters']):
    operation = LIST
    operation = event['queryStringParameters']['operation']
  if operation == LIST:
    document = gql(
      query ListMovies {
        listMovies {
    result = client.execute(document)
  elif operation == GET:
    document = gql(
      query GetMovie($_id: ID!) {
        getMovie(_id: $_id) {
    _id = event['queryStringParameters']['_id']
    params = {
      '_id': _id
    result = client.execute(document, variable_values=params)
  elif operation == ACTOR:
    document = gql(
      query GetMovieByActor($actor: String!) {
        getMovieByActor(actor: $actor) {
    actor = event['queryStringParameters']['actor']
    params = {
      'actor': actor
    result = client.execute(document, variable_values=params)
  elif operation == ADD:
    document = gql(
      mutation AddMovie($director: String, $genre: [String], $year: Int, $actor: [String], $title: String) {
        addMovie(director: $director, genre: $genre, year: $year, actor: $actor, title: $title) {
    director = event['queryStringParameters']['director']
    genre = event['queryStringParameters']['genre'].split(',')
    year = int(event['queryStringParameters']['year'])
    actor = event['queryStringParameters']['actor'].split(',')
    title = event['queryStringParameters']['title']
    params = {
      'director': director,
      'genre': genre,
      'year': year,
      'actor': actor,
      'title': title
    result = client.execute(document, variable_values=params)
  return {
    'statusCode': 200,
    'body': json.dumps(result, indent=2)
Code language: Python (python)

A Lambda function to execute a GraphQL query.
We will use GQL as the GraphQL client library for Python.

GitHub - graphql-python/gql: A GraphQL client in Python
A GraphQL client in Python. Contribute to graphql-python/gql development by creating an account on GitHub.

We will use GQL to execute the query mutations defined in the schema.

In Python, you can retrieve URL query parameters with event[‘queryStringParameters’].
The URL query parameters are used to pass the necessary parameters.
The operation parameter specifies the GraphQL query to be executed.

(Reference) Lambda function for CloudFormation custom resources

For information on how to use a custom resource to automatically upload and index a JSON file located in an S3 bucket when creating an OpenSearch domain, please see the following page

This page will cover the differences from the above page.

First, check the IAM role for the function.

    Type: AWS::IAM::Role
        Version: 2012-10-17
          - Effect: Allow
            Action: sts:AssumeRole
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - PolicyName: !Sub "${Prefix}-CustomResourceFunctionPolicy"
            Version: 2012-10-17
              - Effect: Allow
                  - s3:GetObject
                  - !Sub "arn:aws:s3:::${BulkS3Bucket}/*"
              - Effect: Allow
                  - es:ESHttpDelete
                  - es:ESHttpHead
                  - es:ESHttpGet
                  - es:ESHttpPost
                  - es:ESHttpPut
                  - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*"
Code language: YAML (yaml)

The content allows access to the S3 bucket where the JSON file is stored and access to OpenSearch.
The latter in particular is the same as the IAM role for AppSync.

Next, check the code of the Lambda function.

import boto3
import cfnresponse
import json
import os
import requests
from requests.auth import HTTPBasicAuth
from requests_aws4auth import AWS4Auth

BULK_S3_BUCKET = os.environ['BULK_S3_BUCKET']
BULK_S3_KEY = os.environ['BULK_S3_KEY']

REGION = os.environ['REGION']

CREATE = 'Create'
response_data = {}

s3_client = boto3.client('s3')

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(

def lambda_handler(event, context):
    if event['RequestType'] == CREATE:
      s3_response = s3_client.get_object(
      # binary
      bulk = s3_response['Body'].read()
      requests_response =
        headers={'Content-Type': 'application/json'}
    cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
  except Exception as e:
    cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
Code language: Python (python)

As we saw in the OpenSearch section, access to the domain is done using IAM credentials.
In this case, we will use requests to generate an HTTP request and implement the process of uploading a JSON file to the domain.
This means that this HTTP request must be a signed request using AWS signature version 4.
Therefore, use requests_aws4auth.
For details, please refer to the following page, which uses the authentication information of the aforementioned IAM role to sign and respond to HTTP requests.

AWS4 authentication for Requests


Using CloudFormation, we will build this environment and check the actual behavior.

Preliminary Preparation

Refer to the following page for three preparations.

  • Prepare documents to be uploaded to the OpenSearch domain
  • Prepare a deployment package for the Lambda function
  • Prepare a deployment package for the Lambda layer

The command to create a package for the Lambda layer is as follows

sudo pip3 install requests -t python

sudo pip3 install requests-aws4auth -t python

sudo pip3 install --pre gql[all] -t python


zip -r python
Code language: Bash (bash)

In addition, the cfnresponse module is also included in the Lambda layer.
For more information on cfnresponse, please refer to the following official page

cfn-response module - AWS CloudFormation
When you use the ZipFile property to specify your function's source code and that function interacts with an AWS CloudFormation custom resource, you can load th...

Create CloudFormation stacks and check resources in stacks

Create a CloudFormation stack using AWS CLI.
This configuration consists of multiple template files, which are divided into multiple files, and place them in an arbitrary bucket.

The following is an example of creating a stack by referencing a template file placed in an arbitrary S3 bucket.
The stack name is “fa-056”, the bucket name is “awstut-bucket”, and the folder name where the files are placed is “fa-056”.

$ aws cloudformation create-stack \
--stack-name fa-056 \
--template-url \
--capabilities CAPABILITY_IAM
Code language: Bash (bash)

After checking the resources for each stack, the following is the information for the main resources created in this case.

  • AppSync API: fa-056-GraphQLApi
  • OpenSearch domain name: fa-056
  • OpenSearch domain endpoint URL:
  • Function URL for GraphQL client Lambda function:

Check AppSync from the AWS Management Console.
First is the data source.

The Detail of AppSync DataSource.

Looking at the data source, the Type is “AMAZON_OPENSEARCH_SERVICE”.

Next, check the Schema Resolver.

The Detail of AppSync Schema.
The Detail of AppSync Resolver 1.
The Detail of AppSync Resolver 2.
The Detail of AppSync Resolver 3.
The Detail of AppSync Resolver 3.
The Detail of AppSync Resolver 4.

It is created as defined in the CloudFormation template file.

Next, check OpenSearch.

The Detail of OpenSearch Domain.

This has also been successfully created.

Checking Action

Now that everything is ready, access the Function URL of the GraphQL client Lambda function.


First, retrieve all stored data.
Specify “List” as the operation value in the URL query.
This will execute the following GraphQL query

query ListMovies {
  listMovies {
Code language: plaintext (plaintext)
The Result of GraphQL query 1.

The _id and title of all stored data are returned.


Next, we retrieve the data by specifying the ID.
Specify “Get” for the operation value in the URL query and “3” for the _id.
This will result in the following GraphQL query

query GetMovie($_id: ID!) {
  getMovie(_id: $_id) {
Code language: plaintext (plaintext)
The Result of GraphQL query 2.

Movie data with ID “3” is returned.


Next, the data is obtained by specifying the name of the actor.
Specify “Actor” as the value of operation in the URL query and “Jr.” as the actor.
This will execute the following GraphQL query

query GetMovieByActor($actor: String!) {
  getMovieByActor(actor: $actor) {
Code language: plaintext (plaintext)
The Result of GraphQL query 3.

Movie data with “Jr.” in the actor’s name is returned.


Finally, save the new movie data.
Set the URL query as follows

  • operation:Add
  • director:hoge
  • genre:aaa,bbb
  • year:2022
  • actor:XXX,YYYY,ZZZZ
  • title:foo

This will execute the following GraphQL mutation

mutation AddMovie($director: String, $genre: [String], $year: Int, $actor: [String], $title: String) {
  addMovie(director: $director, genre: $genre, year: $year, actor: $actor, title: $title) {
Code language: plaintext (plaintext)
The Result of GraphQL query 4-1.

The video data was saved as specified.
For the _id, an automatically generated value is used.

Specify the ID again and check the saved data.

The Result of GraphQL query 4-2.

The data was returned normally.


We have introduced a configuration for setting OpenSearch as the AppSync data source.
