AWS_EN

AppSync – DataSource: OpenSearch

スポンサーリンク
AppSync - DataSource: OpenSearch AWS_EN
スポンサーリンク
スポンサーリンク

Setting up OpenSearch as DataSource for AppSync

AppSync allows you to select a data source from the following services

  • Lambda
  • DynamoDB
  • OpenSearch
  • None
  • HTTP endpoint
  • RDS

This time, we will check the configuration with Lambda as the data source.
For a basic explanation of AppSync and the configuration with DynamoDB as the data source, please refer to the following page.

Environment

Diagram of AppSync - DataSource: OpenSearch

Create OpenSearch to act as a data source.
Store AWS official movie data in the OpenSearch domain.

Step 2: Upload data to Amazon OpenSearch Service for indexing - Amazon OpenSearch Service
You can upload data to an OpenSearch Service domain using the command line or most programming languages.

Define a schema resolver to operate this OpenSearch domain in AppSync.
Define the following query and mutation based on the sample presented in the AWS official

Tutorial: Amazon OpenSearch Service Resolvers - AWS AppSync
Amazon OpenSearch Service Resolvers tutorial for AWS AppSync.
  • listMovies: retrieve all stored movies data.
  • getMovie: retrieve movie data by specifying ID.
  • getMovieByActor: Obtains movie data by specifying the actor name.
  • addMovie: Adds movie data.

Create two Lambda functions.
The runtime environment for the functions is Python 3.8.

The first function will be associated with a custom resource and set to run when the stack is created.
The function’s function is to upload a JSON file to the OpenSearch domain.
The aforementioned data is stored in an S3 bucket in the form of a JSON file, which is then uploaded to the domain.

The second function is set up as a client to run the GraphQL API.
Enable the Function URL so that the URL query parameter can specify the operation to be performed.

CloudFormation template files

Build the above configuration with CloudFormation.
The CloudFormation templates are located at the following URL

awstut-fa/056 at main · awstut-an-r/awstut-fa
Contribute to awstut-an-r/awstut-fa development by creating an account on GitHub.

Explanation of key points of the template file

DataSource

Resources: DataSource: Type: AWS::AppSync::DataSource Properties: ApiId: !GetAtt GraphQLApi.ApiId Name: DataSource OpenSearchServiceConfig: AwsRegion: !Ref AWS::Region Endpoint: !Sub "https://${DomainEndpoint}" ServiceRoleArn: !Ref DataSourceRoleArn Type: AMAZON_OPENSEARCH_SERVICE
Code language: YAML (yaml)

The key point is the Type property.
When OpenSearch is used as the data source, specify “AMAZON_OPENSEARCH_SERVICE.

Also, set the details in the OpenSearchServiceConfig property.
Set the domain endpoints, etc.

IAM role for DataSource

Resources: DataSourceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: sts:AssumeRole Principal: Service: appsync.amazonaws.com Policies: - PolicyName: !Sub "${Prefix}-DataSourcePolicy" PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - es:ESHttpDelete - es:ESHttpHead - es:ESHttpGet - es:ESHttpPost - es:ESHttpPut Resource: - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*"
Code language: YAML (yaml)

To specify OpenSearch as the data source, authorize AppSync to access OpenSearch.

Schema

Resources: GraphQLSchema: Type: AWS::AppSync::GraphQLSchema Properties: ApiId: !GetAtt GraphQLApi.ApiId Definition: | schema { query: Query mutation: Mutation } type Query { listMovies: [Movie] getMovie(_id: ID!): Movie getMovieByActor(actor: String!): [Movie] } type Mutation { addMovie(director: String, genre: [String], year: Int, actor: [String], title: String): Movie } type Movie { _id: ID! director: String genre: [String] year: Int actor: [String] title: String }
Code language: YAML (yaml)

Define the schemas with reference to the official AWS page discussed at the beginning of this section.

Resolver

Resolver for listMovies query

Resolver for retrieving all data stored in the OpenSearch domain.

Resources: ListMoviesResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt GraphQLApi.ApiId DataSourceName: !GetAtt DataSource.Name FieldName: listMovies Kind: UNIT RequestMappingTemplate: !Sub | { "version": "2017-02-28", "operation": "GET", "path": "/${IndexName}/_doc/_search", "params": { "headers": {}, "queryString": { "pretty": "true" }, "body": {} } } ResponseMappingTemplate: | [ #foreach($hit in $context.result.hits.hits) ## print ',' of list. #if( $velocityCount > 1 ) , #end #set ($source = $hit.get("_source")) ## append _id to $hit. $util.quiet($source.put("_id", $hit.get("_id"))) $util.toJson($source) #end ] TypeName: Query
Code language: YAML (yaml)

The key point is the configuration of the resolver’s request mapping and response mapping.
The following page details the mapping settings when AppSync is used as the data source.

Resolver Mapping Template Reference for OpenSearch - AWS AppSync
Resolver Mapping Template Reference for Amazon OpenSearch Service for AWS AppSync.

First, check the request mapping (RequestMappingTemplate property).
The operation field specifies the operation to be performed on the OpenSearch domain. Since this resolver is for queries to retrieve data, “GET” is specified.
The path field specifies the endpoint that performs the operation for the OpenSearch domain. To perform a search, specify the following PATH

/[index name]/_doc/_search

This time, use CloudFormation’s built-in function Fn::Sub to embed the index name.
You can set search criteria in the body field. Since this mapping is for a query that retrieves all data, we do not set anything.

Next, check the response mapping (ResponseMappingTemplate property).
The first line of this property should be “[” and the last line should be “]”. This will return a list object as the return value of the response mapping.
You can access the search results with $context.result.hits. This object is of type list and can be looped through using #foreach.
You can define variables with #set. The search results stored in $hit._source are stored in a variable called source.
Use #put method to add data named _id to source. In doing so, use $util.quiet. This is done so that the return value will not be affected by the action of the put method.
In $util.toJson, the contents of the variable source are converted to JSON and output. This becomes the return value.
As mentioned above, the return value is a list object. Therefore, after the second round of loop processing, a delimiter character “,” is required between the previous value and the return value. Therefore, use #if to check $velocityCount, that is, the value of the loop count, and set it so that “,” is output after the second round.

Resolver for getMovie query

Resolver to retrieve specific movie data by specifying ID.

Resources: GetMovieResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt GraphQLApi.ApiId DataSourceName: !GetAtt DataSource.Name FieldName: getMovie Kind: UNIT RequestMappingTemplate: !Sub | #set ($_id = $context.arguments.get("_id")) { "version": "2017-02-28", "operation": "GET", "path": $util.toJson("/${IndexName}/_doc/$_id"), "params": { "headers": {}, "queryString": { "pretty": "true" }, "body": {} } } ResponseMappingTemplate: | #set ($source = $context.result.get("_source")) ## append _id. $util.quiet($source.put("_id", $context.result.get("_id"))) $util.toJson($source) TypeName: Query
Code language: YAML (yaml)

The key points here are also request mapping and response mapping.

First, check the request mapping.
GraphQL arguments are stored in $context.arguments. This time, the argument _id is stored in the variable of the same name.
path field, but if you want to search by ID, specify the following URL in this field.

/[index name]/_doc/[id].

The index name is embedded using the built-in function Fn::Sub.
The ID is the value of the GraphQL argument described above.

Next, check the response mapping.
When a search is performed by specifying an ID, the search result is stored in $context.result._source.
The rest of the flow is the same as the previous resolver, adding the ID data, converting it to JSON, outputting it, and creating the return value.

Resolver for getMovieByActor query

Resolver for retrieving movie data by actor name.

Resources: GetMovieByActorResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt GraphQLApi.ApiId DataSourceName: !GetAtt DataSource.Name FieldName: getMovieByActor Kind: UNIT RequestMappingTemplate: !Sub | #set ($actor = $context.arguments.get("actor")) { "version": "2017-02-28", "operation": "GET", "path": "/${IndexName}/_doc/_search", "params": { "headers": {}, "queryString": { "pretty": "true" }, "body": { "query": { "match": { "actor": $util.toJson($actor) } } } } } ResponseMappingTemplate: | [ #foreach($hit in $context.result.hits.hits) ## print ',' of list. #if( $velocityCount > 1 ) , #end #set ($source = $hit.get("_source")) ## append _id to $hit. $util.quiet($source.put("_id", $hit.get("_id"))) $util.toJson($source) #end ] TypeName: Query
Code language: YAML (yaml)

The key point is the request mapping.
Set the search condition in the query in the body field. Since the search is performed by performer name, set the performer name in the match query.

Response mapping is the same as the first resolver.
Loop through the search results and create a return value.

Resolver for addMovie mutation

Resolver for adding movie data.

Resources: AddMovieResolver: Type: AWS::AppSync::Resolver DependsOn: - GraphQLSchema Properties: ApiId: !GetAtt GraphQLApi.ApiId DataSourceName: !GetAtt DataSource.Name FieldName: addMovie Kind: UNIT RequestMappingTemplate: !Sub | #set ($_id = $util.autoId()) #set ($director = $context.arguments.director) #set ($genre = $context.arguments.genre) #set ($year = $context.arguments.year) #set ($actor = $context.arguments.actor) #set ($title = $context.arguments.title) { "version": "2017-02-28", "operation": "PUT", "path": $util.toJson("/${IndexName}/_doc/$_id"), "params": { "headers": {}, "queryString": { "pretty": "true" }, "body": { "director": $util.toJson($director), "genre": $util.toJson($genre), "year": $util.toJson($year), "actor": $util.toJson($actor), "title": $util.toJson($title) } } } ResponseMappingTemplate: | #set ($source = $context.result.get("_source")) ## append _id to $hit. $util.quiet($source.put("_id", $context.result.get("_id"))) $util.toJson($source) TypeName: Mutation
Code language: YAML (yaml)

The request mapping is the key.
Prepare the six values that make up the movie data; five use GraphQL arguments; for the ID, use the ID automatically generated by $util.autoId().
For the operation field, specify “PUT”. This is for adding data.
The path field is the same as for ID-based search.
Specify 5 data in the body field except ID.

The request mapping is the same as for the second resolver.
Add the ID data and create the return value.

(Reference) OpenSearch

For basic information on OpenSearch, please refer to the following page

This page covers the key points in using OpenSearch as a data source for AppSync.

Resources: Domain: Type: AWS::OpenSearchService::Domain Properties: AccessPolicies: Version: 2012-10-17 Statement: - Effect: Allow Principal: AWS: - !Ref DataSourceRoleArn - !Ref FunctionRole2Arn Action: - es:ESHttpDelete - es:ESHttpHead - es:ESHttpGet - es:ESHttpPost - es:ESHttpPut Resource: !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*" #AdvancedSecurityOptions: # Enabled: true # InternalUserDatabaseEnabled: true # MasterUserOptions: # MasterUserName: !Ref MasterUserName # MasterUserPassword: !Ref MasterUserPassword ClusterConfig: DedicatedMasterEnabled: false InstanceCount: !Ref InstanceCount InstanceType: !Ref InstanceType WarmEnabled: false ZoneAwarenessEnabled: false CognitoOptions: Enabled: false DomainEndpointOptions: CustomEndpointEnabled: false EnforceHTTPS: true TLSSecurityPolicy: Policy-Min-TLS-1-0-2019-07 DomainName: !Ref DomainName EBSOptions: EBSEnabled: true VolumeSize: !Ref VolumeSize VolumeType: gp2 EncryptionAtRestOptions: Enabled: true KmsKeyId: !Ref Key EngineVersion: !Ref EngineVersion NodeToNodeEncryptionOptions: Enabled: true
Code language: YAML (yaml)

There are two points to note related to this property.

The first is the principal setting.
When granting access permission using IAM authentication information, as in this case, a description that specifies all resources using wildcards is not allowed.
As mentioned earlier, specific resources such as IAM roles must be specified.

The second point concerns authentication by a master user using fine-grained access control (FGAC).
In the page introduced at the beginning of this section, we introduced a configuration with master user authentication enabled.
However, please note that when using IAM credentials to grant access permissions as in this case, authentication by master user is not possible, as quoted below.

If a resource-based access policy contains IAM users or roles, clients must send signed requests using AWS Signature Version 4. As such, access policies can conflict with fine-grained access control, especially if you use the internal user database and HTTP basic authentication. You can’t sign a request with a user name and password and IAM credentials.

Fine-grained access control in Amazon OpenSearch Service

(Reference) GraphQL client Lambda function

import json import os import time from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport api_key = os.environ['API_KEY'] graphql_url = os.environ['GRAPHQL_URL'] transport = AIOHTTPTransport( url=graphql_url, headers={ 'x-api-key': api_key }) client = Client(transport=transport, fetch_schema_from_transport=True) LIST = 'List' GET = 'Get' ACTOR = 'Actor' ADD = 'Add' def lambda_handler(event, context): operation = '' document = None result = None if not 'queryStringParameters' in event or ( not 'operation' in event['queryStringParameters']): operation = LIST else: operation = event['queryStringParameters']['operation'] if operation == LIST: document = gql( """ query ListMovies { listMovies { _id title } } """ ) result = client.execute(document) elif operation == GET: document = gql( """ query GetMovie($_id: ID!) { getMovie(_id: $_id) { _id director genre year actor title } } """ ) _id = event['queryStringParameters']['_id'] params = { '_id': _id } result = client.execute(document, variable_values=params) elif operation == ACTOR: document = gql( """ query GetMovieByActor($actor: String!) { getMovieByActor(actor: $actor) { _id actor title } } """ ) actor = event['queryStringParameters']['actor'] params = { 'actor': actor } result = client.execute(document, variable_values=params) elif operation == ADD: document = gql( """ mutation AddMovie($director: String, $genre: [String], $year: Int, $actor: [String], $title: String) { addMovie(director: $director, genre: $genre, year: $year, actor: $actor, title: $title) { _id director genre year actor title } } """ ) director = event['queryStringParameters']['director'] genre = event['queryStringParameters']['genre'].split(',') year = int(event['queryStringParameters']['year']) actor = event['queryStringParameters']['actor'].split(',') title = event['queryStringParameters']['title'] params = { 'director': director, 'genre': genre, 'year': year, 'actor': actor, 'title': title } result = client.execute(document, variable_values=params) return { 'statusCode': 200, 'body': json.dumps(result, indent=2) }
Code language: Python (python)

A Lambda function to execute a GraphQL query.
We will use GQL as the GraphQL client library for Python.

GitHub - graphql-python/gql: A GraphQL client in Python
A GraphQL client in Python. Contribute to graphql-python/gql development by creating an account on GitHub.

We will use GQL to execute the query mutations defined in the schema.

In Python, you can retrieve URL query parameters with event[‘queryStringParameters’].
The URL query parameters are used to pass the necessary parameters.
The operation parameter specifies the GraphQL query to be executed.

(Reference) Lambda function for CloudFormation custom resources

For information on how to use a custom resource to automatically upload and index a JSON file located in an S3 bucket when creating an OpenSearch domain, please see the following page

This page will cover the differences from the above page.

First, check the IAM role for the function.

Resources: FunctionRole2: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: sts:AssumeRole Principal: Service: - lambda.amazonaws.com ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Sub "${Prefix}-CustomResourceFunctionPolicy" PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:GetObject Resource: - !Sub "arn:aws:s3:::${BulkS3Bucket}/*" - Effect: Allow Action: - es:ESHttpDelete - es:ESHttpHead - es:ESHttpGet - es:ESHttpPost - es:ESHttpPut Resource: - !Sub "arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${DomainName}/*"
Code language: YAML (yaml)

The content allows access to the S3 bucket where the JSON file is stored and access to OpenSearch.
The latter in particular is the same as the IAM role for AppSync.

Next, check the code of the Lambda function.

import boto3 import cfnresponse import json import os import requests from requests.auth import HTTPBasicAuth from requests_aws4auth import AWS4Auth BULK_ENDPOINT = os.environ['BULK_ENDPOINT'] BULK_S3_BUCKET = os.environ['BULK_S3_BUCKET'] BULK_S3_KEY = os.environ['BULK_S3_KEY'] REGION = os.environ['REGION'] CREATE = 'Create' response_data = {} s3_client = boto3.client('s3') credentials = boto3.Session().get_credentials() awsauth = AWS4Auth( region=REGION, service='es', refreshable_credentials=credentials ) def lambda_handler(event, context): try: if event['RequestType'] == CREATE: s3_response = s3_client.get_object( Bucket=BULK_S3_BUCKET, Key=BULK_S3_KEY) # binary bulk = s3_response['Body'].read() print(bulk) requests_response = requests.post( BULK_ENDPOINT, data=bulk, auth=awsauth, headers={'Content-Type': 'application/json'} ) print(requests_response.text) cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data) except Exception as e: print(e) cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
Code language: Python (python)

As we saw in the OpenSearch section, access to the domain is done using IAM credentials.
In this case, we will use requests to generate an HTTP request and implement the process of uploading a JSON file to the domain.
This means that this HTTP request must be a signed request using AWS signature version 4.
Therefore, use requests_aws4auth.
For details, please refer to the following page, which uses the authentication information of the aforementioned IAM role to sign and respond to HTTP requests.

requests-aws4auth
AWS4 authentication for Requests

Architecting

Using CloudFormation, we will build this environment and check the actual behavior.

Preliminary Preparation

Refer to the following page for three preparations.

  • Prepare documents to be uploaded to the OpenSearch domain
  • Prepare a deployment package for the Lambda function
  • Prepare a deployment package for the Lambda layer

The command to create a package for the Lambda layer is as follows

sudo pip3 install requests -t python $ sudo pip3 install requests-aws4auth -t python $ sudo pip3 install --pre gql[all] -t python [cfnresponse.py] zip -r layer.zip python
Code language: Bash (bash)

In addition, the cfnresponse module is also included in the Lambda layer.
For more information on cfnresponse, please refer to the following official page

cfn-response module - AWS CloudFormation
When you use the ZipFile property to specify your function's source code and that function interacts with an AWS CloudFormation custom resource, you can load th...

Create CloudFormation stacks and check resources in stacks

Create a CloudFormation stack using AWS CLI.
This configuration consists of multiple template files, which are divided into multiple files, and place them in an arbitrary bucket.

The following is an example of creating a stack by referencing a template file placed in an arbitrary S3 bucket.
The stack name is “fa-056”, the bucket name is “awstut-bucket”, and the folder name where the files are placed is “fa-056”.

$ aws cloudformation create-stack \ --stack-name fa-056 \ --template-url https://awstut-bucket.s3.ap-northeast-1.amazonaws.com/fa-056/fa-056.yaml \ --capabilities CAPABILITY_IAM
Code language: Bash (bash)

After checking the resources for each stack, the following is the information for the main resources created in this case.

  • AppSync API: fa-056-GraphQLApi
  • OpenSearch domain name: fa-056
  • OpenSearch domain endpoint URL: https://search-fa-056-3grntiuisivzu6h6oy7nowjgze.ap-northeast-1.es.amazonaws.com
  • Function URL for GraphQL client Lambda function: https://42rvtqzk4lna3b5xaruvsoiv240wnuwy.lambda-url.ap-northeast-1.on.aws/

Check AppSync from the AWS Management Console.
First is the data source.

The Detail of AppSync DataSource.

Looking at the data source, the Type is “AMAZON_OPENSEARCH_SERVICE”.

Next, check the Schema Resolver.

The Detail of AppSync Schema.
The Detail of AppSync Resolver 1.
The Detail of AppSync Resolver 2.
The Detail of AppSync Resolver 3.
The Detail of AppSync Resolver 3.
The Detail of AppSync Resolver 4.

It is created as defined in the CloudFormation template file.

Next, check OpenSearch.

The Detail of OpenSearch Domain.

This has also been successfully created.

Checking Action

Now that everything is ready, access the Function URL of the GraphQL client Lambda function.

listMovies

First, retrieve all stored data.
Specify “List” as the operation value in the URL query.
This will execute the following GraphQL query

query ListMovies { listMovies { _id title } }
Code language: plaintext (plaintext)
The Result of GraphQL query 1.

The _id and title of all stored data are returned.

getMovie

Next, we retrieve the data by specifying the ID.
Specify “Get” for the operation value in the URL query and “3” for the _id.
This will result in the following GraphQL query

query GetMovie($_id: ID!) { getMovie(_id: $_id) { _id director genre year actor title } }
Code language: plaintext (plaintext)
The Result of GraphQL query 2.

Movie data with ID “3” is returned.

getMovieByActor

Next, the data is obtained by specifying the name of the actor.
Specify “Actor” as the value of operation in the URL query and “Jr.” as the actor.
This will execute the following GraphQL query

query GetMovieByActor($actor: String!) { getMovieByActor(actor: $actor) { _id actor title } }
Code language: plaintext (plaintext)
The Result of GraphQL query 3.

Movie data with “Jr.” in the actor’s name is returned.

AddMovie

Finally, save the new movie data.
Set the URL query as follows

  • operation:Add
  • director:hoge
  • genre:aaa,bbb
  • year:2022
  • actor:XXX,YYYY,ZZZZ
  • title:foo

This will execute the following GraphQL mutation

mutation AddMovie($director: String, $genre: [String], $year: Int, $actor: [String], $title: String) { addMovie(director: $director, genre: $genre, year: $year, actor: $actor, title: $title) { _id director genre year actor title } }
Code language: plaintext (plaintext)
The Result of GraphQL query 4-1.

The video data was saved as specified.
For the _id, an automatically generated value is used.

Specify the ID again and check the saved data.

The Result of GraphQL query 4-2.

The data was returned normally.

Summary

We have introduced a configuration for setting OpenSearch as the AppSync data source.

タイトルとURLをコピーしました