こんにちは、広野です。
Amazon S3 イベント通知を使用して、特定のオブジェクトへのアクションをトリガーに AWS Step Functions ステートマシンを呼び出したいケースがあると思います。
しかし、Amazon S3 イベント通知から呼び出せる AWS サービスは執筆時点では 4 つに限られており、AWS Step Funcitons は含まれていません。
また、AWS 公式サイトでは Amazon EventBridge を介したステートマシンの呼出方法が紹介されています。
私としては、馴染みのある AWS Lambda 関数を介して呼び出す方がわかりやすかったため、いつもそのようにしています。本記事ではその方法を紹介します。
やりたいこと
- Amazon S3 イベント通知から、AWS Lambda 関数経由で AWS Step Functions ステートマシンを呼び出す。
- 呼び出されたステートマシンに、トリガーとなった Amazon S3 オブジェクトのバケット名とキー情報を渡す。
- ステートマシンは、Amazon S3 オブジェクト内のデータを読み込む。(ここでは JSON データとする)

解説
詳細パラメータは本記事最下部の AWS CloudFormation テンプレートをご覧頂くか、実際にスタックを作成して頂いて、できたものをご覧頂ければと思います。
Amazon S3
Amazon S3 には、testdata.json という名前のオブジェクトが作成/更新されたときにイベント通知(AWS Lambda 関数の呼出) が発動するようバケットに設定します。
AWS Lambda 関数 (Python)
AWS Step Functions ステートマシンを呼び出す AWS Lambda 関数は、以下のようになります。
Amazon S3 イベント通知から渡されたバケット名、キー名を取得するときに unquote_plus というモジュールを使用しないとうまく受け取れません。そして、その情報をそのままステートマシンを呼び出すコードに渡してあげます。
import boto3
import json
import datetime
from urllib.parse import unquote_plus
def datetimeconverter(o):
if isinstance(o, datetime.datetime):
return str(o)
def lambda_handler(event, context):
client = boto3.client('stepfunctions')
try:
bucket = event['Records'][0]['s3']['bucket']['name']
key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
params = {
"bucket": bucket,
"key": key
}
res = client.start_execution(
stateMachineArn='ステートマシンのARNを記載',
input=json.dumps(params)
)
except Exception as e:
print(e)
else:
print(json.dumps(res, indent=4, default=datetimeconverter))
AWS Step Functions ステートマシン
呼び出されたステートマシンは、AWS Lambda 関数からバケット名、キー名を受け取ります。
受け取ったデータは “$.bucket” と “$.key” というように表現でき、これを AWS Step Functions ネイティブの S3:GetObject API のパラメータとして埋め込みます。これで、Amazon S3 オブジェクトからデータの中身を取得できます。

Amazon S3 オブジェクトから取得したデータを次のステップに渡すために、以下のように記述します。

取得したデータは $.Body 内にあるのですが、文字列型として取得してしまうので、States.StringToJson という関数を使用して JSON オブジェクトに変換します。その状態で、”body.$” キーの値として格納することで、次のステップに “body”: JSON データ という整形されたフォーマットでデータを渡すことができます。
本記事では省略しますが、以降は次のステップを作成して取得した JSON データを処理していきます。
AWS CloudFormation テンプレート
一連のリソースを作成できる AWS CloudFormation テンプレートを貼っておきます。
AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates sample resources for calling a Step Functions State Machine from a S3 event notification.
# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
SystemName:
Type: String
Description: The system name.
Default: example
MaxLength: 10
MinLength: 1
Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
S3BucketDataimport:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub example-${SystemName}-dataimport
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: 30
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*"
Function: !GetAtt LambdaCallStateMachine.Arn
Filter:
S3Key:
Rules:
- Name: suffix
Value: testdata.json
Tags:
- Key: Cost
Value: !Sub example-${SystemName}
DependsOn:
- LambdaCallStateMachine
- S3LambdaCallStateMachineInvocationPermission
# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
LambdaCallStateMachine:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub example-CallStateMachine-${SystemName}
Description: !Sub Lambda Function to call the state machine for example-${SystemName}, called from S3 trigger.
Runtime: python3.9
Timeout: 3
MemorySize: 128
Role: !GetAtt LambdaSfnInvocationRole.Arn
Handler: index.lambda_handler
Tags:
- Key: Cost
Value: !Sub example-${SystemName}
Code:
ZipFile: !Sub |
import boto3
import json
import datetime
from urllib.parse import unquote_plus
def datetimeconverter(o):
if isinstance(o, datetime.datetime):
return str(o)
def lambda_handler(event, context):
client = boto3.client('stepfunctions')
try:
bucket = event['Records'][0]['s3']['bucket']['name']
key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
params = {
"bucket": bucket,
"key": key
}
res = client.start_execution(
stateMachineArn='${StateMachine}',
input=json.dumps(params)
)
except Exception as e:
print(e)
else:
print(json.dumps(res, indent=4, default=datetimeconverter))
DependsOn:
- StateMachine
- LambdaSfnInvocationRole
# ------------------------------------------------------------#
# State Machine
# ------------------------------------------------------------#
StateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: !Sub example-statemachine-${SystemName}
StateMachineType: STANDARD
DefinitionSubstitutions:
DSSystemName: !Sub ${SystemName}
DefinitionString: |-
{
"Comment": "State machine for example-${DSSystemName}",
"StartAt": "GetData",
"States": {
"GetData": {
"Type": "Task",
"Parameters": {
"Bucket.$": "$.bucket",
"Key.$": "$.key"
},
"Resource": "arn:aws:states:::aws-sdk:s3:getObject",
"ResultSelector": {
"body.$": "States.StringToJson($.Body)"
},
"End": true,
"Comment": "Get data from JSON in S3"
}
},
"TimeoutSeconds": 3600
}
LoggingConfiguration:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt LogGroupStateMachine.Arn
IncludeExecutionData: true
Level: ERROR
RoleArn: !GetAtt StateMachineExecutionRole.Arn
TracingConfiguration:
Enabled: false
Tags:
- Key: Cost
Value: !Sub example-${SystemName}
DependsOn:
- LogGroupStateMachine
# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
LogGroupStateMachine:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/vendedlogs/states/example-statemachine-${SystemName}
RetentionInDays: 365
Tags:
- Key: Cost
Value: !Sub example-${SystemName}
# ------------------------------------------------------------#
# S3 Lambda Invocation Permission
# ------------------------------------------------------------#
S3LambdaCallStateMachineInvocationPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt LambdaCallStateMachine.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceAccount: !Sub ${AWS::AccountId}
SourceArn: !Sub arn:aws:s3:::example-${SystemName}-dataimport
DependsOn:
- LambdaCallStateMachine
# ------------------------------------------------------------#
# Lambda Step Functions Invocation Role (IAM)
# ------------------------------------------------------------#
LambdaSfnInvocationRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub example-SfnInvoke-${SystemName}
Description: This role allows Lambda functions to start state machines of Step Functions.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
- arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
StateMachineExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub example-StateMachineExecutionRole-${SystemName}
Description: This role allows State Machines to call specified AWS resources.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
states.amazonaws.com
Action:
- sts:AssumeRole
Path: /service-role/
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaRole
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
- arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess
まとめ
いかがでしたでしょうか?
内容的には大したことないのですが、いざやろうとすると各所に設定が必要で面倒くさいです。この手の構成はよく使うので、AWS CloudFormation テンプレートにしておくと似たような構成をつくるときに非常に便利なのでお勧めです。
本記事が皆様のお役に立てれば幸いです。
