こんにちは、広野です。
AWS を使ってアプリ開発している人にとって、Amazon Cognito は神のようなサービスです。
- ユーザ認証基盤として必要な機能が十分ある
- AWS サービスとの親和性が高い
- 何より安い(無料課金枠が大きすぎて、大規模利用でなければ実質無料)
ですが、ユーザインポート機能が使いづらいです。
- マネジメントコンソール、CLI、API で実行できるが、1件1件のインポート結果がわからない
- インポートが成功したユーザに対して追加の処理をかけたい(グループ所属など)
- AWS 純正インポート用の CSV は無駄な空白セルが多すぎてわかりにくい。※使用していない項目が多い場合
そんな心のもやもやを1年以上抱えており、あるとき、そろそろ作るかーと腰を上げて独自の Amazon Cognito ユーザインポート画面を React でつくりました。
本記事ではその構成を紹介したいと思いますが、1記事で書くには内容が多すぎるため AWS Step Functions Map ステートによるユーザ登録処理のみ解説します。
やりたいこと
- React で Amazon Cognito ユーザインポート画面をつくる
- インポートしたいユーザ名(メールアドレス)を CSV ファイル内に 1列に羅列する
- ユーザインポート画面に CSV ファイルをロードさせインポートを実行すると、データが Amazon S3 バケットに書き込まれユーザインポートジョブが発動する
- ユーザインポートが完了すると、インポートされたユーザ1件1件の結果を CSV ファイルにまとめて Amazon S3 バケットに保存する
- 結果 CSV ファイルへのアクセス用署名付き URL を発行する
- 実行したユーザ(ここではアプリの管理者ユーザ)のメールアドレスに結果通知メールが送信される
- 結果通知メールには、CSV ファイルへの署名付き URL が記載されており、CSV ファイルをダウンロードすることにより結果を確認できる
アーキテクチャ図
操作イメージ
アプリの管理者がユーザインポート画面を開きます。
インポートする CSV ファイルは以下のフォーマットです。
useremail user04@sample.xxx user05@sample.xxx user06@sample.xxx
画面に CSV ファイルをドラッグアンドドロップすると、以下のようになります。この時点で、メールアドレス書式にエラーがあるとデータを確認する旨エラーメッセージが表示されます。
データに問題がなければ、インポート開始ボタンが押せるようになり、インポートを開始します。
しばらくすると、インポート結果通知メールがインポートを実行した管理者ユーザのメールアドレスに届きます。
URL からは結果 CSV ファイルをダウンロードできます。ユーザごとに status 列に結果が表示されます。失敗した場合は、message 列に Amazon Cognito が出力したエラーメッセージが入ります。ダウンロード有効期限はインポート実行後 36 時間です。
実現構成
本記事の構成は、実は私の過去記事の集合体です。
過去記事に記載されている内容はそちらを参考にして頂けたらと思います。今回は AWS Step Functions ステートマシンの Map ステートの組み方 (下の図の赤枠内) を中心に解説します。
- ローカル CSV を JSON データとしてロードする記事
- JSON データの正規表現チェック記事
- AWS Amplify Storage の記事
- S3 から Step Functions を呼び出す記事
- Map ステートによる並列処理の記事
- pandas モジュールを Lambda 用につくる記事
- S3 署名付き URL を自動生成する記事
Map ステート解説
アプリの管理者が所定の CSV ファイルにユーザ情報を記入し、画面からインポートを実行するとデータは JSON データに変換され、管理者メールアドレスを付加して Amazon S3 の所定のバケット (AWS Amplify Storage) に保存されます。
Amazon S3 バケットに保存されるデータは以下のフォーマットになります。(例)
{ "adminemail": "admin@sample.xxx", "data": [ { "useremail": "user04@sample.xxx" }, { "useremail": "user05@sample.xxx" }, { "useremail": "user06@sample.xxx" } ] }
ユーザインポート画面側は、Amazon S3 バケットに JSON ファイルを保存したら処理は完了になります。結果は待ちません。(非同期処理)
Amazon S3 バケットには、イベント通知を仕掛けています。ファイルが書き込まれると指定した AWS Lambda 関数が発動します。Lambda 関数経由で AWS Step Functions ステートマシンを呼び出し、JSON ファイルのパスを連携します。
ステートマシンは以下のステップを進んでいきます。各ステップをそれぞれ解説します。
GetUsersData
呼び出されたステートマシンは、呼び出し元の AWS Lambda 関数から JSON ファイルに関するバケット名、キー名を受け取ります。
受け取ったデータは “$.bucket” と “$.key” というように表現でき、これを AWS Step Functions ネイティブの S3:GetObject API のパラメータとして埋め込みます。これで、Amazon S3 オブジェクトからデータの中身を取得できます。
Amazon S3 オブジェクトから取得したデータを次のステップに渡すために、以下のように記述します。
取得したデータは $.Body 内にあるのですが、文字列型として取得してしまうので、States.StringToJson という関数を使用して JSON オブジェクトに変換します。その状態で、”body.$” キーの値として格納することで、次のステップに “body”: JSON データ という整形されたフォーマットでデータを渡すことができます。
MapUsersData (1)
MapUsersData ステップは、Map ステートを使用しています。これは、Map ステート内に定義した処理に対してパラメータを変えた同一処理を並列に実行する機能です。
ここでは、Map ステートは前ステップから以下のデータを受け取ります。
{ "name": "MapUsersData", "input": { "body": { "adminemail": "admin@sample.xxx", "data": [ { "useremail": "user04@sample.xxx" }, { "useremail": "user05@sample.xxx" }, { "useremail": "user06@sample.xxx" } ] } }, "inputDetails": { "truncated": false } }
“input” 配下のデータを Map ステート内で使用できます。ここでは、その中の body.data に Amazon Cognito に登録したいユーザのメールアドレスが格納されているので、それを設定内に記述します。
並列処理の最大同時実行数を指定できます。推奨が 40 なのでそのようにしています。
これで、body.data 内にある配列の1要素ごとに Map ステート内の処理に値を渡して並列実行します。
一旦、Map ステート内に定義した処理の解説に移ります。
ImportUser
ここでは、AWS Lambda 関数を実行します。ユーザのメールアドレス1件1件に対し同じ Lambda 関数が使用されます。
Lambda 関数へのインプットは、以下のようになります。
{ "useremail": "user04@sample.xxx" }
ステップの設定は以下の通りです。呼び出す Lambda 関数の ARN を指定しています。
Lambda 関数を実行した結果はごちゃごちゃと多いので、Lambda 関数から戻ってきたデータだけ (“Payload”配下) を後続ステップに渡すよう、出力をフィルタリングしています。
Lambda 関数 (Python) は以下です。
import boto3 import json import datetime import os # Amazon Cognito User Pool Configs REGION = 'ap-northeast-1' USER_POOL_ID = os.environ['CognitoUserPoolID'] # Create boto3 CognitoIdentityProvider client client = boto3.client('cognito-idp', REGION) def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): try: USERNAME = event['useremail'] PASSWORD = os.environ['TemporaryPassword'] USEREMAIL = event['useremail'] RESULT = '' # Create a Cognito User res1 = client.admin_create_user( UserPoolId=USER_POOL_ID, Username=USERNAME, TemporaryPassword=PASSWORD, UserAttributes=[ { 'Name': 'email', 'Value': USEREMAIL }, { 'Name': 'email_verified', 'Value': 'true' } ], MessageAction='SUPPRESS', DesiredDeliveryMediums=[ 'EMAIL' ] ) print(json.dumps(res1, indent=4, default=datetimeconverter)) # Add user to group BASIC if res1['User']['UserStatus'] == "FORCE_CHANGE_PASSWORD": res2 = client.admin_add_user_to_group( UserPoolId=USER_POOL_ID, Username=USERNAME, GroupName='BASIC' ) except Exception as e: print(e) return { "useremail": USEREMAIL, "status": "failed", "message": str(e) } else: print(json.dumps(res2, indent=4, default=datetimeconverter)) return { "useremail": USEREMAIL, "status": "succeeded", "message": "-" }
- ユーザ登録対象の Amazon Cognito ユーザープールの ID は環境変数に入れています。
- Map ステート内、ImportUser ステップから渡されたデータは event に格納されます。登録したいユーザのメールアドレスは、event[‘useremail’] というように表現できます。
- Amazon Cognito ユーザープールに登録するユーザ名はメールアドレスにしています。
- ユーザ登録が完了したら、ここでは固定で「BASIC」という Cognito ユーザーグループに所属させるようにしています。これはアプリ内での閲覧権限に関係してきます。
- Lambda 関数がエラーで異常終了しないよう、try、except、else を使ってエラー処理をしています。ここは重要なポイントで、この Lambda 関数 1件 1件の結果を Map ステートで取りまとめさせたいので、エラーが起きても同じフォーマットで結果を戻せるようにしています。
戻す項目は、useremail、status、message です。正常終了時には message は “-” ですが、エラーが起きた際にはエラーメッセージをそのまま “message” に格納して戻すようにしています。これにより、そのメールアドレスの登録がどのようにエラーになったのか管理者が原因を追究しやすくなっています。
データ件数分、この Lambda 関数が実行されます。それぞれの結果は Map ステートで取りまとめされます。再度、Map ステートの解説に移ります。
MapUsersData (2)
Lambda 関数の結果は Map ステートにより最後取りまとめられ、以下のようなデータになります。
[ { "useremail": "user04@sample.xxx", "status": "succeeded", "message": "-" }, { "useremail": "user05@sample.xxx", "status": "succeeded", "message": "-" }, { "useremail": "user06@sample.xxx", "status": "succeeded", "message": "-" } ]
この結果を管理者に伝えたいです。
が、このままではダメです。通知先となる管理者のメールアドレスがどこにもありません。
処理を遡ると、Map ステートに入ったときのインプットデータにありました。(“adminemail”の部分)
{ "name": "MapUsersData", "input": { "body": { "adminemail": "admin@sample.xxx", "data": [ { "useremail": "user04@sample.xxx" }, { "useremail": "user05@sample.xxx" }, { "useremail": "user06@sample.xxx" } ] } }, "inputDetails": { "truncated": false } }
ということで、Lambda 関数の結果データと、元々 Map ステートが受け取っていたインプットデータをまとめて (Combine) 後続ステップに渡したいと思います。
以下のように、Map ステートに設定します。Lambda 関数の結果は result に入れて、インプットデータも追加する設定にします。
SendResult
最後のステップです。前ステップ MapUsersData から以下のデータを受け取り、インポート結果を管理者にメールで通知します。
{ "name": "SendResult", "input": { "body": { "adminemail": "admin@sample.xxx", "data": [ { "useremail": "user04@sample.xxx" }, { "useremail": "user05@sample.xxx" }, { "useremail": "user06@sample.xxx" } ] }, "result": [ { "useremail": "user04@sample.xxx", "status": "succeeded", "message": "-" }, { "useremail": "user05@sample.xxx", "status": "succeeded", "message": "-" }, { "useremail": "user06@sample.xxx", "status": "succeeded", "message": "-" } ] }, "inputDetails": { "truncated": false } }
処理は Lambda 関数を呼び出します。ステートマシン側の設定の仕方は ImportUsers ステップと全く同じです。呼び出す Lambda 関数が違うだけです。
Lambda 関数 (Python) は以下のようになります。
import json import pandas as pd import datetime import boto3 def datetimeconverter(o): if isinstance(o, datetime.datetime): return str(o) def lambda_handler(event, context): try: # Define Variables s3 = boto3.resource('s3') client = boto3.client('s3') ses = boto3.client('ses',region_name='ap-northeast-1') ADMINEMAIL = event['body']['adminemail'] df = pd.DataFrame(event['result']) # Set the duration the presigned URL is valid for expiredinsec = 129600 # 129600 seconds = 36 hours expdelta = datetime.timedelta(seconds=expiredinsec) jstdelta = datetime.timedelta(hours=9) JST = datetime.timezone(jstdelta, 'JST') dtnow = datetime.datetime.now(JST) dtexp = dtnow + expdelta nowdt = dtnow.strftime('%Y/%m/%d %H:%M - %Z') expdt = dtexp.strftime('%Y/%m/%d %H:%M - %Z') fileNameDateTime = dtnow.strftime('%Y%m%d%H%M%S') tempFileName = '/tmp/import_result_' + fileNameDateTime + '.csv' outputBucket = 'example-xxxxx-logs' outputS3Key = 'cognitoImportlog/import_result_' + fileNameDateTime + '.csv' # JSON to CSV df.to_csv(tempFileName) # upload CSV into S3 s3.meta.client.upload_file(tempFileName, outputBucket, outputS3Key) # generate presigned URL presigned_url = client.generate_presigned_url( ClientMethod = 'get_object', Params = { 'Bucket': outputBucket, 'Key': outputS3Key }, ExpiresIn = expiredinsec, HttpMethod = 'GET' ) # send email BODY_TEXT = ('あなたがリクエストしたユーザインポートジョブが完了しました。\n' '結果は以下のURLからダウンロードできます。\n\n' 'ダウンロードURL:\n<' + presigned_url + '>\n\n' 'ダウンロード有効期限:\n' + expdt ) res = ses.send_email( Destination={ 'ToAddresses': [ ADMINEMAIL ] }, Message={ 'Body': { 'Text': { 'Charset': 'UTF-8', 'Data': BODY_TEXT } }, 'Subject': { 'Charset': 'UTF-8', 'Data': 'ユーザインポート結果 [' + nowdt + ']' } }, Source='admin@sample.xxx' ) except Exception as e: print(e) return { "result": str(e) } else: return { "result": res }
- SendResult ステップから受け取ったデータは、event に格納されます。
- 結果を通知したい管理者のメールアドレスは、event[‘body’][‘adminemail’] で表現できます。
- インポート結果は event[‘result’] で表現できます。
ただし、このデータは JSON 形式です。管理者が見やすい形式にしたいので、CSV に変換します。そのために pandas という外部ライブラリを使ってデータ変換しますが、前提として pandas モジュールが Lambda Layer として登録されている必要があります。 - CSV 形式に変換したデータは、一旦 Lambda 関数の一時ストレージ /tmp に CSV ファイルとして保存します。それを指定した Amazon S3 バケットにアップロードします。
- CSV ファイルを Amazon S3 バケットにアップロードしただけでは管理者は見ることができないので、一定期間 (36 時間) だけ見られるように Amazon S3 署名付き URL を生成します。生成した URL を通知メール本文に埋め込んで、Amazon SES 経由で管理者メールアドレスにメール送信します。
これにて、ユーザインポートの処理は終了です。
まとめ
いかがでしたでしょうか?
Amazon Cognito のユーザインポート画面をつくるにあたり、バックエンドの処理で AWS Step Functions の Map ステートを活用してみた例でした。Map ステートはあまりネット上に情報がなかったので、そこにフォーカスして書きました。
AWS CloudFormation テンプレートを貼り付けたかったのですが、あまりにも大量、複雑すぎて汎用化はあきらめました。ごめんなさい。実運用では全て AWS CloudFormation でプロビジョニングしているのですが、使用しているテンプレートが他の AWS リソースともかなり連携するため切り離しづらかったです。
本記事が皆様のお役に立てれば幸いです。