Amazon S3 イベント通知から AWS Step Functions ステートマシンを呼び出す [AWS CloudFormation テンプレート付き]

こんにちは、広野です。

Amazon S3 イベント通知を使用して、特定のオブジェクトへのアクションをトリガーに AWS Step Functions ステートマシンを呼び出したいケースがあると思います。

しかし、Amazon S3 イベント通知から呼び出せる AWS サービスは執筆時点では 4 つに限られており、AWS Step Funcitons は含まれていません。

ERROR: The request could not be satisfied

また、AWS 公式サイトでは Amazon EventBridge を介したステートマシンの呼出方法が紹介されています。

ERROR: The request could not be satisfied

私としては、馴染みのある AWS Lambda 関数を介して呼び出す方がわかりやすかったため、いつもそのようにしています。本記事ではその方法を紹介します。

やりたいこと

  1. Amazon S3 イベント通知から、AWS Lambda 関数経由で AWS Step Functions ステートマシンを呼び出す。
  2. 呼び出されたステートマシンに、トリガーとなった Amazon S3 オブジェクトのバケット名とキー情報を渡す。
  3. ステートマシンは、Amazon S3 オブジェクト内のデータを読み込む。(ここでは JSON データとする)

解説

詳細パラメータは本記事最下部の AWS CloudFormation テンプレートをご覧頂くか、実際にスタックを作成して頂いて、できたものをご覧頂ければと思います。

Amazon S3

Amazon S3 には、testdata.json という名前のオブジェクトが作成/更新されたときにイベント通知(AWS Lambda 関数の呼出) が発動するようバケットに設定します。

AWS Lambda 関数 (Python)

AWS Step Functions ステートマシンを呼び出す AWS Lambda 関数は、以下のようになります。

Amazon S3 イベント通知から渡されたバケット名、キー名を取得するときに unquote_plus というモジュールを使用しないとうまく受け取れません。そして、その情報をそのままステートマシンを呼び出すコードに渡してあげます。

import boto3
import json
import datetime
from urllib.parse import unquote_plus
def datetimeconverter(o):
  if isinstance(o, datetime.datetime):
    return str(o)
def lambda_handler(event, context):
  client = boto3.client('stepfunctions')
  try:
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    params = {
      "bucket": bucket,
      "key": key
    }
    res = client.start_execution(
      stateMachineArn='ステートマシンのARNを記載',
      input=json.dumps(params)
    )
  except Exception as e:
    print(e)
  else:
    print(json.dumps(res, indent=4, default=datetimeconverter))

AWS Step Functions ステートマシン

呼び出されたステートマシンは、AWS Lambda 関数からバケット名、キー名を受け取ります。

受け取ったデータは “$.bucket” と “$.key” というように表現でき、これを AWS Step Functions ネイティブの S3:GetObject API のパラメータとして埋め込みます。これで、Amazon S3 オブジェクトからデータの中身を取得できます。

Amazon S3 オブジェクトから取得したデータを次のステップに渡すために、以下のように記述します。

取得したデータは $.Body 内にあるのですが、文字列型として取得してしまうので、States.StringToJson という関数を使用して JSON オブジェクトに変換します。その状態で、”body.$” キーの値として格納することで、次のステップに “body”: JSON データ という整形されたフォーマットでデータを渡すことができます。

本記事では省略しますが、以降は次のステップを作成して取得した JSON データを処理していきます。

AWS CloudFormation テンプレート

一連のリソースを作成できる AWS CloudFormation テンプレートを貼っておきます。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates sample resources for calling a Step Functions State Machine from a S3 event notification.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: The system name.
    Default: example
    MaxLength: 10
    MinLength: 1

Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketDataimport:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub example-${SystemName}-dataimport
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 30
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Function: !GetAtt LambdaCallStateMachine.Arn
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: testdata.json
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
    DependsOn:
      - LambdaCallStateMachine
      - S3LambdaCallStateMachineInvocationPermission

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaCallStateMachine:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub example-CallStateMachine-${SystemName}
      Description: !Sub Lambda Function to call the state machine for example-${SystemName}, called from S3 trigger.
      Runtime: python3.9
      Timeout: 3
      MemorySize: 128
      Role: !GetAtt LambdaSfnInvocationRole.Arn
      Handler: index.lambda_handler
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import datetime
          from urllib.parse import unquote_plus
          def datetimeconverter(o):
            if isinstance(o, datetime.datetime):
              return str(o)
          def lambda_handler(event, context):
            client = boto3.client('stepfunctions')
            try:
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              params = {
                "bucket": bucket,
                "key": key
              }
              res = client.start_execution(
                stateMachineArn='${StateMachine}',
                input=json.dumps(params)
              )
            except Exception as e:
              print(e)
            else:
              print(json.dumps(res, indent=4, default=datetimeconverter))
    DependsOn:
      - StateMachine
      - LambdaSfnInvocationRole

# ------------------------------------------------------------#
# State Machine
# ------------------------------------------------------------#
  StateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub example-statemachine-${SystemName}
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DSSystemName: !Sub ${SystemName}
      DefinitionString: |-
        {
          "Comment": "State machine for example-${DSSystemName}",
          "StartAt": "GetData",
          "States": {
            "GetData": {
              "Type": "Task",
              "Parameters": {
                "Bucket.$": "$.bucket",
                "Key.$": "$.key"
              },
              "Resource": "arn:aws:states:::aws-sdk:s3:getObject",
              "ResultSelector": {
                "body.$": "States.StringToJson($.Body)"
              },
              "End": true,
              "Comment": "Get data from JSON in S3"
            }
          },
          "TimeoutSeconds": 3600
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachine.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineExecutionRole.Arn
      TracingConfiguration:
        Enabled: false
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
    DependsOn:
      - LogGroupStateMachine

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachine:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/example-statemachine-${SystemName}
      RetentionInDays: 365
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# S3 Lambda Invocation Permission
# ------------------------------------------------------------#
  S3LambdaCallStateMachineInvocationPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt LambdaCallStateMachine.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceAccount: !Sub ${AWS::AccountId}
      SourceArn: !Sub arn:aws:s3:::example-${SystemName}-dataimport
    DependsOn:
      - LambdaCallStateMachine

# ------------------------------------------------------------#
# Lambda Step Functions Invocation Role (IAM)
# ------------------------------------------------------------#
  LambdaSfnInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-SfnInvoke-${SystemName}
      Description: This role allows Lambda functions to start state machines of Step Functions.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StateMachineExecutionRole-${SystemName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /service-role/
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess

まとめ

いかがでしたでしょうか?

内容的には大したことないのですが、いざやろうとすると各所に設定が必要で面倒くさいです。この手の構成はよく使うので、AWS CloudFormation テンプレートにしておくと似たような構成をつくるときに非常に便利なのでお勧めです。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリとコンテンツづくりに勤しんでいます。React で SPA を書き始めたら快適すぎて、他の言語には戻れなくなりました。サーバーレス & React 仲間を増やしたいです。AWSは好きですが、それよりもAWSすげー!って気持ちの方が強いです。
取得資格:AWS 認定は12資格、ITサービスマネージャ、ITIL v3 Expert 等
2020 - 2023 Japan AWS Top Engineer 受賞
2022 - 2023 Japan AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS Amplify / AWS AppSync / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする
クラウドに強いによるエンジニアブログです。
SCSKは専門性と豊富な実績を活かしたクラウドサービス USiZE(ユーサイズ)を提供しています。
USiZEサービスサイトでは、お客様のDX推進をワンストップで支援するサービスの詳細や導入事例を紹介しています。
AWSクラウド
シェアする
タイトルとURLをコピーしました