こんにちは、広野です。
本記事は、以下の記事の続編です。
AWS Cloud9 上から Python スクリプトで Amazon DynamoDB にデータをインポートする方法を紹介しましたが、今回は Amazon S3 にデータを置いたら自動で Amazon DynamoDB にインポートされる仕組みにアレンジしてみます。
特定の Amazon DynamoDB テーブルを何度も更新する運用が必要な場合、データファイルさえ作ってしまえば Amazon S3 バケットに置くだけで更新ができるようになり、楽です。
実装は AWS CloudFormation で行います。サンプルテンプレートを紹介しますのでパラメータ等を適宜変更すればすぐにお使い頂けます。とりあえず動きを試したいだけでしたら、このままでも問題ありません。
やりたいこと
- Amazon S3 バケットに JSON データを配置したら、自動的にそのデータファイルを読み込んで特定の Amazon DynamoDB テーブルにデータをインポートさせたい。
- データインポート手段には AWS Lambda (Python) を使用する。
- インポートしたいデータは JSON 形式とする。
実現方法
- Amazon S3 バケットにフォルダを作成しておく。
- そのフォルダにファイルが配置されると、Amazon S3 イベント通知により AWS Lambda 関数を呼び出す。
- AWS Lambda 関数は引き渡された Amazon S3 オブジェクトの情報からファイルを取得し、データを読み込む。読み込んだデータを Amazon DynamoDB テーブルにインポートする。
サンプルデータ
Amazon DynamoDB テーブル
以下の Amazon DynamoDB テーブルにデータをインポートしたいと思います。
menu パーティションキー 文字列 |
id ソートキー 数値 |
neta |
menu1 | 1 | いか |
menu1 | 2 | たこ |
menu2 | 1 | 海鮮丼 |
インポート用データ (JSON)
上記のデータを JSON 形式にすると、以下のようになります。
[ { "menu": "menu1", "id": 1, "neta": "いか" }, { "menu": "menu1", "id": 2, "neta": "たこ" }, { "menu": "menu2", "id": 1, "neta": "海鮮丼" } ]
データを Excel や CSV で作成した場合、ネット上のツール等を利用して JSON に変換することができます。セキュリティ上不安な場合は EXCEL/CSV データを JSON に変換するスクリプトを自分で書きましょう。
JSON ファイルの文字コードは UTF-8N (BOMなしUTF-8) で保存しましょう。UTF-8 ですと、AWS Cloud9 や AWS Lambda 等でファイルを扱うときにエラーになります。エディタを使うときは要注意です。
サンプル AWS CloudFormation テンプレート
一連の仕組みを AWS CloudFormation テンプレートに落とし込んでみます。
AWSTemplateFormatVersion: 2010-09-09 Description: The CloudFormation template that creates sample resources for importing data from S3 to DynamoDB with Lambda. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SystemName: Type: String Description: The system name. Default: example MaxLength: 10 MinLength: 1 Resources: # ------------------------------------------------------------# # DynamoDB # ------------------------------------------------------------# DynamoExample: Type: AWS::DynamoDB::Table Properties: TableName: !Sub menutable-${SystemName} AttributeDefinitions: - AttributeName: menu AttributeType: S - AttributeName: id AttributeType: N BillingMode: PAY_PER_REQUEST KeySchema: - AttributeName: menu KeyType: HASH - AttributeName: id KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: false Tags: - Key: Cost Value: !Ref SystemName # ------------------------------------------------------------# # Lambda Execution Role to access S3 and DynamoDB (IAM) # ------------------------------------------------------------# LambdaExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub LambdaExecutionRoleDynamodbAccess-${SystemName} Description: This role allows Lambda functions to access S3 and DynamoDB. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess # ------------------------------------------------------------# # S3 Lambda Invocation Permission # ------------------------------------------------------------# S3LambdaDynamoImportInvocationPermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt LambdaDynamoImport.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Sub ${AWS::AccountId} SourceArn: !Sub arn:aws:s3:::dataimport-${SystemName} DependsOn: - LambdaDynamoImport # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketDataImport: Type: AWS::S3::Bucket Properties: BucketName: !Sub dataimport-${SystemName} LifecycleConfiguration: Rules: - Id: AutoDelete Status: Enabled ExpirationInDays: 30 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:*" Function: !GetAtt LambdaDynamoImport.Arn Tags: - Key: Cost Value: !Ref SystemName DependsOn: - S3LambdaDynamoImportInvocationPermission # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaDynamoImport: Type: AWS::Lambda::Function Properties: FunctionName: !Sub LambdaDynamoImport-${SystemName} Description: !Sub A Lambda Function to import JSON data in S3 into DynamoDB table, called from a S3 event notification. Runtime: python3.9 Timeout: 60 MemorySize: 128 Role: !GetAtt LambdaExecutionRole.Arn Handler: index.lambda_handler Tags: - Key: Cost Value: !Ref SystemName Code: ZipFile: !Sub | import boto3 import json import datetime from urllib.parse import unquote_plus from botocore.exceptions import ClientError dynamodb = boto3.resource('dynamodb', region_name='${AWS::Region}') table = dynamodb.Table('${DynamoExample}') s3 = boto3.client('s3') def lambda_handler(event, context): # S3イベント通知から連携されたJSONデータのオブジェクト名、キーを取得 bucket = event['Records'][0]['s3']['bucket']['name'] key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') # S3上のJSONデータを取得 try: s3data = s3.get_object( Bucket=bucket, Key=key, ResponseContentType='application/json' ) data = json.loads(s3data['Body'].read().decode('utf-8')) except ClientError as e: print(e) # JSONデータをDynamoDBテーブルにインポート with table.batch_writer() as batch: for row in data: menu = row['menu'] id = int(row['id']) neta = row['neta'] try: batch.put_item( Item={ 'menu': menu, 'id': id, 'neta': neta } ) except ClientError as e: print(e) DependsOn: - DynamoExample - LambdaExecutionRole
- IAM ロールの権限はかなり甘めに作っているので、実運用ではより厳格に制御する方が望ましいです。
- Amazon S3 バケットに配置されたファイルは30日間で自動的に削除されるようライフサイクル設定がされています。
- Amazon S3 バケットから AWS Lambda にイベント通知を設定するとともに、必要なパーミッションを付与しています。
動作イメージ
実際に JSON データを Amazon S3 バケットに置いてみます。
Amazon DynamoDB テーブルのデータを見てみるとデータが入っていることが確認できました。
今回のインポートするためのコードは、追記型になります。既に同じパーティションキー、ソートキーのデータが入っていた場合は、上書きされます。(アップサートのイメージ)
まとめ
いかがでしたでしょうか?
汎用的な仕組みではないですが、データフォーマットに合わせていくらでも応用は可能です。本記事がみなさまのお役に立てれば幸いです。