Amazon S3 イベント通知から AWS Step Functions ステートマシンを呼び出す [AWS CloudFormation テンプレート付き]

こんにちは、広野です。

Amazon S3 イベント通知を使用して、特定のオブジェクトへのアクションをトリガーに AWS Step Functions ステートマシンを呼び出したいケースがあると思います。

しかし、Amazon S3 イベント通知から呼び出せる AWS サービスは執筆時点では 4 つに限られており、AWS Step Funcitons は含まれていません。

イベント通知のタイプおよび送信先 - Amazon Simple Storage Service
Amazon S3 では、通知を発行できるいくつかのイベント通知のタイプと送信先がサポートされています。イベント通知を設定するときに、イベントタイプと送信先を指定できます。

また、AWS 公式サイトでは Amazon EventBridge を介したステートマシンの呼出方法が紹介されています。

Starting a State Machine Execution in Response to Amazon S3 Events - AWS Step Functions
Learn how to start an AWS Step Functions state machine execution using Amazon EventBridge.

私としては、馴染みのある AWS Lambda 関数を介して呼び出す方がわかりやすかったため、いつもそのようにしています。本記事ではその方法を紹介します。

やりたいこと

  1. Amazon S3 イベント通知から、AWS Lambda 関数経由で AWS Step Functions ステートマシンを呼び出す。
  2. 呼び出されたステートマシンに、トリガーとなった Amazon S3 オブジェクトのバケット名とキー情報を渡す。
  3. ステートマシンは、Amazon S3 オブジェクト内のデータを読み込む。(ここでは JSON データとする)

解説

詳細パラメータは本記事最下部の AWS CloudFormation テンプレートをご覧頂くか、実際にスタックを作成して頂いて、できたものをご覧頂ければと思います。

Amazon S3

Amazon S3 には、testdata.json という名前のオブジェクトが作成/更新されたときにイベント通知(AWS Lambda 関数の呼出) が発動するようバケットに設定します。

AWS Lambda 関数 (Python)

AWS Step Functions ステートマシンを呼び出す AWS Lambda 関数は、以下のようになります。

Amazon S3 イベント通知から渡されたバケット名、キー名を取得するときに unquote_plus というモジュールを使用しないとうまく受け取れません。そして、その情報をそのままステートマシンを呼び出すコードに渡してあげます。

import boto3
import json
import datetime
from urllib.parse import unquote_plus
def datetimeconverter(o):
  if isinstance(o, datetime.datetime):
    return str(o)
def lambda_handler(event, context):
  client = boto3.client('stepfunctions')
  try:
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    params = {
      "bucket": bucket,
      "key": key
    }
    res = client.start_execution(
      stateMachineArn='ステートマシンのARNを記載',
      input=json.dumps(params)
    )
  except Exception as e:
    print(e)
  else:
    print(json.dumps(res, indent=4, default=datetimeconverter))

AWS Step Functions ステートマシン

呼び出されたステートマシンは、AWS Lambda 関数からバケット名、キー名を受け取ります。

受け取ったデータは “$.bucket” と “$.key” というように表現でき、これを AWS Step Functions ネイティブの S3:GetObject API のパラメータとして埋め込みます。これで、Amazon S3 オブジェクトからデータの中身を取得できます。

Amazon S3 オブジェクトから取得したデータを次のステップに渡すために、以下のように記述します。

取得したデータは $.Body 内にあるのですが、文字列型として取得してしまうので、States.StringToJson という関数を使用して JSON オブジェクトに変換します。その状態で、”body.$” キーの値として格納することで、次のステップに “body”: JSON データ という整形されたフォーマットでデータを渡すことができます。

本記事では省略しますが、以降は次のステップを作成して取得した JSON データを処理していきます。

AWS CloudFormation テンプレート

一連のリソースを作成できる AWS CloudFormation テンプレートを貼っておきます。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates sample resources for calling a Step Functions State Machine from a S3 event notification.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: The system name.
    Default: example
    MaxLength: 10
    MinLength: 1

Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketDataimport:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub example-${SystemName}-dataimport
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 30
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Function: !GetAtt LambdaCallStateMachine.Arn
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: testdata.json
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
    DependsOn:
      - LambdaCallStateMachine
      - S3LambdaCallStateMachineInvocationPermission

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaCallStateMachine:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub example-CallStateMachine-${SystemName}
      Description: !Sub Lambda Function to call the state machine for example-${SystemName}, called from S3 trigger.
      Runtime: python3.9
      Timeout: 3
      MemorySize: 128
      Role: !GetAtt LambdaSfnInvocationRole.Arn
      Handler: index.lambda_handler
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import datetime
          from urllib.parse import unquote_plus
          def datetimeconverter(o):
            if isinstance(o, datetime.datetime):
              return str(o)
          def lambda_handler(event, context):
            client = boto3.client('stepfunctions')
            try:
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              params = {
                "bucket": bucket,
                "key": key
              }
              res = client.start_execution(
                stateMachineArn='${StateMachine}',
                input=json.dumps(params)
              )
            except Exception as e:
              print(e)
            else:
              print(json.dumps(res, indent=4, default=datetimeconverter))
    DependsOn:
      - StateMachine
      - LambdaSfnInvocationRole

# ------------------------------------------------------------#
# State Machine
# ------------------------------------------------------------#
  StateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub example-statemachine-${SystemName}
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DSSystemName: !Sub ${SystemName}
      DefinitionString: |-
        {
          "Comment": "State machine for example-${DSSystemName}",
          "StartAt": "GetData",
          "States": {
            "GetData": {
              "Type": "Task",
              "Parameters": {
                "Bucket.$": "$.bucket",
                "Key.$": "$.key"
              },
              "Resource": "arn:aws:states:::aws-sdk:s3:getObject",
              "ResultSelector": {
                "body.$": "States.StringToJson($.Body)"
              },
              "End": true,
              "Comment": "Get data from JSON in S3"
            }
          },
          "TimeoutSeconds": 3600
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachine.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineExecutionRole.Arn
      TracingConfiguration:
        Enabled: false
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}
    DependsOn:
      - LogGroupStateMachine

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachine:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/example-statemachine-${SystemName}
      RetentionInDays: 365
      Tags:
        - Key: Cost
          Value: !Sub example-${SystemName}

# ------------------------------------------------------------#
# S3 Lambda Invocation Permission
# ------------------------------------------------------------#
  S3LambdaCallStateMachineInvocationPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt LambdaCallStateMachine.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceAccount: !Sub ${AWS::AccountId}
      SourceArn: !Sub arn:aws:s3:::example-${SystemName}-dataimport
    DependsOn:
      - LambdaCallStateMachine

# ------------------------------------------------------------#
# Lambda Step Functions Invocation Role (IAM)
# ------------------------------------------------------------#
  LambdaSfnInvocationRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-SfnInvoke-${SystemName}
      Description: This role allows Lambda functions to start state machines of Step Functions.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub example-StateMachineExecutionRole-${SystemName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /service-role/
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess

まとめ

いかがでしたでしょうか?

内容的には大したことないのですが、いざやろうとすると各所に設定が必要で面倒くさいです。この手の構成はよく使うので、AWS CloudFormation テンプレートにしておくと似たような構成をつくるときに非常に便利なのでお勧めです。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWSサーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリや教育コンテンツをつくっています。ReactでSPAを書き始めたら、快適すぎて他の開発言語には戻れなくなりました。AWSサーバーレスやReactの仲間を増やしたいです。
取得資格:AWS認定は7つ、ITサービスマネージャ、ITIL v3 Expert、等
2020, 2021 APN AWS Top Engineers 受賞
2022 AWS Partner Ambassador 受賞
好きなAWSサービス:AWS Amplify / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする
クラウドに強いによるエンジニアブログです。
SCSKは専門性と豊富な実績を活かしたクラウドサービス USiZE(ユーサイズ)を提供しています。
USiZEサービスサイトでは、お客様のDX推進をワンストップで支援するサービスの詳細や導入事例を紹介しています。
AWSサーバレスアーキテクチャ
TechHarmony
タイトルとURLをコピーしました