こんにちは、広野です。
Amazon S3 イベント通知を使用して、特定のオブジェクトへのアクションをトリガーに AWS Step Functions ステートマシンを呼び出したいケースがあると思います。
しかし、Amazon S3 イベント通知から呼び出せる AWS サービスは執筆時点では 4 つに限られており、AWS Step Funcitons は含まれていません。
また、AWS 公式サイトでは Amazon EventBridge を介したステートマシンの呼出方法が紹介されています。
私としては、馴染みのある AWS Lambda 関数を介して呼び出す方がわかりやすかったため、いつもそのようにしています。本記事ではその方法を紹介します。
やりたいこと
- Amazon S3 イベント通知から、AWS Lambda 関数経由で AWS Step Functions ステートマシンを呼び出す。
- 呼び出されたステートマシンに、トリガーとなった Amazon S3 オブジェクトのバケット名とキー情報を渡す。
- ステートマシンは、Amazon S3 オブジェクト内のデータを読み込む。(ここでは JSON データとする)
解説
詳細パラメータは本記事最下部の AWS CloudFormation テンプレートをご覧頂くか、実際にスタックを作成して頂いて、できたものをご覧頂ければと思います。
Amazon S3
Amazon S3 には、testdata.json という名前のオブジェクトが作成/更新されたときにイベント通知(AWS Lambda 関数の呼出) が発動するようバケットに設定します。
AWS Lambda 関数 (Python)
AWS Step Functions ステートマシンを呼び出す AWS Lambda 関数は、以下のようになります。
Amazon S3 イベント通知から渡されたバケット名、キー名を取得するときに unquote_plus というモジュールを使用しないとうまく受け取れません。そして、その情報をそのままステートマシンを呼び出すコードに渡してあげます。
import boto3 import json import datetime from urllib.parse import unquote_plus def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): client = boto3.client('stepfunctions') try: bucket = event['Records'][0]['s3']['bucket']['name'] key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') params = { "bucket": bucket, "key": key } res = client.start_execution( stateMachineArn='ステートマシンのARNを記載', input=json.dumps(params) ) except Exception as e: print(e) else: print(json.dumps(res, indent=4, default=datetimeconverter))
AWS Step Functions ステートマシン
呼び出されたステートマシンは、AWS Lambda 関数からバケット名、キー名を受け取ります。
受け取ったデータは “$.bucket” と “$.key” というように表現でき、これを AWS Step Functions ネイティブの S3:GetObject API のパラメータとして埋め込みます。これで、Amazon S3 オブジェクトからデータの中身を取得できます。
Amazon S3 オブジェクトから取得したデータを次のステップに渡すために、以下のように記述します。
取得したデータは $.Body 内にあるのですが、文字列型として取得してしまうので、States.StringToJson という関数を使用して JSON オブジェクトに変換します。その状態で、”body.$” キーの値として格納することで、次のステップに “body”: JSON データ という整形されたフォーマットでデータを渡すことができます。
本記事では省略しますが、以降は次のステップを作成して取得した JSON データを処理していきます。
AWS CloudFormation テンプレート
一連のリソースを作成できる AWS CloudFormation テンプレートを貼っておきます。
AWSTemplateFormatVersion: 2010-09-09 Description: The CloudFormation template that creates sample resources for calling a Step Functions State Machine from a S3 event notification. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SystemName: Type: String Description: The system name. Default: example MaxLength: 10 MinLength: 1 Resources: # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketDataimport: Type: AWS::S3::Bucket Properties: BucketName: !Sub example-${SystemName}-dataimport LifecycleConfiguration: Rules: - Id: AutoDelete Status: Enabled ExpirationInDays: 30 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:*" Function: !GetAtt LambdaCallStateMachine.Arn Filter: S3Key: Rules: - Name: suffix Value: testdata.json Tags: - Key: Cost Value: !Sub example-${SystemName} DependsOn: - LambdaCallStateMachine - S3LambdaCallStateMachineInvocationPermission # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaCallStateMachine: Type: AWS::Lambda::Function Properties: FunctionName: !Sub example-CallStateMachine-${SystemName} Description: !Sub Lambda Function to call the state machine for example-${SystemName}, called from S3 trigger. Runtime: python3.9 Timeout: 3 MemorySize: 128 Role: !GetAtt LambdaSfnInvocationRole.Arn Handler: index.lambda_handler Tags: - Key: Cost Value: !Sub example-${SystemName} Code: ZipFile: !Sub | import boto3 import json import datetime from urllib.parse import unquote_plus def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): client = boto3.client('stepfunctions') try: bucket = event['Records'][0]['s3']['bucket']['name'] key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') params = { "bucket": bucket, "key": key } res = client.start_execution( stateMachineArn='${StateMachine}', input=json.dumps(params) ) except Exception as e: print(e) else: print(json.dumps(res, indent=4, default=datetimeconverter)) DependsOn: - StateMachine - LambdaSfnInvocationRole # ------------------------------------------------------------# # State Machine # ------------------------------------------------------------# StateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub example-statemachine-${SystemName} StateMachineType: STANDARD DefinitionSubstitutions: DSSystemName: !Sub ${SystemName} DefinitionString: |- { "Comment": "State machine for example-${DSSystemName}", "StartAt": "GetData", "States": { "GetData": { "Type": "Task", "Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.key" }, "Resource": "arn:aws:states:::aws-sdk:s3:getObject", "ResultSelector": { "body.$": "States.StringToJson($.Body)" }, "End": true, "Comment": "Get data from JSON in S3" } }, "TimeoutSeconds": 3600 } LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt LogGroupStateMachine.Arn IncludeExecutionData: true Level: ERROR RoleArn: !GetAtt StateMachineExecutionRole.Arn TracingConfiguration: Enabled: false Tags: - Key: Cost Value: !Sub example-${SystemName} DependsOn: - LogGroupStateMachine # ------------------------------------------------------------# # State Machine Execution LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupStateMachine: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/vendedlogs/states/example-statemachine-${SystemName} RetentionInDays: 365 Tags: - Key: Cost Value: !Sub example-${SystemName} # ------------------------------------------------------------# # S3 Lambda Invocation Permission # ------------------------------------------------------------# S3LambdaCallStateMachineInvocationPermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt LambdaCallStateMachine.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Sub ${AWS::AccountId} SourceArn: !Sub arn:aws:s3:::example-${SystemName}-dataimport DependsOn: - LambdaCallStateMachine # ------------------------------------------------------------# # Lambda Step Functions Invocation Role (IAM) # ------------------------------------------------------------# LambdaSfnInvocationRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-SfnInvoke-${SystemName} Description: This role allows Lambda functions to start state machines of Step Functions. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess # ------------------------------------------------------------# # State Machine Execution Role (IAM) # ------------------------------------------------------------# StateMachineExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StateMachineExecutionRole-${SystemName} Description: This role allows State Machines to call specified AWS resources. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: - sts:AssumeRole Path: /service-role/ ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaRole - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess
まとめ
いかがでしたでしょうか?
内容的には大したことないのですが、いざやろうとすると各所に設定が必要で面倒くさいです。この手の構成はよく使うので、AWS CloudFormation テンプレートにしておくと似たような構成をつくるときに非常に便利なのでお勧めです。
本記事が皆様のお役に立てれば幸いです。