Amazon S3 に置いたデータを自動的に Amazon DynamoDB にインポートしたい [AWS CloudFormation テンプレート付き]

こんにちは、広野です。
本記事は、以下の記事の続編です。

AWS Cloud9 上から Python スクリプトで Amazon DynamoDB にデータをインポートする方法を紹介しましたが、今回は Amazon S3 にデータを置いたら自動で Amazon DynamoDB にインポートされる仕組みにアレンジしてみます。

特定の Amazon DynamoDB テーブルを何度も更新する運用が必要な場合、データファイルさえ作ってしまえば Amazon S3 バケットに置くだけで更新ができるようになり、楽です。

実装は AWS CloudFormation で行います。サンプルテンプレートを紹介しますのでパラメータ等を適宜変更すればすぐにお使い頂けます。とりあえず動きを試したいだけでしたら、このままでも問題ありません。

やりたいこと

  • Amazon S3 バケットに JSON データを配置したら、自動的にそのデータファイルを読み込んで特定の Amazon DynamoDB テーブルにデータをインポートさせたい。
  • データインポート手段には AWS Lambda (Python) を使用する。
  • インポートしたいデータは JSON 形式とする。

実現方法

  1. Amazon S3 バケットにフォルダを作成しておく。
  2. そのフォルダにファイルが配置されると、Amazon S3 イベント通知により AWS Lambda 関数を呼び出す。
  3. AWS Lambda 関数は引き渡された Amazon S3 オブジェクトの情報からファイルを取得し、データを読み込む。読み込んだデータを Amazon DynamoDB テーブルにインポートする。

サンプルデータ

Amazon DynamoDB テーブル

以下の Amazon DynamoDB テーブルにデータをインポートしたいと思います。

menu
パーティションキー
文字列
id
ソートキー
数値
neta
menu1 1 いか
menu1 2 たこ
menu2 1 海鮮丼

インポート用データ (JSON)

上記のデータを JSON 形式にすると、以下のようになります。

[
  {
    "menu": "menu1",
    "id": 1,
    "neta": "いか"
  },
  {
    "menu": "menu1",
    "id": 2,
    "neta": "たこ"
  },
  {
    "menu": "menu2",
    "id": 1,
    "neta": "海鮮丼"
  }
]

データを Excel や CSV で作成した場合、ネット上のツール等を利用して JSON に変換することができます。セキュリティ上不安な場合は EXCEL/CSV データを JSON に変換するスクリプトを自分で書きましょう。

JSON ファイルの文字コードは UTF-8N (BOMなしUTF-8) で保存しましょう。UTF-8 ですと、AWS Cloud9 や AWS Lambda 等でファイルを扱うときにエラーになります。エディタを使うときは要注意です。

サンプル AWS CloudFormation テンプレート

一連の仕組みを AWS CloudFormation テンプレートに落とし込んでみます。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates sample resources for importing data from S3 to DynamoDB with Lambda.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: The system name.
    Default: example
    MaxLength: 10
    MinLength: 1

Resources:
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
  DynamoExample:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub menutable-${SystemName}
      AttributeDefinitions:
        - AttributeName: menu
          AttributeType: S
        - AttributeName: id
          AttributeType: N
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: menu
          KeyType: HASH
        - AttributeName: id
          KeyType: RANGE
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: false
      Tags:
        - Key: Cost
          Value: !Ref SystemName

# ------------------------------------------------------------#
# Lambda Execution Role to access S3 and DynamoDB (IAM)
# ------------------------------------------------------------#
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub LambdaExecutionRoleDynamodbAccess-${SystemName}
      Description: This role allows Lambda functions to access S3 and DynamoDB.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
        - arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess

# ------------------------------------------------------------#
# S3 Lambda Invocation Permission
# ------------------------------------------------------------#
  S3LambdaDynamoImportInvocationPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt LambdaDynamoImport.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceAccount: !Sub ${AWS::AccountId}
      SourceArn: !Sub arn:aws:s3:::dataimport-${SystemName}
    DependsOn:
      - LambdaDynamoImport

# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketDataImport:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub dataimport-${SystemName}
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 30
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Function: !GetAtt LambdaDynamoImport.Arn
      Tags:
        - Key: Cost
          Value: !Ref SystemName
    DependsOn:
      - S3LambdaDynamoImportInvocationPermission

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaDynamoImport:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub LambdaDynamoImport-${SystemName}
      Description: !Sub A Lambda Function to import JSON data in S3 into DynamoDB table, called from a S3 event notification.
      Runtime: python3.9
      Timeout: 60
      MemorySize: 128
      Role: !GetAtt LambdaExecutionRole.Arn
      Handler: index.lambda_handler
      Tags:
        - Key: Cost
          Value: !Ref SystemName
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import datetime
          from urllib.parse import unquote_plus
          from botocore.exceptions import ClientError
          dynamodb = boto3.resource('dynamodb', region_name='${AWS::Region}')
          table = dynamodb.Table('${DynamoExample}')
          s3 = boto3.client('s3')
          def lambda_handler(event, context):
            # S3イベント通知から連携されたJSONデータのオブジェクト名、キーを取得
            bucket = event['Records'][0]['s3']['bucket']['name']
            key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
            # S3上のJSONデータを取得
            try:
              s3data = s3.get_object(
                Bucket=bucket,
                Key=key,
                ResponseContentType='application/json'
              )
              data = json.loads(s3data['Body'].read().decode('utf-8'))
            except ClientError as e:
              print(e)
            # JSONデータをDynamoDBテーブルにインポート
            with table.batch_writer() as batch:
              for row in data:
                menu = row['menu']
                id = int(row['id'])
                neta = row['neta']
                try:
                  batch.put_item(
                    Item={
                      'menu': menu,
                      'id': id,
                      'neta': neta
                    }
                  )
                except ClientError as e:
                  print(e)
    DependsOn:
      - DynamoExample
      - LambdaExecutionRole
  • IAM ロールの権限はかなり甘めに作っているので、実運用ではより厳格に制御する方が望ましいです。
  • Amazon S3 バケットに配置されたファイルは30日間で自動的に削除されるようライフサイクル設定がされています。
  • Amazon S3 バケットから AWS Lambda にイベント通知を設定するとともに、必要なパーミッションを付与しています。

動作イメージ

実際に JSON データを Amazon S3 バケットに置いてみます。

Amazon DynamoDB テーブルのデータを見てみるとデータが入っていることが確認できました。

今回のインポートするためのコードは、追記型になります。既に同じパーティションキー、ソートキーのデータが入っていた場合は、上書きされます。(アップサートのイメージ)

まとめ

いかがでしたでしょうか?

汎用的な仕組みではないですが、データフォーマットに合わせていくらでも応用は可能です。本記事がみなさまのお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリとコンテンツづくりに勤しんでいます。React で SPA を書き始めたら快適すぎて、他の言語には戻れなくなりました。サーバーレス & React 仲間を増やしたいです。AWSは好きですが、それよりもAWSすげー!って気持ちの方が強いです。
取得資格:AWS 認定は12資格、ITサービスマネージャ、ITIL v3 Expert 等
2020 - 2023 Japan AWS Top Engineer 受賞
2022 - 2023 Japan AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS Amplify / AWS AppSync / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする
クラウドに強いによるエンジニアブログです。
SCSKは専門性と豊富な実績を活かしたクラウドサービス USiZE(ユーサイズ)を提供しています。
USiZEサービスサイトでは、お客様のDX推進をワンストップで支援するサービスの詳細や導入事例を紹介しています。
AWSクラウドソリューションデータベース
シェアする
タイトルとURLをコピーしました