Amazon DynamoDB のデータを定期的にエクスポートするジョブをつくる [非同期処理]

こんにちは、広野です。

Amazon DynamoDB は NoSQL 特有のレスポンスの速さは魅力ですが、反面、指定可能な検索条件が RDBMS に比べて著しく劣るという欠点があります。

そのため、アプリの処理上一旦は Amazon DynamoDB にデータを格納はするものの、その後の処理のためにデータを他のデータベースに移す、という処理をさせることはよくあります。

ここでは、Amazon DynamoDB のデータを Amazon S3 に 定期的にエクスポートする方法を紹介します。エクスポートしたデータを、Amazon Athena で SQL クエリーをかけられるようにすることが次の目標です。

やりたいこと

  • Amazon DynamoDB のデータを定期的に Amazon S3 にエクスポートする
  • エクスポートしたデータは、Amazon Athena でクエリーできるようにする想定
  • データ量が増えても対処可能な、スケーラブルなジョブ設計とする

実現方法

以下の構成をつくります。

  • Amazon EventBridge で、定期実行のスケジュールを設定します。本記事では、毎日 4:00 (JST) に実行します。
  • Amazon EventBridge は、AWS Step Functions のステートマシンを呼び出します。
  • ステートマシンには、DynamoDB のデータエクスポート API を使用してエクスポート命令を出します。
    エクスポート先は、Amazon S3 バケットになります。
    エクスポートデータのフォーマットは、JSON 形式とします。※ DYNAMO_JSON という変わった形式のため後ほど解説
  • DynamoDB は、エクスポート処理を実行できるように Point In Time Recovery (PITR) を有効にします。
  • 今後データが増えても対処できるよう、データエクスポート処理は命令と結果確認を分離します。(非同期処理)

AWS Step Functions ステートマシンの解説

ここで肝なのは、スケーラブルな処理にするために、データエクスポート処理を命令と結果確認に分離していることです。そのために AWS Step Functions ステートマシンを使って、ワークフローを作成しています。これがスケーラブルなフローの基本形です。

以下のワークフローを作成します。各ステップごとに処理内容を解説します。

callExport

指定の Amazon DynamoDB データをエクスポートする命令をかけます。
AWS Step Functions がネイティブにサポートしている API、DynamoDB: ExportTableToPointInTime を使用します。

Wait1

前ステップの callExport は、命令を出しただけで終了になります。
その後処理はバックグラウンドで進んでいるので、一定時間待った後に次ステップを実行します。
ここでは、600秒(10分)待ちを入れています。

checkExport

一定時間待った後、エクスポート結果を Amazon DynamoDB に問い合わせます。
AWS Step Functions がネイティブにサポートしている API、DynamoDB: ListExports を使用します。
問い合わせ対象の DynamoDB テーブルは、前ステップから引き継がれている入力情報から取得しています。
MaxResults を 1 にすることで、過去のエクスポートジョブのうち最新の 1 件を取得します。

checkExportStatus

前ステップでエクスポート結果を取得した後、その値によってこの先の進み方を分岐させます。
条件分岐させるには、Choice state を使用します。
結果が COMPLETED であれば、正常終了。
結果が IN_PROGRESS であれば、Wait1 に戻します。これにより、一定時間待った後に再度結果をチェックしに行きます。
結果がそれ以外であれば、異常終了としています。

ExportCompleted/ExportFailed

何もしないステップですが、見た目成功か失敗かがわかるように入れています。
ここに、通知系のステップを入れるとよいと思います。

ポイントまとめ

スケーラブルなワークフローにするために、以下のポイントに気を付けていきましょう。

処理はなるべく命令を出して終了、バックグラウンドで実行させ、後で結果を問い合わせる。【非同期処理】
効率的な処理にするため、待ち時間は様子を見てチューニングする。AWS X-Ray で可視化しておくとよいですね。
お気づきかもしれませんが、このワークフローでは AWS Lambda 関数を一切使用していません。AWS Step Functions は、簡易な処理であれば直接 API をコールすることができます。それによりやたらと AWS Lambda 関数が増えることがなくなりました。もちろん、値の加工や複雑な処理をさせる場合には AWS Lambda 関数を使用します。
AWS Lambda 関数を使用する場合には、命令と結果確認を分けることをお勧めします。それにより実行時間の制約を受けなくなりますし、データ量が増えても時間と課金を気にすることがなくなります。スケーラブルな設計になります。

実際に動かしてみる

実際に動かすと、以下のように流れます。通ったルートが緑色で表示されます。

Amazon DynamoDB には以下のテストデータを入れました。

これがエクスポートされるとこうなります。

データはいくつかのファイルに分かれます。GZIP 圧縮されていますね。
中身を見てみると、以下のような JSON データになっています。でも、何か違和感がありますね。

{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:48:00.000Z"
    },
    "quantity": {
      "N": "1"
    },
    "menu": {
      "S": "いか"
    }
  }
}
{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:49:52.000Z"
    },
    "quantity": {
      "N": "1"
    },
    "menu": {
      "S": "まぐろ"
    }
  }
}
{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:49:30.000Z"
    },
    "quantity": {
      "N": "2"
    },
    "menu": {
      "S": "たこ"
    }
  }
}

このデータひとかたまりでは、JSON オブジェクトとして成り立たないフォーマットです。全体が配列になっていません。
1階層目の { } 囲みごとには JSON になっているのですが、それぞれの間にカンマがないことに違和感があります。

おそらく、これがエクスポート時に指定したオプション、DYNAMO_JSON というフォーマットなのだと思います。

また、「こんな仕様でエクスポートしてしまって、後の処理が大変じゃない?」という疑問もお持ちではないでしょうか。

  • ファイルが複数、かつ GZIP 圧縮されている
  • データフォーマットが DYNAMO_JSON なので開いても JSON オブジェクトとして認識できない
  • エクスポートされるフォルダ名がランダムに生成された名前になっている

ですが、大丈夫です。

Amazon Athena や AWS Glue は、こんなめちゃくちゃなデータ(と言うのは失礼ですが)でも取り込んでくれるのです。人手で処理するわけではないので、機械が読み取れる形式でさえあればよいのです。

AWS CloudFormation テンプレート

一連の環境をすぐに構築できる AWS CloudFormation テンプレートを用意しました。詳細なパラメータは、実際に作成したスタックで確認して頂けたらと思います。

AWSTemplateFormatVersion: 2010-09-09
Description: CloudFormation template that creates a DynamoDB table, a State machine, a S3 bucket, an EventBridge schedule and relevant IAM roles.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: System name.
    Default: xxxxx
    MaxLength: 10
    MinLength: 1

Resources:
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
  DynamoSushiOrderData:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub example-sushiorderdata-${SystemName}
      AttributeDefinitions:
        - AttributeName: tableid
          AttributeType: S
        - AttributeName: datetime
          AttributeType: S
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: tableid
          KeyType: HASH
        - AttributeName: datetime
          KeyType: RANGE
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketExport:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub example-dynamodb-export-${SystemName}
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# State Machine
# ------------------------------------------------------------#
  StateMachineExportSushiOrderData:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub example-ExportSushiOrderData-${SystemName}
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DSSystemName: !Ref SystemName
        DSDynamoDBTableArn: !GetAtt DynamoSushiOrderData.Arn
      DefinitionString: |-
        {
          "Comment": "State machine to export sushi order data from DynamoDB to S3 for example-${DSSystemName}",
          "StartAt": "callExport",
          "States": {
            "callExport": {
              "Type": "Task",
              "Next": "Wait1",
              "Parameters": {
                "TableArn": "${DSDynamoDBTableArn}",
                "S3Bucket": "example-dynamodb-export-${DSSystemName}",
                "S3Prefix": "export",
                "ExportFormat": "DYNAMODB_JSON"
              },
              "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
              "Comment": "Export sushi order data from DynamoDB",
              "OutputPath": "$.ExportDescription"
            },
            "Wait1": {
              "Type": "Wait",
              "Seconds": 600,
              "Comment": "Wait for export finished",
              "Next": "checkExport"
            },
            "checkExport": {
              "Type": "Task",
              "Next": "checkExportStatus",
              "Parameters": {
                "MaxResults": 1,
                "TableArn.$": "$.TableArn"
              },
              "Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports",
              "OutputPath": "$.ExportSummaries[0]",
              "Comment": "Export result check"
            },
            "checkExportStatus": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.ExportStatus",
                  "StringEquals": "COMPLETED",
                  "Next": "ExportCompleted",
                  "Comment": "If ExportStatus == COMPLETED then"
                },
                {
                  "Variable": "$.ExportStatus",
                  "StringEquals": "IN_PROGRESS",
                  "Next": "Wait1",
                  "Comment": "If ExportStatus == IN_PROGRESS then"
                }
              ],
              "Default": "ExportFailed",
              "Comment": "Check export status"
            },
            "ExportCompleted": {
              "Type": "Succeed",
              "Comment": "Export completed"
            },
            "ExportFailed": {
              "Type": "Fail",
              "Comment": "Export failed"
            }
          }
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachineExportSushiOrderData.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineExecutionRoleExportSushiOrderData.Arn
      TracingConfiguration:
        Enabled: false
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachineExportSushiOrderData:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/example-ExportSushiOrderData-${SystemName}
      RetentionInDays: 365

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineExecutionRoleExportSushiOrderData:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StateMachine-ExportSushiOrderData-${SystemName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /service-role/
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess

# ------------------------------------------------------------#
# EventBridge Step Functions Invocation Role (IAM)
# ------------------------------------------------------------#
  EventBridgeStepFunctionsInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StepFunctionsInvocation-${SystemName}
      Description: This role allows EventBridge to invoke Step Functions state machines.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - events.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess

# ------------------------------------------------------------#
# EventBridge Scheduled Job
# ------------------------------------------------------------#
  EventsRuleSushiOrderData:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub example-SushiOrderData-${SystemName}
      Description: !Sub Events rule to call the state machine example-SushiOrderData-${SystemName} daily.
      ScheduleExpression: "cron(0 19 * * ? *)"
      State: ENABLED
      Targets:
        - Arn: !GetAtt StateMachineExportSushiOrderData.Arn
          Id: !Sub example-ExportSushiOrderData-${SystemName}
          RetryPolicy:
            MaximumRetryAttempts: 2
          RoleArn: !GetAtt EventBridgeStepFunctionsInvocationRole.Arn

まとめ

いかがでしたでしょうか?

お題は Amazon DynamoDB のデータエクスポートでしたが、スケーラブルな非同期処理の基本形をご理解頂けたのではないかと思います。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリとコンテンツづくりに勤しんでいます。React で SPA を書き始めたら快適すぎて、他の言語には戻れなくなりました。サーバーレス & React 仲間を増やしたいです。AWSは好きですが、それよりもAWSすげー!って気持ちの方が強いです。
取得資格:AWS 認定は12資格、ITサービスマネージャ、ITIL v3 Expert 等
2020 - 2023 Japan AWS Top Engineer 受賞
2022 - 2023 Japan AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS Amplify / AWS AppSync / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする
クラウドに強いによるエンジニアブログです。
SCSKは専門性と豊富な実績を活かしたクラウドサービス USiZE(ユーサイズ)を提供しています。
USiZEサービスサイトでは、お客様のDX推進をワンストップで支援するサービスの詳細や導入事例を紹介しています。
AWSクラウドソリューションデータベース
シェアする
タイトルとURLをコピーしました