AWS AppSync を使って React アプリからキックした非同期ジョブの結果をプッシュ通知で受け取る

本記事は、Japan AWS Ambassador Advent Calendar 2022 の 2022/12/1 付記事となります。

こんにちは、SCSK 株式会社の広野です。

非同期ジョブを実行した後、結果をどう受け取るか?というのは開発者として作り込み甲斐のあるテーマです。今回は React アプリが AWS 上にある非同期ジョブを呼び出した後に、AWS AppSync 経由でジョブ完了のプッシュ通知を受け取る仕組みを紹介します。

プッシュ通知の必要性

そもそも、なぜプッシュ通知が必要なのか。

サーバーレスアプリから、とあるデータ解析ジョブにデータを渡して実行させるアーキテクチャ図を見てみましょう。

アプリはユーザのブラウザにロードされています。

アプリがジョブに渡すデータを Amazon S3 にアップロードするところから話を始めます。アプリはデータをアップロードした後は処理の制御を手放します。データを送ったら終わり、です。以降の後続処理も自分の責任を果たせば処理を手放す、非同期処理が連発します。

この状況では、アプリはジョブの結果どころか終了すら知ることができません。そのため、ジョブの終了時にジョブ側からアプリに終了を通知してあげる、プッシュ通知の必要性が出てきます。

正直、プッシュ通知でなくてもいいのですが。定期的にアプリにジョブ状況を問い合わせる常駐プロセスを作ればそれで済むのかもしれません。でも、プッシュ通知を実現する AWS サービスがありますので、それを利用しない手はないでしょう?と考えました。

ユーザがプッシュ通知を受け取るための AWS サービスはいくつかあるのですが、デバイスの OS や SMS、メール等ではなくアプリが直接受け取れるサービスは AWS AppSync が最も実現し易い選択肢だと思います。

このアーキテクチャでは、プッシュ通知を実現するために Amazon DynamoDB にジョブ状況管理テーブルを作成し、そのデータに特定の更新があったら AWS AppSync がアプリにプッシュ通知を送る、という仕組みにしています。

AWS AppSync がやってくれること

AWS AppSync は、GraphQL というクエリー言語を使用してデータにアクセスするための API を提供してくれます。データは本アーキテクチャでは Amazon DynamoDB にありますが、RDBMS でもよいですし、他のデータ形式でも対応しているものがあります。

GraphQL では、大きく 3つの命令が可能です。

  1. Query
  2. Mutation
  3. Subscription

Query は、データ読取です。

Mutation は、データ書込です。

Query と Mutation については、あらかじめ AppSync にスキーマとリゾルバと呼ばれるデータベースとの連携設定をしておくことで、AWS Lambda なしで、代わりにアプリからの GraphQL クエリーを元にデータベースにクエリーしてくれます。スキーマがデータベース情報みたいなもので、リゾルバが GraphQL クエリーから得たインプットを元にデータベースへのクエリーに変換するマッピング情報みたいなものと考えるとよいと思います。

Subscription が、プッシュ通知受け取りです。Mutation と Subscription は密接に関係していて、あらかじめ指定した Mutation 設定に対して、その Mutation が実行されたときにその Mutation を Subscribe しているアプリ画面にプッシュ通知を送ります。

重要なのは

  • あらかじめ AppSync スキーマ設定内で Mutation に紐付ける Subscription の設定が必要
  • Mutation されないとプッシュ通知は発動しない
  • Mutation を Subscribe しているアプリ画面でないとプッシュ通知は受け取れない

ということです。

以下、スキーマとリゾルバの設定例です。

スキーマ

type Jobstatus @aws_cognito_user_pools @aws_iam {
	datetime: String
	jobid: String
	serviceid: String
	serviceiduser: String
	user: String
	url1: String
	url2: String
	status: String
	memo: String
	ttl: Int
}

type JobstatusConnection {
	items: [Jobstatus]
	nextToken: String
}

type Mutation {
	putJobstatus(input: PutJobstatusInput!): Jobstatus
		@aws_cognito_user_pools @aws_iam
	updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
		@aws_cognito_user_pools @aws_iam
}

input PutJobstatusInput {
	datetime: String!
	jobid: String!
	serviceid: String!
	serviceiduser: String!
	url1: String
	user: String!
	status: String!
	memo: String
	ttl: Int!
}

type Query {
	queryJobstatus(serviceiduser: String!): JobstatusConnection
	queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection
}

type Subscription {
	onPutJobstatus(serviceiduser: String!): Jobstatus
		@aws_subscribe(mutations: ["putJobstatus"])
	onUpdateJobstatus(serviceiduser: String!): Jobstatus
		@aws_subscribe(mutations: ["updateJobstatus"])
}

input UpdateJobstatusInput {
	datetime: String!
	jobid: String
	serviceid: String
	serviceiduser: String!
	url1: String
	url2: String
	user: String
	status: String
	memo: String
	ttl: Int
}

schema {
	query: Query
	mutation: Mutation
	subscription: Subscription
}

リゾルバ

上記スキーマ設定のうち、updateJobstatus という Mutation のリゾルバ設定です。serviceiduser と datetime というパーティションキー、ソートキーにマッチするデータに対して、受け取った属性データで更新をかけるクエリーに変換されます。正直、難しすぎて萎えます。AWS Lambda で書いた方が断然楽だ、、、とここに来て後悔し始めます。

{
  "version": "2018-05-29",
  "operation": "UpdateItem",
  "key": {
    "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
    "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
  },
  #set( $expNames  = {} )
  #set( $expValues = {} )
  #set( $expSet = {} )
  #set( $expAdd = {} )
  #set( $expRemove = [] )
  #foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() )
    #if( $util.isNull($entry.value) )
      #set( $discard = ${expRemove.add("#${entry.key}")} )
      $!{expNames.put("#${entry.key}", "${entry.key}")}
    #else
      $!{expSet.put("#${entry.key}", ":${entry.key}")}
      $!{expNames.put("#${entry.key}", "${entry.key}")}
      $!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))}
    #end
  #end
  #set( $expression = "" )
  #if( !${expSet.isEmpty()} )
    #set( $expression = "SET" )
    #foreach( $entry in $expSet.entrySet() )
      #set( $expression = "${expression} ${entry.key} = ${entry.value}" )
      #if ( $foreach.hasNext )
        #set( $expression = "${expression}," )
      #end
    #end
  #end
  #if( !${expAdd.isEmpty()} )
    #set( $expression = "${expression} ADD" )
    #foreach( $entry in $expAdd.entrySet() )
      #set( $expression = "${expression} ${entry.key} ${entry.value}" )
      #if ( $foreach.hasNext )
        #set( $expression = "${expression}," )
      #end
    #end
  #end
  #if( !${expRemove.isEmpty()} )
    #set( $expression = "${expression} REMOVE" )
    #foreach( $entry in $expRemove )
      #set( $expression = "${expression} ${entry}" )
      #if ( $foreach.hasNext )
        #set( $expression = "${expression}," )
      #end
    #end
  #end
  "update": {
    "expression": "${expression}",
    #if( !${expNames.isEmpty()} )
      "expressionNames": $utils.toJson($expNames),
    #end
    #if( !${expValues.isEmpty()} )
      "expressionValues": $utils.toJson($expValues),
    #end
  },
  "condition": {
    "expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)",
    "expressionNames": {
      "#serviceiduser": "serviceiduser",
      "#datetime": "datetime"
    }
  }
}

この Mutation に対応している Subscription の設定が、スキーマ設定内の onUpdateJobstatus の部分になります。

type Subscription {
	onPutJobstatus(serviceiduser: String!): Jobstatus
		@aws_subscribe(mutations: ["putJobstatus"])
	onUpdateJobstatus(serviceiduser: String!): Jobstatus
		@aws_subscribe(mutations: ["updateJobstatus"])
}

updateJobstatus という Mutation が行われたときに、onUpdateJobstatus という Subscription を Subscribe しているアプリにプッシュ通知を送ります。ただし、プッシュ通知を送る条件も書かれています。Subscribe しているアプリがパラメータとして指定している serviceiduser が、その Mutation 内の serviceiduser と一致するとき、という条件で発動します。

意味的には、サービス ID が「今開いている画面」で、ユーザ名が「自分」のものだけプッシュ通知が来る、とイメージしてください。

ここで、アプリ画面イメージを見てみましょう。

アプリ画面イメージ

以下のスクリーンショットのように、アプリ画面から解析ジョブを開始すると、ジョブ完了後にプッシュ通知を受け取り画面を自動更新します。ユーザからしてみれば当たり前の動作なのですが、裏では開発者の努力が詰まっています。(泣)

既に述べましたが、アプリはジョブを開始したら制御を手放します。そのままでは結果が全く分からないので、バックエンドのジョブから完了後に通知を送ってもらうという寸法です。

React アプリのコード

React アプリではジョブ開始時に何をしているかというと、大きく以下 3つです。

  • ジョブ一覧データの取得、表示。Amazon DynamoDB へ AppSync 経由で Query をかける。
  • ジョブ登録。Amazon DynamoDB へ AppSync 経由で Mutation をかける。
  • Amazon S3 へのファイルアップロード。これをトリガーに以降、AWS Step Functions のジョブが発動する。

Amazon S3 へのファイルアップロードについては、以下のブログで方法を掲載しています。今回は送信するファイルが BLOB なので若干ブログ内容をカスタマイズしないといけないのですが。それについては別記事で紹介したいと思います。

また、React アプリでプッシュ通知を受け取るためのコードがあります。

  • updateJobstatus という Mutation が行われたときのプッシュ通知を Subscribe する。

ここでは、React からの Query、Mutation、Subscription のコードを紹介します。

前提

AWS AppSync は認証なしではアクセスできません。本記事では React アプリを Amazon Cognito で認証しているので、透過的に AWS AppSync の認証をさせることができます。React 側の設定は簡単です。以下のコードを App.js に仕込んでいます。それだけです。

  • App.js
import React from 'react';
import Amplify from 'aws-amplify';

//中略

//Amplify Cognito, S3, AppSync 連携設定
Amplify.configure({
  Auth: {
    region: process.env.REACT_APP_REGION,
    userPoolId: process.env.REACT_APP_USERPOOLID,
    userPoolWebClientId: process.env.REACT_APP_USERPOOLWEBCLIENTID,
    identityPoolId: process.env.REACT_APP_IDPOOLID
  },
  Storage: {
    AWSS3: {
      bucket: process.env.REACT_APP_S3BUCKETNAMEAMPLIFYSTG,
      region: process.env.REACT_APP_REGION
    }
  },
  aws_appsync_graphqlEndpoint: process.env.REACT_APP_APPSYNC,
  aws_appsync_region: process.env.REACT_APP_REGION,
  aws_appsync_authenticationType: "AMAZON_COGNITO_USER_POOLS",
  aws_appsync_apiKey: "null"
});

Amplify.configure で AWS AppSync 連携関連の設定を環境変数経由で入れています。Amazon S3 にファイルをアップロードするための設定や Amazon Cognito との連携設定もここに入っています。

Query

あらかじめ AWS AppSync にスキーマ設定、リゾルバ設定ができている状態で、それに合わせた GraphQL クエリーを AWS AppSync API に投げます。

import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import gql from 'graphql-tag';

  //中略

  const serviceid = window.location.pathname;
  const [jobdata, setJobdata] = useState();

  //中略

  //ジョブステータス取得クエリー
  const query = gql`
    query queryJobstatusByServiceidDatetimeIndex($serviceid: String!) {
      queryJobstatusByServiceidDatetimeIndex(serviceid: $serviceid) {
        items {
          datetime
          url1
          url2
          user
          status
          memo
        }
      }
    }
  `;
  //ジョブステータス取得関数
  const queryJobstatusByServiceidDatetimeIndex = async () => {
    const res = await API.graphql({
      query: query,
      variables: {
        serviceid: serviceid
      },
      authMode: "AMAZON_COGNITO_USER_POOLS"
    });
    setJobdata(res.data.queryJobstatusByServiceidDatetimeIndex.items);
  };

Mutation

Query とほぼ同じです。やはりあらかじめ AWS AppSync 側でスキーマやリゾルバが適切に設定されている必要があります。

import { API } from 'aws-amplify';
import gql from 'graphql-tag';

  //中略

  const username = props.username;
  const serviceid = window.location.pathname;
  const jobid = await uuidv4();

  //中略

  //ジョブステータス登録クエリー
  const putmutation = gql`
    mutation putJobstatus(
      $serviceiduser: String!,
      $datetime: String!,
      $jobid: String!,
      $serviceid: String!,
      $url1: String,
      $user: String!,
      $status: String!,
      $memo: String,
      $ttl: Int!
    ) {
      putJobstatus(input: {
        serviceiduser: $serviceiduser,
        datetime: $datetime,
        jobid: $jobid,
        serviceid: $serviceid,
        url1: $url1,
        user: $user,
        status: $status,
        memo: $memo,
        ttl: $ttl
      }) {
        serviceiduser
      }
    }
  `;
  //ジョブステータス登録関数
  const putJobstatus = async (jobid) => {
    const dt = new Date();
    await API.graphql({
      query: putmutation,
      variables: {
        serviceiduser: serviceid + "#" + username,
        datetime: dt.toISOString(),
        jobid: jobid,
        serviceid: serviceid,
        url1: input_text,
        user: username,
        status: "解析中",
        memo: memo,
        ttl: Math.floor(dt.setDate(dt.getDate() + 30) / 1000)
      },
      authMode: "AMAZON_COGNITO_USER_POOLS"
    });
    //ステータス画面更新
    queryJobstatusByServiceidDatetimeIndex();
  };

Subscription

Subscription は、画面表示時に useEffect で実行させています。console.log で見た感じ、AWS AppSync にプッシュ通知を受け取るためのセッションを常時張っている感じに見えました。(正確には違うかもしれません)

Subscription 実行時に、GraphQL クエリーの中でパラメータとして serviceiduser を渡しています。この serviceiduser が同じデータが Mutation されたときにプッシュ通知を受け取ります。

プッシュ通知を受け取ったとき、あらかじめ指定したデータ(ここでは serviceiduser のみ)を受け取ることができます。そのデータ(以下コードでは data)を使った処理を実行させることもできますが、私はジョブ一覧テーブルの表示更新と画面に Snackbar というポップアップ通知を表示させる処理をさせています。本記事ではそれで事足りるので。

import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import gql from 'graphql-tag';

//中略

  //ジョブステータス更新サブスクライブクエリー
  const subscribeUpdateQuery = gql`
    subscription onUpdateJobstatus($serviceiduser: String!) {
      onUpdateJobstatus(serviceiduser: $serviceiduser) {
        serviceiduser
      }
    }
  `;
  //ジョブステータス更新サブスクライブ関数
  const subscribeUpdateJobstatus = () => {
    API.graphql({
      query: subscribeUpdateQuery,
      variables: {
        serviceiduser: serviceid + '#' + username
      },
      authMode: 'AMAZON_COGNITO_USER_POOLS'
    }).subscribe({
      next: (data) => {
        //サブスクリプション通知時のアクション = ステータス一覧を更新
        queryJobstatusByServiceidDatetimeIndex();
        //スナックバーで通知
        setIsSnackbarOpen(true);
      },
      error: (err) => {
        console.log(err);
      }
    });
  };

  //画面表示時自動実行
  useEffect(() => {
    //ジョブステータス確認
    queryJobstatusByServiceidDatetimeIndex();
    //ジョブステータスサブスクリプション実行
    subscribeUpdateJobstatus();
  }, []);

AWS Lambda からの Mutation コード

上記 React アプリがプッシュ通知を受け取るとき、裏では AWS Step Functions ジョブ内から AWS Lambda 関数が呼び出され、それがジョブ完了ステータスを Amazon DynamoDB テーブルに書き込みます。プッシュ通知を発信するためには AWS AppSync 経由の Mutation でなければならないので、AWS Lambda 関数から Mutation させる必要があります。

これは情報が少なく非常に苦労しました。また、AWS Step Functions のビルトイン API にも Mutation はまだ存在せず、AWS Lambda 関数が呼び出せるデフォルトのモジュールだけでは Mutation を実行させられず、結局自分でモジュールを集めて Lambda Layer として読み込ませました。

以下、AWS Lambda 関数のコードです。Node.js でしか作れませんでした。

リージョンはソウルでベタ書きしていますので適宜ご変更ください。AWS Step Functions から invoke される Lambda なので、その仕様で event を引数として受け取っています。そのあたりも適宜ご変更を。

const { Sha256 } = require("@aws-crypto/sha256-js");
const { defaultProvider } = require("@aws-sdk/credential-provider-node");
const { SignatureV4 } = require("@aws-sdk/signature-v4");
const { HttpRequest } = require("@aws-sdk/protocol-http");
const { default: fetch, Request } = require("node-fetch");
const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL;
const AWS_REGION = "ap-northeast-2";
const query = `
  mutation updateJobstatus(
    $serviceiduser: String!,
    $datetime: String!,
    $url1: String,
    $url2: String,
    $status: String
  ) {
    updateJobstatus(input: {
      serviceiduser: $serviceiduser,
      datetime: $datetime,
      url1: $url1,
      url2: $url2,
      status: $status
    }) {
      serviceiduser
    }
  }
`;
exports.handler = async (event) => {
  console.log({"EVENT": JSON.stringify(event)});
  let variables;
  if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") {
    variables = {
      serviceiduser: event.serviceiduser,
      datetime: event.datetime,
      url1: event.filename,
      url2: event.outputkey,
      status: "完了"
    };
  } else {
    variables = {
      serviceiduser: event.serviceiduser,
      datetime: event.datetime,
      status: "エラー"
    };
  }
  const endpoint = new URL(GRAPHQL_ENDPOINT);
  const signer = new SignatureV4({
    credentials: defaultProvider(),
    region: AWS_REGION,
    service: 'appsync',
    sha256: Sha256
  });
  const requestToBeSigned = new HttpRequest({
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      host: endpoint.host
    },
    hostname: endpoint.host,
    body: JSON.stringify({ query, variables }),
    path: endpoint.pathname
  });
  const signed = await signer.sign(requestToBeSigned);
  const request = new Request(endpoint, signed);
  let statusCode = 200;
  let body;
  let response;
  try {
    response = await fetch(request);
    body = await response.json();
    if (body.errors) statusCode = 400;
  } catch (error) {
    statusCode = 500;
    body = {
      errors: [
        {
          message: error.message
        }
      ]
    };
  }
  return {
    statusCode,
    body: JSON.stringify(body)
  };
};

Lambda Layer として読み込ませるモジュールは以下のコマンドで作成します。nodejs というディレクトリを作成して、そのディレクトリ内で実行します。

npm init
npm install --save @aws-crypto/sha256-js @aws-sdk/credential-provider-node @aws-sdk/protocol-http @aws-sdk/signature-v4 node-fetch@2.6.7

node_modules というフォルダ内にモジュールがインストールされ、package.json、package-lock.json というファイルができあがります。nodejs フォルダもろとも ZIP 圧縮すれば出来上がりです。

注意事項があります。

  • node-fetch はバージョン 3 だとこのコードは動かないため、バージョン 2 を指定してインストールする必要があります。
  • AWS Lambda 関数から AWS AppSync に Mutation するには IAM ロールの権限を付与するだけではダメで、AWS AppSync 側でも IAM ロールによるアクセスを許可する設定にしなければなりません。
    AWS AppSync にはデフォルトの認証と追加の認証を設定でき、本記事のケースではデフォルトが Amazon Cognito で 追加を IAM ロールにしています。その場合、スキーマの設定で明示的に追加の認証で許可する Mutation 等を設定する必要があります。
    以下がその例なのですが、@aws_iam で IAM ロールによる認証を許可しています。このように追加認証を設定するときにはデフォルトの認証が無効になってしまうため、デフォルトの認証、ここでは @aws_cognito_user_pools も明示的に設定しています。
type Mutation {
	putJobstatus(input: PutJobstatusInput!): Jobstatus
		@aws_cognito_user_pools @aws_iam
	updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
		@aws_cognito_user_pools @aws_iam
}

AWS Lambda 関数から AWS AppSync に Mutation する機能を構築するのは非常に面倒なので、今後 AWS が標準で簡単に実装できるようにアップデートしてくれないかな、と期待しています。

AWS AppSync の設定

ここまで、AWS AppSync の設定については細かすぎて書ききれなかったので、以下に AWS CloudFormation テンプレートを掲載しておきます。Amazon DynamoDB や AWS Lambda 関数、関連する IAM ロールも込みです。

Amazon Cognito User Pool ID をパラメータとして入力しないと動かないテンプレートになっているため、このままでは使いづらいテンプレートであることは重々承知の上なのですが、AWS AppSync の設定は参考になるかと思います。

めっちゃ大量ですみません。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates an AppSync, a DynamoDB table, a Lambda function and relevant IAM roles.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SubName:
    Type: String
    Description: System sub name of EXAMPLE. (e.g. prod or test)
    Default: test
    MaxLength: 10
    MinLength: 1

  S3BucketNameSdk:
    Type: String
    Description: S3 bucket name in which you uploaded sdks for Lambda Layers. (e.g. example-temp-bucket)
    Default: example-temp-bucket
    MaxLength: 50
    MinLength: 1

  S3KeyAwsGraphqlSdkNodejs:
    Type: String
    Description: S3 key of aws-sdk-graphql-nodejs.zip. Fill the exact key name if you renamed. (e.g. sdk/Node.js16/aws-sdk-graphql-nodejs.zip)
    Default: sdk/Node.js16/aws-sdk-graphql-nodejs.zip
    MaxLength: 100
    MinLength: 1

  CognitoUserPoolID:
    Type: String
    Description: Cognito User Pool ID

Resources:
# ------------------------------------------------------------#
# AppSync DynamoDB Invocation Role (IAM)
# ------------------------------------------------------------#
  AppSyncDynamodbInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub EXAMPLE-AppSyncDynamodbInvocationRole-${SubName}
      Description: This role allows AppSync to invoke DynamoDB.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - appsync.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: !Sub EXAMPLE-AppSyncDynamodbInvocationPolicy-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:GetItem
                  - dynamodb:PutItem
                  - dynamodb:DeleteItem
                  - dynamodb:UpdateItem
                  - dynamodb:Query
                  - dynamodb:Scan
                  - dynamodb:BatchGetItem
                  - dynamodb:BatchWriteItem
                Resource:
                  - Fn::Join:
                    - ""
                    - - Fn::GetAtt: DynamoJobstatus.Arn
                      - "*"
    DependsOn:
      - DynamoJobstatus

  AppSyncCloudWatchLogsPushRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub EXAMPLE-AppSyncCloudWatchLogsPushRole-${SubName}
      Description: This role allows AppSync to push logs to CloudWatch Logs.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - appsync.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSAppSyncPushToCloudWatchLogs

# ------------------------------------------------------------#
# Lambda AppSync Invocation Role (IAM)
# ------------------------------------------------------------#
  LambdaAppSyncInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub EXAMPLE-LambdaAppSyncInvocationRole-${SubName}
      Description: This role allows Lambda to invoke AppSync.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
      Policies:
        - PolicyName: !Sub EXAMPLE-LambdaAppSyncInvocationPolicy-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - appsync:GraphQL
                Resource:
                  - Fn::Join:
                    - ""
                    - - Fn::GetAtt: AppSyncApiEXAMPLE.Arn
                      - "/types/Mutation/*"
    DependsOn:
      - AppSyncApiEXAMPLE

# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
  DynamoJobstatus:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub EXAMPLE-jobstatus-${SubName}
      AttributeDefinitions:
        - AttributeName: serviceiduser
          AttributeType: S
        - AttributeName: datetime
          AttributeType: S
        - AttributeName: jobid
          AttributeType: S
        - AttributeName: serviceid
          AttributeType: S
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: serviceiduser
          KeyType: HASH
        - AttributeName: datetime
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: jobid-index
          KeySchema:
            - AttributeName: jobid
              KeyType: HASH
          Projection:
            ProjectionType: ALL
        - IndexName: serviceid-datetime-index
          KeySchema:
            - AttributeName: serviceid
              KeyType: HASH
            - AttributeName: datetime
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: false
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true
      Tags:
        - Key: Cost
          Value: !Sub EXAMPLE-${SubName}

# ------------------------------------------------------------#
# AppSync
# ------------------------------------------------------------#
  AppSyncApiEXAMPLE:
    Type: AWS::AppSync::GraphQLApi
    Description: AppSync API for EXAMPLE
    Properties:
      Name: !Sub EXAMPLE-AppSyncApiEXAMPLE-${SubName}
      AuthenticationType: AMAZON_COGNITO_USER_POOLS
      UserPoolConfig:
        UserPoolId: !Ref CognitoUserPoolID
        AwsRegion: !Sub ${AWS::Region}
        DefaultAction: "ALLOW"
      AdditionalAuthenticationProviders:
        - AuthenticationType: AWS_IAM
      LogConfig:
        CloudWatchLogsRoleArn: !GetAtt AppSyncCloudWatchLogsPushRole.Arn
        ExcludeVerboseContent: false
        FieldLogLevel: ALL
      XrayEnabled: false
      Tags:
        - Key: Cost
          Value: !Sub EXAMPLE-${SubName}
    DependsOn:
      - AppSyncCloudWatchLogsPushRole

  AppSyncSchemaJobstatus:
    Type: AWS::AppSync::GraphQLSchema
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      Definition: |
        schema {
          query: Query
          mutation: Mutation
          subscription: Subscription
        }
        type Jobstatus @aws_cognito_user_pools @aws_iam {
          datetime: String
          jobid: String
          serviceid: String
          serviceiduser: String
          user: String
          url1: String
          url2: String
          status: String
          memo: String
          ttl: Int
        }
        type JobstatusConnection {
          items: [Jobstatus]
          nextToken: String
        }
        type Mutation {
          putJobstatus(input: PutJobstatusInput!): Jobstatus
            @aws_cognito_user_pools @aws_iam
          updateJobstatus(input: UpdateJobstatusInput!): Jobstatus
            @aws_cognito_user_pools @aws_iam
        }
        type Query {
          queryJobstatus(serviceiduser: String!): JobstatusConnection
          queryJobstatusByServiceidDatetimeIndex(serviceid: String!): JobstatusConnection
        }
        type Subscription {
          onPutJobstatus(serviceiduser: String!): Jobstatus
            @aws_subscribe(mutations : ["putJobstatus"])
          onUpdateJobstatus(serviceiduser: String!): Jobstatus
            @aws_subscribe(mutations: ["updateJobstatus"])
        }
        input PutJobstatusInput {
          datetime: String!
          jobid: String!
          serviceid: String!
          serviceiduser: String!
          url1: String
          user: String!
          status: String!
          memo: String
          ttl: Int!
        }
        input UpdateJobstatusInput {
          datetime: String!
          jobid: String
          serviceid: String
          serviceiduser: String!
          url1: String
          url2: String
          user: String
          status: String
          memo: String
          ttl: Int
        }
    DependsOn:
      - AppSyncApiEXAMPLE

  AppSyncDataSourceJobstatus:
    Type: AWS::AppSync::DataSource
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      Name: !Sub EXAMPLE${SubName}AppSyncDataSourceJobstatus
      Description: AppSync DataSource to query Jobstatus DynamoDB.
      Type: AMAZON_DYNAMODB
      ServiceRoleArn: !GetAtt AppSyncDynamodbInvocationRole.Arn
      DynamoDBConfig:
        TableName: !Ref DynamoJobstatus
        AwsRegion: !Sub ${AWS::Region}
    DependsOn:
      - AppSyncApiEXAMPLE
      - AppSyncDynamodbInvocationRole

  AppSyncResolverQueryJobstatus:
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      TypeName: Query
      FieldName: queryJobstatus
      DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "Query",
          "query": {
            "expression": "#serviceiduser = :serviceiduser",
            "expressionNames": {
              "#serviceiduser": "serviceiduser"
            },
            "expressionValues": {
              ":serviceiduser": $util.dynamodb.toDynamoDBJson($context.arguments.serviceiduser)
            }
          },
          "limit": $util.defaultIfNull($context.arguments.first, 30),
          "nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)),
          "scanIndexForward": false,
          "consistentRead": false,
          "select": "ALL_ATTRIBUTES"
        }
      ResponseMappingTemplate: |
        $utils.toJson($context.result)
    DependsOn:
      - AppSyncDataSourceJobstatus
      - AppSyncSchemaJobstatus

  AppSyncResolverQueryJobstatusByServiceidDatetimeIndex:
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      TypeName: Query
      FieldName: queryJobstatusByServiceidDatetimeIndex
      DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "Query",
          "query": {
            "expression": "#serviceid = :serviceid",
            "expressionNames": {
              "#serviceid": "serviceid"
            },
            "expressionValues": {
              ":serviceid": $util.dynamodb.toDynamoDBJson($context.arguments.serviceid)
            }
          },
          "index": "serviceid-datetime-index",
          "limit": $util.defaultIfNull($context.arguments.first, 30),
          "nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.after, null)),
          "scanIndexForward": false,
          "consistentRead": false,
          "select": "ALL_ATTRIBUTES"
        }
      ResponseMappingTemplate: |
        $utils.toJson($context.result)
    DependsOn:
      - AppSyncDataSourceJobstatus
      - AppSyncSchemaJobstatus

  AppSyncResolverPutJobstatus:
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      TypeName: Mutation
      FieldName: putJobstatus
      DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "PutItem",
          "key": {
            "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
            "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
          },
          "attributeValues": $util.dynamodb.toMapValuesJson($ctx.args.input),
          "condition": {
            "expression": "attribute_not_exists(#serviceiduser) AND attribute_not_exists(#datetime)",
            "expressionNames": {
              "#serviceiduser": "serviceiduser",
              "#datetime": "datetime",
            }
          }
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result)
    DependsOn:
      - AppSyncDataSourceJobstatus
      - AppSyncSchemaJobstatus

  AppSyncResolverUpdateJobstatus:
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt AppSyncApiEXAMPLE.ApiId
      TypeName: Mutation
      FieldName: updateJobstatus
      DataSourceName: !GetAtt AppSyncDataSourceJobstatus.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "operation": "UpdateItem",
          "key": {
            "serviceiduser": $util.dynamodb.toDynamoDBJson($ctx.args.input.serviceiduser),
            "datetime": $util.dynamodb.toDynamoDBJson($ctx.args.input.datetime)
          },
          #set( $expNames  = {} )
          #set( $expValues = {} )
          #set( $expSet = {} )
          #set( $expAdd = {} )
          #set( $expRemove = [] )
          #foreach( $entry in $util.map.copyAndRemoveAllKeys($ctx.args.input, ["serviceiduser", "datetime"]).entrySet() )
            #if( $util.isNull($entry.value) )
              #set( $discard = ${expRemove.add("#${entry.key}")} )
              $!{expNames.put("#${entry.key}", "${entry.key}")}
            #else
              $!{expSet.put("#${entry.key}", ":${entry.key}")}
              $!{expNames.put("#${entry.key}", "${entry.key}")}
              $!{expValues.put(":${entry.key}", $util.dynamodb.toDynamoDB($entry.value))}
            #end
          #end
          #set( $expression = "" )
          #if( !${expSet.isEmpty()} )
            #set( $expression = "SET" )
            #foreach( $entry in $expSet.entrySet() )
              #set( $expression = "${expression} ${entry.key} = ${entry.value}" )
              #if ( $foreach.hasNext )
                #set( $expression = "${expression}," )
              #end
            #end
          #end
          #if( !${expAdd.isEmpty()} )
            #set( $expression = "${expression} ADD" )
            #foreach( $entry in $expAdd.entrySet() )
              #set( $expression = "${expression} ${entry.key} ${entry.value}" )
              #if ( $foreach.hasNext )
                #set( $expression = "${expression}," )
              #end
            #end
          #end
          #if( !${expRemove.isEmpty()} )
            #set( $expression = "${expression} REMOVE" )
            #foreach( $entry in $expRemove )
              #set( $expression = "${expression} ${entry}" )
              #if ( $foreach.hasNext )
                #set( $expression = "${expression}," )
              #end
            #end
          #end
          "update": {
            "expression": "${expression}",
            #if( !${expNames.isEmpty()} )
              "expressionNames": $utils.toJson($expNames),
            #end
            #if( !${expValues.isEmpty()} )
              "expressionValues": $utils.toJson($expValues),
            #end
          },
          "condition": {
            "expression": "attribute_exists(#serviceiduser) AND attribute_exists(#datetime)",
            "expressionNames": {
              "#serviceiduser": "serviceiduser",
              "#datetime": "datetime"
            }
          }
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result)
    DependsOn:
      - AppSyncDataSourceJobstatus
      - AppSyncSchemaJobstatus

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaAppSyncMutationUpdateJobstatus:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub EXAMPLE-MutationUpdateJobstatus-${SubName}
      Description: !Sub Lambda Function to mutate update job status in EXAMPLE-${SubName}
      Runtime: nodejs16.x
      Timeout: 5
      MemorySize: 128
      Environment:
        Variables:
          GRAPHQLURL: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl
      Role: !GetAtt LambdaAppSyncInvocationRole.Arn
      Handler: index.handler
      Layers:
        - !Ref LambdaLayerAwsGraphqlSdkNodejs
      Tags:
        - Key: Cost
          Value: !Sub EXAMPLE-${SubName}
      Code:
        ZipFile: !Sub |
          const { Sha256 } = require("@aws-crypto/sha256-js");
          const { defaultProvider } = require("@aws-sdk/credential-provider-node");
          const { SignatureV4 } = require("@aws-sdk/signature-v4");
          const { HttpRequest } = require("@aws-sdk/protocol-http");
          const { default: fetch, Request } = require("node-fetch");
          const GRAPHQL_ENDPOINT = process.env.GRAPHQLURL;
          const AWS_REGION = "${AWS::Region}";
          const query = `
            mutation updateJobstatus(
              $serviceiduser: String!,
              $datetime: String!,
              $url1: String,
              $url2: String,
              $status: String
            ) {
              updateJobstatus(input: {
                serviceiduser: $serviceiduser,
                datetime: $datetime,
                url1: $url1,
                url2: $url2,
                status: $status
              }) {
                serviceiduser
              }
            }
          `;
          exports.handler = async (event) => {
            console.log({"EVENT": JSON.stringify(event)});
            let variables;
            if (event.result.TranscriptionJob.TranscriptionJobStatus == "COMPLETED") {
              variables = {
                serviceiduser: event.serviceiduser,
                datetime: event.datetime,
                url1: event.filename,
                url2: event.outputkey,
                status: "完了"
              };
            } else {
              variables = {
                serviceiduser: event.serviceiduser,
                datetime: event.datetime,
                status: "エラー"
              };
            }
            const endpoint = new URL(GRAPHQL_ENDPOINT);
            const signer = new SignatureV4({
              credentials: defaultProvider(),
              region: AWS_REGION,
              service: 'appsync',
              sha256: Sha256
            });
            const requestToBeSigned = new HttpRequest({
              method: 'POST',
              headers: {
                'Content-Type': 'application/json',
                host: endpoint.host
              },
              hostname: endpoint.host,
              body: JSON.stringify({ query, variables }),
              path: endpoint.pathname
            });
            const signed = await signer.sign(requestToBeSigned);
            const request = new Request(endpoint, signed);
            let statusCode = 200;
            let body;
            let response;
            try {
              response = await fetch(request);
              body = await response.json();
              if (body.errors) statusCode = 400;
            } catch (error) {
              statusCode = 500;
              body = {
                errors: [
                  {
                    message: error.message
                  }
                ]
              };
            }
            return {
              statusCode,
              body: JSON.stringify(body)
            };
          };
    DependsOn:
      - LambdaAppSyncInvocationRole
      - LambdaLayerAwsGraphqlSdkNodejs

# ------------------------------------------------------------#
# Lambda Layer
# ------------------------------------------------------------#
  LambdaLayerAwsGraphqlSdkNodejs:
    Type: AWS::Lambda::LayerVersion
    Properties:
      LayerName: !Sub EXAMPLE-${SubName}-aws-sdk-graphql-nodejs
      Description: !Sub aws-sdk-graphql for Node.js to load in EXAMPLE-${SubName}
      CompatibleRuntimes:
        - nodejs16.x
      Content:
        S3Bucket: !Sub ${S3BucketNameSdk}
        S3Key: !Sub ${S3KeyAwsGraphqlSdkNodejs}

# ------------------------------------------------------------#
# Output Parameters
# ------------------------------------------------------------#
Outputs:
# AppSync
  AppSyncEndpoint:
    Value: !GetAtt AppSyncApiEXAMPLE.GraphQLUrl
    Export:
      Name: !Sub EXAMPLE-AppSyncEndpoint-${SubName}

まとめ

いかがでしたでしょうか?

プッシュ通知だけのためにここまでしなきゃいけないの?というのが正直な感想です。スキーマやリゾルバ設定の面倒くささ、本記事では説明しませんでしたが GraphQL のわかりにくさが半端ないです。

でも AWS AppSync を使ってみてわかったのですが、レスポンスがめっちゃ速いです。アプリのコードにデータをベタ書きしているんじゃないかって思うぐらい。Amazon API Gateway + AWS Lambda + Amazon DynamoDB の構成と比べると全然違います。なので、レスポンススピードを求めるユースケースでも AWS AppSync は使いどころあるかな、と思います。AppSync というサービス名からもわかるように、アプリとデータを密着させてくれるサービスなんだな、というのはよくわかりました。

本記事が皆様のお役に立てれば幸いです。

タイトルとURLをコピーしました