こんにちは、広野です。
以前、以下の記事で Amazon Cognito ユーザーのインポートジョブを作成していましたが、大量のインポートではエラーが発生する問題を抱えていました。本記事では、少し改善したバージョンを紹介したいと思います。
前のバージョンで抱えていた課題
AWS サービスに何か命令を実行する場合、それが AWS マネジメントコンソールからにしろ、boto3 からにしろ、必ず AWS が用意した API をコールします。AWS サービスにより、一定時間の間にどれだけの数の API コールを許容するか、制限 (クォータ) があります。
Amazon Cognito ユーザープールにも当然そのクォータがあり、ユーザーの作成は 1秒間あたり 50 回 (RPS) という制限になっています。これを、考えなしに同時に API をコールしてしまうとクォータ超過のエラーが発生します。
前のバージョンでは、処理が速すぎてクォータを超過してしまうほど API コールをしてしまうので、待ち時間を入れるよう改善する必要がありました。新しいバージョンでは、2024 年の re:Invent で追加された JSONata を使用する AWS Step Functions 構成に変更します。
作り直した構成
全体像は以下です。React アプリに 管理者用の Amazon Cognito ユーザーインポート画面を作成していますが、本記事で着目するのは AWS Step Functions ステートマシンの部分です。
AWS Step Functions ステートマシンを AWS AppSync から呼び出す部分は、以下の記事で紹介しています。
ステートマシンの部分を拡大します。
ステップごとに順を追って解説します。
Map State (入力)
インプットは、以下の例のような JSON データです。
{ "data": [ { "useremail": "user01@scsktest.com" }, { "useremail": "user02@scsktest.com" }, { "useremail": "user03@scsktest.com" } ], "adminemail": "xxxxxxxx@scsktest.com" }
Amazon Cognito ユーザープールに登録したいユーザーのメールアドレスが複数あり、このジョブを実行した管理者のメールアドレスが adminemail として格納されています。管理者メールアドレスはインポート完了後に結果報告メールを送信するのに使用します。
Amazon Cognito ユーザープールの設定は、ユーザー名をメールアドレスにする設定のため、ユーザー名情報はありません。
このデータが Map State に渡されます。これらデータはそのときに input という JSON 項目に格納されます。従って、input.data に格納されている配列単位で、Amazon Cognito ユーザー作成を並列処理させます。JSONata では、以下のように記述します。
Amazon Cognito ユーザー作成の API (AdminCreateUser) の RPS は 50 なのですが、本記事では、作成したユーザーを特定の Cognito グループに所属させる設定を実施します。グループに追加する API (AdminAddUserToGroup) の RPS は 25 のため、安全を取って Map による同時実行数を 20 に設定しています。
Map State 完了時の出力については、章を分離して後ほど説明します。
AdminCreateUser
Amazon Cognito ユーザーを 1 件作成する API を AWS Step Functions からダイレクトにコールします。インプットは、以下のような JSON データになります。
{ "input": { "useremail": "user01@scsktest.com" }, "inputDetails": { "truncated": false }, "name": "AdminCreateUser" }
作成したいユーザー名は input.useremail になります。以下のように、引数の設定に入力します。作成時のパラメータは要件に応じて適宜変更する必要があります。
エラー処理の設定を追加します。それを入れないと、何らかのエラーが発生した時点でステートマシン全体がエラーで失敗してしまいます。Map 処理が途中で終了してしまうと、どのインプットデータをどこまで処理したのかがわかりにくくなってしまうので、本記事の設計では Map State 内でエラーが発生してもすべてのインプットデータを必ず処理するようにし、それぞれの結果を集計して確認できるようにします。
キャッチャーを 1 つ作成し、Errors には States.ALL を選択します。Fallback state には Error と設定しています。これはエラー発生時に進む次のステップです。後ほど説明します。
出力は、最終的に処理が成功、失敗に関わらず同じフォーマットで集計したいので、以下の考えで統一したフォーマットにします。
- useremail: 作成しようとしたユーザーのメールアドレス
- status: 処理結果 (エラー時は failed 固定)
- message: エラーメッセージ
useremail にはインプットから取得した値を格納します。message には、API が返してきた結果から取得します。多くの API は、$states.errorOutput.Cause にエラーメッセージを残すので、それを取得します。API によってはデータ構造が異なるかもしれないので都度確認は必要です。
AdminAddUserToGroup
AdminAddUserToGroup は、AdminCreateUser が成功し、そのユーザーが作成された後の後続処理です。インプットは、以下のように input に AdminCreateUser の結果を無加工で受け取るようにしています。
{ "input": { "User": { "Attributes": [ { "Name": "email", "Value": "user01@scsktest.com" }, { "Name": "email_verified", "Value": "true" }, { "Name": "sub", "Value": "97a4ca78-1061-70ba-1b7f-xxxxxxxxxxxx" } ], "Enabled": true, "UserCreateDate": "2025-04-14T14:54:07.135Z", "UserLastModifiedDate": "2025-04-14T14:54:07.135Z", "UserStatus": "FORCE_CHANGE_PASSWORD", "Username": "97a4ca78-1061-70ba-1b7f-xxxxxxxxxxxx" } }, "inputDetails": { "truncated": false }, "name": "AdminAddUserToGroup" }
このインプットから、API の引数を以下のように指定します。グループ名、Amazon Cognito ユーザープール ID は固定で指定しています。Username は、インプットから取得していますが、ここでは自動生成された sub と呼ばれる UUID 形式のユーザー名を指定していますが、おそらくメールアドレスでも問題ないです。
成功時の出力は、前述エラー処理のところで決めたフォーマットと合わせたいので、以下のように明示的に設定します。useremail はインプットから取得するようにしようとすると、以下のような記述になります。status は、成功なので succeeded 固定にし、messege はエラーがないので “-” 固定にしました。
さて、このステップでも何らかのエラーが発生したときのためにエラー処理を入れておきます。この記述は、AdminCreateUser のときと全く同じ考え方ですが、useremail がこのステップのインプットから取得する場合はデータ構造が異なりますので、その部分だけが異なっています。
Success/Error
これらのステップは、1 秒待ちのステップです。先行ステップで実行した API に RPS という秒単位の実行数制限があるため、後続ステップに待ち時間をはさんでいます。細かい話ですが、これを AWS Lambda 関数内に入れてしまうと待ち時間も課金対象になってしまうため、良くないプラクティスになります。AWS Step Functions では待ち時間には課金されません。
先行ステップの成功時の Success、失敗時の Error は設定が同じです。ただ、名前を変えてジョブフローの見た目で成功か失敗かをわかりやすくしたいために明示的に分けました。統合してもかまいません。
設定はシンプルに 1 秒待ちを入れているだけです。
Map State (出力)
ここまでで、Map State の中身は説明しました。全てのインプットを処理した後、結果が集計されます。
$states.result に結果が格納されるのですが、そこには管理者のメールアドレスがありません。本記事の設計では、ステートマシンのインプットに入っていた adminemail のメールアドレスに結果通知メールを送りたいので、後続ステップにインプットから引用して adminemail が入るように設定します。また、本記事ではまったく主要な話ではないのですが jobtype というフラグをここで追加して後続に渡しています。
後続でどのようなインプットになるかは、次の章で説明します。
SendResult
ここでは、先行ステップ (Map State) から受け取った結果 (JSON) を CSV に変換し、Amazon S3 バケットに保存、署名付き URL を生成して送付先管理者メールアドレスに送信します。CSV 変換をサードパーティの Python モジュール pandas を使用しているので、AWS Lambda 関数で作成しました。
インプットの例は以下のデータになります。
{ "input": { "adminemail": "xxxxxxxx@scsktest.com", "result": [ { "useremail": "user01@scsktest.com", "status": "succeeded", "message": "-" }, { "useremail": "user02@scsktest.com", "status": "succeeded", "message": "-" }, { "useremail": "user03@scsktest.com", "status": "failed", "message": "User account already exists (Service: CognitoIdentityProvider, Status Code: 400, Request ID: cd112c36-4a54-4706-b2b0-ac144791c2f9) (SDK Attempt Count: 1)" } ], "jobtype": "import" }, "inputDetails": { "truncated": false }, "name": "SendResult" }
AWS Lambda 関数には、input 配下のデータを渡します。以下のように引数を設定します。Lambda 関数の関連付けは ARN で行います。出力はデフォルトです。
AWS Lambda 関数は以下です。jobtype という引数が import か否かによって、メール送信文章が変わるようになっています。※本記事では重要ではありません。
import json import pandas as pd import datetime import boto3 s3 = boto3.resource('s3') client = boto3.client('s3') ses = boto3.client('ses',region_name='ap-northeast-1') def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): try: ADMINEMAIL = event['adminemail'] jobtype = event['jobtype'] jobname = 'インポート' if event['jobtype'] == 'import' else '一括削除' df = pd.DataFrame(event['result']) # Set the duration the presigned URL is valid for expiredinsec = 129600 # 129600 seconds = 36 hours expdelta = datetime.timedelta(seconds=expiredinsec) jstdelta = datetime.timedelta(hours=9) JST = datetime.timezone(jstdelta, 'JST') dtnow = datetime.datetime.now(JST) dtexp = dtnow + expdelta nowdt = dtnow.strftime('%Y/%m/%d %H:%M - %Z') expdt = dtexp.strftime('%Y/%m/%d %H:%M - %Z') fileNameDateTime = dtnow.strftime('%Y%m%d%H%M%S') tempFileName = '/tmp/' + jobtype + '_result_' + fileNameDateTime + '.csv' outputBucket = 'output_bucket' outputS3Key = 'cognito' + jobtype + 'log/' + jobtype + '_result_' + fileNameDateTime + '.csv' # JSON to CSV df.to_csv(tempFileName) # upload CSV into S3 s3.meta.client.upload_file(tempFileName, outputBucket, outputS3Key) # generate presigned URL presigned_url = client.generate_presigned_url( ClientMethod = 'get_object', Params = { 'Bucket': outputBucket, 'Key': outputS3Key }, ExpiresIn = expiredinsec, HttpMethod = 'GET' ) # send email BODY_TEXT = ('あなたがリクエストしたユーザ' + jobname + 'ジョブが完了しました。\n' '結果は以下のURLからダウンロードできます。\n\n' 'ダウンロードURL:\n<' + presigned_url + '>\n\n' 'ダウンロード有効期限:\n' + expdt +'\n\n' ) res = ses.send_email( Destination={ 'ToAddresses': [ ADMINEMAIL ] }, Message={ 'Body': { 'Text': { 'Charset': 'UTF-8', 'Data': BODY_TEXT } }, 'Subject': { 'Charset': 'UTF-8', 'Data': 'ユーザ' + jobname + '結果 [' + nowdt + ']' } }, Source='xxxxxxx@xxxxxx.xxx' ) except Exception as e: print(e) return { "result": str(e) } else: return { "result": res }
- SendResult ステップから受け取ったデータは、event に格納されます。
- 結果を通知したい管理者のメールアドレスは、event[’adminemail’] で表現できます。
- インポート結果は event[‘result’] で表現できます。
ただし、このデータは JSON 形式です。管理者が見やすい形式にしたいので、CSV に変換します。そのために pandas という外部ライブラリを使ってデータ変換しますが、前提として pandas モジュールが Lambda Layer として登録されている必要があります。 - CSV 形式に変換したデータは、一旦 Lambda 関数の一時ストレージ /tmp に CSV ファイルとして保存します。それを指定した Amazon S3 バケットにアップロードします。
- CSV ファイルを Amazon S3 バケットにアップロードしただけでは管理者は見ることができないので、一定期間 (36 時間) だけ見られるように Amazon S3 署名付き URL を生成します。生成した URL を通知メール本文に埋め込んで、Amazon SES 経由で管理者メールアドレスにメール送信します。
- pandas を使用するための Lambda レイヤー作成は以下の記事が参考になります。
この Lambda 関数が実行されると、以下のようなメールが送信されます。
ダウンロードした CSV ファイルは以下のようになります。
参考: AWS CloudFormation テンプレート
紹介した構成一式をデプロイする AWS CloudFormation テンプレートを貼り付けておきます。IAM ロールなど細かい設定まで説明できていないので、詳細はこちらを見て頂けたらと思います。ただし、IAM ロールは雑にマネージドのものを多用しているので、必要に応じて最小権限化する必要があります。ご注意ください。
AWSTemplateFormatVersion: 2010-09-09 Description: The CloudFormation template that creates Lambda functions, Step Functions workflows, and relevant IAM roles. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SubName: Type: String Description: System sub name of example. (e.g. prod or test) Default: test MaxLength: 10 MinLength: 1 FeedbackSenderEmail1: Type: String Description: E-mail address that sends emails. (e.g. xxx@xxx.xx) Default: xxx@xxx.xx MaxLength: 100 MinLength: 5 AllowedPattern: "[^\\s@]+@[^\\s@]+\\.[^\\s@]+" S3BucketNameSdk: Type: String Description: S3 bucket name in which you uploaded sdks for Lambda Layers. (e.g. example-bucket) Default: example-bucket MaxLength: 50 MinLength: 1 S3KeyPandasSdk: Type: String Description: S3 key of pandas.zip. Fill the exact key name if you renamed. (e.g. sdk/Python3.13/pandas223.zip) Default: sdk/Python3.13/pandas223.zip MaxLength: 50 MinLength: 1 Resources: # ------------------------------------------------------------# # Lambda S3 and SES Invocation Role for Sfn (IAM) # ------------------------------------------------------------# LambdaS3SesInSfnRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-LambdaS3SesInSfnRole-${SubName} Description: This role allows Lambda functions to invoke S3 and SES. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: !Sub example-LambdaS3SesInSfnPolicy-${SubName} PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Resource: - "*" Action: - "ses:SendEmail" ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess # ------------------------------------------------------------# # State Machine Execution Role (IAM) # ------------------------------------------------------------# StateMachineExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub example-StateMachineExecutionRole-${SubName} Description: This role allows State Machines to call specified AWS resources. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: - sts:AssumeRole Path: /service-role/ ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaRole - arn:aws:iam::aws:policy/AmazonCognitoPowerUser - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess # ------------------------------------------------------------# # State Machine Execution LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupStateMachineImportCognitoUsers: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/vendedlogs/states/example-ImportCognitoUsers-${SubName} RetentionInDays: 365 Tags: - Key: Cost Value: !Sub example-${SubName} # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketLogs: Type: AWS::S3::Bucket Properties: BucketName: !Sub example-${SubName}-logs OwnershipControls: Rules: - ObjectOwnership: BucketOwnerPreferred PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true Tags: - Key: Cost Value: !Sub example-${SubName} # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaAdminSendResultCognitoUsersSfn: Type: AWS::Lambda::Function Properties: FunctionName: !Sub example-Admin-SendResultCognitoUsersSfn-${SubName} Description: Lambda Function to send the Step Functions job result to maintain Cognito users. Runtime: python3.13 Timeout: 180 MemorySize: 256 Role: !GetAtt LambdaS3SesInSfnRole.Arn Handler: index.lambda_handler Layers: - !Ref LambdaLayerPandas223 Tags: - Key: Cost Value: !Sub example-${SubName} Code: ZipFile: !Sub | import json import pandas as pd import datetime import boto3 s3 = boto3.resource('s3') client = boto3.client('s3') ses = boto3.client('ses',region_name='${AWS::Region}') def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): try: ADMINEMAIL = event['adminemail'] jobtype = event['jobtype'] jobname = 'インポート' if event['jobtype'] == 'import' else '一括削除' df = pd.DataFrame(event['result']) # Set the duration the presigned URL is valid for expiredinsec = 129600 # 129600 seconds = 36 hours expdelta = datetime.timedelta(seconds=expiredinsec) jstdelta = datetime.timedelta(hours=9) JST = datetime.timezone(jstdelta, 'JST') dtnow = datetime.datetime.now(JST) dtexp = dtnow + expdelta nowdt = dtnow.strftime('%Y/%m/%d %H:%M - %Z') expdt = dtexp.strftime('%Y/%m/%d %H:%M - %Z') fileNameDateTime = dtnow.strftime('%Y%m%d%H%M%S') tempFileName = '/tmp/' + jobtype + '_result_' + fileNameDateTime + '.csv' outputBucket = 'example-${SubName}-logs' outputS3Key = 'cognito' + jobtype + 'log/' + jobtype + '_result_' + fileNameDateTime + '.csv' # JSON to CSV df.to_csv(tempFileName) # upload CSV into S3 s3.meta.client.upload_file(tempFileName, outputBucket, outputS3Key) # generate presigned URL presigned_url = client.generate_presigned_url( ClientMethod = 'get_object', Params = { 'Bucket': outputBucket, 'Key': outputS3Key }, ExpiresIn = expiredinsec, HttpMethod = 'GET' ) # send email BODY_TEXT = ('あなたがリクエストしたユーザ' + jobname + 'ジョブが完了しました。\n' '結果は以下のURLからダウンロードできます。\n\n' 'ダウンロードURL:\n<' + presigned_url + '>\n\n' 'ダウンロード有効期限:\n' + expdt +'\n\n' ) res = ses.send_email( Destination={ 'ToAddresses': [ ADMINEMAIL ] }, Message={ 'Body': { 'Text': { 'Charset': 'UTF-8', 'Data': BODY_TEXT } }, 'Subject': { 'Charset': 'UTF-8', 'Data': 'ユーザ' + jobname + '結果 [' + nowdt + ']' } }, Source='${FeedbackSenderEmail1}' ) except Exception as e: print(e) return { "result": str(e) } else: return { "result": res } # ------------------------------------------------------------# # Lambda Layer # ------------------------------------------------------------# LambdaLayerPandas223: Type: AWS::Lambda::LayerVersion Properties: LayerName: !Sub example-${SubName}-pandas223 Description: !Sub Pandas 2.2.3 for Python to load in example-${SubName} CompatibleRuntimes: - python3.13 Content: S3Bucket: !Sub ${S3BucketNameSdk} S3Key: !Sub ${S3KeyPandasSdk} LicenseInfo: BSD-3-Clause # ------------------------------------------------------------# # Step Functions # ------------------------------------------------------------# StateMachineImportCognitoUsers: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub example-ImportCognitoUsers-${SubName} StateMachineType: STANDARD DefinitionSubstitutions: DSSubName: !Sub ${SubName} DSCognitoUserPoolId: Fn::ImportValue: !Sub CognitoUserPoolID-example-${SubName} DSTemporaryPassword: Passw0rd DSLambdaAdminSendResultCognitoUsersSfnArn: !GetAtt LambdaAdminSendResultCognitoUsersSfn.Arn DefinitionString: |- { "QueryLanguage": "JSONata", "Comment": "State machine to import example users into Cognito User Pool for example-${DSSubName}", "StartAt": "Map", "States": { "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "AdminCreateUser", "States": { "AdminCreateUser": { "Type": "Task", "Arguments": { "UserPoolId": "${DSCognitoUserPoolId}", "Username": "{% $states.input.useremail %}", "TemporaryPassword": "${DSTemporaryPassword}", "UserAttributes": [ { "Name": "email", "Value": "{% $states.input.useremail %}" }, { "Name": "email_verified", "Value": "true" } ], "MessageAction": "SUPPRESS", "DesiredDeliveryMediums": [ "EMAIL" ] }, "Resource": "arn:aws:states:::aws-sdk:cognitoidentityprovider:adminCreateUser", "Next": "AdminAddUserToGroup", "TimeoutSeconds": 3, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Error", "Output": { "useremail": "{% $states.input.useremail %}", "status": "failed", "message": "{% $states.errorOutput.Cause %}" } } ] }, "AdminAddUserToGroup": { "Type": "Task", "Arguments": { "GroupName": "BASIC", "UserPoolId": "${DSCognitoUserPoolId}", "Username": "{% $states.input.User.Username %}" }, "Resource": "arn:aws:states:::aws-sdk:cognitoidentityprovider:adminAddUserToGroup", "Next": "Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Error", "Output": { "useremail": "{% $states.input.User.Attributes[0].Value %}", "status": "failed", "message": "{% $states.errorOutput.Cause %}" } } ], "TimeoutSeconds": 3, "Output": { "useremail": "{% $states.input.User.Attributes[0].Value %}", "status": "succeeded", "message": "-" } }, "Error": { "Type": "Wait", "Seconds": 1, "End": true, "Comment": "This wait is for avoiding the Cognito API error due to exceeding the rps quota." }, "Success": { "Type": "Wait", "Seconds": 1, "End": true, "Comment": "This wait is for avoiding the Cognito API error due to exceeding the rps quota." } } }, "MaxConcurrency": 20, "Items": "{% $states.input.data %}", "Next": "SendResult", "Output": { "adminemail": "{% $states.input.adminemail %}", "result": "{% $states.result %}", "jobtype": "import" } }, "SendResult": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Output": "{% $states.result.Payload %}", "Arguments": { "FunctionName": "${DSLambdaAdminSendResultCognitoUsersSfnArn}", "Payload": "{% $states.input %}" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2, "JitterStrategy": "FULL" } ], "End": true, "Comment": "Invoke the Lambda function to send the result of this state machine via email.", "TimeoutSeconds": 210 } }, "TimeoutSeconds": 900 } LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt LogGroupStateMachineImportCognitoUsers.Arn IncludeExecutionData: true Level: ERROR RoleArn: !GetAtt StateMachineExecutionRole.Arn TracingConfiguration: Enabled: true Tags: - Key: Cost Value: !Sub example-${SubName}
まとめ
いかがでしたでしょうか?
本記事で紹介した構成はまだまだ不完全でして、このステートマシンを複数同時実行した場合はクォータを超過してしまいます。ですので、複数呼び出す可能性があるジョブを作成するときは SQS も使用してキューイングする必要がありそうです。しかし、それを実装したところで、このステートマシンを使用せずにユーザー作成をする別の作業とタイミングが重なってしまうとキューイングでも防ぎようがありません。完全な制御は難しいのですが、それをわかった上でやるべきことは、今回のように成功、失敗の結果を把握できるようにすることや、リトライ、通知などの仕組みを活用して、完璧ではないけれどより信頼性の高い仕組みを構築することだろう、と考えております。
本記事が皆様のお役に立てれば幸いです。