こんにちは、SCSK 株式会社の広野です。
非同期ジョブを実行した後、結果をどう受け取るか?というのは開発者として作り込み甲斐のあるテーマです。今回は React アプリが AWS 上にある非同期ジョブを呼び出した後に、AWS AppSync 経由でジョブ完了のプッシュ通知を受け取る仕組みを紹介します。
プッシュ通知の必要性
そもそも、なぜプッシュ通知が必要なのか。
サーバーレスアプリから、とあるデータ解析ジョブにデータを渡して実行させるアーキテクチャ図を見てみましょう。
アプリはユーザのブラウザにロードされています。
アプリがジョブに渡すデータを Amazon S3 にアップロードするところから話を始めます。アプリはデータをアップロードした後は処理の制御を手放します。データを送ったら終わり、です。以降の後続処理も自分の責任を果たせば処理を手放す、非同期処理が連発します。
この状況では、アプリはジョブの結果どころか終了すら知ることができません。そのため、ジョブの終了時にジョブ側からアプリに終了を通知してあげる、プッシュ通知の必要性が出てきます。
正直、プッシュ通知でなくてもいいのですが。定期的にアプリにジョブ状況を問い合わせる常駐プロセスを作ればそれで済むのかもしれません。でも、プッシュ通知を実現する AWS サービスがありますので、それを利用しない手はないでしょう?と考えました。
ユーザがプッシュ通知を受け取るための AWS サービスはいくつかあるのですが、デバイスの OS や SMS、メール等ではなくアプリが直接受け取れるサービスは AWS AppSync が最も実現し易い選択肢だと思います。
このアーキテクチャでは、プッシュ通知を実現するために Amazon DynamoDB にジョブ状況管理テーブルを作成し、そのデータに特定の更新があったら AWS AppSync がアプリにプッシュ通知を送る、という仕組みにしています。
AWS AppSync がやってくれること
AWS AppSync は、GraphQL というクエリー言語を使用してデータにアクセスするための API を提供してくれます。データは本アーキテクチャでは Amazon DynamoDB にありますが、RDBMS でもよいですし、他のデータ形式でも対応しているものがあります。
GraphQL では、大きく 3つの命令が可能です。
- Query
- Mutation
- Subscription
Query は、データ読取です。
Mutation は、データ書込です。
Query と Mutation については、あらかじめ AppSync にスキーマとリゾルバと呼ばれるデータベースとの連携設定をしておくことで、AWS Lambda なしで、代わりにアプリからの GraphQL クエリーを元にデータベースにクエリーしてくれます。スキーマがデータベース情報みたいなもので、リゾルバが GraphQL クエリーから得たインプットを元にデータベースへのクエリーに変換するマッピング情報みたいなものと考えるとよいと思います。
Subscription が、プッシュ通知受け取りです。Mutation と Subscription は密接に関係していて、あらかじめ指定した Mutation 設定に対して、その Mutation が実行されたときにその Mutation を Subscribe しているアプリ画面にプッシュ通知を送ります。
重要なのは
- あらかじめ AppSync スキーマ設定内で Mutation に紐付ける Subscription の設定が必要
- Mutation されないとプッシュ通知は発動しない
- Mutation を Subscribe しているアプリ画面でないとプッシュ通知は受け取れない
ということです。
以下、スキーマとリゾルバの設定例です。
スキーマ
type Jobstatus @aws_cognito_user_pools @aws_iam {
datetime: String
jobid: String
serviceid: String
serviceiduser: String
user: String
url1: String
url2: String
status: String
memo: String
ttl: Int
}
type JobstatusConnection {
items: [Jobstatus]
nextToken: String
}
type Mutation {
putJobstatus(input: PutJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
}
input PutJobstatusInput {
datetime: String!
jobid: String!
serviceid: String!
serviceiduser: String!
url1: String
user: String!
status: String!
memo: String
ttl: Int!
}
type Query {
queryJobstatus(serviceiduser: String!): JobstatusConnection
queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection
}
type Subscription {
onPutJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations: ["putJobstatus"])
onUpdateJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations: ["updateJobstatus"])
}
input UpdateJobstatusInput {
datetime: String!
jobid: String
serviceid: String
serviceiduser: String!
url1: String
url2: String
user: String
status: String
memo: String
ttl: Int
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
リゾルバ
上記スキーマ設定のうち、updateJobstatus という Mutation のリゾルバ設定です。serviceiduser と datetime というパーティションキー、ソートキーにマッチするデータに対して、受け取った属性データで更新をかけるクエリーに変換されます。正直、難しすぎて萎えます。AWS Lambda で書いた方が断然楽だ、、、とここに来て後悔し始めます。
{
"version": "2018-05-29",
"operation": "UpdateItem",
"key": {
"serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
"datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
},
#set( $expNames = {} )
#set( $expValues = {} )
#set( $expSet = {} )
#set( $expAdd = {} )
#set( $expRemove = [] )
#foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() )
#if( $util.isNull($entry.value) )
#set( $discard = ${expRemove.add("#${entry.key}")} )
$!{expNames.put("#${entry.key}", "${entry.key}")}
#else
$!{expSet.put("#${entry.key}", ":${entry.key}")}
$!{expNames.put("#${entry.key}", "${entry.key}")}
$!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))}
#end
#end
#set( $expression = "" )
#if( !${expSet.isEmpty()} )
#set( $expression = "SET" )
#foreach( $entry in $expSet.entrySet() )
#set( $expression = "${expression} ${entry.key} = ${entry.value}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
#if( !${expAdd.isEmpty()} )
#set( $expression = "${expression} ADD" )
#foreach( $entry in $expAdd.entrySet() )
#set( $expression = "${expression} ${entry.key} ${entry.value}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
#if( !${expRemove.isEmpty()} )
#set( $expression = "${expression} REMOVE" )
#foreach( $entry in $expRemove )
#set( $expression = "${expression} ${entry}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
"update": {
"expression": "${expression}",
#if( !${expNames.isEmpty()} )
"expressionNames": $utils.toJson($expNames),
#end
#if( !${expValues.isEmpty()} )
"expressionValues": $utils.toJson($expValues),
#end
},
"condition": {
"expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)",
"expressionNames": {
"#serviceiduser": "serviceiduser",
"#datetime": "datetime"
}
}
}
この Mutation に対応している Subscription の設定が、スキーマ設定内の onUpdateJobstatus の部分になります。
type Subscription {
onPutJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations: ["putJobstatus"])
onUpdateJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations: ["updateJobstatus"])
}
updateJobstatus という Mutation が行われたときに、onUpdateJobstatus という Subscription を Subscribe しているアプリにプッシュ通知を送ります。ただし、プッシュ通知を送る条件も書かれています。Subscribe しているアプリがパラメータとして指定している serviceiduser が、その Mutation 内の serviceiduser と一致するとき、という条件で発動します。
意味的には、サービス ID が「今開いている画面」で、ユーザ名が「自分」のものだけプッシュ通知が来る、とイメージしてください。
ここで、アプリ画面イメージを見てみましょう。
アプリ画面イメージ
以下のスクリーンショットのように、アプリ画面から解析ジョブを開始すると、ジョブ完了後にプッシュ通知を受け取り画面を自動更新します。ユーザからしてみれば当たり前の動作なのですが、裏では開発者の努力が詰まっています。(泣)
既に述べましたが、アプリはジョブを開始したら制御を手放します。そのままでは結果が全く分からないので、バックエンドのジョブから完了後に通知を送ってもらうという寸法です。
React アプリのコード
React アプリではジョブ開始時に何をしているかというと、大きく以下 3つです。
- ジョブ一覧データの取得、表示。Amazon DynamoDB へ AppSync 経由で Query をかける。
- ジョブ登録。Amazon DynamoDB へ AppSync 経由で Mutation をかける。
- Amazon S3 へのファイルアップロード。これをトリガーに以降、AWS Step Functions のジョブが発動する。
Amazon S3 へのファイルアップロードについては、以下のブログで方法を掲載しています。今回は送信するファイルが BLOB なので若干ブログ内容をカスタマイズしないといけないのですが。それについては別記事で紹介したいと思います。
また、React アプリでプッシュ通知を受け取るためのコードがあります。
- updateJobstatus という Mutation が行われたときのプッシュ通知を Subscribe する。
ここでは、React からの Query、Mutation、Subscription のコードを紹介します。
前提
AWS AppSync は認証なしではアクセスできません。本記事では React アプリを Amazon Cognito で認証しているので、透過的に AWS AppSync の認証をさせることができます。React 側の設定は簡単です。以下のコードを App.js に仕込んでいます。それだけです。
- App.js
import React from 'react';
import Amplify from 'aws-amplify';
//中略
//Amplify Cognito, S3, AppSync 連携設定
Amplify.configure({
Auth: {
region: process.env.REACT_APP_REGION,
userPoolId: process.env.REACT_APP_USERPOOLID,
userPoolWebClientId: process.env.REACT_APP_USERPOOLWEBCLIENTID,
identityPoolId: process.env.REACT_APP_IDPOOLID
},
Storage: {
AWSS3: {
bucket: process.env.REACT_APP_S3BUCKETNAMEAMPLIFYSTG,
region: process.env.REACT_APP_REGION
}
},
aws_appsync_graphqlEndpoint: process.env.REACT_APP_APPSYNC,
aws_appsync_region: process.env.REACT_APP_REGION,
aws_appsync_authenticationType: "AMAZON_COGNITO_USER_POOLS",
aws_appsync_apiKey: "null"
});
Amplify.configure で AWS AppSync 連携関連の設定を環境変数経由で入れています。Amazon S3 にファイルをアップロードするための設定や Amazon Cognito との連携設定もここに入っています。
Query
あらかじめ AWS AppSync にスキーマ設定、リゾルバ設定ができている状態で、それに合わせた GraphQL クエリーを AWS AppSync API に投げます。
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import gql from 'graphql-tag';
//中略
const serviceid = window.location.pathname;
const [jobdata, setJobdata] = useState();
//中略
//ジョブステータス取得クエリー
const query = gql`
query queryJobstatusByServiceidDatetimeIndex($serviceid: String!) {
queryJobstatusByServiceidDatetimeIndex(serviceid: $serviceid) {
items {
datetime
url1
url2
user
status
memo
}
}
}
`;
//ジョブステータス取得関数
const queryJobstatusByServiceidDatetimeIndex = async () => {
const res = await API.graphql({
query: query,
variables: {
serviceid: serviceid
},
authMode: "AMAZON_COGNITO_USER_POOLS"
});
setJobdata(res.data.queryJobstatusByServiceidDatetimeIndex.items);
};
Mutation
Query とほぼ同じです。やはりあらかじめ AWS AppSync 側でスキーマやリゾルバが適切に設定されている必要があります。
import { API } from 'aws-amplify';
import gql from 'graphql-tag';
//中略
const username = props.username;
const serviceid = window.location.pathname;
const jobid = await uuidv4();
//中略
//ジョブステータス登録クエリー
const putmutation = gql`
mutation putJobstatus(
$serviceiduser: String!,
$datetime: String!,
$jobid: String!,
$serviceid: String!,
$url1: String,
$user: String!,
$status: String!,
$memo: String,
$ttl: Int!
) {
putJobstatus(input: {
serviceiduser: $serviceiduser,
datetime: $datetime,
jobid: $jobid,
serviceid: $serviceid,
url1: $url1,
user: $user,
status: $status,
memo: $memo,
ttl: $ttl
}) {
serviceiduser
}
}
`;
//ジョブステータス登録関数
const putJobstatus = async (jobid) => {
const dt = new Date();
await API.graphql({
query: putmutation,
variables: {
serviceiduser: serviceid + "#" + username,
datetime: dt.toISOString(),
jobid: jobid,
serviceid: serviceid,
url1: input_text,
user: username,
status: "解析中",
memo: memo,
ttl: Math.floor(dt.setDate(dt.getDate() + 30) / 1000)
},
authMode: "AMAZON_COGNITO_USER_POOLS"
});
//ステータス画面更新
queryJobstatusByServiceidDatetimeIndex();
};
Subscription
Subscription は、画面表示時に useEffect で実行させています。console.log で見た感じ、AWS AppSync にプッシュ通知を受け取るためのセッションを常時張っている感じに見えました。(正確には違うかもしれません)
Subscription 実行時に、GraphQL クエリーの中でパラメータとして serviceiduser を渡しています。この serviceiduser が同じデータが Mutation されたときにプッシュ通知を受け取ります。
プッシュ通知を受け取ったとき、あらかじめ指定したデータ(ここでは serviceiduser のみ)を受け取ることができます。そのデータ(以下コードでは data)を使った処理を実行させることもできますが、私はジョブ一覧テーブルの表示更新と画面に Snackbar というポップアップ通知を表示させる処理をさせています。本記事ではそれで事足りるので。
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import gql from 'graphql-tag';
//中略
//ジョブステータス更新サブスクライブクエリー
const subscribeUpdateQuery = gql`
subscription onUpdateJobstatus($serviceiduser: String!) {
onUpdateJobstatus(serviceiduser: $serviceiduser) {
serviceiduser
}
}
`;
//ジョブステータス更新サブスクライブ関数
const subscribeUpdateJobstatus = () => {
API.graphql({
query: subscribeUpdateQuery,
variables: {
serviceiduser: serviceid + '#' + username
},
authMode: 'AMAZON_COGNITO_USER_POOLS'
}).subscribe({
next: (data) => {
//サブスクリプション通知時のアクション = ステータス一覧を更新
queryJobstatusByServiceidDatetimeIndex();
//スナックバーで通知
setIsSnackbarOpen(true);
},
error: (err) => {
console.log(err);
}
});
};
//画面表示時自動実行
useEffect(() => {
//ジョブステータス確認
queryJobstatusByServiceidDatetimeIndex();
//ジョブステータスサブスクリプション実行
subscribeUpdateJobstatus();
}, []);
AWS Lambda からの Mutation コード
上記 React アプリがプッシュ通知を受け取るとき、裏では AWS Step Functions ジョブ内から AWS Lambda 関数が呼び出され、それがジョブ完了ステータスを Amazon DynamoDB テーブルに書き込みます。プッシュ通知を発信するためには AWS AppSync 経由の Mutation でなければならないので、AWS Lambda 関数から Mutation させる必要があります。
これは情報が少なく非常に苦労しました。また、AWS Step Functions のビルトイン API にも Mutation はまだ存在せず、AWS Lambda 関数が呼び出せるデフォルトのモジュールだけでは Mutation を実行させられず、結局自分でモジュールを集めて Lambda Layer として読み込ませました。
以下、AWS Lambda 関数のコードです。Node.js で作成しています。
リージョンはソウルでベタ書きしていますので適宜ご変更ください。AWS Step Functions から invoke される Lambda なので、その仕様で event を引数として受け取っています。そのあたりも適宜ご変更を。
const { Sha256 } = require("@aws-crypto/sha256-js");
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
const { SignatureV4 } = require("@aws-sdk/signature-v4");
const { HttpRequest } = require("@aws-sdk/protocol-http");
const { default: fetch, Request } = require("node-fetch");
const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL;
const AWS_REGION = "ap-northeast-2";
const query = `
mutation updateJobstatus(
$serviceiduser: String!,
$datetime: String!,
$url1: String,
$url2: String,
$status: String
) {
updateJobstatus(input: {
serviceiduser: $serviceiduser,
datetime: $datetime,
url1: $url1,
url2: $url2,
status: $status
}) {
serviceiduser
}
}
`;
exports.handler = async (event) => {
console.log({"EVENT": JSON.stringify(event)});
let variables;
if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") {
variables = {
serviceiduser: event.serviceiduser,
datetime: event.datetime,
url1: event.filename,
url2: event.outputkey,
status: "完了"
};
} else {
variables = {
serviceiduser: event.serviceiduser,
datetime: event.datetime,
status: "エラー"
};
}
const endpoint = new URL(GRAPHQL_ENDPOINT);
const signer = new SignatureV4({
credentials: defaultProvider(),
region: AWS_REGION,
service: 'appsync',
sha256: Sha256
});
const requestToBeSigned = new HttpRequest({
method: 'POST',
headers: {
'Content-Type': 'application/json',
host: endpoint.host
},
hostname: endpoint.host,
body: JSON.stringify({ query, variables }),
path: endpoint.pathname
});
const signed = await signer.sign(requestToBeSigned);
const request = new Request(endpoint, signed);
let statusCode = 200;
let body;
let response;
try {
response = await fetch(request);
body = await response.json();
if (body.errors) statusCode = 400;
} catch (error) {
statusCode = 500;
body = {
errors: [
{
message: error.message
}
]
};
}
return {
statusCode,
body: JSON.stringify(body)
};
};
Lambda Layer として読み込ませるモジュールは以下のコマンドで作成します。nodejs というディレクトリを作成して、そのディレクトリ内で実行します。
npm init npm install --save @aws-crypto/sha256-js @aws-sdk/credential-provider-node @aws-sdk/protocol-http @aws-sdk/signature-v4 node-fetch@2.6.7
node_modules というフォルダ内にモジュールがインストールされ、package.json、package-lock.json というファイルができあがります。nodejs フォルダもろとも ZIP 圧縮すれば出来上がりです。
注意事項があります。
- node-fetch はバージョン 3 だとこのコードは動かないため、バージョン 2 を指定してインストールする必要があります。
- AWS Lambda 関数から AWS AppSync に Mutation するには IAM ロールの権限を付与するだけではダメで、AWS AppSync 側でも IAM ロールによるアクセスを許可する設定にしなければなりません。
AWS AppSync にはデフォルトの認証と追加の認証を設定でき、本記事のケースではデフォルトが Amazon Cognito で 追加を IAM ロールにしています。その場合、スキーマの設定で明示的に追加の認証で許可する Mutation 等を設定する必要があります。
以下がその例なのですが、@aws_iam で IAM ロールによる認証を許可しています。このように追加認証を設定するときにはデフォルトの認証が無効になってしまうため、デフォルトの認証、ここでは @aws_cognito_user_pools も明示的に設定しています。
type Mutation {
putJobstatus(input: PutJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
}
AWS Lambda 関数から AWS AppSync に Mutation する機能を構築するのは非常に面倒なので、今後 AWS が標準で簡単に実装できるようにアップデートしてくれないかな、と期待しています。
AWS AppSync の設定
ここまで、AWS AppSync の設定については細かすぎて書ききれなかったので、以下に AWS CloudFormation テンプレートを掲載しておきます。Amazon DynamoDB や AWS Lambda 関数、関連する IAM ロールも込みです。
Amazon Cognito User Pool ID をパラメータとして入力しないと動かないテンプレートになっているため、このままでは使いづらいテンプレートであることは重々承知の上なのですが、AWS AppSync の設定は参考になるかと思います。
めっちゃ大量ですみません。
AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates an AppSync, a DynamoDB table, a Lambda function and relevant IAM roles.
# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
SubName:
Type: String
Description: System sub name of EXAMPLE. (e.g. prod or test)
Default: test
MaxLength: 10
MinLength: 1
S3BucketNameSdk:
Type: String
Description: S3 bucket name in which you uploaded sdks for Lambda Layers. (e.g. example-temp-bucket)
Default: example-temp-bucket
MaxLength: 50
MinLength: 1
S3KeyAwsGraphqlSdkNodejs:
Type: String
Description: S3 key of aws-sdk-graphql-nodejs.zip. Fill the exact key name if you renamed. (e.g. sdk/Node.js16/aws-sdk-graphql-nodejs.zip)
Default: sdk/Node.js16/aws-sdk-graphql-nodejs.zip
MaxLength: 100
MinLength: 1
CognitoUserPoolID:
Type: String
Description: Cognito User Pool ID
Resources:
# ------------------------------------------------------------#
# AppSync DynamoDB Invocation Role (IAM)
# ------------------------------------------------------------#
AppSyncDynamodbInvocationRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub EXAMPLE-AppSyncDynamodbInvocationRole-${SubName}
Description: This role allows AppSync to invoke DynamoDB.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- appsync.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: !Sub EXAMPLE-AppSyncDynamodbInvocationPolicy-${SubName}
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:DeleteItem
- dynamodb:UpdateItem
- dynamodb:Query
- dynamodb:Scan
- dynamodb:BatchGetItem
- dynamodb:BatchWriteItem
Resource:
- Fn::Join:
- ""
- - Fn::GetAtt: DynamoJobstatus.Arn
- "*"
DependsOn:
- DynamoJobstatus
AppSyncCloudWatchLogsPushRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub EXAMPLE-AppSyncCloudWatchLogsPushRole-${SubName}
Description: This role allows AppSync to push logs to CloudWatch Logs.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- appsync.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSAppSyncPushToCloudWatchLogs
# ------------------------------------------------------------#
# Lambda AppSync Invocation Role (IAM)
# ------------------------------------------------------------#
LambdaAppSyncInvocationRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub EXAMPLE-LambdaAppSyncInvocationRole-${SubName}
Description: This role allows Lambda to invoke AppSync.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
Policies:
- PolicyName: !Sub EXAMPLE-LambdaAppSyncInvocationPolicy-${SubName}
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- appsync:GraphQL
Resource:
- Fn::Join:
- ""
- - Fn::GetAtt: AppSyncApiEXAMPLE.Arn
- "/types/Mutation/*"
DependsOn:
- AppSyncApiEXAMPLE
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
DynamoJobstatus:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub EXAMPLE-jobstatus-${SubName}
AttributeDefinitions:
- AttributeName: serviceiduser
AttributeType: S
- AttributeName: datetime
AttributeType: S
- AttributeName: jobid
AttributeType: S
- AttributeName: serviceid
AttributeType: S
BillingMode: PAY_PER_REQUEST
KeySchema:
- AttributeName: serviceiduser
KeyType: HASH
- AttributeName: datetime
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: jobid-index
KeySchema:
- AttributeName: jobid
KeyType: HASH
Projection:
ProjectionType: ALL
- IndexName: serviceid-datetime-index
KeySchema:
- AttributeName: serviceid
KeyType: HASH
- AttributeName: datetime
KeyType: RANGE
Projection:
ProjectionType: ALL
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: false
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
Tags:
- Key: Cost
Value: !Sub EXAMPLE-${SubName}
# ------------------------------------------------------------#
# AppSync
# ------------------------------------------------------------#
AppSyncApiEXAMPLE:
Type: AWS::AppSync::GraphQLApi
Description: AppSync API for EXAMPLE
Properties:
Name: !Sub EXAMPLE-AppSyncApiEXAMPLE-${SubName}
AuthenticationType: AMAZON_COGNITO_USER_POOLS
UserPoolConfig:
UserPoolId: !Ref CognitoUserPoolID
AwsRegion: !Sub ${AWS::Region}
DefaultAction: "ALLOW"
AdditionalAuthenticationProviders:
- AuthenticationType: AWS_IAM
LogConfig:
CloudWatchLogsRoleArn: !GetAtt AppSyncCloudWatchLogsPushRole.Arn
ExcludeVerboseContent: false
FieldLogLevel: ALL
XrayEnabled: false
Tags:
- Key: Cost
Value: !Sub EXAMPLE-${SubName}
DependsOn:
- AppSyncCloudWatchLogsPushRole
AppSyncSchemaJobstatus:
Type: AWS::AppSync::GraphQLSchema
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
Definition: |
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
type Jobstatus @aws_cognito_user_pools @aws_iam {
datetime: String
jobid: String
serviceid: String
serviceiduser: String
user: String
url1: String
url2: String
status: String
memo: String
ttl: Int
}
type JobstatusConnection {
items: [Jobstatus]
nextToken: String
}
type Mutation {
putJobstatus(input: PutJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
@aws_cognito_user_pools @aws_iam
}
type Query {
queryJobstatus(serviceiduser: String!): JobstatusConnection
queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection
}
type Subscription {
onPutJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations : ["putJobstatus"])
onUpdateJobstatus(serviceiduser: String!): Jobstatus
@aws_subscribe(mutations: ["updateJobstatus"])
}
input PutJobstatusInput {
datetime: String!
jobid: String!
serviceid: String!
serviceiduser: String!
url1: String
user: String!
status: String!
memo: String
ttl: Int!
}
input UpdateJobstatusInput {
datetime: String!
jobid: String
serviceid: String
serviceiduser: String!
url1: String
url2: String
user: String
status: String
memo: String
ttl: Int
}
DependsOn:
- AppSyncApiEXAMPLE
AppSyncDataSourceJobstatus:
Type: AWS::AppSync::DataSource
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
Name: !Sub EXAMPLE${SubName}AppSyncDataSourceJobstatus
Description: AppSync DataSource to query Jobstatus DynamoDB.
Type: AMAZON_DYNAMODB
ServiceRoleArn: !GetAtt AppSyncDynamodbInvocationRole.Arn
DynamoDBConfig:
TableName: !Ref DynamoJobstatus
AwsRegion: !Sub ${AWS::Region}
DependsOn:
- AppSyncApiEXAMPLE
- AppSyncDynamodbInvocationRole
AppSyncResolverQueryJobstatus:
Type: AWS::AppSync::Resolver
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
TypeName: Query
FieldName: queryJobstatus
DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
RequestMappingTemplate: |
{
"version": "2018-05-29",
"operation": "Query",
"query": {
"expression": "#serviceiduser = :serviceiduser",
"expressionNames": {
"#serviceiduser": "serviceiduser"
},
"expressionValues": {
":serviceiduser": $util.dynamodb.toDynamoDBJson($context.arguments.serviceiduser)
}
},
"limit": $util.defaultIfNull($context.arguments.first, 30),
"nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)),
"scanIndexForward": false,
"consistentRead": false,
"select": "ALL_ATTRIBUTES"
}
ResponseMappingTemplate: |
$utils.toJson($context.result)
DependsOn:
- AppSyncDataSourceJobstatus
- AppSyncSchemaJobstatus
AppSyncResolverQueryJobstatusByServiceidDatetimeIndex:
Type: AWS::AppSync::Resolver
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
TypeName: Query
FieldName: queryJobstatusByServiceidDatetimeIndex
DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
RequestMappingTemplate: |
{
"version": "2018-05-29",
"operation": "Query",
"query": {
"expression": "#serviceid = :serviceid",
"expressionNames": {
"#serviceid": "serviceid"
},
"expressionValues": {
":serviceid": $util.dynamodb.toDynamoDBJson($context.arguments.serviceid)
}
},
"index": "serviceid-datetime-index",
"limit": $util.defaultIfNull($context.arguments.first, 30),
"nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)),
"scanIndexForward": false,
"consistentRead": false,
"select": "ALL_ATTRIBUTES"
}
ResponseMappingTemplate: |
$utils.toJson($context.result)
DependsOn:
- AppSyncDataSourceJobstatus
- AppSyncSchemaJobstatus
AppSyncResolverPutJobstatus:
Type: AWS::AppSync::Resolver
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
TypeName: Mutation
FieldName: putJobstatus
DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
RequestMappingTemplate: |
{
"version": "2018-05-29",
"operation": "PutItem",
"key": {
"serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
"datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
},
"attributeValues": $util.dynamodb.toMapValuesJson($ctx.args.input),
"condition": {
"expression": "attribute_not_exists(#serviceiduser) AND attribute_not_exists(#datetime)",
"expressionNames": {
"#serviceiduser": "serviceiduser",
"#datetime": "datetime",
}
}
}
ResponseMappingTemplate: |
$util.toJson($context.result)
DependsOn:
- AppSyncDataSourceJobstatus
- AppSyncSchemaJobstatus
AppSyncResolverUpdateJobstatus:
Type: AWS::AppSync::Resolver
Properties:
ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
TypeName: Mutation
FieldName: updateJobstatus
DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
RequestMappingTemplate: |
{
"version": "2018-05-29",
"operation": "UpdateItem",
"key": {
"serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
"datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
},
#set( $expNames = {} )
#set( $expValues = {} )
#set( $expSet = {} )
#set( $expAdd = {} )
#set( $expRemove = [] )
#foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() )
#if( $util.isNull($entry.value) )
#set( $discard = ${expRemove.add("#${entry.key}")} )
$!{expNames.put("#${entry.key}", "${entry.key}")}
#else
$!{expSet.put("#${entry.key}", ":${entry.key}")}
$!{expNames.put("#${entry.key}", "${entry.key}")}
$!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))}
#end
#end
#set( $expression = "" )
#if( !${expSet.isEmpty()} )
#set( $expression = "SET" )
#foreach( $entry in $expSet.entrySet() )
#set( $expression = "${expression} ${entry.key} = ${entry.value}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
#if( !${expAdd.isEmpty()} )
#set( $expression = "${expression} ADD" )
#foreach( $entry in $expAdd.entrySet() )
#set( $expression = "${expression} ${entry.key} ${entry.value}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
#if( !${expRemove.isEmpty()} )
#set( $expression = "${expression} REMOVE" )
#foreach( $entry in $expRemove )
#set( $expression = "${expression} ${entry}" )
#if ( $foreach.hasNext )
#set( $expression = "${expression}," )
#end
#end
#end
"update": {
"expression": "${expression}",
#if( !${expNames.isEmpty()} )
"expressionNames": $utils.toJson($expNames),
#end
#if( !${expValues.isEmpty()} )
"expressionValues": $utils.toJson($expValues),
#end
},
"condition": {
"expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)",
"expressionNames": {
"#serviceiduser": "serviceiduser",
"#datetime": "datetime"
}
}
}
ResponseMappingTemplate: |
$util.toJson($context.result)
DependsOn:
- AppSyncDataSourceJobstatus
- AppSyncSchemaJobstatus
# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
LambdaAppSyncMutationUpdateJobstatus:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub EXAMPLE-MutationUpdateJobstatus-${SubName}
Description: !Sub Lambda Function to mutate update job status in EXAMPLE-${SubName}
Runtime: nodejs16.x
Timeout: 5
MemorySize: 128
Environment:
Variables:
GRAPHQLURL: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl
Role: !GetAtt LambdaAppSyncInvocationRole.Arn
Handler: index.handler
Layers:
- !Ref LambdaLayerAwsGraphqlSdkNodejs
Tags:
- Key: Cost
Value: !Sub EXAMPLE-${SubName}
Code:
ZipFile: !Sub |
const { Sha256 } = require("@aws-crypto/sha256-js");
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
const { SignatureV4 } = require("@aws-sdk/signature-v4");
const { HttpRequest } = require("@aws-sdk/protocol-http");
const { default: fetch, Request } = require("node-fetch");
const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL;
const AWS_REGION = "${AWS::Region}";
const query = `
mutation updateJobstatus(
$serviceiduser: String!,
$datetime: String!,
$url1: String,
$url2: String,
$status: String
) {
updateJobstatus(input: {
serviceiduser: $serviceiduser,
datetime: $datetime,
url1: $url1,
url2: $url2,
status: $status
}) {
serviceiduser
}
}
`;
exports.handler = async (event) => {
console.log({"EVENT": JSON.stringify(event)});
let variables;
if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") {
variables = {
serviceiduser: event.serviceiduser,
datetime: event.datetime,
url1: event.filename,
url2: event.outputkey,
status: "完了"
};
} else {
variables = {
serviceiduser: event.serviceiduser,
datetime: event.datetime,
status: "エラー"
};
}
const endpoint = new URL(GRAPHQL_ENDPOINT);
const signer = new SignatureV4({
credentials: defaultProvider(),
region: AWS_REGION,
service: 'appsync',
sha256: Sha256
});
const requestToBeSigned = new HttpRequest({
method: 'POST',
headers: {
'Content-Type': 'application/json',
host: endpoint.host
},
hostname: endpoint.host,
body: JSON.stringify({ query, variables }),
path: endpoint.pathname
});
const signed = await signer.sign(requestToBeSigned);
const request = new Request(endpoint, signed);
let statusCode = 200;
let body;
let response;
try {
response = await fetch(request);
body = await response.json();
if (body.errors) statusCode = 400;
} catch (error) {
statusCode = 500;
body = {
errors: [
{
message: error.message
}
]
};
}
return {
statusCode,
body: JSON.stringify(body)
};
};
DependsOn:
- LambdaAppSyncInvocationRole
- LambdaLayerAwsGraphqlSdkNodejs
# ------------------------------------------------------------#
# Lambda Layer
# ------------------------------------------------------------#
LambdaLayerAwsGraphqlSdkNodejs:
Type: AWS::Lambda::LayerVersion
Properties:
LayerName: !Sub EXAMPLE-${SubName}-aws-sdk-graphql-nodejs
Description: !Sub aws-sdk-graphql for Node.js to load in EXAMPLE-${SubName}
CompatibleRuntimes:
- nodejs16.x
Content:
S3Bucket: !Sub ${S3BucketNameSdk}
S3Key: !Sub ${S3KeyAwsGraphqlSdkNodejs}
# ------------------------------------------------------------#
# Output Parameters
# ------------------------------------------------------------#
Outputs:
# AppSync
AppSyncEndpoint:
Value: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl
Export:
Name: !Sub EXAMPLE-AppSyncEndpoint-${SubName}
まとめ
いかがでしたでしょうか?
プッシュ通知だけのためにここまでしなきゃいけないの?というのが正直な感想です。スキーマやリゾルバ設定の面倒くささ、本記事では説明しませんでしたが GraphQL のわかりにくさが半端ないです。
でも AWS AppSync を使ってみてわかったのですが、レスポンスがめっちゃ速いです。アプリのコードにデータをベタ書きしているんじゃないかって思うぐらい。Amazon API Gateway + AWS Lambda + Amazon DynamoDB の構成と比べると全然違います。なので、レスポンススピードを求めるユースケースでも AWS AppSync は使いどころあるかな、と思います。AppSync というサービス名からもわかるように、アプリとデータを密着させてくれるサービスなんだな、というのはよくわかりました。
本記事が皆様のお役に立てれば幸いです。



