【CSPM】Prisma Cloudの監査ログをAWSへ外部保管する

Prisma Cloudはパブリッククラウドやアプリケーションのセキュリティ強化に役立つツールですが、Prisma Cloud自身のセキュリティも考慮する必要があります。例えば、Prisma Cloudの監査ログは適切に保管し、いざという時に確認できるようにしておく必要があります。
しかしながら、Prisma Cloudの監査ログは最大120日間しか保管されないため、それ以降の日付のログを確認することができません。
そこでPrisma Cloudの監査ログを外部へ転送する機能を利用して、Amazon SQSに転送し、AWS LambdaでAmazon S3にログを保管する仕組みを構築します。この仕組みにより、121日以前のログも確認可能となり、いざという時でもログを確認できるようになります。

本記事では、この仕組みの構築方法を解説するとともに、Lambdaプログラム(Python)も共有します。
Prisma Cloudの監査ログを外部に保管しておきたい方やPrisma Cloudのセキュリティを強化したいという方のお役に立てれば幸いです。

監査ログの転送機能について

まず、Prisma Cloudの監査ログを外部へ転送する機能について説明します。

本機能に関する公式のドキュメントはこちらで、本機能はPrisma Cloudに登録済みのSQSまたはWebhooksに対してログを転送できます。そのため、事前にSQSまたはWebhooksを用意して、統合というところに登録をしておく必要があります。本記事ではSQSに対してログを転送をします。

また、注意点としてPrisma Cloudへのログインに成功したログは転送されないためご注意ください。
他のすべての監査ログは転送されるとのことです。裏付けとなるドキュメントはこちらです。

AWS構成図

AWS側のリソースは以下構成で構築します。

複数のPrisma Cloudテナントを管理している想定のため、各テナントごとの監査ログを一つのS3バケットに、テナントごとにフォルダを分けて保管できるようにしています。処理の流れは、ログがPrisma CloudからSQSへ転送され、SQSがメッセージを受信すると監査ログ保管用のLambdaが起動します。LambdaはSQS名からテナントを判断し、受信したログをS3バケット内のテナント用フォルダに出力します。

SQS名は以下ネーミングルールで構築することで、Lambdaがどのテナントから送られてきたログであるかを判断できるようにします。

<テナント識別子>-PrismaCloudAuditLog-queue.fifo

こうすることで、監査ログ保管用のS3バケットには以下ネーミングでログが保管されます。

/<テナント識別子>/yyyy/mm/dd/prismacloud-auditlog-<テナント識別子>-yyyymmdd-hhmmss.json

 

Pythonソースコード

以下はLambda上で動かすPythonプログラムです。

import json
import os
import logging
import boto3
from datetime import datetime, timedelta

# ロギングの設定を行う - INFOレベルでログを出力する
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def extract_prefix_from_sqs_name(sqs_name):
    # SQS名の先頭から最初の「-」の部分の文字列をプレフィックスとして抽出する
    return sqs_name.split('-')[0]

def lambda_handler(event, context):
    # S3クライアントを初期化する
    s3_client = boto3.client('s3')
    
    # 環境変数からS3のバケット名を取得する
    bucket_name = os.environ['S3_BUCKET_NAME']

    # SQSメッセージの処理開始をログに記録する
    logger.info('Starting the processing of SQS messages')
    
    try:
        # 各SQSメッセージを処理する
        for record in event['Records']:
            message_body = record['body']  # メッセージの本文を取得する
            message_id = record['messageId']  # メッセージIDを取得する
            
            # メッセージの受信時刻を取得し、日付形式に変換する
            timestamp = record['attributes']['ApproximateFirstReceiveTimestamp']
            message_datetime_utc = datetime.utcfromtimestamp(int(timestamp) / 1000)

            # UTCをJSTに変換する
            jst_offset = timedelta(hours=9)
            message_datetime_jst = message_datetime_utc + jst_offset

            # 日付フォルダとファイル名を決定する
            date_folder = message_datetime_jst.strftime('%Y/%m/%d')
            time_stamp = message_datetime_jst.strftime('%Y%m%d-%H%M%S')

            # SQSの名前からプレフィックスを抽出する
            sqs_name = record['eventSourceARN'].split(':')[-1]
            base_prefix = extract_prefix_from_sqs_name(sqs_name)

            # ファイル名を構築する
            file_name = f"prismacloud-auditlog-{base_prefix}-{time_stamp}.json"

            # S3キーを組み立てる
            s3_key = f"{base_prefix}/{date_folder}/{file_name}"

            # メッセージをS3に保存する旨をログに記録する
            logger.info(f"Saving message {message_id} to S3 bucket: {bucket_name}, key: {s3_key}")
            
            # メッセージをS3バケットに保存する処理
            s3_client.put_object(
                Bucket=bucket_name,
                Key=s3_key,
                Body=message_body
            )
            
            # メッセージをS3に保存したことをログに記録する
            logger.info(f"Successfully saved message {message_id} to S3")
            
    except Exception as e:
        # メッセージ処理中にエラーが発生した場合、エラーログを記録し、ステータスコード500でレスポンスを返す
        logger.error("Error occurred while processing the messages: %s", str(e))
        return {
            'statusCode': 500,
            'body': json.dumps('Error occurred while processing the messages')
        }
    
    # すべてのメッセージの処理が成功したことをログに記録する
    logger.info('All messages have been processed successfully')
    return {
        'statusCode': 200,
        'body': json.dumps('Messages have been saved to S3!')
    }
    
 

AWS側リソースの構築

以下のようにSQS、S3、IAMロール・ポリシー、Lambdaを構築します。

SQS

SQSのコンソール画面を表示し、「キューを作成」を押下します。

タイプを「FIFO」で設定し、名前を入力したら、他の項目は必要に応じて変更ください。今回、他の項目は全てデフォルト設定とし、最下部の「キューを作成」を押下して、SQSを構築しました。

SQSの構築が完了したら、そのSQSを開き、以下URL部分をコピーして、メモしておきます。
このURLは後ほどPrisma CloudへSQSを登録する時に利用します。

S3

S3はデフォルト設定で構築しました。必要に応じて設定は変更ください。

IAMロール・ポリシー

Lambdaに付与するIAMロールと、そのロールに付与するIAMポリシーを作成します。

以下のカスタムポリシーを作成し、そのポリシーを持つロールを作成しました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "sqs:*",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": [
                "arn:aws:s3:::<監査ログ保管用のS3バケット名>/*"
            ]
        }
    ]
}

ロールの信頼関係タブでは以下のようにLambdaがこのロールを引き受けられるように許可します。

上記以外はすべてデフォルト設定としました。

 

Lambda

Lambdaのコンソール画面を表示し、「関数を作成」を押下します。

関数名に関数の名前を入力し、ランタイムは最新バージョンのPythonを選択します。実行ロールで「既存のロールを使用する」を選択して、先ほど作成したIAMロールを選択し、「関数の作成」を押下します。

Lambda関数が作成されたら、デフォルトコードを先ほどご紹介したソースコードに置き換えて、「Deploy」を押下します。
上部にて、関数が正常に更新された旨の緑のメッセージを確認した後、「トリガーを追加」を押下します。

トリガーの設定に「SQS」を選択し、SQSキューには先ほど構築したSQSキューを選択します。
「トリガーをアクティブ化」にチェックがついていることを確認し、最下部の「追加」を押下します。

最後に環境変数タブにて、以下環境変数を作成します。値は先ほど作成したS3バケット名を指定します。

キー
S3_BUCKET_NAME 監査ログ保管用のS3バケット名

Lambdaも紹介した設定以外はすべてデフォルト設定としました。

 

Prisma Cloud側の設定

次にPrisma Cloud側で、統合へのSQS登録と監査ログの転送設定を行います。

統合へのSQS登録

まず、Prisma CloudがSQSへログを転送するときに利用するIAMロールとポリシーを作成します。ドキュメントの手順に則って作業をします。

これから作成するIAMロールにはランダムな文字列ではなく128ビット形式のUUIDで外部IDを設定する必要があります。手作業で作成しても良いのですが、Prisma Cloudコンソールにて外部IDを生成することができますので、コンソールにて作成します。

監査ログの転送をしたいPrisma Cloudテナントにログインし、SystemAdmin ロールにスイッチします。
「Settings」>「Integrations & Notifications」画面を表示し、「Add Integration」を押下します。

「Amazon SQS」を選択した後、次の画面で「More Options」にチェックをいれます。
External Idにある、「Generate」を押下すると、128ビット形式のUUIDが表示されますので、コピーしておきます。

IAMロールのコンソールへ移動し、IAMロールを作成します。

信頼されたエンティティタイプは「AWSアカウント」を選択し、AWSアカウントは「別のAWSアカウント」を選択して、Prisma CloudのAWSアカウントIDである、188619942792を入力します。
「外部IDを要求する」にチェックを入れて、先ほどコピーしていたUUIDを入力して、「次へ」を押下します。

ここではポリシーを付与せずに、他はデフォルト設定としてロールを作成します。

作成後、ロールに以下カスタムポリシーを付与して、ロールは完成です。
作成したロールのARNをコピーしておきます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sqs:GetQueueAttributes",
                "sqs:ListQueues",
                "sqs:SendMessage",
                "tag:GetResources",
                "iam:GetRole"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Prisma Cloudのコンソールへ戻り、「Role ARN」にコピーしておいたIAMロールのARNを入力します。次に「Queue URL」にはSQS構築時にメモしておいたSQS URLを入力し、「Next」を押下します。

「Test Integration」を押下し、以下Prisma CloudとSQSの疎通成功の緑のメッセージが表示されることを確認し、「Save Integration」を押下します。

監査ログの転送設定

最後に監査ログの転送設定を行います。ドキュメントの手順に則って作業をします。

「Settings」>「Enterprise Settings」画面を表示し、下の方にある「Send Audit Logs to integration」を有効化します。
その後、統合に登録した先ほどのSQSを選択して、画面右上の「Save Settings」を押下します。

以上で監査ログの転送設定が完了となります。

実際に監査ログがS3に保管されているか

これにてPrisma Cloudの監査ログがAWSのS3バケットに保管されるようになります。実際にログが保管されているか確認したいと思います。監査ログ保管用のS3バケットに以下ネーミングでログが保管されているはずです。

/<テナント識別子>/yyyy/mm/dd/prismacloud-auditlog-<テナント識別子>-yyyymmdd-hhmmss.json

まず、Prisma Cloudで監査ログに記録されるような行動をとります。

その後、S3バケットにアクセスすると、想定通りのパスにログが保管されていました。

 

ログの中身も先ほどの行動が記載されており、想定通りです。

{
    "result": "Successful",
    "actionType": "CREATE",
    "ipAddress": "XXX.XXX.XXX.XXX",
    "action": "'YYYY'(with role 'System Admin':'System Admin') created the account group. The group wasn't given access to any cloud accounts.",
    "resourceName": "TEST",
    "user": "YYYY",
    "timestamp": 1750991540913,
    "resourceType": "Account Group"
}

 

さいごに

当社ではPrisma Cloud利用して、複数クラウド環境の設定状況を自動でチェックし、
設定ミスやコンプライアンス違反、異常行動などのリスクを診断するCSPMソリューションを販売しております。
興味のある方は是非、お気軽にお問い合わせください。

タイトルとURLをコピーしました