Amazon Bedrock 基盤モデルに並列に問い合わせるジョブをつくる

こんにちは、広野です。

大量のデータに対して、生成 AI で同じプロンプトを使用する問い合わせをしたかったので、CSV データを読み込んで結果を CSV に追記して返してくれるジョブを AWS 上に作成しました。

簡単なつくりですが、プロンプトを変えることでいろいろな処理に活用できると思います。

つくったもの

以下のフォーマットの CSV ファイルをインプットにします。サンプルデータのソースは AWS ドキュメントです。

結果の CSV ファイルです。本記事ではサンプルとして元データテキスト(英語)を翻訳してサマリーするだけとします。

CSV ファイルを AWS マネジメントコンソールで所定の Amazon S3 バケットに保存すると、自動的にジョブが実行され、結果 CSV ファイルがバケットに保存されるとともに、メールで完了通知が届く仕組みです。

アーキテクチャ

  • Amazon S3 バケット内、input フォルダに CSV ファイルが保存されると、イベント通知が Amazon EventBridge に送信されます。
  • Amazon EventBridge ルールがそれを検知し、AWS Step Functions ステートマシンを開始します。その際、CSV ファイルが配置された Amazon S3 バケットとキーを渡します。
  • ステートマシンは該当の Amazon S3 バケットとキーを元に、CSV ファイルを取得します。CSV ファイルの中身を参照して、行単位で Map ステートによる並列処理を行います。
  • 並列実行される各処理内では、データ 1 件ごとに Amazon Bedrock 基盤モデルに問い合わせます。すべての並列処理が完了後、各処理の結果がとりまとめられ、AWS Lambda 関数に渡します。
  • AWS Lambda 関数内では、結果データ (JSON) を CSV に変換し、Amazon S3 バケットに保存します。最後、Amazon SNS に完了通知をパブリッシュします。

※本記事の構成では、生成 AI のモデルに Claude 3.7 Sonnet を使用したかったので、オレゴンリージョンにデプロイしました。執筆時点では東京リージョンで 3.7 がリリースされておらず。

設定解説

Amazon EventBridge

Amazon S3 の設定は省略します。Amazon S3 バケットに Amazon EventBridge へのイベント通知が有効になっている状態で、Amazon EventBridge ルールを設定します。イベントパターンは以下のように設定しています。バケット名指定で、キーはワイルドカードで input フォルダにある .csv ファイルであれば発動する条件です。

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["bedrockjob-test"]
    },
    "object": {
      "key": [{
        "wildcard": "input/*.csv"
      }]
    }
  }
}

ターゲットは AWS Step Functions ステートマシンです。開始するには ARN さえ設定できればよいのですが、ステートマシンの引数は Amazon S3 バケット名とキーなので、以下のように入力トランスフォーマーで絞り込みました。

  • 入力パス
{
  "bucket": "$.detail.bucket.name",
  "key": "$.detail.object.key"
}
  • 入力テンプレート
{
  "bucket": <bucket>,
  "key": <key>
}

AWS Step Functions

ステートマシンのタスク単位で説明します。設定の記法は JSONata を使用しています。

Map ステート

ステートマシンが開始したとき、Map ステートから始まります。この時点のインプットは以下のようにデータが格納された CSV ファイルの S3 バケット名、キーになります。

{
  "input": {
    "bucket": "bedrockjob-test",
    "key": "input/testdata3.csv"
  },
  "inputDetails": {
    "truncated": false
  },
  "roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/bedrockjob-StateMachineExecutionRole-test"
}

インプットにしたいデータが純粋な JSON データであれば処理モードは「インライン」で良いのですが、今回のように Amazon S3 にあるファイルがインプットの場合は「分散」を選択します。

データが CSV 形式の場合、専用の設定があります。

  • CSV ファイルは先頭行に項目名を入れているため、CSV ヘッダーの場所を「最初の行」に設定します。CSV なのでデリミタは Comma です。
  • CSV データからどのようにデータを取得するか、後工程でわかりやすくするため、ItemSelector を設定します。CSV 内には id, data の列があり、その項目名で JSON データに変換するイメージです。
  • Map ステートでは CSV 各行のデータは $states.context.Map.Item.Value に格納される仕様になっており、その中に id, data が格納されるので上記のような書き方になります。

同時実行数制限は 10 にしました。Amazon Bedrock の基盤モデルを呼び出す類のリクエスト数は分単位での実行数制限があります。しかし、数百から数千単位の数なので、同時実行 10 であればさほど気にすることはないだろう、と考えました。

InvokeModel

Amazon Bedrock InvokeModel ランタイムを使用します。基本的には boto3 で InvokeModel するときと同じ引数 (パラメータ) を指定することになります。以下は例ですので、必要に応じてパラメータは変更しましょう。

注意点ですが、AWS Step Functions から直接 API を呼び出す場合、JSON 的に最初の階層の項目名は Pascal case、つまり先頭を大文字にしないとエラーになります。例えば modelId は ModelId にします。

{
  "ModelId": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
  "ContentType": "application/json",
  "Accept": "application/json",
  "Trace": "DISABLED",
  "PerformanceConfigLatency": "standard",
  "Body": {
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 2000,
    "top_k": 250,
    "stop_sequences": [],
    "temperature": 1,
    "top_p": 0.999,
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": "{% 'Please translate the following text into Japanese and summarize it in no more than 300 characters.\n' & $states.input.data %}"
          }
        ]
      }
    ]
  }
}

プロンプトは “text”: の部分に書いてある

Please translate the following text into Japanese and summarize it in no more than 300 characters.

です。日本語も問題ないですが、なんとなく英語で書きました。

その続きに、& 区切りで変数を追加しています。$states.input.data が CSV ファイルから読み取った元データです。これら引数は使用する基盤モデルによって変わることがありますので、ご注意ください。

出力は、成功時、エラー時ともに同じフォーマットに統一したかったので、以下のように固定で項目名を設定しています。

{
  "id": "{% $states.input.id %}",
  "data": "{% $states.input.data %}",
  "result": "{% $parse($states.result.Body).content[0].text %}",
  "status": "succeeded"
}

id と data は、インプットに入っていたものをそのまま後続に渡します。

result には、Amazon Bedrock 基盤モデルが返してきたレスポンスから、メッセージ部分だけを抽出して格納します。

status には、成功時は succeeded 固定です。

エラー処理には、キャッチャーを 1 つ設定し以下のように入れています。result にエラーメッセージ ($states.errorOutput.Cause) を、status には failed が格納されるようにしました。

Success/Error

これらのタスクは、ジョブフローの見た目で成功か失敗かをわかりやすくするために入れた Pass ステートです。特に何もしていません。

SendResult

このタスクは AWS Lambda 関数を呼び出しています。AWS Step Functions のデフォルト設定で Lambda 関数を紐づけているだけです。ここでは Lambda 関数のコードを紹介します。

Lambda 関数へのインプットは、以下のような JSON データになります。このデータが Lambda 関数内の event 変数に格納されます。

[
  {
    "id": "1",
    "data": "Amazon Bedrock is a fully managed service that makes high-performing foundation models... 以下略",
    "result": "# 日本語翻訳と要約\n\nAmazon Bedrockは、主要AI企業とAmazonの高性能基盤モデル(FM)を統一...以下略",
    "status": "succeeded"
  },
  {
    "id": "2",
    "data": "With AWS Step Functions, you can create workflows, also called State machines, to build...以下略",
    "result": "# 日本語訳と要約\n\nAWS Step Functionsでは、分散アプリケーション構築、プロセス自動化...以下略",
    "status": "succeeded"
  }
]

ユーザーには CSV ファイルとして結果を返したいので、この Lambda 関数内で pandas を使用して JSON を CSV に変換し、Amazon S3 バケットに保存します。

今回の仕様では、元データの CSV ファイルを保存した Amazon S3 バケットの output フォルダに結果 CSV ファイルを保存します。

  • Lambda 関数
import json
import pandas as pd
import datetime
import boto3
s3 = boto3.resource('s3')
client = boto3.client('s3')
sns = boto3.client('sns',region_name='us-west-2')
def datetimeconverter(o):
  if isinstance(o, datetime.datetime):
    return str(o)
def lambda_handler(event, context):
  try:
    df = pd.DataFrame(event)
    jstdelta = datetime.timedelta(hours=9)
    JST = datetime.timezone(jstdelta, 'JST')
    dtnow = datetime.datetime.now(JST)
    fileNameDateTime = dtnow.strftime('%Y%m%d%H%M%S')
    tempFileName = '/tmp/result_' + fileNameDateTime + '.csv'
    outputBucket = 'bedrockjob-test'
    outputS3Key = 'output/result_' + fileNameDateTime + '.csv'
    # JSON to CSV
    df.to_csv(tempFileName, encoding='utf-8_sig', index=None)
    # upload CSV into S3
    s3.meta.client.upload_file(tempFileName, outputBucket, outputS3Key)
    # publish SNS
    res = sns.publish(TopicArn='arn:aws:sns:us-west-2:xxxxxxxxxxxx:bedrockjob-SNSTopic-xxxxxxxxxxxx',Message='Your Bedrock job has been completed.\nS3Bucket: ' + outputBucket + '\nS3Key: ' + outputS3Key)
  except Exception as e:
    print(e)
    return {
      "result": str(e)
    }
  else:
    return {
      "result": res
    }

最後、Amazon SNS トピックに完了メッセージを送信しています。本記事の主要なテーマではないので詳細は割愛します。

この Lambda 関数は pandas を使用するため、以下のブログ記事で紹介しているように、Lambda レイヤーの設定が必要になります。

ここで行っている JSON から CSV 変換ですが、他のアーキテクチャも考えました。AWS Glue や Amazon Athena を使用して簡単に (極力ノーコードで) できないかと思ったんですが、カタログやテーブル的なものを作成しなければならなかったので、このようなスポット的な変換処理、かつ所詮 CSV レベルのデータ量であれば Lambda でいいか、と落ち着きました。

参考: AWS CloudFormation テンプレート

これらリソース一式を AWS CloudFormation でデプロイできるようにしてあります。詳細な設定はこちらをご確認ください。Amazon SNS トピックはトピックを作成するところまでとなっていますので、サブスクライブは手動で実施ください。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates a S3 bucket, a Lambda function, a Step Functions workflow, an EventBridge rule, a SNS topic, and relevant IAM roles.

# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SubName:
    Type: String
    Description: System sub name of the Bedrock job. (e.g. prod or test)
    Default: test
    MaxLength: 30
    MinLength: 1

  S3BucketNameSdk:
    Type: String
    Description: S3 bucket name in which you uploaded sdks for Lambda Layers. (e.g. sdkbucket)
    Default: sdkbucket
    MaxLength: 50
    MinLength: 1

  S3KeyPandasSdk:
    Type: String
    Description: S3 key of pandas.zip. Fill the exact key name if you renamed. (e.g. sdk/Python3.13/pandas223.zip)
    Default: sdk/Python3.13/pandas223.zip
    MaxLength: 50
    MinLength: 1

  BedrockClaudeModelId:
    Type: String
    Description: The Bedrock Model ID.
    Default: "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
    MaxLength: 100
    MinLength: 1

  BedrockOptionTopK:
    Type: Number
    Description: Top K parameter. The data type is int. The range is 0 to 500.
    Default: 250
    MaxValue: 500
    MinValue: 0

  BedrockOptionTopP:
    Type: String
    Description: Top P parameter. The data type is float. The range is 0 to 1.
    Default: 0.999
    MaxLength: 5
    MinLength: 1

  BedrockOptionTemperature:
    Type: String
    Description: Temperature parameter. The data type is float. The range is 0 to 1.
    Default: 1
    MaxLength: 5
    MinLength: 1

  BedrockOptionMaxTokens:
    Type: Number
    Description: Max Tokens parameter. The data type is int. The range is 100 to 64000.
    Default: 2000
    MaxValue: 64000
    MinValue: 100

Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketBedrockJob:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub bedrockjob-${SubName}
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 7
      OwnershipControls:
        Rules:
          - ObjectOwnership: BucketOwnerEnforced
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      NotificationConfiguration:
        EventBridgeConfiguration:
          EventBridgeEnabled: true
      Tags:
        - Key: Cost
          Value: !Sub bedrockjob-${SubName}

# ------------------------------------------------------------#
# Lambda S3 and SNS Invocation Role for Sfn (IAM)
# ------------------------------------------------------------#
  LambdaS3SnsInSfnRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub bedrockjob-LambdaS3SnsInSfnRole-${SubName}
      Description: This role allows Lambda functions to invoke S3 and SNS.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: !Sub bedrockjob-LambdaS3SnsInSfnPolicy-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Resource:
                  - !Ref SNSTopic
                Action:
                  - sns:Publish
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource:
                  - !Sub "${S3BucketBedrockJob.Arn}/*"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
    DependsOn:
      - S3BucketBedrockJob
      - SNSTopic

# ------------------------------------------------------------#
# Lambda Layer
# ------------------------------------------------------------#
  LambdaLayerPandas223:
    Type: AWS::Lambda::LayerVersion
    Properties:
      LayerName: !Sub bedrockjob-pandas223-${SubName}
      Description: !Sub Pandas 2.2.3 for Python to load in bedrockjob-${SubName}
      CompatibleRuntimes:
        - python3.13
      Content:
        S3Bucket: !Sub ${S3BucketNameSdk}
        S3Key: !Sub ${S3KeyPandasSdk}
      LicenseInfo: BSD-3-Clause

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaSendResult:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub bedrockjob-SendResult-${SubName}
      Description: Lambda Function to send the Step Functions job result.
      Runtime: python3.13
      Timeout: 240
      MemorySize: 256
      Role: !GetAtt LambdaS3SnsInSfnRole.Arn
      Handler: index.lambda_handler
      Layers:
        - !Ref LambdaLayerPandas223
      Tags:
        - Key: Cost
          Value: !Sub bedrockjob-${SubName}
      Code:
        ZipFile: !Sub |
          import json
          import pandas as pd
          import datetime
          import boto3
          s3 = boto3.resource('s3')
          client = boto3.client('s3')
          sns = boto3.client('sns',region_name='${AWS::Region}')
          def datetimeconverter(o):
            if isinstance(o, datetime.datetime):
              return str(o)
          def lambda_handler(event, context):
            try:
              df = pd.DataFrame(event)
              jstdelta = datetime.timedelta(hours=9)
              JST = datetime.timezone(jstdelta, 'JST')
              dtnow = datetime.datetime.now(JST)
              fileNameDateTime = dtnow.strftime('%Y%m%d%H%M%S')
              tempFileName = '/tmp/result_' + fileNameDateTime + '.csv'
              outputBucket = '${S3BucketBedrockJob}'
              outputS3Key = 'output/result_' + fileNameDateTime + '.csv'
              # JSON to CSV
              df.to_csv(tempFileName, encoding='utf-8_sig', index=None)
              # upload CSV into S3
              s3.meta.client.upload_file(tempFileName, outputBucket, outputS3Key)
              # publish SNS
              res = sns.publish(TopicArn='${SNSTopic}',Message='Your Bedrock job has been completed.\nS3Bucket: ' + outputBucket + '\nS3Key: ' + outputS3Key)
            except Exception as e:
              print(e)
              return {
                "result": str(e)
              }
            else:
              return {
                "result": res
              }

# ------------------------------------------------------------#
# State Machine Execution LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupStateMachineBedrockJob:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/vendedlogs/states/bedrockjob-${SubName}
      RetentionInDays: 365
      Tags:
        - Key: Cost
          Value: !Sub bedrockjob-${SubName}

# ------------------------------------------------------------#
# State Machine Execution Role (IAM)
# ------------------------------------------------------------#
  StateMachineExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub bedrockjob-StateMachineExecutionRole-${SubName}
      Description: This role allows State Machines to call specified AWS resources.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                states.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: !Sub bedrockjob-StateMachineExecutionPolicy-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                Resource:
                  - !Sub "${S3BucketBedrockJob.Arn}/*"
              - Effect: Allow
                Action: states:StartExecution
                Resource: "*"
              - Effect: Allow
                Action: states:DescribeExecution
                Resource: "*"
              - Effect: Allow
                Action:
                  - bedrock:InvokeModel*
                  - bedrock:CreateInferenceProfile
                Resource:
                  - arn:aws:bedrock:*::foundation-model/*
                  - arn:aws:bedrock:*:*:inference-profile/*
                  - arn:aws:bedrock:*:*:application-inference-profile/*
              - Effect: Allow
                Action:
                  - bedrock:GetInferenceProfile
                  - bedrock:ListInferenceProfiles
                  - bedrock:DeleteInferenceProfile
                  - bedrock:TagResource
                  - bedrock:UntagResource
                  - bedrock:ListTagsForResource
                Resource:
                  - arn:aws:bedrock:*:*:inference-profile/*
                  - arn:aws:bedrock:*:*:application-inference-profile/*
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess
    DependsOn:
      - S3BucketBedrockJob

# ------------------------------------------------------------#
# Step Functions
# ------------------------------------------------------------#
  StateMachineBedrockJob:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub bedrockjob-${SubName}
      StateMachineType: STANDARD
      DefinitionSubstitutions:
        DSLambdaSendResultArn: !GetAtt LambdaSendResult.Arn
        DSBedrockClaudeModelId: !Ref BedrockClaudeModelId
        DSBedrockOptionTopK: !Ref BedrockOptionTopK
        DSBedrockOptionTopP: !Ref BedrockOptionTopP
        DSBedrockOptionTemperature: !Ref BedrockOptionTemperature
        DSBedrockOptionMaxTokens: !Ref BedrockOptionMaxTokens
      DefinitionString: |-
        {
          "Comment": "The state machine to invoke Bedrock job.",
          "QueryLanguage": "JSONata",
          "StartAt": "Map",
          "States": {
            "Map": {
              "Type": "Map",
              "ItemProcessor": {
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "STANDARD"
                },
                "StartAt": "InvokeModel",
                "States": {
                  "InvokeModel": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::aws-sdk:bedrockruntime:invokeModel",
                    "Next": "Success",
                    "Output": {
                      "id": "{% $states.input.id %}",
                      "data": "{% $states.input.data %}",
                      "result": "{% $parse($states.result.Body).content[0].text %}",
                      "status": "succeeded"
                    },
                    "TimeoutSeconds": 600,
                    "Catch": [
                      {
                        "ErrorEquals": [
                          "States.ALL"
                        ],
                        "Comment": "Error",
                        "Next": "Error",
                        "Output": {
                          "id": "{% $states.input.id %}",
                          "data": "{% $states.input.data %}",
                          "result": "{% $states.errorOutput.Cause %}",
                          "status": "failed"
                        }
                      }
                    ],
                    "QueryLanguage": "JSONata",
                    "Arguments": {
                      "ModelId": "${DSBedrockClaudeModelId}",
                      "ContentType": "application/json",
                      "Accept": "application/json",
                      "Trace": "DISABLED",
                      "PerformanceConfigLatency": "standard",
                      "Body": {
                        "anthropic_version": "bedrock-2023-05-31",
                        "max_tokens": ${DSBedrockOptionMaxTokens},
                        "top_k": ${DSBedrockOptionTopK},
                        "stop_sequences": [],
                        "temperature": ${DSBedrockOptionTemperature},
                        "top_p": ${DSBedrockOptionTopP},
                        "messages": [
                          {
                            "role": "user",
                            "content": [
                              {
                                "type": "text",
                                "text": "{% 'Please translate the following text into Japanese and summarize it in no more than 300 characters.\n' & $states.input.data %}"
                              }
                            ]
                          }
                        ]
                      }
                    }
                  },
                  "Error": {
                    "Type": "Pass",
                    "End": true,
                    "QueryLanguage": "JSONata"
                  },
                  "Success": {
                    "Type": "Pass",
                    "End": true,
                    "QueryLanguage": "JSONata"
                  }
                }
              },
              "Label": "Map",
              "MaxConcurrency": 10,
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                  "InputType": "CSV",
                  "CSVHeaderLocation": "FIRST_ROW",
                  "CSVDelimiter": "COMMA"
                },
                "Arguments": {
                  "Bucket": "{% $states.input.bucket %}",
                  "Key": "{% $states.input.key %}"
                }
              },
              "ItemSelector": {
                "id": "{% $states.context.Map.Item.Value.id %}",
                "data": "{% $states.context.Map.Item.Value.data %}"
              },
              "QueryLanguage": "JSONata",
              "ToleratedFailureCount": 5,
              "Next": "SendResult"
            },
            "SendResult": {
              "QueryLanguage": "JSONata",
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Output": "{% $states.result.Payload %}",
              "Arguments": {
                "FunctionName": "${DSLambdaSendResultArn}",
                "Payload": "{% $states.input %}"
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException",
                    "Lambda.TooManyRequestsException"
                  ],
                  "IntervalSeconds": 1,
                  "MaxAttempts": 3,
                  "BackoffRate": 2,
                  "JitterStrategy": "FULL"
                }
              ],
              "End": true,
              "Comment": "Invoke the Lambda function to send the result of this state machine via email.",
              "TimeoutSeconds": 300
            }
          },
          "TimeoutSeconds": 7200
        }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt LogGroupStateMachineBedrockJob.Arn
        IncludeExecutionData: true
        Level: ERROR
      RoleArn: !GetAtt StateMachineExecutionRole.Arn
      TracingConfiguration:
        Enabled: true
      Tags:
        - Key: Cost
          Value: !Sub bedrockjob-${SubName}
    DependsOn:
      - LogGroupStateMachineBedrockJob
      - StateMachineExecutionRole

# ------------------------------------------------------------#
# EventBridge Role (IAM)
# ------------------------------------------------------------#
  EventBridgeRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub bedrockjob-EventBridgeRole-${SubName}
      Description: This role allows EventBridge to call the Step Functions state machine.
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: !Sub bedrockjob-EventBridgePolicy-${SubName}
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: states:StartExecution
                Resource: !Ref StateMachineBedrockJob
    DependsOn:
      - StateMachineBedrockJob

# ------------------------------------------------------------#
# EventBridge Rule
# ------------------------------------------------------------#
  EventBridgeRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub bedrockjob-StartStateMachine-${SubName}
      Description: This rule starts the Step Functions state machine when the CSV data is put in the S3 bucket.
      EventBusName: default
      State: ENABLED
      EventPattern: !Sub |-
        {
          "source": ["aws.s3"],
          "detail-type": ["Object Created"],
          "detail": {
            "bucket": {
              "name": ["${S3BucketBedrockJob}"]
            },
            "object": {
              "key": [{
                "wildcard": "input/*.csv"
              }]
            }
          }
        }
      Targets:
        - Arn: !Ref StateMachineBedrockJob
          RoleArn: !GetAtt EventBridgeRole.Arn
          Id: !Sub bedrockjob-StartStateMachine-${SubName}
          InputTransformer:
            InputPathsMap:
              bucket: "$.detail.bucket.name"
              key: "$.detail.object.key"
            InputTemplate: |
              {
                "bucket": ,
                "key": 
              }
          RetryPolicy:
            MaximumRetryAttempts: 1
            MaximumEventAgeInSeconds: 3600
    DependsOn:
      - S3BucketBedrockJob
      - EventBridgeRole

# ------------------------------------------------------------#
# SNS Topic
# ------------------------------------------------------------#
  SNSTopic:
    Type: AWS::SNS::Topic
    Properties:
      TracingConfig: PassThrough
      DisplayName: !Sub bedrockjob-notification-${SubName}
      FifoTopic: false
      Tags:
        - Key: Cost
          Value: !Sub bedrockjob-${SubName}

まとめ

いかがでしたでしょうか?

プロンプトの調整、インプットデータのサニタイズ、センシティブ情報のマスキングなど課題はありますが、個人利用であれば運用でカバーできると思っています。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャを駆使して社内クラウド人材育成アプリとコンテンツづくりに勤しんでいます。React で SPA を書き始めたら快適すぎて、他の言語には戻れなくなりました。サーバーレス & React 仲間を増やしたいです。AWSは好きですが、それよりもフロントエンド開発の方が好きでして、バックエンド構築を簡単にしてくれたAWSには感謝の気持ちの方が強いです。
取得資格:AWS 認定は13資格、ITサービスマネージャ、ITIL v3 Expert 等
2020 - 2024 Japan AWS Top Engineer 受賞
2022 - 2024 AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS Amplify / AWS AppSync / Amazon Cognito / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする

クラウドに強いによるエンジニアブログです。

SCSKクラウドサービス(AWS)は、企業価値の向上につながるAWS 導入を全面支援するオールインワンサービスです。AWS最上位パートナーとして、多種多様な業界のシステム構築実績を持つSCSKが、お客様のDX推進を強力にサポートします。

AI・MLAWSクラウドソリューション
シェアする
タイトルとURLをコピーしました