こんにちは、広野です。
Amazon DynamoDB は NoSQL 特有のレスポンスの速さは魅力ですが、反面、指定可能な検索条件が RDBMS に比べて著しく劣るという欠点があります。
そのため、アプリの処理上一旦は Amazon DynamoDB にデータを格納はするものの、その後の処理のためにデータを他のデータベースに移す、という処理をさせることはよくあります。
ここでは、Amazon DynamoDB のデータを Amazon S3 に 定期的にエクスポートする方法を紹介します。エクスポートしたデータを、Amazon Athena で SQL クエリーをかけられるようにすることが次の目標です。
やりたいこと
- Amazon DynamoDB のデータを定期的に Amazon S3 にエクスポートする
- エクスポートしたデータは、Amazon Athena でクエリーできるようにする想定
- データ量が増えても対処可能な、スケーラブルなジョブ設計とする
実現方法
以下の構成をつくります。
- Amazon EventBridge で、定期実行のスケジュールを設定します。本記事では、毎日 4:00 (JST) に実行します。
- Amazon EventBridge は、AWS Step Functions のステートマシンを呼び出します。
- ステートマシンには、DynamoDB のデータエクスポート API を使用してエクスポート命令を出します。
エクスポート先は、Amazon S3 バケットになります。
エクスポートデータのフォーマットは、JSON 形式とします。※ DYNAMO_JSON という変わった形式のため後ほど解説 - DynamoDB は、エクスポート処理を実行できるように Point In Time Recovery (PITR) を有効にします。
- 今後データが増えても対処できるよう、データエクスポート処理は命令と結果確認を分離します。(非同期処理)
AWS Step Functions ステートマシンの解説
以下のワークフローを作成します。各ステップごとに処理内容を解説します。
callExport
指定の Amazon DynamoDB データをエクスポートする命令をかけます。
AWS Step Functions がネイティブにサポートしている API、DynamoDB: ExportTableToPointInTime を使用します。
Wait1
前ステップの callExport は、命令を出しただけで終了になります。
その後処理はバックグラウンドで進んでいるので、一定時間待った後に次ステップを実行します。
ここでは、600秒(10分)待ちを入れています。
checkExport
一定時間待った後、エクスポート結果を Amazon DynamoDB に問い合わせます。
AWS Step Functions がネイティブにサポートしている API、DynamoDB: ListExports を使用します。
問い合わせ対象の DynamoDB テーブルは、前ステップから引き継がれている入力情報から取得しています。
MaxResults を 1 にすることで、過去のエクスポートジョブのうち最新の 1 件を取得します。
checkExportStatus
前ステップでエクスポート結果を取得した後、その値によってこの先の進み方を分岐させます。
条件分岐させるには、Choice state を使用します。
結果が COMPLETED であれば、正常終了。
結果が IN_PROGRESS であれば、Wait1 に戻します。これにより、一定時間待った後に再度結果をチェックしに行きます。
結果がそれ以外であれば、異常終了としています。
ExportCompleted/ExportFailed
何もしないステップですが、見た目成功か失敗かがわかるように入れています。
ここに、通知系のステップを入れるとよいと思います。
ポイントまとめ
スケーラブルなワークフローにするために、以下のポイントに気を付けていきましょう。
AWS Lambda 関数を使用する場合には、命令と結果確認を分けることをお勧めします。それにより実行時間の制約を受けなくなりますし、データ量が増えても時間と課金を気にすることがなくなります。スケーラブルな設計になります。
実際に動かしてみる
実際に動かすと、以下のように流れます。通ったルートが緑色で表示されます。
Amazon DynamoDB には以下のテストデータを入れました。
これがエクスポートされるとこうなります。
データはいくつかのファイルに分かれます。GZIP 圧縮されていますね。
中身を見てみると、以下のような JSON データになっています。でも、何か違和感がありますね。
{ "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:48:00.000Z" }, "quantity": { "N": "1" }, "menu": { "S": "いか" } } } { "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:49:52.000Z" }, "quantity": { "N": "1" }, "menu": { "S": "まぐろ" } } } { "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:49:30.000Z" }, "quantity": { "N": "2" }, "menu": { "S": "たこ" } } }
このデータひとかたまりでは、JSON オブジェクトとして成り立たないフォーマットです。全体が配列になっていません。
1階層目の { } 囲みごとには JSON になっているのですが、それぞれの間にカンマがないことに違和感があります。
おそらく、これがエクスポート時に指定したオプション、DYNAMO_JSON というフォーマットなのだと思います。
また、「こんな仕様でエクスポートしてしまって、後の処理が大変じゃない?」という疑問もお持ちではないでしょうか。
- ファイルが複数、かつ GZIP 圧縮されている
- データフォーマットが DYNAMO_JSON なので開いても JSON オブジェクトとして認識できない
- エクスポートされるフォルダ名がランダムに生成された名前になっている
ですが、大丈夫です。
Amazon Athena や AWS Glue は、こんなめちゃくちゃなデータ(と言うのは失礼ですが)でも取り込んでくれるのです。人手で処理するわけではないので、機械が読み取れる形式でさえあればよいのです。
AWS CloudFormation テンプレート
一連の環境をすぐに構築できる AWS CloudFormation テンプレートを用意しました。詳細なパラメータは、実際に作成したスタックで確認して頂けたらと思います。
AWSTemplateFormatVersion: 2010-09-09 Description: CloudFormation template that creates a DynamoDB table, a State machine, a S3 bucket, an EventBridge schedule and relevant IAM roles. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SystemName: Type: String Description: System name. Default: xxxxx MaxLength: 10 MinLength: 1 Resources: # ------------------------------------------------------------# # DynamoDB # ------------------------------------------------------------# DynamoSushiOrderData: Type: AWS::DynamoDB::Table Properties: TableName: !Sub example-sushiorderdata-${SystemName} AttributeDefinitions: - AttributeName: tableid AttributeType: S - AttributeName: datetime AttributeType: S BillingMode: PAY_PER_REQUEST KeySchema: - AttributeName: tableid KeyType: HASH - AttributeName: datetime KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketExport: Type: AWS::S3::Bucket Properties: BucketName: !Sub example-dynamodb-export-${SystemName} PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # State Machine # ------------------------------------------------------------# StateMachineExportSushiOrderData: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub example-ExportSushiOrderData-${SystemName} StateMachineType: STANDARD DefinitionSubstitutions: DSSystemName: !Ref SystemName DSDynamoDBTableArn: !GetAtt DynamoSushiOrderData.Arn DefinitionString: |- { "Comment": "State machine to export sushi order data from DynamoDB to S3 for example-${DSSystemName}", "StartAt": "callExport", "States": { "callExport": { "Type": "Task", "Next": "Wait1", "Parameters": { "TableArn": "${DSDynamoDBTableArn}", "S3Bucket": "example-dynamodb-export-${DSSystemName}", "S3Prefix": "export", "ExportFormat": "DYNAMODB_JSON" }, "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime", "Comment": "Export sushi order data from DynamoDB", "OutputPath": "$.ExportDescription" }, "Wait1": { "Type": "Wait", "Seconds": 600, "Comment": "Wait for export finished", "Next": "checkExport" }, "checkExport": { "Type": "Task", "Next": "checkExportStatus", "Parameters": { "MaxResults": 1, "TableArn.$": "$.TableArn" }, "Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports", "OutputPath": "$.ExportSummaries[0]", "Comment": "Export result check" }, "checkExportStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.ExportStatus", "StringEquals": "COMPLETED", "Next": "ExportCompleted", "Comment": "If ExportStatus == COMPLETED then" }, { "Variable": "$.ExportStatus", "StringEquals": "IN_PROGRESS", "Next": "Wait1", "Comment": "If ExportStatus == IN_PROGRESS then" } ], "Default": "ExportFailed", "Comment": "Check export status" }, "ExportCompleted": { "Type": "Succeed", "Comment": "Export completed" }, "ExportFailed": { "Type": "Fail", "Comment": "Export failed" } } } LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt LogGroupStateMachineExportSushiOrderData.Arn IncludeExecutionData: true Level: ERROR RoleArn: !GetAtt StateMachineExecutionRoleExportSushiOrderData.Arn TracingConfiguration: Enabled: false Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # State Machine Execution LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupStateMachineExportSushiOrderData: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/vendedlogs/states/example-ExportSushiOrderData-${SystemName} RetentionInDays: 365 # ------------------------------------------------------------# # State Machine Execution Role (IAM) # ------------------------------------------------------------# StateMachineExecutionRoleExportSushiOrderData: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StateMachine-ExportSushiOrderData-${SystemName} Description: This role allows State Machines to call specified AWS resources. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: - sts:AssumeRole Path: /service-role/ ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess # ------------------------------------------------------------# # EventBridge Step Functions Invocation Role (IAM) # ------------------------------------------------------------# EventBridgeStepFunctionsInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StepFunctionsInvocation-${SystemName} Description: This role allows EventBridge to invoke Step Functions state machines. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - events.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess # ------------------------------------------------------------# # EventBridge Scheduled Job # ------------------------------------------------------------# EventsRuleSushiOrderData: Type: AWS::Events::Rule Properties: Name: !Sub example-SushiOrderData-${SystemName} Description: !Sub Events rule to call the state machine example-SushiOrderData-${SystemName} daily. ScheduleExpression: "cron(0 19 * * ? *)" State: ENABLED Targets: - Arn: !GetAtt StateMachineExportSushiOrderData.Arn Id: !Sub example-ExportSushiOrderData-${SystemName} RetryPolicy: MaximumRetryAttempts: 2 RoleArn: !GetAtt EventBridgeStepFunctionsInvocationRole.Arn
まとめ
いかがでしたでしょうか?
お題は Amazon DynamoDB のデータエクスポートでしたが、スケーラブルな非同期処理の基本形をご理解頂けたのではないかと思います。
本記事が皆様のお役に立てれば幸いです。