Amazon Bedrock Knowledge Bases で構造化データ(CSV)を使用した RAG をつくる -実装編-

こんにちは、広野です。

以下の記事の続編記事です。RAG で CSV データからの検索精度向上を目指してみました。本記事は実装編で、主にバックエンドの設定について記載しています。UI や実際の動作については続編記事の UI 編で紹介します。

 

やりたいこと (再掲)

以下のような架空のヘルプデスク問い合わせ履歴データ (CSV) を用意しました。

ヘルプデスク担当者が新たな問い合わせを受けたときに、似たような過去の対応履歴を引き当てられるようにしたい、というのが目的です。

  • LLM に、今届いた新しい問い合わせに対する回答案を提案させたい。
  • 回答案を生成するために、自然言語で書かれた問い合わせ内容と回答内容から、意味的に近いデータを引き当てたい。
  • カテゴリで検索対象をフィルタしたい。その方が精度が上がるケースがあると考えられる。
  • LLM が回答案を提案するときには、参考にした過去対応履歴がどの問合せ番号のものか、提示させたい。その問合せ番号をキーに、生の対応履歴データを参照できるようにしたい。

以下の前提があります。

  • データソースとなる CSV ファイルは 1つのみ。過去の対応履歴は 1 つの CSV ファイルに収まっているということ。
  • つまり、データの1行が1件の問い合わせであり、その項目間には意味的なつながりがある。

まあ、ごくごく一般的なニーズではないかと思います。

 

関連記事

以前、私が公開した Amazon Bedrock Knowledge Bases や Amazon S3 Vectors を使用した RAG 基盤の記事です。今回はこの基盤のチャンキング戦略をカスタマイズして臨みました。

 

本記事の言及範囲

RAG そのものや、RAG 基盤については本記事では語りません。

以下のアーキテクチャ図の中の、赤枠の部分に着目します。ベクトルデータを格納するまでのデータソースのカスタムチャンキングと、それを実装した Amazon Bedrock Knowledge Bases にどう問い合わせするか、です。

 

アーキテクチャ (再掲)

前回記事で紹介した、カスタムチャンキングを実装するアーキテクチャです。

実装

Amazon Bedrock Knowledge Bases

カスタムチャンキングの一連の処理は、Amazon Bedrock Knowledge Bases で行われます。各種設定の全体像は AWS マネジメントコンソールの画面で一望できます。

  • カスタムチャンキングを AWS Lambda 関数に処理させるので、当然 Lambda 関数が必要です。(内容は後述)
  • Lambda 関数が出力するチャンク分割後のデータ (中間成果物の JSON) を保存する S3 バケットが必要です。これはオリジナルのドキュメント配置用 S3 バケットとは別にする必要があります。
  • チャンキング戦略は NO にします。NO にすると、オリジナルのドキュメントの内容をそのまま Lambda 関数に渡してくれます。別の戦略を選択すると、その戦略によってチャンク分割されたデータごとに Lambda 関数を実行してしまうので、期待するカスタムチャンク分割ができなくなります。
  • 解析戦略は Default にします。
  • チャンキング戦略と解析戦略は、データソース作成後には変更できません。変更したいときは作り直しになります。
  • データ削除ポリシーは DELETE にすることをお勧めします。同期をかけたときに過去のデータを残すかどうかの設定で、残してしまうと古い情報が検索に引っ掛かってしまいます。

AWS Lambda 関数 (カスタムチャンキング)

カスタムチャンキングする Lambda 関数コード (Python) です。

冒頭に紹介した CSV を、Amazon Bedrock Knowledge Bases が理解できるフォーマットの JSON データに変換します。内部的には 1 チャンクごとに自然言語で検索させたいデータとメタデータに分けて出力します。

Lambda レイヤーは不要です。モジュールは Lambda 標準でサポートしているものだけで実装可能でした。 

インプットとなる S3 バケット内の CSV データのメタデータは、Amazon Bedrock Knowledge Bases がこの Lambda 関数を呼び出すときに渡してくれるので、こちらが特に気にすることはありません。受け取ったバケット名、キーから CSV データを取得しに行きます。出力先となる S3 バケットやキー名も Amazon Bedrock Knowledge Bases から渡されますのでこの関数内でベタ書きすることはありません。

データフォーマットの変換処理の内容的には、そんなに難しいことはしていません。大事なのは出力フォーマットです。

import json
import csv
import boto3
from io import StringIO
s3 = boto3.client('s3')
def lambda_handler(event, context):
  try:
    bucket_name = event.get('bucketName')
    input_files = event.get('inputFiles', [])
    output_files = []
    for file_info in input_files:
      original_file_location = file_info.get('originalFileLocation', {})
      s3_location = original_file_location.get('s3Location', {})
      original_uri = s3_location.get('uri', '')
      content_batches = file_info.get('contentBatches', [])
      output_batches = []
      for batch in content_batches:
        input_key = batch.get('key')
        # Read input file from S3
        response = s3.get_object(Bucket=bucket_name, Key=input_key)
        input_content = json.loads(response['Body'].read().decode('utf-8'))
        # Extract CSV content
        csv_content = input_content['fileContents'][0]['contentBody']
        # Remove BOM if present (input may have BOM)
        if csv_content.startswith('\ufeff'):
          csv_content = csv_content[1:]
        csv_reader = csv.DictReader(StringIO(csv_content))
        # Process each row as a chunk
        file_contents = []
        for row in csv_reader:
          content_body = f"問合せ番号: {row.get('問合せ番号', '')}\n商品番号: {row.get('商品番号', '')}\n\n問合せ内容:\n{row.get('問合せ内容', '')}\n\n回答内容:\n{row.get('回答内容', '')}"
          content_metadata = {
            "問合せ番号": row.get('問合せ番号', ''),
            "販売形態": row.get('販売形態', ''),
            "受付日時": row.get('受付日時', ''),
            "完了日時": row.get('完了日時', ''),
            "商品番号": row.get('商品番号', ''),
            "カテゴリ": row.get('カテゴリ', ''),
            "ステータス": row.get('ステータス', '')
          }
          file_contents.append({
            "contentBody": content_body,
            "contentType": "TEXT",
            "contentMetadata": content_metadata
          })
        # Write output file to S3
        output_key = input_key.replace('.json', '_transformed.json')
        output_data = {"fileContents": file_contents}
        s3.put_object(
          Bucket=bucket_name,
          Key=output_key,
          Body=json.dumps(output_data, ensure_ascii=False),
          ContentType='application/json'
        )
        output_batches.append({"key": output_key})
      output_files.append({
        "originalFileLocation": original_file_location,
        "fileMetadata": file_info.get('fileMetadata', {}),
        "contentBatches": output_batches
      })
    return {"outputFiles": output_files}
  except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()
    raise

チャンク分割された後のデータ構造 (再掲)

Lambda 関数がチャンク分割した後のデータ構造 (上のアーキテクチャ図では 5番の処理によって作成されるもの) は、以下のようになります。

{
  "fileContents": [
    {
      "contentBody": "問合せ番号: AB01234569\n商品番号: SH001-01BL\n\n問合せ内容:\n[問合せ内容の文章]\n\n回答内容:\n[回答内容の文章]",
      "contentType": "TEXT",
      "contentMetadata": {
        "問合せ番号": "AB01234569",
        "販売形態": "代理店",
        "受付日時": "2026/2/23 12:59",
        "完了日時": "2026/2/23 13:39",
        "商品番号": "SH001-01BL",
        "カテゴリ": "家庭用収納棚",
        "ステータス": "完了"
      }
    },
    {
      "contentBody": "問合せ番号: AB01234573\n商品番号: TB19541\n\n問合せ内容:\n[問合せ内容の文章]\n\n回答内容:\n[回答内容の文章]",
      "contentType": "TEXT",
      "contentMetadata": {
        "問合せ番号": "AB01234573",
        "販売形態": "直販",
        "受付日時": "2026/2/24 9:15",
        "完了日時": "2026/2/24 14:30",
        "商品番号": "TB19541",
        "カテゴリ": "家庭用テーブル",
        "ステータス": "完了"
      }
    }
  ]
}
  • fileContents 配列の各要素が 1 チャンク(CSV の 1 行に相当)
  • contentBody がベクトル化・検索対象にできるテキスト
  • contentMetadata が引用表示やフィルタリングに使用されるメタデータ ※contentBody ももちろん引用可能

ここまで実装できると、Amazon Bedrock Knowledge Bases に対して contentBody に書かれた内容に対して自然言語で検索できたり、検索時にメタデータの項目単位でフィルタリングできるようになります。

メタデータフィルタリングについて

Amazon Bedrock Knowledge Bases ができてしまえば、自然言語による問い合わせは RetrieveAndGenerate API を使用して極論プロンプトさえ送ればいいので、難しいことはありません。しかし、メタデータフィルタリング機能を追加すると、設計次第ではコードが複雑になります。

ここで、メタデータフィルタリングについて仕様を説明します。

  • メタデータ条件にマッチするチャンクのみにベクトル検索を行うため、不要な結果を排除することができ、検索精度の向上が期待できる。
  • メタデータ項目に対してかけられる文字列検索条件は、完全一致や、指定した文字列を含む、などいろいろできる。Amazon S3 Vectors でサポートしている条件は以下公式ドキュメントを参照。
  • メタデータ項目は、複数項目を And や Or で組み合わせることが可能。

つまり、かなり細かいフィルタリングができるということです。

以下にメタデータフィルタリングを設定するときの Lambda 関数コードの一部を紹介します。

  • 単一のメタデータ条件
    「販売形態が代理店で完全一致」でフィルタリングしたいとき
retrievalConfiguration={
  "vectorSearchConfiguration": {
    "filter": {
      "equals": { "key": "販売形態", "value": "代理店" }
    }
  }
}
  • 複数のメタデータ条件
    「販売形態が代理店で完全一致」かつ「カテゴリが家庭用コタツで完全一致」でフィルタリングしたいとき
retrievalConfiguration={
  "vectorSearchConfiguration": {
    "filter": {
      "andAll": [
        { "equals": { "key": "販売形態", "value": "代理店" } },
        { "equals": { "key": "カテゴリ", "value": "家庭用コタツ" } }
      ]
    }
  }
}

見てもらえるとわかると思いますが、複数のメタデータ条件では 2 つの equals 条件を andAll で囲んでいると思います。上記はまだシンプルですが、複数の条件が重なれば重なるほど、このような階層構造をさらにコーディングしなければなりません。Or 条件も可能とすると、さらに複雑になりそうです。

今回のブログ記事では、簡略化のため上記のように「販売形態」と「カテゴリ」の 2 つのメタデータのみフィルタリング可能なように設計します。

  • フィルタ対象項目
    • 販売形態(2種類: 直販, 代理店)
    • カテゴリ(10種類: 家庭用コタツ, 家庭用テーブル, 家庭用収納棚, 家庭用チェア, 家庭用デスク, 業務用ラック, 業務用キャビネット, 業務用会議テーブル, 業務用チェア, 業務用デスク)
  • フィルタ条件選択時の動作
    • 両方未選択 → フィルタリングなし(全件検索)
    • 片方のみ選択 → 選択したキーワードに完全一致で単一条件検索
    • 両方選択 → AND 条件検索、それぞれ選択したキーワードに完全一致とする

AWS Lambda 関数 (ナレッジベースへの問い合わせ)

前述のメタデータフィルタリング機能を実装した、Amazon Bedrock Knowledge Bases の RetrieveAndGenerate API をコールする AWS Lambda 関数コードは以下のようになります。

Amazon API Gateway REST API から呼び出され、AWS AppSync Events にストリームレスポンスを返す構成です。コメントで メタデータフィルタリングの組み立て と書いてある部分が先ほど説明した部分の実装です。

インプットとして

"filters": [
  {"販売形態": "代理店"},
  {"カテゴリ": "家庭用コタツ"}
]

のようなメタデータフィルタリングパラメータを受け取る想定です。条件が2つあれば andAll で囲う処理を実装しています。

import os
import json
import boto3
import urllib.request
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
# common objects and valiables
session = boto3.session.Session()
bedrock_agent = boto3.client('bedrock-agent-runtime')
endpoint = os.environ['APPSYNC_API_ENDPOINT']
model_arn = os.environ['MODEL_ARN']
knowledge_base_id = os.environ['KNOWLEDGE_BASE_ID']
region = os.environ['REGION']
service = 'appsync'
headers = {'Content-Type': 'application/json'}
# AppSync publish message function
def publish_appsync_message(sub, appsync_session_id, payload, credentials):
  body = json.dumps({
    "channel": f"rag-stream-response/{sub}/{appsync_session_id}",
    "events": [ json.dumps(payload) ]
  }).encode("utf-8")
  aws_request = AWSRequest(
    method='POST',
    url=endpoint,
    data=body,
    headers=headers
  )
  SigV4Auth(credentials, service, region).add_auth(aws_request)
  req = urllib.request.Request(
    url=endpoint,
    data=aws_request.body,
    method='POST'
  )
  for k, v in aws_request.headers.items():
    req.add_header(k, v)
  with urllib.request.urlopen(req) as res:
    return res.read().decode('utf-8')
# handler
def lambda_handler(event, context):
  try:
    credentials = session.get_credentials().get_frozen_credentials()
    # API Gateway からのインプットを取得
    prompt = event['body']['prompt']
    appsync_session_id = event['body']['appsyncSessionId']
    bedrock_session_id = event['body'].get('bedrockSessionId')
    sub = event['sub']
    # Amazon Bedrock Knowledge Bases への問い合わせパラメータ作成
    request = {
      "input": {
        "text": prompt
      },
      "retrieveAndGenerateConfiguration": {
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
          "knowledgeBaseId": knowledge_base_id,
          "modelArn": model_arn,
          "generationConfiguration": {
            "inferenceConfig": {
              "textInferenceConfig": {
                "maxTokens": 10000,
                "temperature": 0.5,
                "topP": 0.9
              }
            },
            "performanceConfig": {
              "latency": "standard"
            },
            "promptTemplate": {
              "textPromptTemplate": (
                "あなたは優秀なヘルプデスクアシスタントです。ヘルプデスク担当者からの質問に対して、必ず日本語で回答してください。"
                "適切な回答が見つからない場合は、正直に「分かりません」と回答してください。\n\n"
                "検索結果:\n$search_results$\n\n"
                "回答指示: $output_format_instructions$"
              )
            }
          }
        }
      }
    }
    # メタデータフィルタ条件の組み立て
    filters = event['body'].get('filters', [])
    if filters:
      conditions = [{"equals": {"key": k, "value": v}} for f in filters for k, v in f.items()]
      if len(conditions) == 1:
        retrieval_filter = conditions[0]
      else:
        retrieval_filter = {"andAll": conditions}
      request["retrieveAndGenerateConfiguration"]["knowledgeBaseConfiguration"]["retrievalConfiguration"] = {
        "vectorSearchConfiguration": {
          "filter": retrieval_filter
        }
      }
    # Bedrock sessionId は存在するときのみ渡す (継続会話時のみ)
    if bedrock_session_id:
      request["sessionId"] = bedrock_session_id
    # Bedrock Knowledge Bases への問い合わせ
    response = bedrock_agent.retrieve_and_generate_stream(**request)
    # Bedrock sessionId
    if "sessionId" in response:
      publish_appsync_message(
        sub,
        appsync_session_id,
        {
          "type": "bedrock_session",
          "bedrock_session_id": response["sessionId"]
        },
        credentials
      )
    for chunk in response["stream"]:
      payload = None
      # Generated text
      if "output" in chunk and "text" in chunk["output"]:
        payload = {
          "type": "text",
          "message": chunk["output"]["text"]
        }
        print({"t": chunk["output"]["text"]})
      # Citation
      elif "citation" in chunk:
        payload = {
          "type": "citation",
          "citation": chunk['citation']['retrievedReferences']
        }
        print({"c": chunk['citation']['retrievedReferences']})
      # Continue
      if not payload:
        continue
      # Publish AppSync
      publish_appsync_message(sub, appsync_session_id, payload, credentials)
  except Exception as e:
    print(str(e))
    raise

AWS CloudFormation テンプレート

Amazon API Gateway REST API や AWS AppSync Events API など、関連するリソースを一式デプロイするテンプレートを掲載します。これ単体では動かないと思いますので、参考までに。ここまで実装できると、アプリ UI から API をコールすることでチャットボット UI を作れます。

AWSTemplateFormatVersion: 2010-09-09
Description: The CloudFormation template that creates a S3 vector bucket and index as a RAG Knowledge base.
# ------------------------------------------------------------#
# Input Parameters
# ------------------------------------------------------------#
Parameters:
  SystemName:
    Type: String
    Description: System name. use lower case only. (e.g. example)
    Default: example
    MaxLength: 10
    MinLength: 1

  SubName:
    Type: String
    Description: System sub name. use lower case only. (e.g. prod or dev)
    Default: dev
    MaxLength: 10
    MinLength: 1

  DomainName:
    Type: String
    Description: Domain name for URL. xxxxx.xxx (e.g. example.com)
    Default: example.com
    AllowedPattern: "[^\\s@]+\\.[^\\s@]+"

  SubDomainName:
    Type: String
    Description: Sub domain name for URL. (e.g. example-prod or example-dev)
    Default: example-dev
    MaxLength: 20
    MinLength: 1

  Dimension:
    Type: Number
    Description: The dimensions of the vectors to be inserted into the vector index. The value depends on the embedding model.
    Default: 1024
    MaxValue: 4096
    MinValue: 1

  EmbeddingModelId:
    Type: String
    Description: The embedding model ID.
    Default: amazon.titan-embed-text-v2:0
    MaxLength: 100
    MinLength: 1

  LlmModelId:
    Type: String
    Description: The LLM model ID for the Knowledge base.
    Default: global.amazon.nova-2-lite-v1:0
    MaxLength: 100
    MinLength: 1

Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label:
          default: "General Configuration"
        Parameters:
          - SystemName
          - SubName
      - Label:
          default: "Domain Configuration"
        Parameters:
          - DomainName
          - SubDomainName
      - Label:
          default: "Embedding Configuration"
        Parameters:
          - Dimension
          - EmbeddingModelId
      - Label:
          default: "Knowledge Base Configuration"
        Parameters:
          - LlmModelId

Resources:
# ------------------------------------------------------------#
# S3
# ------------------------------------------------------------#
  S3BucketKbDatasource:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${SystemName}-${SubName}-kbdatasource
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      CorsConfiguration:
        CorsRules:
          - AllowedHeaders:
              - "*"
            AllowedMethods:
              - "GET"
              - "HEAD"
              - "PUT"
              - "POST"
              - "DELETE"
            AllowedOrigins:
              - !Sub https://${SubDomainName}.${DomainName}
            ExposedHeaders:
              - last-modified
              - content-type
              - content-length
              - etag
              - x-amz-version-id
              - x-amz-request-id
              - x-amz-id-2
              - x-amz-cf-id
              - x-amz-storage-class
              - date
              - access-control-expose-headers
            MaxAge: 3000
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

  S3BucketPolicyKbDatasource:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref S3BucketKbDatasource
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Deny
            Principal: "*"
            Action: "s3:*"
            Resource:
              - !Sub "arn:aws:s3:::${S3BucketKbDatasource}"
              - !Sub "arn:aws:s3:::${S3BucketKbDatasource}/*"
            Condition:
              Bool:
                "aws:SecureTransport": "false"
    DependsOn:
      - S3BucketKbDatasource

  S3VectorBucket:
    Type: AWS::S3Vectors::VectorBucket
    Properties:
      VectorBucketName: !Sub ${SystemName}-${SubName}-vectordb

  S3BucketKbIntermediate:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${SystemName}-${SubName}-kbintermediate
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

  S3BucketPolicyKbIntermediate:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref S3BucketKbIntermediate
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Deny
            Principal: "*"
            Action: "s3:*"
            Resource:
              - !Sub "arn:aws:s3:::${S3BucketKbIntermediate}"
              - !Sub "arn:aws:s3:::${S3BucketKbIntermediate}/*"
            Condition:
              Bool:
                "aws:SecureTransport": "false"
    DependsOn:
      - S3BucketKbIntermediate

  S3VectorBucketIndex:
    Type: AWS::S3Vectors::Index
    Properties:
      IndexName: !Sub ${SystemName}-${SubName}-vectordb-index
      DataType: float32
      Dimension: !Ref Dimension
      DistanceMetric: cosine
      VectorBucketArn: !GetAtt S3VectorBucket.VectorBucketArn
      MetadataConfiguration:
        NonFilterableMetadataKeys:
          - AMAZON_BEDROCK_TEXT
          - AMAZON_BEDROCK_METADATA
    DependsOn:
      - S3VectorBucket

# ------------------------------------------------------------#
# Bedrock Knowledge Base
# ------------------------------------------------------------#
  BedrockKnowledgeBase:
    Type: AWS::Bedrock::KnowledgeBase
    Properties:
      Name: !Sub ${SystemName}-${SubName}-kb
      Description: !Sub RAG Knowledge Base for ${SystemName}-${SubName}
      KnowledgeBaseConfiguration:
        Type: VECTOR
        VectorKnowledgeBaseConfiguration:
          EmbeddingModelArn: !Sub arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId}
      RoleArn: !GetAtt IAMRoleBedrockKb.Arn
      StorageConfiguration:
        Type: S3_VECTORS
        S3VectorsConfiguration:
          IndexArn: !GetAtt S3VectorBucketIndex.IndexArn
          VectorBucketArn: !GetAtt S3VectorBucket.VectorBucketArn
      Tags:
        Cost: !Sub ${SystemName}-${SubName}
    DependsOn:
      - IAMRoleBedrockKb

  BedrockKnowledgeBaseDataSource:
    Type: AWS::Bedrock::DataSource
    Properties:
      Name: !Sub ${SystemName}-${SubName}-kb-datasource
      Description: !Sub RAG Knowledge Base Data Source for ${SystemName}-${SubName}
      KnowledgeBaseId: !Ref BedrockKnowledgeBase
      DataDeletionPolicy: DELETE
      DataSourceConfiguration:
        Type: S3
        S3Configuration:
          BucketArn: !GetAtt S3BucketKbDatasource.Arn
      VectorIngestionConfiguration:
        ChunkingConfiguration:
          ChunkingStrategy: NONE
        CustomTransformationConfiguration:
          Transformations:
            - TransformationFunction:
                TransformationLambdaConfiguration:
                  LambdaArn: !GetAtt LambdaCsvChunker.Arn
              StepToApply: POST_CHUNKING
          IntermediateStorage:
            S3Location:
              URI: !Sub s3://${S3BucketKbIntermediate}/
    DependsOn:
      - S3BucketKbDatasource
      - BedrockKnowledgeBase
      - S3BucketKbIntermediate

# ------------------------------------------------------------#
# AppSync Events
# ------------------------------------------------------------#
  AppSyncChannelNamespaceRagSR:
    Type: AWS::AppSync::ChannelNamespace
    Properties:
      Name: rag-stream-response
      ApiId:
        Fn::ImportValue:
          !Sub AppSyncApiId-${SystemName}-${SubName}
      CodeHandlers: |
        import { util } from '@aws-appsync/utils';
        export function onSubscribe(ctx) {
          const requested = ctx.info.channel.path;
          if (!requested.startsWith(`/rag-stream-response/${ctx.identity.sub}`)) {
            util.unauthorized();
          }
        }
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

# ------------------------------------------------------------#
# API Gateway REST API
# ------------------------------------------------------------#
  RestApiRagSR:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: !Sub rag-sr-${SystemName}-${SubName}
      Description: !Sub REST API to call Lambda rag-stream-response-${SystemName}-${SubName}
      EndpointConfiguration:
        Types:
          - REGIONAL
        IpAddressType: dualstack
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

  RestApiDeploymentRagSR:
    Type: AWS::ApiGateway::Deployment
    Properties:
      RestApiId: !Ref RestApiRagSR
    DependsOn:
      - RestApiMethodRagSRPost
      - RestApiMethodRagSROptions

  RestApiStageRagSR:
    Type: AWS::ApiGateway::Stage
    Properties:
      StageName: prod
      Description: production stage
      RestApiId: !Ref RestApiRagSR
      DeploymentId: !Ref RestApiDeploymentRagSR
      MethodSettings:
        - ResourcePath: "/*"
          HttpMethod: "*"
          LoggingLevel: INFO
          DataTraceEnabled : true
      TracingEnabled: false
      AccessLogSetting:
        DestinationArn: !GetAtt LogGroupRestApiRagSR.Arn
        Format: '{"requestId":"$context.requestId","status":"$context.status","sub":"$context.authorizer.claims.sub","email":"$context.authorizer.claims.email","resourcePath":"$context.resourcePath","requestTime":"$context.requestTime","sourceIp":"$context.identity.sourceIp","userAgent":"$context.identity.userAgent","apigatewayError":"$context.error.message","authorizerError":"$context.authorizer.error","integrationError":"$context.integration.error"}'
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

  RestApiAuthorizerRagSR:
    Type: AWS::ApiGateway::Authorizer
    Properties:
      Name: !Sub restapi-authorizer-ragsr-${SystemName}-${SubName}
      RestApiId: !Ref RestApiRagSR
      Type: COGNITO_USER_POOLS
      ProviderARNs:
        - Fn::ImportValue:
            !Sub CognitoArn-${SystemName}-${SubName}
      AuthorizerResultTtlInSeconds: 300
      IdentitySource: method.request.header.Authorization

  RestApiResourceRagSR:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId: !Ref RestApiRagSR
      ParentId: !GetAtt RestApiRagSR.RootResourceId
      PathPart: ragsr

  RestApiMethodRagSRPost:
    Type: AWS::ApiGateway::Method
    Properties:
      RestApiId: !Ref RestApiRagSR
      ResourceId: !Ref RestApiResourceRagSR
      HttpMethod: POST
      AuthorizationType: COGNITO_USER_POOLS
      AuthorizerId: !Ref RestApiAuthorizerRagSR
      Integration:
        Type: AWS
        IntegrationHttpMethod: POST
        Credentials:
          Fn::ImportValue:
            !Sub ApigLambdaInvocationRoleArn-${SystemName}-${SubName}
        Uri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaRagSR.Arn}/invocations"
        PassthroughBehavior: NEVER
        RequestTemplates:
          application/json: |
            {
              "body": $input.json('$'),
              "sub": "$context.authorizer.claims.sub"
            }
        RequestParameters:
          integration.request.header.X-Amz-Invocation-Type: "'Event'"
        IntegrationResponses:
          - ResponseParameters:
              method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Cache-Control'"
              method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'"
              method.response.header.Access-Control-Allow-Origin: !Sub "'https://${SubDomainName}.${DomainName}'"
            ResponseTemplates:
              application/json: ''
            StatusCode: '202'
      MethodResponses:
        - StatusCode: '202'
          ResponseModels:
            application/json: Empty
          ResponseParameters:
            method.response.header.Access-Control-Allow-Origin: true
            method.response.header.Access-Control-Allow-Headers: true
            method.response.header.Access-Control-Allow-Methods: true

  RestApiMethodRagSROptions:
    Type: AWS::ApiGateway::Method
    Properties:
      RestApiId: !Ref RestApiRagSR
      ResourceId: !Ref RestApiResourceRagSR
      HttpMethod: OPTIONS
      AuthorizationType: NONE
      Integration:
        Type: MOCK
        Credentials:
          Fn::ImportValue:
            !Sub ApigLambdaInvocationRoleArn-${SystemName}-${SubName}
        IntegrationResponses:
          - ResponseParameters:
              method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Cache-Control'"
              method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'"
              method.response.header.Access-Control-Allow-Origin: !Sub "'https://${SubDomainName}.${DomainName}'"
            ResponseTemplates:
              application/json: ''
            StatusCode: '200'
        PassthroughBehavior: WHEN_NO_MATCH
        RequestTemplates:
          application/json: '{"statusCode": 200}'
      MethodResponses:
        - ResponseModels:
            application/json: Empty
          ResponseParameters:
            method.response.header.Access-Control-Allow-Headers: true
            method.response.header.Access-Control-Allow-Methods: true
            method.response.header.Access-Control-Allow-Origin: true
          StatusCode: '200'

# ------------------------------------------------------------#
# API Gateway LogGroup (CloudWatch Logs)
# ------------------------------------------------------------#
  LogGroupRestApiRagSR:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/apigateway/${RestApiRagSR}
      RetentionInDays: 365
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
  LambdaRagSR:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub rag-sr-${SystemName}-${SubName}
      Description: !Sub Lambda Function to invoke Bedrock Knowledge Bases for ${SystemName}-${SubName}
      Architectures:
        - x86_64
      Runtime: python3.14
      Timeout: 300
      MemorySize: 128
      Environment:
        Variables:
          APPSYNC_API_ENDPOINT:
            Fn::ImportValue:
              !Sub AppSyncEventsEndpointHttp-${SystemName}-${SubName}
          MODEL_ARN: !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:inference-profile/${LlmModelId}"
          KNOWLEDGE_BASE_ID: !Ref BedrockKnowledgeBase
          REGION: !Ref AWS::Region
      Role: !GetAtt LambdaBedrockKbRole.Arn
      Handler: index.lambda_handler
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}
      Code:
        ZipFile: |
          import os
          import json
          import boto3
          import urllib.request
          from botocore.auth import SigV4Auth
          from botocore.awsrequest import AWSRequest
          # common objects and valiables
          session = boto3.session.Session()
          bedrock_agent = boto3.client('bedrock-agent-runtime')
          endpoint = os.environ['APPSYNC_API_ENDPOINT']
          model_arn = os.environ['MODEL_ARN']
          knowledge_base_id = os.environ['KNOWLEDGE_BASE_ID']
          region = os.environ['REGION']
          service = 'appsync'
          headers = {'Content-Type': 'application/json'}
          # AppSync publish message function
          def publish_appsync_message(sub, appsync_session_id, payload, credentials):
            body = json.dumps({
              "channel": f"rag-stream-response/{sub}/{appsync_session_id}",
              "events": [ json.dumps(payload) ]
            }).encode("utf-8")
            aws_request = AWSRequest(
              method='POST',
              url=endpoint,
              data=body,
              headers=headers
            )
            SigV4Auth(credentials, service, region).add_auth(aws_request)
            req = urllib.request.Request(
              url=endpoint,
              data=aws_request.body,
              method='POST'
            )
            for k, v in aws_request.headers.items():
              req.add_header(k, v)
            with urllib.request.urlopen(req) as res:
              return res.read().decode('utf-8')
          # handler
          def lambda_handler(event, context):
            try:
              credentials = session.get_credentials().get_frozen_credentials()
              # API Gateway からのインプットを取得
              prompt = event['body']['prompt']
              appsync_session_id = event['body']['appsyncSessionId']
              bedrock_session_id = event['body'].get('bedrockSessionId')
              sub = event['sub']
              # Amazon Bedrock Knowledge Bases への問い合わせパラメータ作成
              request = {
                "input": {
                  "text": prompt
                },
                "retrieveAndGenerateConfiguration": {
                  "type": "KNOWLEDGE_BASE",
                  "knowledgeBaseConfiguration": {
                    "knowledgeBaseId": knowledge_base_id,
                    "modelArn": model_arn,
                    "generationConfiguration": {
                      "inferenceConfig": {
                        "textInferenceConfig": {
                          "maxTokens": 10000,
                          "temperature": 0.5,
                          "topP": 0.9
                        }
                      },
                      "performanceConfig": {
                        "latency": "standard"
                      },
                      "promptTemplate": {
                        "textPromptTemplate": (
                          "あなたは優秀なヘルプデスクアシスタントです。ヘルプデスク担当者からの質問に対して、必ず日本語で回答してください。"
                          "適切な回答が見つからない場合は、正直に「分かりません」と回答してください。\n\n"
                          "検索結果:\n$search_results$\n\n"
                          "回答指示: $output_format_instructions$"
                        )
                      }
                    }
                  }
                }
              }
              # メタデータフィルタ条件の組み立て
              filters = event['body'].get('filters', [])
              if filters:
                conditions = [{"equals": {"key": k, "value": v}} for f in filters for k, v in f.items()]
                if len(conditions) == 1:
                  retrieval_filter = conditions[0]
                else:
                  retrieval_filter = {"andAll": conditions}
                request["retrieveAndGenerateConfiguration"]["knowledgeBaseConfiguration"]["retrievalConfiguration"] = {
                  "vectorSearchConfiguration": {
                    "filter": retrieval_filter
                  }
                }
              # Bedrock sessionId は存在するときのみ渡す (継続会話時のみ)
              if bedrock_session_id:
                request["sessionId"] = bedrock_session_id
              # Bedrock Knowledge Bases への問い合わせ
              response = bedrock_agent.retrieve_and_generate_stream(**request)
              # Bedrock sessionId
              if "sessionId" in response:
                publish_appsync_message(
                  sub,
                  appsync_session_id,
                  {
                    "type": "bedrock_session",
                    "bedrock_session_id": response["sessionId"]
                  },
                  credentials
                )
              for chunk in response["stream"]:
                payload = None
                # Generated text
                if "output" in chunk and "text" in chunk["output"]:
                  payload = {
                    "type": "text",
                    "message": chunk["output"]["text"]
                  }
                  print({"t": chunk["output"]["text"]})
                # Citation
                elif "citation" in chunk:
                  payload = {
                    "type": "citation",
                    "citation": chunk['citation']['retrievedReferences']
                  }
                  print({"c": chunk['citation']['retrievedReferences']})
                # Continue
                if not payload:
                  continue
                # Publish AppSync
                publish_appsync_message(sub, appsync_session_id, payload, credentials)
            except Exception as e:
              print(str(e))
              raise
    DependsOn:
      - LambdaBedrockKbRole
      - BedrockKnowledgeBase

  LambdaRagSREventInvokeConfig:
    Type: AWS::Lambda::EventInvokeConfig
    Properties:
      FunctionName: !GetAtt LambdaRagSR.Arn
      Qualifier: $LATEST
      MaximumRetryAttempts: 0
      MaximumEventAgeInSeconds: 300

  LambdaCsvChunker:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub csv-chunker-${SystemName}-${SubName}
      Description: !Sub Lambda Function to embed with custom chunk for ${SystemName}-${SubName}
      Architectures:
        - x86_64
      Runtime: python3.14
      Handler: index.lambda_handler
      Timeout: 900
      MemorySize: 512
      Role: !GetAtt LambdaCsvChunkerRole.Arn
      Tags:
        - Key: Cost
          Value: !Sub ${SystemName}-${SubName}
      Code:
        ZipFile: |
          import json
          import csv
          import boto3
          from io import StringIO
          s3 = boto3.client('s3')
          def lambda_handler(event, context):
            try:
              bucket_name = event.get('bucketName')
              input_files = event.get('inputFiles', [])
              output_files = []
              for file_info in input_files:
                original_file_location = file_info.get('originalFileLocation', {})
                s3_location = original_file_location.get('s3Location', {})
                original_uri = s3_location.get('uri', '')
                content_batches = file_info.get('contentBatches', [])
                output_batches = []
                for batch in content_batches:
                  input_key = batch.get('key')
                  # Read input file from S3
                  response = s3.get_object(Bucket=bucket_name, Key=input_key)
                  input_content = json.loads(response['Body'].read().decode('utf-8'))
                  # Extract CSV content
                  csv_content = input_content['fileContents'][0]['contentBody']
                  # Remove BOM if present (input may have BOM)
                  if csv_content.startswith('\ufeff'):
                    csv_content = csv_content[1:]
                  csv_reader = csv.DictReader(StringIO(csv_content))
                  # Process each row as a chunk
                  file_contents = []
                  for row in csv_reader:
                    content_body = f"問合せ番号: {row.get('問合せ番号', '')}\n商品番号: {row.get('商品番号', '')}\n\n問合せ内容:\n{row.get('問合せ内容', '')}\n\n回答内容:\n{row.get('回答内容', '')}"
                    content_metadata = {
                      "問合せ番号": row.get('問合せ番号', ''),
                      "販売形態": row.get('販売形態', ''),
                      "受付日時": row.get('受付日時', ''),
                      "完了日時": row.get('完了日時', ''),
                      "商品番号": row.get('商品番号', ''),
                      "カテゴリ": row.get('カテゴリ', ''),
                      "ステータス": row.get('ステータス', '')
                    }
                    file_contents.append({
                      "contentBody": content_body,
                      "contentType": "TEXT",
                      "contentMetadata": content_metadata
                    })
                  # Write output file to S3
                  output_key = input_key.replace('.json', '_transformed.json')
                  output_data = {"fileContents": file_contents}
                  s3.put_object(
                    Bucket=bucket_name,
                    Key=output_key,
                    Body=json.dumps(output_data, ensure_ascii=False),
                    ContentType='application/json'
                  )
                  output_batches.append({"key": output_key})
                output_files.append({
                  "originalFileLocation": original_file_location,
                  "fileMetadata": file_info.get('fileMetadata', {}),
                  "contentBatches": output_batches
                })
              return {"outputFiles": output_files}
            except Exception as e:
              print(f"Error: {str(e)}")
              import traceback
              traceback.print_exc()
              raise

  LambdaInvokePermissionCsvChunker:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref LambdaCsvChunker
      Action: lambda:InvokeFunction
      Principal: bedrock.amazonaws.com
      SourceAccount: !Ref AWS::AccountId
      # SourceArn: !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/${BedrockKnowledgeBase}"
    DependsOn:
      - LambdaCsvChunker
      # - BedrockKnowledgeBase

# ------------------------------------------------------------#
# Lambda Role (IAM)
# ------------------------------------------------------------#
  LambdaBedrockKbRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub LambdaBedrockKbRole-${SystemName}-${SubName}
      Description: This role allows Lambda functions to invoke Bedrock Knowledge Bases and AppSync Events API.
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
              - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess
      Policies:
        - PolicyName: !Sub LambdaBedrockKbPolicy-${SystemName}-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "bedrock:InvokeModel"
                  - "bedrock:InvokeModelWithResponseStream"
                  - "bedrock:GetInferenceProfile"
                  - "bedrock:ListInferenceProfiles"
                Resource:
                  - !Sub "arn:aws:bedrock:*::foundation-model/*"
                  - !Sub "arn:aws:bedrock:*:${AWS::AccountId}:inference-profile/*"
              - Effect: Allow
                Action:
                  - "bedrock:RetrieveAndGenerate"
                  - "bedrock:Retrieve"
                Resource:
                  - !GetAtt BedrockKnowledgeBase.KnowledgeBaseArn
              - Effect: Allow
                Action:
                  - "appsync:connect"
                Resource:
                  - Fn::ImportValue:
                      !Sub AppSyncApiArn-${SystemName}-${SubName}
              - Effect: Allow
                Action:
                  - "appsync:publish"
                  - "appsync:EventPublish"
                Resource:
                  - Fn::Join:
                    - ""
                    - - Fn::ImportValue:
                          !Sub AppSyncApiArn-${SystemName}-${SubName}
                      - /channelNamespace/rag-stream-response

  LambdaCsvChunkerRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub LambdaCsvChunkerRole-${SystemName}-${SubName}
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: !Sub LambdaCsvChunkerPolicy-${SystemName}-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "s3:GetObject"
                  - "s3:PutObject"
                  - "s3:ListObject"
                Resource:
                  - !GetAtt S3BucketKbDatasource.Arn
                  - !Sub ${S3BucketKbDatasource.Arn}/*
                  - !GetAtt S3BucketKbIntermediate.Arn
                  - !Sub ${S3BucketKbIntermediate.Arn}/*

# ------------------------------------------------------------#
# IAM Role for Bedrock Knowledge Base
# ------------------------------------------------------------#
  IAMRoleBedrockKb:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub BedrockKbRole-${SystemName}-${SubName}
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - sts:AssumeRole
            Principal:
              Service:
                - bedrock.amazonaws.com
            Condition:
              StringEquals:
                "aws:SourceAccount": !Ref AWS::AccountId
              # ArnLike:
                # "aws:SourceArn": !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/*"
      Policies:
        - PolicyName: !Sub BedrockKbPolicy-${SystemName}-${SubName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "s3:GetObject"
                  - "s3:ListBucket"
                  - "s3:PutObject"
                Resource:
                  - !GetAtt S3BucketKbDatasource.Arn
                  - !Sub ${S3BucketKbDatasource.Arn}/*
                  - !GetAtt S3BucketKbIntermediate.Arn
                  - !Sub ${S3BucketKbIntermediate.Arn}/*
              - Effect: Allow
                Action:
                  - "s3vectors:GetIndex"
                  - "s3vectors:QueryVectors"
                  - "s3vectors:PutVectors"
                  - "s3vectors:GetVectors"
                  - "s3vectors:DeleteVectors"
                Resource:
                  - !GetAtt S3VectorBucketIndex.IndexArn
              - Effect: Allow
                Action:
                  - "bedrock:InvokeModel"
                Resource:
                  - !Sub arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId}
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource:
                  - !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:csv-chunker-${SystemName}-${SubName}*"
    DependsOn:
      - S3BucketKbDatasource
      - S3VectorBucketIndex
      - LambdaCsvChunker
      - S3BucketKbIntermediate

# ------------------------------------------------------------#
# Output Parameters
# ------------------------------------------------------------#
Outputs:
# S3
  S3BucketKbDatasourceName:
    Value: !Ref S3BucketKbDatasource
# API Gateway
  APIGatewayEndpointRagSR:
    Value: !Sub https://${RestApiRagSR}.execute-api.${AWS::Region}.${AWS::URLSuffix}/${RestApiStageRagSR}/ragsr
    Export:
      Name: !Sub RestApiEndpointRagSR-${SystemName}-${SubName}

続編記事

まとめ

いかがでしたでしょうか。

メタデータフィルタリングは設計次第でかなり細かい検索ができそうですが、その分コーディングが大変です。むやみに汎用的なフィルタ設定を実装しようとすると開発負担増やバグの温床になりそうなので、フィルタ対象項目はなるべく厳選した方がよいと思います。

本記事が皆様のお役に立てれば幸いです。

著者について
広野 祐司

AWS サーバーレスアーキテクチャと React を使用して社内向け e-Learning アプリ開発とコンテンツ作成に勤しんでいます。React でアプリを書き始めたら、快適すぎて他の言語には戻れなくなりました。近年は社内外への AWS 技術支援にも従事しています。AWS サービスには AWS が考える IT 設計思想が詰め込まれているので、いつも AWS を通して勉強させて頂いてまます。
取得資格:AWS 認定は15資格、IT サービスマネージャ、ITIL v3 Expert 等
2020 - 2025 Japan AWS Top Engineer 受賞
2022 - 2025 AWS Ambassador 受賞
2023 当社初代フルスタックエンジニア認定
好きなAWSサービス:AWS AppSync Events / AWS Step Functions / AWS CloudFormation

広野 祐司をフォローする

クラウドに強いによるエンジニアブログです。

SCSKクラウドサービス(AWS)は、企業価値の向上につながるAWS 導入を全面支援するオールインワンサービスです。AWS最上位パートナーとして、多種多様な業界のシステム構築実績を持つSCSKが、お客様のDX推進を強力にサポートします。

AI・MLAWSクラウドソリューション
シェアする
タイトルとURLをコピーしました