Amazon DynamoDB のデータを定期的に Amazon Athena テーブルにする [AWS CloudFormation テンプレート付き]

こんにちは、広野です。

以前、Amazon DynamoDB のデータを定期的に Amazon S3 にエクスポートする記事を書きました。

本記事はその続編記事で、Amazon S3 にあるデータを Amazon Athena テーブルにする方法を紹介します。

そうすると、そのデータに対して Amazon Athena からすぐに SQL クエリーをかけられる状態になるということです。Amazon Athena テーブル同士でテーブルの結合やビューの作成もできるので、あらゆるデータソースからのデータを Amazon Athena テーブルにしておけば、データ分析に役立てることができます。

前回記事では、Amazon DynamoDB のデータを1日1回 Amazon S3 にエクスポートしていました。本記事では、そのジョブに後続処理としてエクスポートデータを Amazon Athena テーブルとして設定するステップを追加します。

Amazon Athena テーブルについてはこちら。

テーブル、データベース、およびデータカタログの理解 - Amazon Athena
Athena では、カタログ、データベース、およびテーブルは、基盤となるソースデータのスキーマを定義するメタデータ定義のコンテナです。

やりたいこと

  • Amazon DynamoDB から Amazon S3 にエクスポートしたデータに SQL でクエリーをかけられるようにしたい。

実現方法

以下の構成をつくります。

  • Amazon S3 へのデータエクスポートが成功したら、Amazon Athena は古い Athena テーブルを削除します。
  • その後、Amazon Athena は最新のエクスポートデータから、新しい Athena テーブルを作成します。
  • 管理者は、最新の Athena テーブルに対して SQL クエリーをかけられます。

AWS Step Functions ステートマシンの解説

以下のワークフローを作成します。各ステップごとに処理内容を解説します。
※今回追加部分の dropAthenaExternalTable から

dropAthenaExternalTable

指定の Amazon Athena テーブルを削除する命令をかけます。
AWS Step Functions がネイティブにサポートしている API、Athena: StartQueryExecution を使用します。
Amazon Athena に対する DDL コマンド、DROP TABLE をかけているだけです。本記事の設計では同じ名前で Amazon Athena テーブルを毎日作成し続けるため、前処理として古いテーブルを削除しておかなければ後続処理でエラーになってしまうのです。

ここで、重要な設定があります。
ResultPath を使用して元の入力を出力に追加 というオプションを有効にしています。

この dropAthenaExternalTable ステップは、前ステップである checkExportStatus の結果を受け取るのですが、それをそのまま後続ステップに渡すという意味を持ちます。なぜそれをするかというと、後続ステップで checkExportStatus の結果データを使用するからです。
実際に受け渡すデータは以下のようなものになります。

{
  "ExportArn": "arn:aws:dynamodb:ap-northeast-2:xxxxxxxxxxxx:table/example-sushiorderdata-test999/export/01656356429030-157ef080",
  "ExportStatus": "COMPLETED"
}

Amazon DynamoDB はデータをエクスポートするとき、1回1回のエクスポート処理に固有のエクスポート ID を持ちます。そのエクスポート ID が、ExportArn の値の一部として格納されています。/export/ 以降の英数字とハイフンの文字列 (UUIDかな?) です。
そして、エクスポート ID をデータエクスポート時の保存フォルダ名として使ってしまうのですが、後続ステップではそれを必要とするため連携してあげないと他にわかる術がないのです。

createAthenaExternalTable

Amazon S3 上のデータから、Amazon Athena テーブルを作成する命令をかけます。
できれば AWS Step Function ネイティブの API を使用したいのですが、前ステップから受け取る入力データを加工する必要があり、仕方なく AWS Lambda 関数を使用しています。

呼び出す AWS Lambda 関数は以下です。

import boto3
client = boto3.client('athena')
# Get ExportId from ExportArn
def getExportId(strtmp):
  array = strtmp.split('/')
  return array[3]
# Create Athena external table
def lambda_handler(event, context):
  exportarn = event['ExportArn']
  exportid = getExportId(exportarn)
  athena_ddl = "CREATE EXTERNAL TABLE IF NOT EXISTS example_sushiorderdata_test999 \
  (Item struct <tableid:struct,datetime:struct,quantity:struct,menu:struct>) \
  ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \
  LOCATION 's3://example-dynamodb-export-test999/export/AWSDynamoDB/" + exportid + "/data/' \
  TBLPROPERTIES ('has_encrypted_data'='true');"
  res = client.start_query_execution(
    QueryString=athena_ddl,
    ResultConfiguration={
      'OutputLocation': 's3://example-dynamodb-export-test999/export/AWSDynamoDB/' + exportid + '/ddloutput/'
    },
    WorkGroup='AthenaWG-example-test999'
  )
  return res

Amazon Athena に対する DDL コマンド、CREATE EXTERNAL TABLE をかけます。
パラメータとして Amazon S3 にあるデータ(複数ファイル)が保存されているバケット名、フォルダを指定します。このときに、前ステップから渡されたデータから Amazon DynamoDB のエクスポート ID を抽出します。

event[‘ExportArn’] の中にデータがあり、/ (スラッシュ) で区切られたデータのうち 4つ目、0始まりでカウントすると 3つ目のデータがエクスポート ID になるため、それを抜き取るコードを書いています。ジョブの実行ごとにエクスポート ID が変わってしまうため、エクスポート ID を変数に格納してジョブの実行の都度、DDL コマンドのパラメータに突っ込むコードにしています。

ここで Amazon Athena が読み取る、エクスポートされたデータは以下のフォーマットです。

{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:48:00.000Z"
    },
    "quantity": {
      "N": "1"
    },
    "menu": {
      "S": "いか"
    }
  }
}
{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:49:52.000Z"
    },
    "quantity": {
      "N": "1"
    },
    "menu": {
      "S": "まぐろ"
    }
  }
}
{
  "Item": {
    "tableid": {
      "S": "01"
    },
    "datetime": {
      "S": "2022-06-20T01:49:30.000Z"
    },
    "quantity": {
      "N": "2"
    },
    "menu": {
      "S": "たこ"
    }
  }
}

このデータを読み取って Amazon Athena テーブルにするために、(Item struct から始まる行でデータ項目の定義をしています。

AWS Lambda 関数が無事実行されると、結果は res という変数に格納され、AWS Step Functions に返されます。この res の中に、後続ステップで必要な項目が含まれているのですが、結果の中には正直使用しないデータが大量に含まれているため、必要な情報だけフィルタして後続ステップに渡すようにしています。

AWS Lambda 関数から渡されたデータは、Payload というキーの中に格納されます。その中の QueryExecutionId だけは後続ステップで使用するので、それだけを抽出して渡すように設定しています。

なぜそうするかと言うと、この AWS Lambda 関数は命令を実行して終わりになっています。結果確認は次ステップで QueryExectionId を元に確認します。前回記事でも書きましたが、非同期処理です。こうすることによって、AWS Lambda 関数にかかる課金は極小化され、命令の実行にかかる時間が長くなったとしても AWS Lambda 関数の制約に引っ掛かることもなく、スケーラブルなジョブになっていると言えます。

Wait2

前ステップの createAthenaExternalTable は、命令を出しただけで終了になります。
その処理はバックグラウンドで進んでいるので、一定時間待った後に後続ステップ(結果確認)を実行します。
ここでは、30秒の待ちを入れています。

前ステップで QueryExecutionId を出力し、後続ステップであるこの Wait2 に渡しました。Wait2 はただ待つだけなので QueryExecutionId を使用しませんが、またさらに後続ステップに渡してあげる必要があります。

ただし、何も設定する必要はありません。デフォルトで、待ちのステップは受け取ったデータを後続ステップにそのまま渡してくれます。

checkAthenaExternalTable

createAthenaExternalTable ステップで行った、Amazon Athena テーブル作成命令の結果を Amazon Athena に問い合わせします。
AWS Step Functions がネイティブにサポートしている API、Athena: GetQueryExecution を使用します。

この API は、実行した Amazon Athena へのクエリー結果がどうなったのか、結果を取得してくれます。パラメータとして、前ステップから受け取った QueryExectionId を指定します。

結果は、後続ステップで使用しないデータが大量に含まれているため、フィルタリングして渡します。

checkAthenaExternalTableStatus

前ステップで Amazon Athena テーブル作成結果を取得した後、その値によってこの先の進み方を分岐させます。条件分岐させるには、Choice state を使用します。
結果の値は、受け取った入力データの Status.State に格納されています。
値が SUCCEEDED であれば、正常終了。
値が QUEUED または RUNNING であれば、Wait2 に戻します。これにより、一定時間待った後に再度結果をチェックしに行きます。
値がそれ以外であれば、異常終了としています。

AthenaExternalTableCreateCompleted / AthenaExternalTableCreateFailed / ExportFailed

何もしないステップですが、見た目成功か失敗かがわかるように入れています。
ここに、通知系のステップを入れるとよいと思います。

ポイントまとめ

基本的に前回記事と気を付けているポイントは同じです。少し異なるのは、以下のテクニックを使っている点でしょうか。

当該ステップで使用しない前ステップの入力データを、後続ステップ以降で使用するために明示的に後続ステップに引き渡す。
効率的な処理にするため、当該ステップの結果データをフィルタリングして後続ステップに渡す。
AWS Step Functions がネイティブにサポートしている API が使用できない処理を実行するため、代わりに AWS Lambda 関数を使用している。
前回記事からの繰り返しになりますが、AWS Lambda 関数を使用する場合には、命令と結果確認を分けることをお勧めします。それにより実行時間の制約を受けなくなりますし、データ量が増えても時間と課金を気にすることがなくなります。スケーラブルな設計になります。

実際に動かしてみる

実際に動かすと、以下のように流れます。通ったルートが緑色で表示されます。

Amazon DynamoDB には以下のテストデータを入れていました。

これがエクスポートされ、Amazon Athena テーブルにしてから SQL クエリーをかけるとこうなります。

少々、いやかなり独特の SQL になりますが、無事 SQL をかけられるようになりました。
当然、SQL の WHERE 句等による複雑な条件設定が可能となります。データが Amazon DynamoDB (NoSQL) に格納されているままでは実現できなかった検索条件ができるようになっています。結果を CSV にしてダウンロードすることもできますし、SQL 実行から結果を Amazon S3 に落とす、という処理まで自動化し、WEB アプリから結果データを読み込ませ表示させる、なんてことにも応用できます。

datetime は、元々 Amazon DynamoDB に格納されている状態では ISO8601 フォーマットの string 型でしたが、この SQL では AWS のタイムスタンプ型に変換をかけています。この方が、Amazon Athena でクエリーをかける際には都合が良いです。 

めでたし、めでたし。

AWS CloudFormation テンプレート

一連の環境をすぐに構築できる AWS CloudFormation テンプレートを用意しました。詳細なパラメータは、実際に作成したスタックで確認して頂けたらと思います。

AWSTemplateFormatVersion: 2010-09-09
Description: CloudFormation template that creates a DynamoDB table, a State machine, a S3 bucket, an Athena external table, an EventBridge schedule and relevant IAM roles.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: System name.
    Default: xxxxx
    MaxLength: 10
    MinLength: 1

  SFnSystemName:
    Type: String
    Description: System name for Step Functions only. (not any special characters)
    Default: xxxxx
    MaxLength: 10
    MinLength: 1

Resources:
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
  DynamoSushiOrderData:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub example-sushiorderdata-${SystemName}
      AttributeDefinitions:
        - AttributeName: tableid
          AttributeType: S
        - AttributeName: datetime
          AttributeType: S
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: tableid
          KeyType: HASH
        - AttributeName: datetime
          KeyType: RANGE
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketExport:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub example-dynamodb-export-${SystemName}
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# State Machine
# ------------------------------------------------------------#
  StateMachineExportSushiOrderData:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub example-ExportSushiOrderData-${SystemName}
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DSSystemName: !Ref SFnSystemName
        DSDynamoDBTableArn: !GetAtt DynamoSushiOrderData.Arn
        DSS3BucketName: !Ref S3BucketExport
        DSLambdaCreateAthenaExternalTableSushiOrderDataArn: !GetAtt LambdaCreateAthenaExternalTableSushiOrderData.Arn
      DefinitionString: |-
        {
          "Comment": "State machine to export sushi order data from DynamoDB to S3 for example-${DSSystemName}",
          "StartAt": "callExport",
          "States": {
            "callExport": {
              "Type": "Task",
              "Next": "Wait1",
              "Parameters": {
                "TableArn": "${DSDynamoDBTableArn}",
                "S3Bucket": "${DSS3BucketName}",
                "S3Prefix": "export",
                "ExportFormat": "DYNAMODB_JSON"
              },
              "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
              "Comment": "Export sushi order data from DynamoDB",
              "OutputPath": "$.ExportDescription"
            },
            "Wait1": {
              "Type": "Wait",
              "Seconds": 600,
              "Comment": "Wait for export finished",
              "Next": "checkExport"
            },
            "checkExport": {
              "Type": "Task",
              "Next": "checkExportStatus",
              "Parameters": {
                "MaxResults": 1,
                "TableArn.$": "$.TableArn"
              },
              "Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports",
              "OutputPath": "$.ExportSummaries[0]",
              "Comment": "Export result check"
            },
            "checkExportStatus": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.ExportStatus",
                  "StringEquals": "COMPLETED",
                  "Next": "dropAthenaExternalTable",
                  "Comment": "If ExportStatus == COMPLETED then"
                },
                {
                  "Variable": "$.ExportStatus",
                  "StringEquals": "IN_PROGRESS",
                  "Next": "Wait1",
                  "Comment": "If ExportStatus == IN_PROGRESS then"
                }
              ],
              "Default": "ExportFailed",
              "Comment": "Check export status"
            },
            "dropAthenaExternalTable": {
              "Type": "Task",
              "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
              "Parameters": {
                "QueryString": "DROP TABLE IF EXISTS example_sushiorderdata_${DSSystemName}",
                "ResultConfiguration": {
                  "OutputLocation": "s3://${DSS3BucketName}/export/AWSDynamoDB/deleteTable/ddloutput/"
                },
                "WorkGroup": "AthenaWG-example-${DSSystemName}"
              },
              "Next": "createAthenaExternalTable",
              "ResultPath": "$.result",
              "Comment": "Drop the old Athena external table"
            },
            "createAthenaExternalTable": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "Payload.$": "$",
                "FunctionName": "${DSLambdaCreateAthenaExternalTableSushiOrderDataArn}"
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 3,
                  "BackoffRate": 2
                }
              ],
              "Next": "Wait2",
              "Comment": "Create a new Athena external table from the exported data",
              "ResultSelector": {
                "QueryExecutionId.$": "$.Payload.QueryExecutionId"
              }
            },
            "Wait2": {
              "Type": "Wait",
              "Seconds": 30,
              "Next": "checkAthenaExternalTable",
              "Comment": "Wait for creating the Athena external table finished"
            },
            "checkAthenaExternalTable": {
              "Type": "Task",
              "Resource": "arn:aws:states:::athena:getQueryExecution",
              "Parameters": {
                "QueryExecutionId.$": "$.QueryExecutionId"
              },
              "Next": "checkAthenaExternalTableStatus",
              "Comment": "Check the Athena external table creation completed",
              "OutputPath": "$.QueryExecution"
            },
            "checkAthenaExternalTableStatus": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.Status.State",
                  "StringEquals": "SUCCEEDED",
                  "Next": "AthenaExternalTableCreateCompleted",
                  "Comment": "If Status == SUCCEEDED then"
                },
                {
                  "Or": [
                    {
                      "Variable": "$.Status.State",
                      "StringEquals": "QUEUED"
                    },
                    {
                      "Variable": "$.Status.State",
                      "StringEquals": "RUNNING"
                    }
                  ],
                  "Next": "Wait2",
                  "Comment": "If State IN ( QUEUED, RUNNING )"
                }
              ],
              "Comment": "Check the Athena external table status",
              "Default": "AthenaExternalTableCreateFailed"
            },
            "AthenaExternalTableCreateFailed": {
              "Type": "Fail",
              "Comment": "Athena external table creation failed"
            },
            "AthenaExternalTableCreateCompleted": {
              "Type": "Succeed",
              "Comment": "Athena external table creation completed"
            },
            "ExportFailed": {
              "Type": "Fail",
              "Comment": "ExportFailed"
            }
          }
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachineExportSushiOrderData.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineExecutionRoleExportSushiOrderData.Arn
      TracingConfiguration:
        Enabled: false
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachineExportSushiOrderData:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/example-ExportSushiOrderData-${SystemName}
      RetentionInDays: 365

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineExecutionRoleExportSushiOrderData:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StateMachine-ExportSushiOrderData-${SystemName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /service-role/
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
        - arn:aws:iam::aws:policy/AmazonAthenaFullAccess

# ------------------------------------------------------------#
# Lambda Athena Execution Role (IAM)
# ------------------------------------------------------------#
  LambdaAthenaInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-AthenaInvocationRole-${SystemName}
      Description: This role allows Lambda functions to invoke Athena.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonAthenaFullAccess
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AmazonS3FullAccess

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaCreateAthenaExternalTableSushiOrderData:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub example-CreateAthenaExternalTableSushiOrderData-${SystemName}
      Description: !Sub Lambda Function to create or update the Athena external table to query sushi order data in S3 for example-${SystemName}, called from Step Functions
      Runtime: python3.9
      Timeout: 3
      MemorySize: 128
      Role: !GetAtt LambdaAthenaInvocationRole.Arn
      Handler: index.lambda_handler
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
      Code:
        ZipFile: !Sub |
          import boto3
          client = boto3.client('athena')
          # Get ExportId from ExportArn
          def getExportId(strtmp):
            array = strtmp.split('/')
            return array[3]
          # Create Athena external table
          def lambda_handler(event, context):
            exportarn = event['ExportArn']
            exportid = getExportId(exportarn)
            athena_ddl = "CREATE EXTERNAL TABLE IF NOT EXISTS example_sushiorderdata_${SFnSystemName} \
            (Item struct <tableid:struct,datetime:struct,quantity:struct,menu:struct>) \
            ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \
            LOCATION 's3://${S3BucketExport}/export/AWSDynamoDB/" + exportid + "/data/' \
            TBLPROPERTIES ('has_encrypted_data'='true');"
            res = client.start_query_execution(
              QueryString=athena_ddl,
              ResultConfiguration={
                'OutputLocation': 's3://${S3BucketExport}/export/AWSDynamoDB/' + exportid + '/ddloutput/'
              },
              WorkGroup='AthenaWG-example-${SFnSystemName}'
            )
            return res

# ------------------------------------------------------------#
# Athena
# ------------------------------------------------------------#
  AthenaWorkgroupLogs:
    Type: AWS::Athena::WorkGroup
    Properties:
      Description: !Sub Athena Workgroup for example-${SystemName}
      Name: !Sub AthenaWG-example-${SFnSystemName}
      RecursiveDeleteOption: true
      State: ENABLED
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
      WorkGroupConfiguration:
        EnforceWorkGroupConfiguration: false
        PublishCloudWatchMetricsEnabled: true
        RequesterPaysEnabled: false
        ResultConfiguration:
          OutputLocation: !Sub s3://${S3BucketExport}/

# ------------------------------------------------------------#
# EventBridge Step Functions Invocation Role (IAM)
# ------------------------------------------------------------#
  EventBridgeStepFunctionsInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StepFunctionsInvocation-${SystemName}
      Description: This role allows EventBridge to invoke Step Functions state machines.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - events.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess

# ------------------------------------------------------------#
# EventBridge Scheduled Job
# ------------------------------------------------------------#
  EventsRuleSushiOrderData:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub example-SushiOrderData-${SystemName}
      Description: !Sub Events rule to call the state machine example-SushiOrderData-${SystemName} daily.
      ScheduleExpression: "cron(0 19 * * ? *)"
      State: ENABLED
      Targets:
        - Arn: !GetAtt StateMachineExportSushiOrderData.Arn
          Id: !Sub example-ExportSushiOrderData-${SystemName}
          RetryPolicy:
            MaximumRetryAttempts: 2
          RoleArn: !GetAtt EventBridgeStepFunctionsInvocationRole.Arn

まとめ

いかがでしたでしょうか?

リアルタイムなデータ検索ではないですが、Amazon DynamoDB に格納されているデータに対して Amazon Athena から SQL クエリーをかけられるようになったと思います。

細かい部分に着目するともっとスマートな方法、設定はありますが、とりあえず動くサンプルの1つであるというご理解頂けましたら幸いです。

タイトルとURLをコピーしました