こんにちは、広野です。
Amazon S3 バケットにある大量のオブジェクトのデータ変換を実行するのに、AWS Step Functions でバッチジョブを実行することにしました。その過程で得た Map ステート分散モードの実装における注意点を紹介します。
使用するのは以下の AWS ドキュメントに書かれている、大量の並列処理が実行できる機能です。
やりたいこと
- Amazon S3 バケットの特定のフォルダーにある大量のオブジェクトに対して同じ処理を並列実行したい。
- AWS Step Functions の Map ステート分散モードを使用し、そのネイティブな機能で S3 バケット、フォルダー内のオブジェクトリストを取得する。※ Lambda でもできることです。むしろそっちの方が簡単とも言えますが。。。
ここで問題が。
なので、やりたいことを追加。
- 取得するオブジェクトリストから、フォルダーを除外する。
でないと、フォルダーに処理をしてしまうことになりますので・・・。地味ですがここに一番苦労しました。
アーキテクチャ
いろいろ考えましたが、以下のアーキテクチャに落ち着きました。データ変換処理は省略しています。AWS Step Functions の Map ステート分散モード部分のみの紹介です。
- AWS Step Functions ステートマシンを開始すると、予め指定していた Amazon S3 バケットのフォルダー内のオブジェクトをリストアップします。(いわゆる ListBucket ですね)
- 本来はリストアップされたオブジェクトごとにすぐにデータ変換処理に取り掛かりたいのですが、中にはフォルダーが含まれているので、Choice ステートを最初に入れています。
- Choice ステートでは、取得したキー名がフォルダー(末尾が / で終わる) かどうかを判別し、フォルダー名であればスキップに、そうでなければ (オブジェクトであれば) 変換処理に進みます。
ここで Lambda 関数を使用してしまうと、分散モードを使用した意味が無くなるので使用しませんでした。あくまでも AWS Step Functions ネイティブの機能で実現しています。
設定の解説
ここからは、フロー中の各ステップについて少し踏み込んで説明します。
mapS3Objects
基本的にはスクリーンショットの通りです。
この図の設定では、Prefix に指定した input フォルダー内のオブジェクト (キー) のリストを取得してきてくれるのですが、余計なフォルダー名、つまり「input/」まで含まれてしまいます。フォルダー名除外の設定は見つけられず。
CheckFolder
仕方がないので、フォルダー名が含まれたオブジェクトリストから各オブジェクトの処理をする前に、キー名がフォルダー名なのか判断する分岐を入れています。
Rule #1 は、キー名がフォルダーである (true) の場合です。SkipProcessing つまりスキップに進みます。
条件文は JSONata で記述しており、key の末尾が / で終われば true という正規表現マッチングをかけています。
{% $contains($states.input.key, /^.*\/$/) %}
この判定が false になると、Default rule つまりデータ変換をかける方、processObject に進みます。
SkipProcessing と processObject についてはこのサンプルでは中身がないので、省略します。実際にはそれぞれの分岐に応じた処理を当て込むことになります。
実際の動き
2つのファイルを S3 バケットの input フォルダーに保存して、ステートマシンを開始します。
オブジェクトは 2つなのですが、フォルダー名が含まれてしまうため、子ワークフローが 3つになって処理されます。
以下はキー名がオブジェクトだったときのフローです。正しく processObject に誘導されていますね。
以下はキー名がフォルダー名だったときのフローです。正しく SkipProcessing に誘導されています。
IAM ポリシーの注意事項
Map ステート分散モードを使用するとき、ステートマシンには以下のドキュメントにあるように特殊な権限を付与する必要があります。
何かと言うと、親ワークフローが一時的に並列処理用の子ワークフローを作って、親とは切り離して実行する形になるようで、そのための権限が必要っぽいです。詳細は以下の AWS 公式ドキュメントや、この後掲載する AWS CloudFormation テンプレートを参照いただけたらと思います。
AWS CloudFormation テンプレート
一連の設定を AWS CloudFormation テンプレートに落とし込んでいますので、詳細はこちらをご覧ください。IAM ロールは簡易化している部分があるので、適宜最小化する必要があります。
AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates a Step Functions state machine with a distributed mode map state. It processes objects in the specified S3 bucket.
# ------------------------------------------------------------#
# General Information
#
# Last update: 2026-04-25
# Author: SCSK Hirono
# ------------------------------------------------------------#
# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
SystemName:
Type: String
Description: System name. use lower case only. (e.g. example)
Default: example
MaxLength: 10
MinLength: 1
AllowedPattern: "^[a-z0-9]+$"
SubName:
Type: String
Description: System sub name. use lower case only. (e.g. prod or dev)
Default: dev
MaxLength: 10
MinLength: 1
AllowedPattern: "^[a-z0-9]+$"
InputFolder:
Type: String
Description: The folder name of the S3 bucket you upload.
Default: input
MaxLength: 50
MinLength: 1
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
- Label:
default: "General Configuration"
Parameters:
- SystemName
- SubName
- Label:
default: "S3 Configuration"
Parameters:
- InputFolder
Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
S3BucketPrep:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${SystemName}-${SubName}-prep
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: 365
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Tags:
- Key: Cost
Value: !Sub ${SystemName}-${SubName}
S3BucketPolicyPrep:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref S3BucketPrep
PolicyDocument:
Version: "2012-10-17"
Statement:
- Action: "s3:*"
Effect: Deny
Resource:
- !GetAtt S3BucketPrep.Arn
- !Sub "${S3BucketPrep.Arn}/*"
Condition:
Bool:
"aws:SecureTransport": "false"
Principal: "*"
DependsOn:
- S3BucketPrep
# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
StateMachineRolePrep:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub StateMachineRolePrep-${SystemName}-${SubName}
Description: This role allows State Machines to call specified AWS resources.
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
states.amazonaws.com
Action:
- sts:AssumeRole
Path: /service-role/
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaRole
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
- arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
StateMachineRolePolicyPrep:
Type: AWS::IAM::RolePolicy
Properties:
PolicyName: !Sub StateMachinePolicyPrep-${SystemName}-${SubName}
RoleName: !Ref StateMachineRolePrep
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "states:StartExecution"
Resource:
- !Ref StateMachinePrep
- Effect: Allow
Action:
- "states:DescribeExecution"
- "states:StopExecution"
Resource:
- !Sub "${StateMachinePrep}:*"
- Effect: Allow
Action:
- "states:RedriveExecution"
Resource:
- !Sub "${StateMachinePrep}/S3objectkeys:*"
- Effect: Allow
Action:
- "s3:GetObject"
- "s3:PutObject"
Resource:
- !Sub "${S3BucketPrep.Arn}/*"
- Effect: Allow
Action:
- "s3:ListBucket"
Resource:
- !GetAtt S3BucketPrep.Arn
- !Sub "${S3BucketPrep.Arn}/*"
DependsOn:
- StateMachinePrep
- S3BucketPrep
# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
LogGroupStateMachinePrep:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/vendedlogs/states/${SystemName}-${SubName}-Prep
RetentionInDays: 365
Tags:
- Key: Cost
Value: !Sub ${SystemName}-${SubName}
# ------------------------------------------------------------#
# Step Functions State Machine
# ------------------------------------------------------------#
StateMachinePrep:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: !Sub ${SystemName}-${SubName}-Prep
StateMachineType: STANDARD
DefinitionSubstitutions:
DsSubName: !Ref SubName
DsPrepBucketName: !Ref S3BucketPrep
DsInputFolder: !Ref InputFolder
DefinitionString: |-
{
"Comment": "Preprocess S3 objects",
"StartAt": "mapS3Objects",
"States": {
"mapS3Objects": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "CheckIfFolder",
"States": {
"CheckIfFolder": {
"Type": "Choice",
"Choices": [
{
"Condition": "{% $contains($states.input.key, /^.*\\/$/) %}",
"Next": "SkipProcessing"
}
],
"Default": "processObject"
},
"SkipProcessing": {
"Type": "Pass",
"End": true,
"Comment": "Skip processing if the key is a folder."
},
"processObject": {
"Type": "Pass",
"Assign": {
"bucket": "{% $states.input.bucket %}",
"key": "{% $states.input.key %}"
},
"Output": {
"bucket": "{% $states.input.bucket %}",
"key": "{% $states.input.key %}"
},
"End": true,
"Comment": "process object (dummy)"
}
}
},
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"ReaderConfig": {
"Transformation": "NONE"
},
"Arguments": {
"Bucket": "${DsPrepBucketName}",
"Prefix": "${DsInputFolder}/"
}
},
"MaxConcurrency": 10,
"Label": "mapS3Objects",
"End": true,
"ItemSelector": {
"bucket": "${DsPrepBucketName}",
"key": "{% $states.context.Map.Item.Value.Key %}"
}
}
},
"QueryLanguage": "JSONata",
"TimeoutSeconds": 600
}
LoggingConfiguration:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt LogGroupStateMachinePrep.Arn
IncludeExecutionData: true
Level: ERROR
RoleArn: !GetAtt StateMachineRolePrep.Arn
TracingConfiguration:
Enabled: true
Tags:
- Key: Cost
Value: !Sub ${SystemName}-${SubName}
DependsOn:
- StateMachineRolePrep
まとめ
いかがでしたでしょうか。
Lambda 抜きで実現したのですが、Lambda の方が簡単だったなと思いました。ですが脱 Lambda 派としては苦しいけれどもこのアーキテクチャを推したいと思います。また、Map ステート分散モードのシンプルな実装例としても活用して頂ければ。
本記事が皆様のお役に立てれば幸いです。









