AWS Step Functions Map ステート分散モードで任意の Amazon S3 バケットのフォルダーにあるオブジェクトを並列処理する

こんにちは、広野です。

Amazon S3 バケットにある大量のオブジェクトのデータ変換を実行するのに、AWS Step Functions でバッチジョブを実行することにしました。その過程で得た Map ステート分散モードの実装における注意点を紹介します。

使用するのは以下の AWS ドキュメントに書かれている、大量の並列処理が実行できる機能です。

 

やりたいこと

  • Amazon S3 バケットの特定のフォルダーにある大量のオブジェクトに対して同じ処理を並列実行したい。
  • AWS Step Functions の Map ステート分散モードを使用し、そのネイティブな機能で S3 バケット、フォルダー内のオブジェクトリストを取得する。※ Lambda でもできることです。むしろそっちの方が簡単とも言えますが。。。

ここで問題が。

AWS Step Functions の Map ステートで分散モードを使用したとき、処理対象の S3 バケットをフォルダーのプレフィックス付きで指定すると、リストアップされたキー群にフォルダー名が含まれてしまいます。

なので、やりたいことを追加。

  • 取得するオブジェクトリストから、フォルダーを除外する。

でないと、フォルダーに処理をしてしまうことになりますので・・・。地味ですがここに一番苦労しました。

 

アーキテクチャ

いろいろ考えましたが、以下のアーキテクチャに落ち着きました。データ変換処理は省略しています。AWS Step Functions の Map ステート分散モード部分のみの紹介です。

  • AWS Step Functions ステートマシンを開始すると、予め指定していた Amazon S3 バケットのフォルダー内のオブジェクトをリストアップします。(いわゆる ListBucket ですね)
  • 本来はリストアップされたオブジェクトごとにすぐにデータ変換処理に取り掛かりたいのですが、中にはフォルダーが含まれているので、Choice ステートを最初に入れています。
  • Choice ステートでは、取得したキー名がフォルダー(末尾が / で終わる) かどうかを判別し、フォルダー名であればスキップに、そうでなければ (オブジェクトであれば) 変換処理に進みます。

ここで Lambda 関数を使用してしまうと、分散モードを使用した意味が無くなるので使用しませんでした。あくまでも AWS Step Functions ネイティブの機能で実現しています。

 

設定の解説

ここからは、フロー中の各ステップについて少し踏み込んで説明します。

mapS3Objects

基本的にはスクリーンショットの通りです。

この図の設定では、Prefix に指定した input フォルダー内のオブジェクト (キー) のリストを取得してきてくれるのですが、余計なフォルダー名、つまり「input/」まで含まれてしまいます。フォルダー名除外の設定は見つけられず。

CheckFolder

仕方がないので、フォルダー名が含まれたオブジェクトリストから各オブジェクトの処理をする前に、キー名がフォルダー名なのか判断する分岐を入れています。

Rule #1 は、キー名がフォルダーである (true) の場合です。SkipProcessing つまりスキップに進みます。

条件文は JSONata で記述しており、key の末尾が / で終われば true という正規表現マッチングをかけています。

{% $contains($states.input.key, /^.*\/$/) %}

この判定が false になると、Default rule つまりデータ変換をかける方、processObject に進みます。

SkipProcessing と processObject についてはこのサンプルでは中身がないので、省略します。実際にはそれぞれの分岐に応じた処理を当て込むことになります。

 

実際の動き

2つのファイルを S3 バケットの input フォルダーに保存して、ステートマシンを開始します。

オブジェクトは 2つなのですが、フォルダー名が含まれてしまうため、子ワークフローが 3つになって処理されます。

以下はキー名がオブジェクトだったときのフローです。正しく processObject に誘導されていますね。

以下はキー名がフォルダー名だったときのフローです。正しく SkipProcessing に誘導されています。

 

IAM ポリシーの注意事項

Map ステート分散モードを使用するとき、ステートマシンには以下のドキュメントにあるように特殊な権限を付与する必要があります。

何かと言うと、親ワークフローが一時的に並列処理用の子ワークフローを作って、親とは切り離して実行する形になるようで、そのための権限が必要っぽいです。詳細は以下の AWS 公式ドキュメントや、この後掲載する AWS CloudFormation テンプレートを参照いただけたらと思います。

 

AWS CloudFormation テンプレート

一連の設定を AWS CloudFormation テンプレートに落とし込んでいますので、詳細はこちらをご覧ください。IAM ロールは簡易化している部分があるので、適宜最小化する必要があります。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates a Step Functions state machine with a distributed mode map state. It processes objects in the specified S3 bucket.
# ------------------------------------------------------------#
# General Information
#
# Last update: 2026-04-25
# Author: SCSK Hirono
# ------------------------------------------------------------#

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: System name. use lower case only. (e.g. example)
    Default: example
    MaxLength: 10
    MinLength: 1
    AllowedPattern: "^[a-z0-9]+$"

  SubName:
    Type: String
    Description: System sub name. use lower case only. (e.g. prod or dev)
    Default: dev
    MaxLength: 10
    MinLength: 1
    AllowedPattern: "^[a-z0-9]+$"

  InputFolder:
    Type: String
    Description: The folder name of the S3 bucket you upload.
    Default: input
    MaxLength: 50
    MinLength: 1

Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label:
          default: "General Configuration"
        Parameters:
          - SystemName
          - SubName
      - Label:
          default: "S3 Configuration"
        Parameters:
          - InputFolder

Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketPrep:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${SystemName}-${SubName}-prep
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 365
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

  S3BucketPolicyPrep:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref S3BucketPrep
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Action: "s3:*"
            Effect: Deny
            Resource:
              - !GetAtt S3BucketPrep.Arn
              - !Sub "${S3BucketPrep.Arn}/*"
            Condition:
              Bool:
                "aws:SecureTransport": "false"
            Principal: "*"
    DependsOn:
      - S3BucketPrep

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineRolePrep:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub StateMachineRolePrep-${SystemName}-${SubName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /service-role/
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess

  StateMachineRolePolicyPrep:
    Type: AWS::IAM::RolePolicy
    Properties:
      PolicyName: !Sub StateMachinePolicyPrep-${SystemName}-${SubName}
      RoleName: !Ref StateMachineRolePrep
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - "states:StartExecution"
            Resource:
              - !Ref StateMachinePrep
          - Effect: Allow
            Action:
              - "states:DescribeExecution"
              - "states:StopExecution"
            Resource:
              - !Sub "${StateMachinePrep}:*"
          - Effect: Allow
            Action:
              - "states:RedriveExecution"
            Resource:
              - !Sub "${StateMachinePrep}/S3objectkeys:*"
          - Effect: Allow
            Action:
              - "s3:GetObject"
              - "s3:PutObject"
            Resource:
              - !Sub "${S3BucketPrep.Arn}/*"
          - Effect: Allow
            Action:
              - "s3:ListBucket"
            Resource:
              - !GetAtt S3BucketPrep.Arn
              - !Sub "${S3BucketPrep.Arn}/*"
    DependsOn:
      - StateMachinePrep
      - S3BucketPrep

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachinePrep:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/${SystemName}-${SubName}-Prep
      RetentionInDays: 365
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

# ------------------------------------------------------------#
# Step Functions State Machine
# ------------------------------------------------------------#
  StateMachinePrep:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub ${SystemName}-${SubName}-Prep
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DsSubName: !Ref SubName
        DsPrepBucketName: !Ref S3BucketPrep
        DsInputFolder: !Ref InputFolder
      DefinitionString: |-
        {
          "Comment": "Preprocess S3 objects",
          "StartAt": "mapS3Objects",
          "States": {
            "mapS3Objects": {
              "Type": "Map",
              "ItemProcessor": {
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "STANDARD"
                },
                "StartAt": "CheckIfFolder",
                "States": {
                  "CheckIfFolder": {
                    "Type": "Choice",
                    "Choices": [
                      {
                        "Condition": "{% $contains($states.input.key, /^.*\\/$/) %}",
                        "Next": "SkipProcessing"
                      }
                    ],
                    "Default": "processObject"
                  },
                  "SkipProcessing": {
                    "Type": "Pass",
                    "End": true,
                    "Comment": "Skip processing if the key is a folder."
                  },
                  "processObject": {
                    "Type": "Pass",
                    "Assign": {
                      "bucket": "{% $states.input.bucket %}",
                      "key": "{% $states.input.key %}"
                    },
                    "Output": {
                      "bucket": "{% $states.input.bucket %}",
                      "key": "{% $states.input.key %}"
                    },
                    "End": true,
                    "Comment": "process object (dummy)"
                  }
                }
              },
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:listObjectsV2",
                "ReaderConfig": {
                  "Transformation": "NONE"
                },
                "Arguments": {
                  "Bucket": "${DsPrepBucketName}",
                  "Prefix": "${DsInputFolder}/"
                }
              },
              "MaxConcurrency": 10,
              "Label": "mapS3Objects",
              "End": true,
              "ItemSelector": {
                "bucket": "${DsPrepBucketName}",
                "key": "{% $states.context.Map.Item.Value.Key %}"
              }
            }
          },
          "QueryLanguage": "JSONata",
          "TimeoutSeconds": 600
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachinePrep.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineRolePrep.Arn
      TracingConfiguration:
        Enabled: true
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}
    DependsOn:
      - StateMachineRolePrep

 

まとめ

いかがでしたでしょうか。

Lambda 抜きで実現したのですが、Lambda の方が簡単だったなと思いました。ですが脱 Lambda 派としては苦しいけれどもこのアーキテクチャを推したいと思います。また、Map ステート分散モードのシンプルな実装例としても活用して頂ければ。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャと React を使用して社内向け e-Learning アプリ開発とコンテンツ作成に勤しんでいます。React でアプリを書き始めたら、快適すぎて他の言語には戻れなくなりました。近年は社内外への AWS 技術支援にも従事しています。AWS サービスには AWS が考える IT 設計思想が詰め込まれているので、いつも AWS を通して勉強させて頂いてまます。
取得資格:AWS 認定は15資格、IT サービスマネージャ、ITIL v3 Expert 等
2020 - 2025 Japan AWS Top Engineer 受賞
2022 - 2025 AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS AppSync Events / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする

クラウドに強いによるエンジニアブログです。

SCSKクラウドサービス(AWS)は、企業価値の向上につながるAWS 導入を全面支援するオールインワンサービスです。AWS最上位パートナーとして、多種多様な業界のシステム構築実績を持つSCSKが、お客様のDX推進を強力にサポートします。

AWSクラウドソリューションデータ分析・活用基盤
シェアする
タイトルとURLをコピーしました