こんにちは、広野です。
以前、Amazon DynamoDB のデータを定期的に Amazon S3 にエクスポートする記事を書きました。
本記事はその続編記事で、Amazon S3 にあるデータを Amazon Athena テーブルにする方法を紹介します。
そうすると、そのデータに対して Amazon Athena からすぐに SQL クエリーをかけられる状態になるということです。Amazon Athena テーブル同士でテーブルの結合やビューの作成もできるので、あらゆるデータソースからのデータを Amazon Athena テーブルにしておけば、データ分析に役立てることができます。
前回記事では、Amazon DynamoDB のデータを1日1回 Amazon S3 にエクスポートしていました。本記事では、そのジョブに後続処理としてエクスポートデータを Amazon Athena テーブルとして設定するステップを追加します。
Amazon Athena テーブルについてはこちら。
やりたいこと
- Amazon DynamoDB から Amazon S3 にエクスポートしたデータに SQL でクエリーをかけられるようにしたい。
実現方法
以下の構成をつくります。
- Amazon S3 へのデータエクスポートが成功したら、Amazon Athena は古い Athena テーブルを削除します。
- その後、Amazon Athena は最新のエクスポートデータから、新しい Athena テーブルを作成します。
- 管理者は、最新の Athena テーブルに対して SQL クエリーをかけられます。
AWS Step Functions ステートマシンの解説
以下のワークフローを作成します。各ステップごとに処理内容を解説します。
※今回追加部分の dropAthenaExternalTable から
dropAthenaExternalTable
指定の Amazon Athena テーブルを削除する命令をかけます。
AWS Step Functions がネイティブにサポートしている API、Athena: StartQueryExecution を使用します。
Amazon Athena に対する DDL コマンド、DROP TABLE をかけているだけです。本記事の設計では同じ名前で Amazon Athena テーブルを毎日作成し続けるため、前処理として古いテーブルを削除しておかなければ後続処理でエラーになってしまうのです。
ここで、重要な設定があります。
ResultPath を使用して元の入力を出力に追加 というオプションを有効にしています。
この dropAthenaExternalTable ステップは、前ステップである checkExportStatus の結果を受け取るのですが、それをそのまま後続ステップに渡すという意味を持ちます。なぜそれをするかというと、後続ステップで checkExportStatus の結果データを使用するからです。
実際に受け渡すデータは以下のようなものになります。
{ "ExportArn": "arn:aws:dynamodb:ap-northeast-2:xxxxxxxxxxxx:table/example-sushiorderdata-test999/export/01656356429030-157ef080", "ExportStatus": "COMPLETED" }
Amazon DynamoDB はデータをエクスポートするとき、1回1回のエクスポート処理に固有のエクスポート ID を持ちます。そのエクスポート ID が、ExportArn の値の一部として格納されています。/export/ 以降の英数字とハイフンの文字列 (UUIDかな?) です。
そして、エクスポート ID をデータエクスポート時の保存フォルダ名として使ってしまうのですが、後続ステップではそれを必要とするため連携してあげないと他にわかる術がないのです。
createAthenaExternalTable
Amazon S3 上のデータから、Amazon Athena テーブルを作成する命令をかけます。
できれば AWS Step Function ネイティブの API を使用したいのですが、前ステップから受け取る入力データを加工する必要があり、仕方なく AWS Lambda 関数を使用しています。
呼び出す AWS Lambda 関数は以下です。
import boto3 client = boto3.client('athena') # Get ExportId from ExportArn def getExportId(strtmp): array = strtmp.split('/') return array[3] # Create Athena external table def lambda_handler(event, context): exportarn = event['ExportArn'] exportid = getExportId(exportarn) athena_ddl = "CREATE EXTERNAL TABLE IF NOT EXISTS example_sushiorderdata_test999 \ (Item struct <tableid:struct,datetime:struct,quantity:struct,menu:struct>) \ ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \ LOCATION 's3://example-dynamodb-export-test999/export/AWSDynamoDB/" + exportid + "/data/' \ TBLPROPERTIES ('has_encrypted_data'='true');" res = client.start_query_execution( QueryString=athena_ddl, ResultConfiguration={ 'OutputLocation': 's3://example-dynamodb-export-test999/export/AWSDynamoDB/' + exportid + '/ddloutput/' }, WorkGroup='AthenaWG-example-test999' ) return res
Amazon Athena に対する DDL コマンド、CREATE EXTERNAL TABLE をかけます。
パラメータとして Amazon S3 にあるデータ(複数ファイル)が保存されているバケット名、フォルダを指定します。このときに、前ステップから渡されたデータから Amazon DynamoDB のエクスポート ID を抽出します。
event[‘ExportArn’] の中にデータがあり、/ (スラッシュ) で区切られたデータのうち 4つ目、0始まりでカウントすると 3つ目のデータがエクスポート ID になるため、それを抜き取るコードを書いています。ジョブの実行ごとにエクスポート ID が変わってしまうため、エクスポート ID を変数に格納してジョブの実行の都度、DDL コマンドのパラメータに突っ込むコードにしています。
ここで Amazon Athena が読み取る、エクスポートされたデータは以下のフォーマットです。
{ "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:48:00.000Z" }, "quantity": { "N": "1" }, "menu": { "S": "いか" } } } { "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:49:52.000Z" }, "quantity": { "N": "1" }, "menu": { "S": "まぐろ" } } } { "Item": { "tableid": { "S": "01" }, "datetime": { "S": "2022-06-20T01:49:30.000Z" }, "quantity": { "N": "2" }, "menu": { "S": "たこ" } } }
このデータを読み取って Amazon Athena テーブルにするために、(Item struct から始まる行でデータ項目の定義をしています。
AWS Lambda 関数が無事実行されると、結果は res という変数に格納され、AWS Step Functions に返されます。この res の中に、後続ステップで必要な項目が含まれているのですが、結果の中には正直使用しないデータが大量に含まれているため、必要な情報だけフィルタして後続ステップに渡すようにしています。
AWS Lambda 関数から渡されたデータは、Payload というキーの中に格納されます。その中の QueryExecutionId だけは後続ステップで使用するので、それだけを抽出して渡すように設定しています。
なぜそうするかと言うと、この AWS Lambda 関数は命令を実行して終わりになっています。結果確認は次ステップで QueryExectionId を元に確認します。前回記事でも書きましたが、非同期処理です。こうすることによって、AWS Lambda 関数にかかる課金は極小化され、命令の実行にかかる時間が長くなったとしても AWS Lambda 関数の制約に引っ掛かることもなく、スケーラブルなジョブになっていると言えます。
Wait2
前ステップの createAthenaExternalTable は、命令を出しただけで終了になります。
その処理はバックグラウンドで進んでいるので、一定時間待った後に後続ステップ(結果確認)を実行します。
ここでは、30秒の待ちを入れています。
前ステップで QueryExecutionId を出力し、後続ステップであるこの Wait2 に渡しました。Wait2 はただ待つだけなので QueryExecutionId を使用しませんが、またさらに後続ステップに渡してあげる必要があります。
ただし、何も設定する必要はありません。デフォルトで、待ちのステップは受け取ったデータを後続ステップにそのまま渡してくれます。
checkAthenaExternalTable
createAthenaExternalTable ステップで行った、Amazon Athena テーブル作成命令の結果を Amazon Athena に問い合わせします。
AWS Step Functions がネイティブにサポートしている API、Athena: GetQueryExecution を使用します。
この API は、実行した Amazon Athena へのクエリー結果がどうなったのか、結果を取得してくれます。パラメータとして、前ステップから受け取った QueryExectionId を指定します。
結果は、後続ステップで使用しないデータが大量に含まれているため、フィルタリングして渡します。
checkAthenaExternalTableStatus
前ステップで Amazon Athena テーブル作成結果を取得した後、その値によってこの先の進み方を分岐させます。条件分岐させるには、Choice state を使用します。
結果の値は、受け取った入力データの Status.State に格納されています。
値が SUCCEEDED であれば、正常終了。
値が QUEUED または RUNNING であれば、Wait2 に戻します。これにより、一定時間待った後に再度結果をチェックしに行きます。
値がそれ以外であれば、異常終了としています。
AthenaExternalTableCreateCompleted / AthenaExternalTableCreateFailed / ExportFailed
何もしないステップですが、見た目成功か失敗かがわかるように入れています。
ここに、通知系のステップを入れるとよいと思います。
ポイントまとめ
基本的に前回記事と気を付けているポイントは同じです。少し異なるのは、以下のテクニックを使っている点でしょうか。
実際に動かしてみる
実際に動かすと、以下のように流れます。通ったルートが緑色で表示されます。
Amazon DynamoDB には以下のテストデータを入れていました。
これがエクスポートされ、Amazon Athena テーブルにしてから SQL クエリーをかけるとこうなります。
少々、いやかなり独特の SQL になりますが、無事 SQL をかけられるようになりました。
当然、SQL の WHERE 句等による複雑な条件設定が可能となります。データが Amazon DynamoDB (NoSQL) に格納されているままでは実現できなかった検索条件ができるようになっています。結果を CSV にしてダウンロードすることもできますし、SQL 実行から結果を Amazon S3 に落とす、という処理まで自動化し、WEB アプリから結果データを読み込ませ表示させる、なんてことにも応用できます。
datetime は、元々 Amazon DynamoDB に格納されている状態では ISO8601 フォーマットの string 型でしたが、この SQL では AWS のタイムスタンプ型に変換をかけています。この方が、Amazon Athena でクエリーをかける際には都合が良いです。
めでたし、めでたし。
AWS CloudFormation テンプレート
一連の環境をすぐに構築できる AWS CloudFormation テンプレートを用意しました。詳細なパラメータは、実際に作成したスタックで確認して頂けたらと思います。
AWSTemplateFormatVersion: 2010-09-09 Description: CloudFormation template that creates a DynamoDB table, a State machine, a S3 bucket, an Athena external table, an EventBridge schedule and relevant IAM roles. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SystemName: Type: String Description: System name. Default: xxxxx MaxLength: 10 MinLength: 1 SFnSystemName: Type: String Description: System name for Step Functions only. (not any special characters) Default: xxxxx MaxLength: 10 MinLength: 1 Resources: # ------------------------------------------------------------# # DynamoDB # ------------------------------------------------------------# DynamoSushiOrderData: Type: AWS::DynamoDB::Table Properties: TableName: !Sub example-sushiorderdata-${SystemName} AttributeDefinitions: - AttributeName: tableid AttributeType: S - AttributeName: datetime AttributeType: S BillingMode: PAY_PER_REQUEST KeySchema: - AttributeName: tableid KeyType: HASH - AttributeName: datetime KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketExport: Type: AWS::S3::Bucket Properties: BucketName: !Sub example-dynamodb-export-${SystemName} PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # State Machine # ------------------------------------------------------------# StateMachineExportSushiOrderData: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub example-ExportSushiOrderData-${SystemName} StateMachineType: STANDARD DefinitionSubstitutions: DSSystemName: !Ref SFnSystemName DSDynamoDBTableArn: !GetAtt DynamoSushiOrderData.Arn DSS3BucketName: !Ref S3BucketExport DSLambdaCreateAthenaExternalTableSushiOrderDataArn: !GetAtt LambdaCreateAthenaExternalTableSushiOrderData.Arn DefinitionString: |- { "Comment": "State machine to export sushi order data from DynamoDB to S3 for example-${DSSystemName}", "StartAt": "callExport", "States": { "callExport": { "Type": "Task", "Next": "Wait1", "Parameters": { "TableArn": "${DSDynamoDBTableArn}", "S3Bucket": "${DSS3BucketName}", "S3Prefix": "export", "ExportFormat": "DYNAMODB_JSON" }, "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime", "Comment": "Export sushi order data from DynamoDB", "OutputPath": "$.ExportDescription" }, "Wait1": { "Type": "Wait", "Seconds": 600, "Comment": "Wait for export finished", "Next": "checkExport" }, "checkExport": { "Type": "Task", "Next": "checkExportStatus", "Parameters": { "MaxResults": 1, "TableArn.$": "$.TableArn" }, "Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports", "OutputPath": "$.ExportSummaries[0]", "Comment": "Export result check" }, "checkExportStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.ExportStatus", "StringEquals": "COMPLETED", "Next": "dropAthenaExternalTable", "Comment": "If ExportStatus == COMPLETED then" }, { "Variable": "$.ExportStatus", "StringEquals": "IN_PROGRESS", "Next": "Wait1", "Comment": "If ExportStatus == IN_PROGRESS then" } ], "Default": "ExportFailed", "Comment": "Check export status" }, "dropAthenaExternalTable": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "DROP TABLE IF EXISTS example_sushiorderdata_${DSSystemName}", "ResultConfiguration": { "OutputLocation": "s3://${DSS3BucketName}/export/AWSDynamoDB/deleteTable/ddloutput/" }, "WorkGroup": "AthenaWG-example-${DSSystemName}" }, "Next": "createAthenaExternalTable", "ResultPath": "$.result", "Comment": "Drop the old Athena external table" }, "createAthenaExternalTable": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "Payload.$": "$", "FunctionName": "${DSLambdaCreateAthenaExternalTableSushiOrderDataArn}" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2 } ], "Next": "Wait2", "Comment": "Create a new Athena external table from the exported data", "ResultSelector": { "QueryExecutionId.$": "$.Payload.QueryExecutionId" } }, "Wait2": { "Type": "Wait", "Seconds": 30, "Next": "checkAthenaExternalTable", "Comment": "Wait for creating the Athena external table finished" }, "checkAthenaExternalTable": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryExecution", "Parameters": { "QueryExecutionId.$": "$.QueryExecutionId" }, "Next": "checkAthenaExternalTableStatus", "Comment": "Check the Athena external table creation completed", "OutputPath": "$.QueryExecution" }, "checkAthenaExternalTableStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.Status.State", "StringEquals": "SUCCEEDED", "Next": "AthenaExternalTableCreateCompleted", "Comment": "If Status == SUCCEEDED then" }, { "Or": [ { "Variable": "$.Status.State", "StringEquals": "QUEUED" }, { "Variable": "$.Status.State", "StringEquals": "RUNNING" } ], "Next": "Wait2", "Comment": "If State IN ( QUEUED, RUNNING )" } ], "Comment": "Check the Athena external table status", "Default": "AthenaExternalTableCreateFailed" }, "AthenaExternalTableCreateFailed": { "Type": "Fail", "Comment": "Athena external table creation failed" }, "AthenaExternalTableCreateCompleted": { "Type": "Succeed", "Comment": "Athena external table creation completed" }, "ExportFailed": { "Type": "Fail", "Comment": "ExportFailed" } } } LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt LogGroupStateMachineExportSushiOrderData.Arn IncludeExecutionData: true Level: ERROR RoleArn: !GetAtt StateMachineExecutionRoleExportSushiOrderData.Arn TracingConfiguration: Enabled: false Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # State Machine Execution LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupStateMachineExportSushiOrderData: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/vendedlogs/states/example-ExportSushiOrderData-${SystemName} RetentionInDays: 365 # ------------------------------------------------------------# # State Machine Execution Role (IAM) # ------------------------------------------------------------# StateMachineExecutionRoleExportSushiOrderData: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StateMachine-ExportSushiOrderData-${SystemName} Description: This role allows State Machines to call specified AWS resources. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: - sts:AssumeRole Path: /service-role/ ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaRole - arn:aws:iam::aws:policy/AmazonAthenaFullAccess # ------------------------------------------------------------# # Lambda Athena Execution Role (IAM) # ------------------------------------------------------------# LambdaAthenaInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-AthenaInvocationRole-${SystemName} Description: This role allows Lambda functions to invoke Athena. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonAthenaFullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AmazonS3FullAccess # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaCreateAthenaExternalTableSushiOrderData: Type: AWS::Lambda::Function Properties: FunctionName: !Sub example-CreateAthenaExternalTableSushiOrderData-${SystemName} Description: !Sub Lambda Function to create or update the Athena external table to query sushi order data in S3 for example-${SystemName}, called from Step Functions Runtime: python3.9 Timeout: 3 MemorySize: 128 Role: !GetAtt LambdaAthenaInvocationRole.Arn Handler: index.lambda_handler Tags: - Key: Cost Value: !Sub example-${SystemName} Code: ZipFile: !Sub | import boto3 client = boto3.client('athena') # Get ExportId from ExportArn def getExportId(strtmp): array = strtmp.split('/') return array[3] # Create Athena external table def lambda_handler(event, context): exportarn = event['ExportArn'] exportid = getExportId(exportarn) athena_ddl = "CREATE EXTERNAL TABLE IF NOT EXISTS example_sushiorderdata_${SFnSystemName} \ (Item struct <tableid:struct,datetime:struct,quantity:struct,menu:struct>) \ ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \ LOCATION 's3://${S3BucketExport}/export/AWSDynamoDB/" + exportid + "/data/' \ TBLPROPERTIES ('has_encrypted_data'='true');" res = client.start_query_execution( QueryString=athena_ddl, ResultConfiguration={ 'OutputLocation': 's3://${S3BucketExport}/export/AWSDynamoDB/' + exportid + '/ddloutput/' }, WorkGroup='AthenaWG-example-${SFnSystemName}' ) return res # ------------------------------------------------------------# # Athena # ------------------------------------------------------------# AthenaWorkgroupLogs: Type: AWS::Athena::WorkGroup Properties: Description: !Sub Athena Workgroup for example-${SystemName} Name: !Sub AthenaWG-example-${SFnSystemName} RecursiveDeleteOption: true State: ENABLED Tags: - Key: Cost Value: !Sub example-${SystemName} WorkGroupConfiguration: EnforceWorkGroupConfiguration: false PublishCloudWatchMetricsEnabled: true RequesterPaysEnabled: false ResultConfiguration: OutputLocation: !Sub s3://${S3BucketExport}/ # ------------------------------------------------------------# # EventBridge Step Functions Invocation Role (IAM) # ------------------------------------------------------------# EventBridgeStepFunctionsInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StepFunctionsInvocation-${SystemName} Description: This role allows EventBridge to invoke Step Functions state machines. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - events.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess # ------------------------------------------------------------# # EventBridge Scheduled Job # ------------------------------------------------------------# EventsRuleSushiOrderData: Type: AWS::Events::Rule Properties: Name: !Sub example-SushiOrderData-${SystemName} Description: !Sub Events rule to call the state machine example-SushiOrderData-${SystemName} daily. ScheduleExpression: "cron(0 19 * * ? *)" State: ENABLED Targets: - Arn: !GetAtt StateMachineExportSushiOrderData.Arn Id: !Sub example-ExportSushiOrderData-${SystemName} RetryPolicy: MaximumRetryAttempts: 2 RoleArn: !GetAtt EventBridgeStepFunctionsInvocationRole.Arn
まとめ
いかがでしたでしょうか?
リアルタイムなデータ検索ではないですが、Amazon DynamoDB に格納されているデータに対して Amazon Athena から SQL クエリーをかけられるようになったと思います。
細かい部分に着目するともっとスマートな方法、設定はありますが、とりあえず動くサンプルの1つであるというご理解頂けましたら幸いです。