SCSKの畑です。4回目の投稿です。
今回は、現在作成中の WEB アプリケーションにおいて、どのように排他制御を実装したのかについてまとめました。ただ、内容が長くなりそうなので、何回かに分けて記載していこうと思います。
排他制御が必要となる機能要件について
初回のエントリに記載通り、現在作成中の WEB アプリケーションの主機能はデータベース/ DWH 上のテーブルデータのメンテナンスとなっています。どのようにテーブルデータのメンテナンスを行いたいか(=どのような機能要件か)というのは案件ごとに差異はありますが、そのような機能を実装する上で何らかの排他制御の仕組みを考慮する必要がある、というのはイメージできるところだと思います。
- テーブルデータ編集時に、テーブル単位での排他制御(ロック)を行う ※いわゆる表レベルロック
- あるユーザがデータ編集中のテーブルを、他ユーザが同時に編集することはできない。
 
 - 外部キーを持つテーブルのデータ編集時には、外部キーの参照先テーブルについても合わせて排他制御(ロック)を行う
- 例えばテーブル A の外部キー参照先が テーブル B だった場合、テーブル A の編集を開始した時点(テーブル A をロックした時点)で テーブル B もロックされ、他ユーザが編集できなくなる。
 - 逆に テーブル B がすでにロック中だった場合、テーブル A は編集できない。
 
 
総じて、(複数テーブル間も考慮した)データ整合性を重視していますが、本案件で注力したのが正に「メンテナンス時のデータ整合性の担保」だったこともあり、方向性通りの内容ではあります。逆に表レベルロックという仕組みの都合上ユーザの利便性は下がりますが、想定ユーザ数やデータメンテナンスの頻度などをお客さんと会話の上、この機能要件で問題ないという整理になりました。
アーキテクチャ概要、及び排他制御に使用したサービス / コンポーネント
そろそろ毎度のことになりつつありますが、初回のエントリからアーキテクチャ図を再掲します。
排他制御関連の機能は、以下のサービス / コンポーネントで実現しています。本投稿では主に DynamoDB と Lambda の部分について記載したいと思います。少しだけ Amplify のスキーマ定義が入ってきますが、排他制御という観点だとこの2つがメインとなります。
- Amazon DynamoDB
- テーブルのステータス(編集状態)管理
 
 - AWS Lambda
- 排他制御を考慮したテーブルのステータス更新ロジックの実装
 
 - AWS AppSync(AWS Amplify)
- 上記 Lambda をアプリケーション上から実行するためのスキーマ定義
 
 - アプリケーション(Nuxt.js)
- テーブルのステータスに応じた画面制御
 
 
DynamoDB における設計・実装
ということで上記の通り、テーブルのステータス(編集状態)を管理するために DynamoDB を使用しています。
- 対象テーブルのステータス(編集状態)を永続化して管理する必要があること
 - ステータスは複数のユーザから参照・更新できること
 - 排他制御(ロック)して複数テーブルのステータスを同時に更新できること
 
の3点より、何らかのデータベースを使用するのが妥当と判断しました。最も「判断した」と記載できるほどではなく、当たり前によくある話だとは思いますが、、ただこの3点目については重要なので補足します。
テーブル単位の排他制御(ロック)である以上、先述したような「同一のテーブルを複数ユーザが同時に編集する」ケース自体は考慮する必要がありません。編集中のテーブルについては他のユーザが編集できないようにアプリケーション側で実装すれば良いからです。ただ「同一のテーブルを複数ユーザが同時に編集しようとする(=同一テーブルのステータスを複数ユーザが同時に更新する)」ケースについては考慮しておかないと、タイミング次第で複数ユーザが同時に同一テーブルの編集を更新できる状態となってしまう可能性があります。よって、データベース側で「排他制御(ロック)」の仕組みが必要となります。
また、機能要件のセクションで記載した通り、外部キーの存在するテーブルを編集する場合は、対象だけでなく外部参照先のテーブルについても同時にステータスを更新できる必要があります。つまり、データベース側で複数の更新処理(≒SQL文の実行)を単一の処理として扱える「トランザクション」の仕組みについても必要となります。
データベースと名前の付いた製品なりサービスでは基本的にこれらの機能は備えており、当然ながら DynamoDB でも可能です。どのように実現したのかについて、以下に記載していきます。
DynamoDB における排他制御(ロック)
さて、データベースによってどのような方式で排他制御(ロック)ができるかどうかは異なります。悲観ロック・楽観ロックの2種類がありますが、DynamoDB で取得できるロックは後者の楽観ロックとなります。具体的には以下ドキュメントの「条件付き書き込み」を使用します。
また、ステータスの更新に条件付き書き込みを使用する上でステータスの遷移条件を定めておく必要があります。例えば、ステータスを通常状態から編集状態に遷移(更新)できれば、正常なステータス遷移と見なし更新を確定する、という流れです。(もちろんアプリケーション自体の設計としても定めておく必要がありますが・・)
DynamoDB におけるトランザクション
上記の通り、複数の更新処理(≒SQL文の実行)を単一の処理として行うために、以下ドキュメントの「TransactWriteItems API」を使用します。
設計・実装例
テーブルのステータス(編集状態)を管理するための、amplify を使用したスキーマ定義例です。各項目の意味はそれぞれ以下の通りです。
- name:テーブル名
 - status:テーブルのステータス
 - editor:テーブルが編集状態における、編集しているユーザ名(通常状態の場合は null)
 - locked_by:テーブルがロック状態における、外部キー参照元のテーブル名(通常状態の場合は null)
 
enum TableStatus {
    normal
    editing
    locked
}
type TableInfo
@model
@auth(rules: [
    {allow: public, provider: apiKey},
    {allow: private, provider: iam},
])
{
    name: String! @primaryKey
    status: TableStatus!
    editor: String
    locked_by: String
}
ステータスについては、enum で特定の値のみが入力されるように定義しています。実案件では他の要件もあったことからもう少しステータスの種類が多かったのですが、説明のために簡略化しています。
- normal:通常状態
 - editing:編集状態
 - locked:ロック状態(=外部キー参照元のテーブルが編集中)
 
また、ステータスの遷移については、以下の通り通常状態をベースに編集状態またはロック状態に遷移するような定義とします。(ちゃんとした状態遷移図でないのはご了承ください・・) 
Lambda における設計・実装
DynamoDB のテーブルに対する汎用オペレーション(get, list, create など)については Amplify が自動生成してくれる graphql の query / mutation を使用していますが、残念ながら「条件付き書き込み」や「TransactWriteItems」を実現してくれるようなスキーマ定義の方法はありません。よって、テーブルステータスの更新については、Lambda で実装したものを AppSync 経由で実行するような棲み分けとしています。
Amplify(schema.graphql)における mutation のスキーマ定義例
type を input として使い回せないのが唯一イマイチな仕様だなと思っているのですが、それ以外は特に違和感なく使えています。機能・記法としてこういうものが欲しいみたいな話はまた別にありますが。。
input UpdateTableStatusWithLockInput {
    table_name: String!
    current_status: TableStatus!
    target_status: TableStatus!
    fk_table_name: [String!]
    fk_current_status: TableStatus
    fk_target_status: TableStatus
    editor: String
}
type FKResultTableStatus {
    table_name: String!
    current_status: TableStatus!
}
type ResultTableStatus {
    lock_result: Boolean!
    current_status: TableStatus
    fk_current_status: [FKResultTableStatus!]
}
type Mutation {
    UpdateTableStatusWithLock(input: UpdateTableStatusWithLockInput!): ResultTableStatus @function(name: "<Lambda関数名>")
    @aws_api_key
    @aws_iam
}
入力における各項目の意味は以下の通りです。外部キー(FK)参照先のテーブルのステータス更新に対応するため「fk_」から始まる引数を設けています。外部キーの有無はテーブルによって異なるため、それらの引数は必須とはしていません。また、外部キーが複数あるケースを想定して、fk_table_name については list で指定するようにしています。
- table_name:ステータス更新対象のテーブル名
 - current_status:更新前の想定ステータス
 - target_status:更新後の想定ステータス
 - fk_table_name:ステータス更新対象の FK 参照先テーブル名
 - fk_current_status:FK 参照先テーブルの更新前の想定ステータス
 - fk_target_status:FK 参照先テーブルの更新後の想定ステータス
 - editor:ステータス更新対象テーブルの編集ユーザ名
 - locked_by:FK 参照先テーブルをロックしている、参照元のテーブル名
 
(fk_)current_status 及び (fk_)target_status を DynamoDB の条件付き書き込みで使用します。具体的には、ステータス更新試行時に「対象テーブルのステータスが current_status と同一であれば、target_status にステータスを更新する」という動作となります。よって、複数ユーザが同時にステータスを更新しようとしても、ステータス更新が成功するのは(=対象テーブルのロックを取得することができるのは)いずれか単一のユーザのみ、という結果となります。
出力についても一応記載しておきます。このあたりはアプリケーション側でどのような使い方を想定するかにもよると思いますが。。
- lock_result:ステータス更新の成否
 - current_status:ステータス更新試行後の対象テーブルのステータス
 - fk_current_status:ステータス更新試行後の FK 参照先テーブルのステータス
 
Lambda 関数の実装例
次に、上記定義に合わせた Lambda 関数の実装例です。ランタイムは Python です。
入出力ともほぼ上記スキーマ定義通りに実装すれば良いのが直感的で良いですね。反面、「条件付き書き込み」や「TransactWriteItems」を使用する場合は boto3.resource が使用できず、 boto3.client を使用しないといけない関係で初期化部分のコードがイカつい見た目になってしまってますが。。(特に DynamoDB の型情報部分)
import os
import boto3
import traceback
from datetime import datetime, timezone, timedelta
from botocore.exceptions import ClientError
client = boto3.client('dynamodb')
def lambda_handler(event, context):
    try:
        # 引数、返り値の初期化
        result_json = {}
        result_json['lock_result'] = False
        table_name = {'S': event['arguments']['input']['table_name']}
        current_status = {'S': event['arguments']['input']['current_status']}
        target_status = {'S': event['arguments']['input']['target_status']}
        fk_table_name = [{'S': x} for x in event['arguments']['input']['fk_table_name']] if 'fk_table_name' in event['arguments']['input'] else []
        fk_current_status = {'S': event['arguments']['input']['fk_current_status']} if 'fk_current_status' in event['arguments']['input'] else None
        fk_target_status = {'S': event['arguments']['input']['fk_target_status']} if 'fk_target_status' in event['arguments']['input'] else None
        editor = {'S': event['arguments']['input']['editor']} if 'editor' in event['arguments']['input'] else {'NULL': True}
        locked_by = {'S': event['arguments']['input']['table_name']} if 'locked_by' in event['arguments']['input'] else {'NULL': True}
        current_time = {'S': datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00','Z')}
        # FK参照先テーブルロック解除時、editorを削除するように設定
        if event['arguments']['input']['target_status'] == 'normal':
            fk_editor = {'NULL': True}
        else:
            fk_editor = editor
        # 対象テーブルの状態遷移
        target_items = [{
            'Update': {
                'TableName' : <MASTER_TABLE_NAME>,
                'Key' : {
                    'name': table_name,
                },
                'UpdateExpression' : "SET #st = :upd_val1, updatedAt = :upd_val2, editor = :upd_val3",
                'ConditionExpression' : "#st = :cond_val1",
                'ExpressionAttributeNames' : {'#st' : 'status'},
                'ExpressionAttributeValues' : {
                    ':upd_val1': target_status,
                    ':upd_val2': current_time,
                    ':upd_val3': editor,
                    ':cond_val1': current_status
                }
            }
        }]
        # 対象テーブルのFK参照先テーブルの状態遷移
        for lock_table in fk_table_name:
            target_items.append({
                'Update': {
                    'TableName': <MASTER_TABLE_NAME>,
                    'Key': {
                        'name': lock_table,
                    },
                    'UpdateExpression': "SET #st = :upd_val1, updatedAt = :upd_val2, editor = :upd_val3, locked_by = :upd_val4",
                    'ConditionExpression': "#st = :cond_val1",
                    'ExpressionAttributeNames': {'#st' : 'status'},
                    'ExpressionAttributeValues': {
                        ':upd_val1': fk_target_status,
                        ':upd_val2': current_time,
                        ':upd_val3': fk_editor,
                        ':upd_val4': locked_by,
                        ':cond_val1': fk_current_status
                    }
                }
            })
        # 条件付き書き込みの実施
        update_response = client.transact_write_items(
            TransactItems=target_items
        )
        # 返り値の設定
        result_json['lock_result'] = True
        result_json['current_status'] = event['arguments']['input']['target_status']
        result_json['fk_current_status'] = []
        for lock_table in fk_table_name:
            result_json['fk_current_status'].append({'table_name': lock_table['S'], 'current_status': fk_target_status['S']})
    # 条件付き書き込み失敗時の処理
    except ClientError as e:
        print(traceback.format_exc())
        try:
            # 返り値の設定
            response = client.get_item(
                TableName = <MASTER_TABLE_NAME>,
                Key={'name': {'S': event['arguments']['input']['table_name']}}
            )
            result_json['current_status'] = response['Item']['status']['S']
            result_json['fk_current_status'] = []
            for lock_table in fk_table_name:
                response = client.get_item(
                    TableName = <MASTER_TABLE_NAME>,
                    Key={'name': lock_table}
                )
                result_json['fk_current_status'].append({'table_name': lock_table['S'], 'current_status': response['Item']['status']['S']})
        except Exception as e:
            print(traceback.format_exc())
   
    finally:
        return result_json
実装例の内容について全て説明すると長くなってしまうので、要点のみ箇条書きでまとめます。
- 冒頭で、AppSync から渡される引数と、AppSync に返す返り値の初期化をしています。
 - TransactWriteItems により、単一のトランザクション内で実行したいクエリを target_items リストに格納しています。
- 外部キー(FK)のないテーブルは、TransactWriteItems で実行するクエリ数は1つです。
 - 外部キー(FK)のあるテーブルは、外部キー参照先テーブルの個数分クエリ数が増加します。
 
 - 各クエリはステータス更新を伴うため、全て条件付き書き込みを使用しています。
- ‘ConditionExpression’ で条件付き書き込みの「条件」を指定しています。今回の条件は、ステータスが想定通りの値となっているかとなります。
 - ‘ExpressionAttributeNames’ は、DynamoDB スキーマ定義で使用している「status」という単語が予約語にあたるようで、そのまま使用するとエラーになってしまうため、クエリ内で一時的に別の単語に置き換える目的で使用しています。
- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html
 
 - つまり、TransactWriteItems で実行される各クエリの内1つでも失敗した場合は、トランザクション内の他のクエリについても実行されず、実質的に対象テーブルのロックを取得できなかったことになります。
 
 - 条件付き書き込みが失敗した場合(≒対象テーブルのロックを取得できなかった場合)は ClientError が発生するので、それを except 句で拾って例外処理を実装しています。
 
後は、この Lambda 関数を AppSync API 経由で実行するような画面を作れば、ユーザがアプリケーション上でテーブルのステータスを変更する際に排他制御御(ロック)が実現できるということになります。
まとめ
第二回はここまでで作成した仕組みをアプリケーション上からどのように使用しているかについて記載する予定です。最も、ステータス遷移時における排他制御の仕組みそのものはほぼ書き切ってしまったため、分量のバランスとしては悪くなってしまうかもしれませんが・・
本記事の内容がどなたかの役に立てば幸いです。

  
  
  
  