こんにちは、SCSK 株式会社の広野です。
非同期ジョブを実行した後、結果をどう受け取るか?というのは開発者として作り込み甲斐のあるテーマです。今回は React アプリが AWS 上にある非同期ジョブを呼び出した後に、AWS AppSync 経由でジョブ完了のプッシュ通知を受け取る仕組みを紹介します。
プッシュ通知の必要性
そもそも、なぜプッシュ通知が必要なのか。
サーバーレスアプリから、とあるデータ解析ジョブにデータを渡して実行させるアーキテクチャ図を見てみましょう。
アプリはユーザのブラウザにロードされています。
アプリがジョブに渡すデータを Amazon S3 にアップロードするところから話を始めます。アプリはデータをアップロードした後は処理の制御を手放します。データを送ったら終わり、です。以降の後続処理も自分の責任を果たせば処理を手放す、非同期処理が連発します。
この状況では、アプリはジョブの結果どころか終了すら知ることができません。そのため、ジョブの終了時にジョブ側からアプリに終了を通知してあげる、プッシュ通知の必要性が出てきます。
正直、プッシュ通知でなくてもいいのですが。定期的にアプリにジョブ状況を問い合わせる常駐プロセスを作ればそれで済むのかもしれません。でも、プッシュ通知を実現する AWS サービスがありますので、それを利用しない手はないでしょう?と考えました。
ユーザがプッシュ通知を受け取るための AWS サービスはいくつかあるのですが、デバイスの OS や SMS、メール等ではなくアプリが直接受け取れるサービスは AWS AppSync が最も実現し易い選択肢だと思います。
このアーキテクチャでは、プッシュ通知を実現するために Amazon DynamoDB にジョブ状況管理テーブルを作成し、そのデータに特定の更新があったら AWS AppSync がアプリにプッシュ通知を送る、という仕組みにしています。
AWS AppSync がやってくれること
AWS AppSync は、GraphQL というクエリー言語を使用してデータにアクセスするための API を提供してくれます。データは本アーキテクチャでは Amazon DynamoDB にありますが、RDBMS でもよいですし、他のデータ形式でも対応しているものがあります。
GraphQL では、大きく 3つの命令が可能です。
- Query
- Mutation
- Subscription
Query は、データ読取です。
Mutation は、データ書込です。
Query と Mutation については、あらかじめ AppSync にスキーマとリゾルバと呼ばれるデータベースとの連携設定をしておくことで、AWS Lambda なしで、代わりにアプリからの GraphQL クエリーを元にデータベースにクエリーしてくれます。スキーマがデータベース情報みたいなもので、リゾルバが GraphQL クエリーから得たインプットを元にデータベースへのクエリーに変換するマッピング情報みたいなものと考えるとよいと思います。
Subscription が、プッシュ通知受け取りです。Mutation と Subscription は密接に関係していて、あらかじめ指定した Mutation 設定に対して、その Mutation が実行されたときにその Mutation を Subscribe しているアプリ画面にプッシュ通知を送ります。
重要なのは
- あらかじめ AppSync スキーマ設定内で Mutation に紐付ける Subscription の設定が必要
- Mutation されないとプッシュ通知は発動しない
- Mutation を Subscribe しているアプリ画面でないとプッシュ通知は受け取れない
ということです。
以下、スキーマとリゾルバの設定例です。
スキーマ
type Jobstatus @aws_cognito_user_pools @aws_iam { datetime: String jobid: String serviceid: String serviceiduser: String user: String url1: String url2: String status: String memo: String ttl: Int } type JobstatusConnection { items: [Jobstatus] nextToken: String } type Mutation { putJobstatus(input: PutJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam updateJobstatus(input: UpdateJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam } input PutJobstatusInput { datetime: String! jobid: String! serviceid: String! serviceiduser: String! url1: String user: String! status: String! memo: String ttl: Int! } type Query { queryJobstatus(serviceiduser: String!): JobstatusConnection queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection } type Subscription { onPutJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations: ["putJobstatus"]) onUpdateJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations: ["updateJobstatus"]) } input UpdateJobstatusInput { datetime: String! jobid: String serviceid: String serviceiduser: String! url1: String url2: String user: String status: String memo: String ttl: Int } schema { query: Query mutation: Mutation subscription: Subscription }
リゾルバ
上記スキーマ設定のうち、updateJobstatus という Mutation のリゾルバ設定です。serviceiduser と datetime というパーティションキー、ソートキーにマッチするデータに対して、受け取った属性データで更新をかけるクエリーに変換されます。正直、難しすぎて萎えます。AWS Lambda で書いた方が断然楽だ、、、とここに来て後悔し始めます。
{ "version": "2018-05-29", "operation": "UpdateItem", "key": { "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser), "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime) }, #set( $expNames = {} ) #set( $expValues = {} ) #set( $expSet = {} ) #set( $expAdd = {} ) #set( $expRemove = [] ) #foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() ) #if( $util.isNull($entry.value) ) #set( $discard = ${expRemove.add("#${entry.key}")} ) $!{expNames.put("#${entry.key}", "${entry.key}")} #else $!{expSet.put("#${entry.key}", ":${entry.key}")} $!{expNames.put("#${entry.key}", "${entry.key}")} $!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))} #end #end #set( $expression = "" ) #if( !${expSet.isEmpty()} ) #set( $expression = "SET" ) #foreach( $entry in $expSet.entrySet() ) #set( $expression = "${expression} ${entry.key} = ${entry.value}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end #if( !${expAdd.isEmpty()} ) #set( $expression = "${expression} ADD" ) #foreach( $entry in $expAdd.entrySet() ) #set( $expression = "${expression} ${entry.key} ${entry.value}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end #if( !${expRemove.isEmpty()} ) #set( $expression = "${expression} REMOVE" ) #foreach( $entry in $expRemove ) #set( $expression = "${expression} ${entry}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end "update": { "expression": "${expression}", #if( !${expNames.isEmpty()} ) "expressionNames": $utils.toJson($expNames), #end #if( !${expValues.isEmpty()} ) "expressionValues": $utils.toJson($expValues), #end }, "condition": { "expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)", "expressionNames": { "#serviceiduser": "serviceiduser", "#datetime": "datetime" } } }
この Mutation に対応している Subscription の設定が、スキーマ設定内の onUpdateJobstatus の部分になります。
type Subscription { onPutJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations: ["putJobstatus"]) onUpdateJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations: ["updateJobstatus"]) }
updateJobstatus という Mutation が行われたときに、onUpdateJobstatus という Subscription を Subscribe しているアプリにプッシュ通知を送ります。ただし、プッシュ通知を送る条件も書かれています。Subscribe しているアプリがパラメータとして指定している serviceiduser が、その Mutation 内の serviceiduser と一致するとき、という条件で発動します。
意味的には、サービス ID が「今開いている画面」で、ユーザ名が「自分」のものだけプッシュ通知が来る、とイメージしてください。
ここで、アプリ画面イメージを見てみましょう。
アプリ画面イメージ
以下のスクリーンショットのように、アプリ画面から解析ジョブを開始すると、ジョブ完了後にプッシュ通知を受け取り画面を自動更新します。ユーザからしてみれば当たり前の動作なのですが、裏では開発者の努力が詰まっています。(泣)
既に述べましたが、アプリはジョブを開始したら制御を手放します。そのままでは結果が全く分からないので、バックエンドのジョブから完了後に通知を送ってもらうという寸法です。
React アプリのコード
React アプリではジョブ開始時に何をしているかというと、大きく以下 3つです。
- ジョブ一覧データの取得、表示。Amazon DynamoDB へ AppSync 経由で Query をかける。
- ジョブ登録。Amazon DynamoDB へ AppSync 経由で Mutation をかける。
- Amazon S3 へのファイルアップロード。これをトリガーに以降、AWS Step Functions のジョブが発動する。
Amazon S3 へのファイルアップロードについては、以下のブログで方法を掲載しています。今回は送信するファイルが BLOB なので若干ブログ内容をカスタマイズしないといけないのですが。それについては別記事で紹介したいと思います。
また、React アプリでプッシュ通知を受け取るためのコードがあります。
- updateJobstatus という Mutation が行われたときのプッシュ通知を Subscribe する。
ここでは、React からの Query、Mutation、Subscription のコードを紹介します。
前提
AWS AppSync は認証なしではアクセスできません。本記事では React アプリを Amazon Cognito で認証しているので、透過的に AWS AppSync の認証をさせることができます。React 側の設定は簡単です。以下のコードを App.js に仕込んでいます。それだけです。
- App.js
import React from 'react'; import Amplify from 'aws-amplify'; //中略 //Amplify Cognito, S3, AppSync 連携設定 Amplify.configure({ Auth: { region: process.env.REACT_APP_REGION, userPoolId: process.env.REACT_APP_USERPOOLID, userPoolWebClientId: process.env.REACT_APP_USERPOOLWEBCLIENTID, identityPoolId: process.env.REACT_APP_IDPOOLID }, Storage: { AWSS3: { bucket: process.env.REACT_APP_S3BUCKETNAMEAMPLIFYSTG, region: process.env.REACT_APP_REGION } }, aws_appsync_graphqlEndpoint: process.env.REACT_APP_APPSYNC, aws_appsync_region: process.env.REACT_APP_REGION, aws_appsync_authenticationType: "AMAZON_COGNITO_USER_POOLS", aws_appsync_apiKey: "null" });
Amplify.configure で AWS AppSync 連携関連の設定を環境変数経由で入れています。Amazon S3 にファイルをアップロードするための設定や Amazon Cognito との連携設定もここに入っています。
Query
あらかじめ AWS AppSync にスキーマ設定、リゾルバ設定ができている状態で、それに合わせた GraphQL クエリーを AWS AppSync API に投げます。
import React, { useState, useEffect } from 'react'; import { API } from 'aws-amplify'; import gql from 'graphql-tag'; //中略 const serviceid = window.location.pathname; const [jobdata, setJobdata] = useState(); //中略 //ジョブステータス取得クエリー const query = gql` query queryJobstatusByServiceidDatetimeIndex($serviceid: String!) { queryJobstatusByServiceidDatetimeIndex(serviceid: $serviceid) { items { datetime url1 url2 user status memo } } } `; //ジョブステータス取得関数 const queryJobstatusByServiceidDatetimeIndex = async () => { const res = await API.graphql({ query: query, variables: { serviceid: serviceid }, authMode: "AMAZON_COGNITO_USER_POOLS" }); setJobdata(res.data.queryJobstatusByServiceidDatetimeIndex.items); };
Mutation
Query とほぼ同じです。やはりあらかじめ AWS AppSync 側でスキーマやリゾルバが適切に設定されている必要があります。
import { API } from 'aws-amplify'; import gql from 'graphql-tag'; //中略 const username = props.username; const serviceid = window.location.pathname; const jobid = await uuidv4(); //中略 //ジョブステータス登録クエリー const putmutation = gql` mutation putJobstatus( $serviceiduser: String!, $datetime: String!, $jobid: String!, $serviceid: String!, $url1: String, $user: String!, $status: String!, $memo: String, $ttl: Int! ) { putJobstatus(input: { serviceiduser: $serviceiduser, datetime: $datetime, jobid: $jobid, serviceid: $serviceid, url1: $url1, user: $user, status: $status, memo: $memo, ttl: $ttl }) { serviceiduser } } `; //ジョブステータス登録関数 const putJobstatus = async (jobid) => { const dt = new Date(); await API.graphql({ query: putmutation, variables: { serviceiduser: serviceid + "#" + username, datetime: dt.toISOString(), jobid: jobid, serviceid: serviceid, url1: input_text, user: username, status: "解析中", memo: memo, ttl: Math.floor(dt.setDate(dt.getDate() + 30) / 1000) }, authMode: "AMAZON_COGNITO_USER_POOLS" }); //ステータス画面更新 queryJobstatusByServiceidDatetimeIndex(); };
Subscription
Subscription は、画面表示時に useEffect で実行させています。console.log で見た感じ、AWS AppSync にプッシュ通知を受け取るためのセッションを常時張っている感じに見えました。(正確には違うかもしれません)
Subscription 実行時に、GraphQL クエリーの中でパラメータとして serviceiduser を渡しています。この serviceiduser が同じデータが Mutation されたときにプッシュ通知を受け取ります。
プッシュ通知を受け取ったとき、あらかじめ指定したデータ(ここでは serviceiduser のみ)を受け取ることができます。そのデータ(以下コードでは data)を使った処理を実行させることもできますが、私はジョブ一覧テーブルの表示更新と画面に Snackbar というポップアップ通知を表示させる処理をさせています。本記事ではそれで事足りるので。
import React, { useState, useEffect } from 'react'; import { API } from 'aws-amplify'; import gql from 'graphql-tag'; //中略 //ジョブステータス更新サブスクライブクエリー const subscribeUpdateQuery = gql` subscription onUpdateJobstatus($serviceiduser: String!) { onUpdateJobstatus(serviceiduser: $serviceiduser) { serviceiduser } } `; //ジョブステータス更新サブスクライブ関数 const subscribeUpdateJobstatus = () => { API.graphql({ query: subscribeUpdateQuery, variables: { serviceiduser: serviceid + '#' + username }, authMode: 'AMAZON_COGNITO_USER_POOLS' }).subscribe({ next: (data) => { //サブスクリプション通知時のアクション = ステータス一覧を更新 queryJobstatusByServiceidDatetimeIndex(); //スナックバーで通知 setIsSnackbarOpen(true); }, error: (err) => { console.log(err); } }); }; //画面表示時自動実行 useEffect(() => { //ジョブステータス確認 queryJobstatusByServiceidDatetimeIndex(); //ジョブステータスサブスクリプション実行 subscribeUpdateJobstatus(); }, []);
AWS Lambda からの Mutation コード
上記 React アプリがプッシュ通知を受け取るとき、裏では AWS Step Functions ジョブ内から AWS Lambda 関数が呼び出され、それがジョブ完了ステータスを Amazon DynamoDB テーブルに書き込みます。プッシュ通知を発信するためには AWS AppSync 経由の Mutation でなければならないので、AWS Lambda 関数から Mutation させる必要があります。
これは情報が少なく非常に苦労しました。また、AWS Step Functions のビルトイン API にも Mutation はまだ存在せず、AWS Lambda 関数が呼び出せるデフォルトのモジュールだけでは Mutation を実行させられず、結局自分でモジュールを集めて Lambda Layer として読み込ませました。
以下、AWS Lambda 関数のコードです。Node.js で作成しています。
リージョンはソウルでベタ書きしていますので適宜ご変更ください。AWS Step Functions から invoke される Lambda なので、その仕様で event を引数として受け取っています。そのあたりも適宜ご変更を。
const { Sha256 } = require("@aws-crypto/sha256-js"); const { defaultProvider } = require("@aws-sdk/credential-provider-node"); const { SignatureV4 } = require("@aws-sdk/signature-v4"); const { HttpRequest } = require("@aws-sdk/protocol-http"); const { default: fetch, Request } = require("node-fetch"); const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL; const AWS_REGION = "ap-northeast-2"; const query = ` mutation updateJobstatus( $serviceiduser: String!, $datetime: String!, $url1: String, $url2: String, $status: String ) { updateJobstatus(input: { serviceiduser: $serviceiduser, datetime: $datetime, url1: $url1, url2: $url2, status: $status }) { serviceiduser } } `; exports.handler = async (event) => { console.log({"EVENT": JSON.stringify(event)}); let variables; if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") { variables = { serviceiduser: event.serviceiduser, datetime: event.datetime, url1: event.filename, url2: event.outputkey, status: "完了" }; } else { variables = { serviceiduser: event.serviceiduser, datetime: event.datetime, status: "エラー" }; } const endpoint = new URL(GRAPHQL_ENDPOINT); const signer = new SignatureV4({ credentials: defaultProvider(), region: AWS_REGION, service: 'appsync', sha256: Sha256 }); const requestToBeSigned = new HttpRequest({ method: 'POST', headers: { 'Content-Type': 'application/json', host: endpoint.host }, hostname: endpoint.host, body: JSON.stringify({ query, variables }), path: endpoint.pathname }); const signed = await signer.sign(requestToBeSigned); const request = new Request(endpoint, signed); let statusCode = 200; let body; let response; try { response = await fetch(request); body = await response.json(); if (body.errors) statusCode = 400; } catch (error) { statusCode = 500; body = { errors: [ { message: error.message } ] }; } return { statusCode, body: JSON.stringify(body) }; };
Lambda Layer として読み込ませるモジュールは以下のコマンドで作成します。nodejs というディレクトリを作成して、そのディレクトリ内で実行します。
npm init npm install --save @aws-crypto/sha256-js @aws-sdk/credential-provider-node @aws-sdk/protocol-http @aws-sdk/signature-v4 node-fetch@2.6.7
node_modules というフォルダ内にモジュールがインストールされ、package.json、package-lock.json というファイルができあがります。nodejs フォルダもろとも ZIP 圧縮すれば出来上がりです。
注意事項があります。
- node-fetch はバージョン 3 だとこのコードは動かないため、バージョン 2 を指定してインストールする必要があります。
- AWS Lambda 関数から AWS AppSync に Mutation するには IAM ロールの権限を付与するだけではダメで、AWS AppSync 側でも IAM ロールによるアクセスを許可する設定にしなければなりません。
AWS AppSync にはデフォルトの認証と追加の認証を設定でき、本記事のケースではデフォルトが Amazon Cognito で 追加を IAM ロールにしています。その場合、スキーマの設定で明示的に追加の認証で許可する Mutation 等を設定する必要があります。
以下がその例なのですが、@aws_iam で IAM ロールによる認証を許可しています。このように追加認証を設定するときにはデフォルトの認証が無効になってしまうため、デフォルトの認証、ここでは @aws_cognito_user_pools も明示的に設定しています。
type Mutation { putJobstatus(input: PutJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam updateJobstatus(input: UpdateJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam }
AWS Lambda 関数から AWS AppSync に Mutation する機能を構築するのは非常に面倒なので、今後 AWS が標準で簡単に実装できるようにアップデートしてくれないかな、と期待しています。
AWS AppSync の設定
ここまで、AWS AppSync の設定については細かすぎて書ききれなかったので、以下に AWS CloudFormation テンプレートを掲載しておきます。Amazon DynamoDB や AWS Lambda 関数、関連する IAM ロールも込みです。
Amazon Cognito User Pool ID をパラメータとして入力しないと動かないテンプレートになっているため、このままでは使いづらいテンプレートであることは重々承知の上なのですが、AWS AppSync の設定は参考になるかと思います。
めっちゃ大量ですみません。
AWSTemplateFormatVersion: 2010-09-09 Description: The CloudFormation template that creates an AppSync, a DynamoDB table, a Lambda function and relevant IAM roles. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SubName: Type: String Description: System sub name of EXAMPLE. (e.g. prod or test) Default: test MaxLength: 10 MinLength: 1 S3BucketNameSdk: Type: String Description: S3 bucket name in which you uploaded sdks for Lambda Layers. (e.g. example-temp-bucket) Default: example-temp-bucket MaxLength: 50 MinLength: 1 S3KeyAwsGraphqlSdkNodejs: Type: String Description: S3 key of aws-sdk-graphql-nodejs.zip. Fill the exact key name if you renamed. (e.g. sdk/Node.js16/aws-sdk-graphql-nodejs.zip) Default: sdk/Node.js16/aws-sdk-graphql-nodejs.zip MaxLength: 100 MinLength: 1 CognitoUserPoolID: Type: String Description: Cognito User Pool ID Resources: # ------------------------------------------------------------# # AppSync DynamoDB Invocation Role (IAM) # ------------------------------------------------------------# AppSyncDynamodbInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub EXAMPLE-AppSyncDynamodbInvocationRole-${SubName} Description: This role allows AppSync to invoke DynamoDB. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - appsync.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: !Sub EXAMPLE-AppSyncDynamodbInvocationPolicy-${SubName} PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - dynamodb:GetItem - dynamodb:PutItem - dynamodb:DeleteItem - dynamodb:UpdateItem - dynamodb:Query - dynamodb:Scan - dynamodb:BatchGetItem - dynamodb:BatchWriteItem Resource: - Fn::Join: - "" - - Fn::GetAtt: DynamoJobstatus.Arn - "*" DependsOn: - DynamoJobstatus AppSyncCloudWatchLogsPushRole: Type: AWS::IAM::Role Properties: RoleName: !Sub EXAMPLE-AppSyncCloudWatchLogsPushRole-${SubName} Description: This role allows AppSync to push logs to CloudWatch Logs. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - appsync.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSAppSyncPushToCloudWatchLogs # ------------------------------------------------------------# # Lambda AppSync Invocation Role (IAM) # ------------------------------------------------------------# LambdaAppSyncInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub EXAMPLE-LambdaAppSyncInvocationRole-${SubName} Description: This role allows Lambda to invoke AppSync. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess Policies: - PolicyName: !Sub EXAMPLE-LambdaAppSyncInvocationPolicy-${SubName} PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - appsync:GraphQL Resource: - Fn::Join: - "" - - Fn::GetAtt: AppSyncApiEXAMPLE.Arn - "/types/Mutation/*" DependsOn: - AppSyncApiEXAMPLE # ------------------------------------------------------------# # DynamoDB # ------------------------------------------------------------# DynamoJobstatus: Type: AWS::DynamoDB::Table Properties: TableName: !Sub EXAMPLE-jobstatus-${SubName} AttributeDefinitions: - AttributeName: serviceiduser AttributeType: S - AttributeName: datetime AttributeType: S - AttributeName: jobid AttributeType: S - AttributeName: serviceid AttributeType: S BillingMode: PAY_PER_REQUEST KeySchema: - AttributeName: serviceiduser KeyType: HASH - AttributeName: datetime KeyType: RANGE GlobalSecondaryIndexes: - IndexName: jobid-index KeySchema: - AttributeName: jobid KeyType: HASH Projection: ProjectionType: ALL - IndexName: serviceid-datetime-index KeySchema: - AttributeName: serviceid KeyType: HASH - AttributeName: datetime KeyType: RANGE Projection: ProjectionType: ALL PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: false TimeToLiveSpecification: AttributeName: ttl Enabled: true Tags: - Key: Cost Value: !Sub EXAMPLE-${SubName} # ------------------------------------------------------------# # AppSync # ------------------------------------------------------------# AppSyncApiEXAMPLE: Type: AWS::AppSync::GraphQLApi Description: AppSync API for EXAMPLE Properties: Name: !Sub EXAMPLE-AppSyncApiEXAMPLE-${SubName} AuthenticationType: AMAZON_COGNITO_USER_POOLS UserPoolConfig: UserPoolId: !Ref CognitoUserPoolID AwsRegion: !Sub ${AWS::Region} DefaultAction: "ALLOW" AdditionalAuthenticationProviders: - AuthenticationType: AWS_IAM LogConfig: CloudWatchLogsRoleArn: !GetAtt AppSyncCloudWatchLogsPushRole.Arn ExcludeVerboseContent: false FieldLogLevel: ALL XrayEnabled: false Tags: - Key: Cost Value: !Sub EXAMPLE-${SubName} DependsOn: - AppSyncCloudWatchLogsPushRole AppSyncSchemaJobstatus: Type: AWS::AppSync::GraphQLSchema Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId Definition: | schema { query: Query mutation: Mutation subscription: Subscription } type Jobstatus @aws_cognito_user_pools @aws_iam { datetime: String jobid: String serviceid: String serviceiduser: String user: String url1: String url2: String status: String memo: String ttl: Int } type JobstatusConnection { items: [Jobstatus] nextToken: String } type Mutation { putJobstatus(input: PutJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam updateJobstatus(input: UpdateJobstatusInput!): Jobstatus @aws_cognito_user_pools @aws_iam } type Query { queryJobstatus(serviceiduser: String!): JobstatusConnection queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection } type Subscription { onPutJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations : ["putJobstatus"]) onUpdateJobstatus(serviceiduser: String!): Jobstatus @aws_subscribe(mutations: ["updateJobstatus"]) } input PutJobstatusInput { datetime: String! jobid: String! serviceid: String! serviceiduser: String! url1: String user: String! status: String! memo: String ttl: Int! } input UpdateJobstatusInput { datetime: String! jobid: String serviceid: String serviceiduser: String! url1: String url2: String user: String status: String memo: String ttl: Int } DependsOn: - AppSyncApiEXAMPLE AppSyncDataSourceJobstatus: Type: AWS::AppSync::DataSource Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId Name: !Sub EXAMPLE${SubName}AppSyncDataSourceJobstatus Description: AppSync DataSource to query Jobstatus DynamoDB. Type: AMAZON_DYNAMODB ServiceRoleArn: !GetAtt AppSyncDynamodbInvocationRole.Arn DynamoDBConfig: TableName: !Ref DynamoJobstatus AwsRegion: !Sub ${AWS::Region} DependsOn: - AppSyncApiEXAMPLE - AppSyncDynamodbInvocationRole AppSyncResolverQueryJobstatus: Type: AWS::AppSync::Resolver Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId TypeName: Query FieldName: queryJobstatus DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name RequestMappingTemplate: | { "version": "2018-05-29", "operation": "Query", "query": { "expression": "#serviceiduser = :serviceiduser", "expressionNames": { "#serviceiduser": "serviceiduser" }, "expressionValues": { ":serviceiduser": $util.dynamodb.toDynamoDBJson($context.arguments.serviceiduser) } }, "limit": $util.defaultIfNull($context.arguments.first, 30), "nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)), "scanIndexForward": false, "consistentRead": false, "select": "ALL_ATTRIBUTES" } ResponseMappingTemplate: | $utils.toJson($context.result) DependsOn: - AppSyncDataSourceJobstatus - AppSyncSchemaJobstatus AppSyncResolverQueryJobstatusByServiceidDatetimeIndex: Type: AWS::AppSync::Resolver Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId TypeName: Query FieldName: queryJobstatusByServiceidDatetimeIndex DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name RequestMappingTemplate: | { "version": "2018-05-29", "operation": "Query", "query": { "expression": "#serviceid = :serviceid", "expressionNames": { "#serviceid": "serviceid" }, "expressionValues": { ":serviceid": $util.dynamodb.toDynamoDBJson($context.arguments.serviceid) } }, "index": "serviceid-datetime-index", "limit": $util.defaultIfNull($context.arguments.first, 30), "nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)), "scanIndexForward": false, "consistentRead": false, "select": "ALL_ATTRIBUTES" } ResponseMappingTemplate: | $utils.toJson($context.result) DependsOn: - AppSyncDataSourceJobstatus - AppSyncSchemaJobstatus AppSyncResolverPutJobstatus: Type: AWS::AppSync::Resolver Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId TypeName: Mutation FieldName: putJobstatus DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name RequestMappingTemplate: | { "version": "2018-05-29", "operation": "PutItem", "key": { "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser), "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime) }, "attributeValues": $util.dynamodb.toMapValuesJson($ctx.args.input), "condition": { "expression": "attribute_not_exists(#serviceiduser) AND attribute_not_exists(#datetime)", "expressionNames": { "#serviceiduser": "serviceiduser", "#datetime": "datetime", } } } ResponseMappingTemplate: | $util.toJson($context.result) DependsOn: - AppSyncDataSourceJobstatus - AppSyncSchemaJobstatus AppSyncResolverUpdateJobstatus: Type: AWS::AppSync::Resolver Properties: ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId TypeName: Mutation FieldName: updateJobstatus DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name RequestMappingTemplate: | { "version": "2018-05-29", "operation": "UpdateItem", "key": { "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser), "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime) }, #set( $expNames = {} ) #set( $expValues = {} ) #set( $expSet = {} ) #set( $expAdd = {} ) #set( $expRemove = [] ) #foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() ) #if( $util.isNull($entry.value) ) #set( $discard = ${expRemove.add("#${entry.key}")} ) $!{expNames.put("#${entry.key}", "${entry.key}")} #else $!{expSet.put("#${entry.key}", ":${entry.key}")} $!{expNames.put("#${entry.key}", "${entry.key}")} $!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))} #end #end #set( $expression = "" ) #if( !${expSet.isEmpty()} ) #set( $expression = "SET" ) #foreach( $entry in $expSet.entrySet() ) #set( $expression = "${expression} ${entry.key} = ${entry.value}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end #if( !${expAdd.isEmpty()} ) #set( $expression = "${expression} ADD" ) #foreach( $entry in $expAdd.entrySet() ) #set( $expression = "${expression} ${entry.key} ${entry.value}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end #if( !${expRemove.isEmpty()} ) #set( $expression = "${expression} REMOVE" ) #foreach( $entry in $expRemove ) #set( $expression = "${expression} ${entry}" ) #if ( $foreach.hasNext ) #set( $expression = "${expression}," ) #end #end #end "update": { "expression": "${expression}", #if( !${expNames.isEmpty()} ) "expressionNames": $utils.toJson($expNames), #end #if( !${expValues.isEmpty()} ) "expressionValues": $utils.toJson($expValues), #end }, "condition": { "expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)", "expressionNames": { "#serviceiduser": "serviceiduser", "#datetime": "datetime" } } } ResponseMappingTemplate: | $util.toJson($context.result) DependsOn: - AppSyncDataSourceJobstatus - AppSyncSchemaJobstatus # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaAppSyncMutationUpdateJobstatus: Type: AWS::Lambda::Function Properties: FunctionName: !Sub EXAMPLE-MutationUpdateJobstatus-${SubName} Description: !Sub Lambda Function to mutate update job status in EXAMPLE-${SubName} Runtime: nodejs16.x Timeout: 5 MemorySize: 128 Environment: Variables: GRAPHQLURL: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl Role: !GetAtt LambdaAppSyncInvocationRole.Arn Handler: index.handler Layers: - !Ref LambdaLayerAwsGraphqlSdkNodejs Tags: - Key: Cost Value: !Sub EXAMPLE-${SubName} Code: ZipFile: !Sub | const { Sha256 } = require("@aws-crypto/sha256-js"); const { defaultProvider } = require("@aws-sdk/credential-provider-node"); const { SignatureV4 } = require("@aws-sdk/signature-v4"); const { HttpRequest } = require("@aws-sdk/protocol-http"); const { default: fetch, Request } = require("node-fetch"); const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL; const AWS_REGION = "${AWS::Region}"; const query = ` mutation updateJobstatus( $serviceiduser: String!, $datetime: String!, $url1: String, $url2: String, $status: String ) { updateJobstatus(input: { serviceiduser: $serviceiduser, datetime: $datetime, url1: $url1, url2: $url2, status: $status }) { serviceiduser } } `; exports.handler = async (event) => { console.log({"EVENT": JSON.stringify(event)}); let variables; if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") { variables = { serviceiduser: event.serviceiduser, datetime: event.datetime, url1: event.filename, url2: event.outputkey, status: "完了" }; } else { variables = { serviceiduser: event.serviceiduser, datetime: event.datetime, status: "エラー" }; } const endpoint = new URL(GRAPHQL_ENDPOINT); const signer = new SignatureV4({ credentials: defaultProvider(), region: AWS_REGION, service: 'appsync', sha256: Sha256 }); const requestToBeSigned = new HttpRequest({ method: 'POST', headers: { 'Content-Type': 'application/json', host: endpoint.host }, hostname: endpoint.host, body: JSON.stringify({ query, variables }), path: endpoint.pathname }); const signed = await signer.sign(requestToBeSigned); const request = new Request(endpoint, signed); let statusCode = 200; let body; let response; try { response = await fetch(request); body = await response.json(); if (body.errors) statusCode = 400; } catch (error) { statusCode = 500; body = { errors: [ { message: error.message } ] }; } return { statusCode, body: JSON.stringify(body) }; }; DependsOn: - LambdaAppSyncInvocationRole - LambdaLayerAwsGraphqlSdkNodejs # ------------------------------------------------------------# # Lambda Layer # ------------------------------------------------------------# LambdaLayerAwsGraphqlSdkNodejs: Type: AWS::Lambda::LayerVersion Properties: LayerName: !Sub EXAMPLE-${SubName}-aws-sdk-graphql-nodejs Description: !Sub aws-sdk-graphql for Node.js to load in EXAMPLE-${SubName} CompatibleRuntimes: - nodejs16.x Content: S3Bucket: !Sub ${S3BucketNameSdk} S3Key: !Sub ${S3KeyAwsGraphqlSdkNodejs} # ------------------------------------------------------------# # Output Parameters # ------------------------------------------------------------# Outputs: # AppSync AppSyncEndpoint: Value: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl Export: Name: !Sub EXAMPLE-AppSyncEndpoint-${SubName}
まとめ
いかがでしたでしょうか?
プッシュ通知だけのためにここまでしなきゃいけないの?というのが正直な感想です。スキーマやリゾルバ設定の面倒くささ、本記事では説明しませんでしたが GraphQL のわかりにくさが半端ないです。
でも AWS AppSync を使ってみてわかったのですが、レスポンスがめっちゃ速いです。アプリのコードにデータをベタ書きしているんじゃないかって思うぐらい。Amazon API Gateway + AWS Lambda + Amazon DynamoDB の構成と比べると全然違います。なので、レスポンススピードを求めるユースケースでも AWS AppSync は使いどころあるかな、と思います。AppSync というサービス名からもわかるように、アプリとデータを密着させてくれるサービスなんだな、というのはよくわかりました。
本記事が皆様のお役に立てれば幸いです。