こんにちは、広野です。
本記事は、以下の記事の続編です。
AWS Cloud9 上から Python スクリプトで Amazon DynamoDB にデータをインポートする方法を紹介しましたが、今回は Amazon S3 にデータを置いたら自動で Amazon DynamoDB にインポートされる仕組みにアレンジしてみます。
特定の Amazon DynamoDB テーブルを何度も更新する運用が必要な場合、データファイルさえ作ってしまえば Amazon S3 バケットに置くだけで更新ができるようになり、楽です。
実装は AWS CloudFormation で行います。サンプルテンプレートを紹介しますのでパラメータ等を適宜変更すればすぐにお使い頂けます。とりあえず動きを試したいだけでしたら、このままでも問題ありません。
やりたいこと
- Amazon S3 バケットに JSON データを配置したら、自動的にそのデータファイルを読み込んで特定の Amazon DynamoDB テーブルにデータをインポートさせたい。
- データインポート手段には AWS Lambda (Python) を使用する。
- インポートしたいデータは JSON 形式とする。
実現方法
- Amazon S3 バケットにフォルダを作成しておく。
- そのフォルダにファイルが配置されると、Amazon S3 イベント通知により AWS Lambda 関数を呼び出す。
- AWS Lambda 関数は引き渡された Amazon S3 オブジェクトの情報からファイルを取得し、データを読み込む。読み込んだデータを Amazon DynamoDB テーブルにインポートする。

サンプルデータ
Amazon DynamoDB テーブル
以下の Amazon DynamoDB テーブルにデータをインポートしたいと思います。
| menu パーティションキー 文字列 |
id ソートキー 数値 |
neta |
| menu1 | 1 | いか |
| menu1 | 2 | たこ |
| menu2 | 1 | 海鮮丼 |
インポート用データ (JSON)
上記のデータを JSON 形式にすると、以下のようになります。
[
{
"menu": "menu1",
"id": 1,
"neta": "いか"
},
{
"menu": "menu1",
"id": 2,
"neta": "たこ"
},
{
"menu": "menu2",
"id": 1,
"neta": "海鮮丼"
}
]
データを Excel や CSV で作成した場合、ネット上のツール等を利用して JSON に変換することができます。セキュリティ上不安な場合は EXCEL/CSV データを JSON に変換するスクリプトを自分で書きましょう。
JSON ファイルの文字コードは UTF-8N (BOMなしUTF-8) で保存しましょう。UTF-8 ですと、AWS Cloud9 や AWS Lambda 等でファイルを扱うときにエラーになります。エディタを使うときは要注意です。
サンプル AWS CloudFormation テンプレート
一連の仕組みを AWS CloudFormation テンプレートに落とし込んでみます。
AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates sample resources for importing data from S3 to DynamoDB with Lambda.
# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
SystemName:
Type: String
Description: The system name.
Default: example
MaxLength: 10
MinLength: 1
Resources:
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
DynamoExample:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub menutable-${SystemName}
AttributeDefinitions:
- AttributeName: menu
AttributeType: S
- AttributeName: id
AttributeType: N
BillingMode: PAY_PER_REQUEST
KeySchema:
- AttributeName: menu
KeyType: HASH
- AttributeName: id
KeyType: RANGE
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: false
Tags:
- Key: Cost
Value: !Ref SystemName
# ------------------------------------------------------------#
# Lambda Execution Role to access S3 and DynamoDB (IAM)
# ------------------------------------------------------------#
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub LambdaExecutionRoleDynamodbAccess-${SystemName}
Description: This role allows Lambda functions to access S3 and DynamoDB.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
- arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole
- arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
- arn:aws:iam::aws:policy/AmazonS3FullAccess
# ------------------------------------------------------------#
# S3 Lambda Invocation Permission
# ------------------------------------------------------------#
S3LambdaDynamoImportInvocationPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt LambdaDynamoImport.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceAccount: !Sub ${AWS::AccountId}
SourceArn: !Sub arn:aws:s3:::dataimport-${SystemName}
DependsOn:
- LambdaDynamoImport
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
S3BucketDataImport:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub dataimport-${SystemName}
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: 30
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*"
Function: !GetAtt LambdaDynamoImport.Arn
Tags:
- Key: Cost
Value: !Ref SystemName
DependsOn:
- S3LambdaDynamoImportInvocationPermission
# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
LambdaDynamoImport:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub LambdaDynamoImport-${SystemName}
Description: !Sub A Lambda Function to import JSON data in S3 into DynamoDB table, called from a S3 event notification.
Runtime: python3.9
Timeout: 60
MemorySize: 128
Role: !GetAtt LambdaExecutionRole.Arn
Handler: index.lambda_handler
Tags:
- Key: Cost
Value: !Ref SystemName
Code:
ZipFile: !Sub |
import boto3
import json
import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb', region_name='${AWS::Region}')
table = dynamodb.Table('${DynamoExample}')
s3 = boto3.client('s3')
def lambda_handler(event, context):
# S3イベント通知から連携されたJSONデータのオブジェクト名、キーを取得
bucket = event['Records'][0]['s3']['bucket']['name']
key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
# S3上のJSONデータを取得
try:
s3data = s3.get_object(
Bucket=bucket,
Key=key,
ResponseContentType='application/json'
)
data = json.loads(s3data['Body'].read().decode('utf-8'))
except ClientError as e:
print(e)
# JSONデータをDynamoDBテーブルにインポート
with table.batch_writer() as batch:
for row in data:
menu = row['menu']
id = int(row['id'])
neta = row['neta']
try:
batch.put_item(
Item={
'menu': menu,
'id': id,
'neta': neta
}
)
except ClientError as e:
print(e)
DependsOn:
- DynamoExample
- LambdaExecutionRole
- IAM ロールの権限はかなり甘めに作っているので、実運用ではより厳格に制御する方が望ましいです。
- Amazon S3 バケットに配置されたファイルは30日間で自動的に削除されるようライフサイクル設定がされています。
- Amazon S3 バケットから AWS Lambda にイベント通知を設定するとともに、必要なパーミッションを付与しています。
動作イメージ
実際に JSON データを Amazon S3 バケットに置いてみます。

Amazon DynamoDB テーブルのデータを見てみるとデータが入っていることが確認できました。

今回のインポートするためのコードは、追記型になります。既に同じパーティションキー、ソートキーのデータが入っていた場合は、上書きされます。(アップサートのイメージ)
まとめ
いかがでしたでしょうか?
汎用的な仕組みではないですが、データフォーマットに合わせていくらでも応用は可能です。本記事がみなさまのお役に立てれば幸いです。

