SCSKの畑です。
前回の投稿に引き続き、今回は非同期処理部分のバックエンド実装についてピックアップして説明していきます。
バックエンドにおける非同期処理実装の方針について
前回の投稿で説明した通り、密結合・同期処理前提の実装を、疎結合・非同期処理前提の実装に変更する必要がありました。この内、密結合を疎結合に変更する過程については、非同期処理として分割すべき処理単位を頑張って中身を見ながら分割していく・・くらいしか極論書くことがないので割愛します。
一方、非同期処理への変更については処理ロジックそのものを大きく変更する必要はありません。例えば前回の投稿で取り上げたテーブルの更新差分計算処理についても、その計算処理が非同期で実行されるというだけで計算ロジック自体には手を入れる必要がないためです。あくまで要件としてはそのような特定の処理を非同期実行することにあり、かつ既存の実装(Lambda)が既に存在することから、特定の処理を非同期で実行するための共通インターフェース/ラッパーを実装した上で、非同期処理として実行する Lambda の入出力仕様をそれに合わせる形で修正していく方針が最も効率的と判断しました。
非同期実行のための共通インターフェース/ラッパーの実装
上記方針に基づき、以下のような流れで実装を進めていきました。
- DynamoDB 上に非同期実行ステータス管理用のテーブルを作成
- 非同期実行用の共通インターフェース/ラッパーとしての AppSync クエリの作成
- 2.で作成した AppSync クエリのデータソースとなる Lambda の作成
- 非同期実行対象の Lamdba について、主に入出力仕様を1.及び2.の実装に対応する形で変更
以下、順番に説明します。
1.DynamoDB上に非同期実行ステータス管理用のテーブルを作成
さて、実際に特定の処理を非同期実行するにあたり、その実行ステータスを何かしらのデータストアで管理する必要があります。フロントエンド(画面)側でそのステータスを取得した上で、画面のロジックや通知などに反映する必要があるためです。このあたりの話は、以前の投稿でも取り上げさせて頂いた通り、広野さんの以下エントリに「プッシュ通知」として詳説されていますので適宜ご参照頂ければと思います。

データストア自体は極論何を使用しても良いと思いますが、Amplify 及び AppSync との親和性や画面へのプッシュ通知機能(Subscription)の使用を考慮して素直にDynamoDBのテーブルを使用しました。以下 Amplify によるスキーマ定義例となります。
type AsyncTask @model (
queries: { list: "listAsyncTasks", get: null },
mutations: { create : "createAsyncTask", update: "updateAsyncTask", delete: null },
subscriptions: { onCreate: null, onUpdate: null, onDelete: null },
)
@auth(rules: [
{allow: public, provider: apiKey},
{allow: private, provider: iam},
])
{
id: ID! @primaryKey
status: AsyncTaskStatus!
session_info: AWSJSON
err_msg: String
ttl: AWSTimestamp!
createdAt: AWSDateTime
updatedAt: AWSDateTime
}
enum AsyncTaskStatus {
PENDING
PROCESSING
COMPLETED
FAILED
POSTPROCESSING
}
type Subscription {
onUpdateAsyncTask(id: ID!): AsyncTask
@aws_subscribe(mutations: ["updateAsyncTask"])
@aws_api_key
@aws_iam
}
内容についてもポイントだけかいつまんで説明します。
- 使用する想定のない query/mutation/subscription は明示的にnullを設定して Amplify による自動作成を抑止
- 特に本機能における Subscription は特定の行(=特定のID)のみを対象とできれば良いため、@model の定義には含めずSubscription として個別に定義
- 非同期実行したタスク(Lambda)の実行単位でユニークなIDを発行し、そのステータスを status 列で管理
- 非同期実行したタスク(Lambda)の処理結果は原則 S3 上に出力する前提とするが、タスクの実行中や実行後に同期的に渡す必要のある情報については session_info 列を使用してやり取り、例えば以下のような目的で使用
- 非同期実行しているタスクの処理状況を画面にリアルタイムに通知するための情報を格納
- 非同期実行の呼び出し元で S3 上に出力される処理結果のパスを同定できない場合、そのパスを格納
- 本テーブルの情報はあくまでテンポラリで永続的に保持する必要がないため、DynamoDB の TTL 機能を使用して自動的に削除、そのためのタイムスタンプ情報として ttl 列を使用
- なお、残念ながら Amplify gen1 におけるスキーマ定義に TTL 設定は含められないため、別途 DynamoDB 側で定義する必要あり。Amplify gen2 では別の仕組みでできるらしい?
- createdAt・updatedAt 列は Subscription との兼ね合いで明示的に定義
- Amplify が自動で気を利かせて作成してくれる Subscription を使用する分には明示的な定義は不要なようですが、今回は明示的に ID 列をキーとした Subscription を定義しているため、明示的にスキーマ定義に含める必要がありました
- ちなみに、Amplify でテーブルのスキーマに createdAt・updatedAt 列を定義していなくても、Amplify が自動的に作成した Mutation を使って更新すると同情報が含まれるようになっています
- Amplify が自動で気を利かせて作成してくれる Subscription を使用する分には明示的な定義は不要なようですが、今回は明示的に ID 列をキーとした Subscription を定義しているため、明示的にスキーマ定義に含める必要がありました
2.非同期実行用の共通インターフェース/ラッパーとしての AppSync クエリの作成
平たく言うと「非同期実行対象となる他の Lambda を非同期実行するための AppSync クエリを作成する」という話なので、Amplify による AppSync のスキーマ定義も以下実装例のようにシンプルですが、ちょっとだけ工夫してます。
type ExecuteAsyncTaskResult {
exec_result: Boolean!
id: ID
}
input ExecuteAsyncTaskInput {
type: AsyncTaskType!
args: AWSJSON!
}
enum AsyncTaskType {
<非同期実行対象のタスク名を定義>
}
type Query {
ExecuteAsyncTask(input: ExecuteAsyncTaskInput!): ExecuteAsyncTaskResult
@function(name: "<データソースのLambda関数名>")
@aws_api_key
@aws_iam
}
- 引数
- type 引数において、AsyncTaskType として非同期実行対象のタスク名を enum で定義
- このタスク名と非同期実行したい Lambda 関数名の1:1の対応関係をコンフィグとして AppSync に対応する Lambda 関数に持たせることで、環境ごとの Lambda 関数名の差異を吸収
- 一方、非同期実行対象の各タスク(Lambda)ごとに必要となる引数は異なることから、それは引数 args により AWSJSON 形式で指定の上、対象の Lambda 関数にそのまま渡す
- type 引数において、AsyncTaskType として非同期実行対象のタスク名を enum で定義
- 返り値
- 非同期実行の成否(=本 AppSync クエリの実行成否)自体を exec_result で返却
- 一方、非同期実行されたタスク(Lambda)の実行ステータスを呼び出し元から確認・取得できる必要があるため、そのキーとなる ID を合わせて返却
- この ID が1.で作成した DynamoDB テーブルにおける ID 列の値に対応します
3. 2.で作成した AppSync クエリのデータソースとなる Lambda の作成
同様にデータソースとして定義している Lambda 関数の実装例も載せてみました。
import os
import json
import time
import boto3
import requests
import logging
import traceback
from lambdautility import common
from aws_requests_auth.aws_auth import AWSRequestsAuth
# GraphQL mutations
mutation_createAsyncTask = """
mutation CreateAsyncTask($input: CreateAsyncTaskInput!) {
createAsyncTask(input: $input) {
id
status
ttl
}
}
"""
mutation_updateAsyncTask = """
mutation UpdateAsyncTask($input: UpdateAsyncTaskInput!) {
updateAsyncTask(input: $input) {
id
status
ttl
}
}
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
try:
# AppSync経由で渡される引数の取得
task_type = event['arguments']['input']['type']
args = event['arguments']['input']['args']
# 共通レイヤーから環境依存の変数を初期化
config_dict = common.get_env_config(os.environ.get('env_name'))
APPSYNC_HOST = config_dict.get('appsync', 'host')
APPSYNC_ENDPOINT = config_dict.get('appsync', 'endpoint')
HEADERS = {'Content-Type': 'application/json'}
TTL_SECONDS = config_dict.get('dynamodb', 'ttl')
TARGET_LAMBDA_NAME = config_dict.get('async_lambda', task_type)
if not TARGET_LAMBDA_NAME:
raiseException(f'Lambda function not found for task type: {task_type}')
# AppSync接続情報初期化
session = boto3.session.Session()
credentials = session.get_credentials()
auth = AWSRequestsAuth(
aws_access_key=credentials.access_key,
aws_secret_access_key=credentials.secret_key,
aws_token=credentials.token,
aws_host=APPSYNC_HOST,
aws_region='ap-northeast-1',
aws_service='appsync'
)
# TTL初期化
ttl = int(time.time()) + int(TTL_SECONDS)
# AsyncTaskレコード作成
create_task_input = {
'status': 'PENDING',
'ttl': ttl
}
payload = {
'query': mutation_createAsyncTask,
'variables': {'input': create_task_input}
}
result_appsync = requests.post(APPSYNC_ENDPOINT, auth=auth, json=payload, headers=HEADERS).json()
if 'errors' in result_appsync:
raiseException(f'GraphQL error: {result_appsync["errors"]}')
# レスポンスから非同期実行のIDを取得
task_id = result_appsync['data']['createAsyncTask']['id']
# 対象のLambda関数を非同期実行
lambda_client = boto3.client('lambda')
invoke_payload = {
'src_id': task_id,
'async_flag': True,
'args': args
}
lambda_client.invoke(
FunctionName=TARGET_LAMBDA_NAME,
InvocationType='Event', # 非同期実行
Payload=json.dumps(invoke_payload)
)
# Lambda関数実行後、ステータスをPROCESSINGに更新
update_input = {
'id': task_id,
'status': 'PROCESSING',
'ttl': ttl
}
update_payload = {
'query': mutation_updateAsyncTask,
'variables': {'input': update_input}
}
requests.post(APPSYNC_ENDPOINT, auth=auth, json=update_payload, headers=HEADERS)
# ExecuteAsyncTaskの返り値(正常時)
return {
'exec_result': True,
'id': task_id
}
except Exception as e:
logger.error(f'Error: {str(e)}')
logger.error(traceback.format_exc())
# ExecuteAsyncTaskの返り値(異常時)
return {
'exec_result': False,
'id': None
}
こちらは前段(1. 及び 2.)で作成した Amplify・AppSync の定義に沿って作成している以上、他に説明できることがほとんどないのですが、実装における留意点をいくつか挙げてみます。
- type 引数で渡される AsyncType に対応する Lambda 関数名は S3 上に配置したコンフィグファイルで定義の上、Lambda 関数の共通レイヤー内で読み込むような実装としている
- 対象のタスク(Lamdba)を非同期実行する前に、非同期実行ステータス管理用テーブルにレコードを登録して ID を取得
- この ID をフロントエンド/バックエンド双方で使用して対象タスクのステータス管理・更新を実施します
- また、レコード登録時のタイミングで合わせて TTL も設定しておけば、(本ユースケースにおいては)処理失敗時などに対象レコードの掃除(削除)を実施する必要がなくなります
4.非同期実行対象の Lamdba について、主に入出力仕様を1.及び2.の実装に対応する形で変更
このステップにおける必要な対応は変更対象の Lambda の中身次第なので詳細は割愛しますが、変更にあたり共通して留意すべきポイントは以下2点かなと思います。特に2点目は Lambda から普通に DynamoDB を更新することもできてしまう分、ちょっとした落とし穴かなと。
- 非同期実行する Lambda 内でステータス管理用 DynamoDB テーブルを更新する際は、呼び出し時に引数として渡している ID を使用すること
- Subscription でプッシュ通知を受け取るために同テーブルは AppSync の mutation 経由で行う必要があること
まとめ
改めて内容をまとめてみると思ったよりシンプルでした。逆に言うと各論になってしまう部分については相対的により多くの工数がかかってしまったところになりますが・・次回はフロントエンド側の実装変更について説明する予定です。
本記事がどなたかの役に立てば幸いです。

お客さん環境における AWS リソースの追加・変更のための申請等にリードタイムが必要なこともあり、一連の申請に要するリードタイムが最も短い(=既存 AWS リソースの追加・変更が最も少ない)方針にすべきという観点からも有力でした。この方針に従うと、AppSync のデータソースを1つ、Lambda を数個作成する申請を上げるだけで済んだので。
※なお、AppSync の他リソースについては変更可能な IAM 権限を頂けているので大丈夫でした。そのあたりの顛末は以下のエントリを御覧ください。