【CSPM】Prisma Cloud の API を呼び出してみた

今回はPrisma Cloud APIの解説をしていこうと思います。
その後、実際にPrisma CloudのAPIとPythonを用いてPrisma Cloudで検知した1か月分のアラート件数を取得するプログラムを作成します。

Prisma Cloud APIとは

Prisma Cloud APIはプログラムを介してPrisma Cloudの機能にアクセスできるインターフェースです。APIを使用することで、Webコンソールで行っている操作をプログラムで実行することが可能になります。

Prisma CloudではAPIを実行するときにAPIキーを必要とします。まずはそのAPIキーを取得するために事前準備を行い、ログインAPIでAPIキーを取得してから、APIを実行する流れとなります。

事前準備

事前準備として以下のものが必要になります。
・Prisma CloudのACCESS_KEY
・Prisma CloudのSECRET_KEY
・Prisma CloudのAPI URL
API URLはドキュメントを参照:Cloud Security API | Develop with Palo Alto Networks (pan.dev)

アクセスキーの発行方法

Prisma CloudのACCESS_KEYとSECRET_KEYを発行していない場合発行する必要があります。
Prisma Cloudコンソールに行き、右上の「Settings」> 左ペイン「Access Control」> 右上の「Add」> 「Access Key」を押下します。

押下後「Name」と「Enable Expiration」の設定値が求められますが、任意で入力してください。
※Enable Expirationはアクセスキーの有効期限を決めるかの設定値になります。オフであれば有効期限はありません。

作成後以下の画像のようにAccess Key IDとSecret Access Keyがでてくるので必ず保存してください。
※「Download.csv file」を押下するとAccess Key IDとSecret Access Keyの入ったファイルが保存されます。

ログイン処理

ここからはログイン処理のコードを書いていきます。

まず必要なライブラリをインポートします。

import requests
from time import sleep
import json

リクエストするための情報を書いていきます。

url変数には事前準備で準備したURLを入力し、末尾にはAPIの/loginを入れてください。

usernameには事前準備で用意したACCESS_KEY
passwordには事前準備で用意したSECRET_KEYを入力します。
wasResponseErrorはレスポンスのステータスコードが200以外の時に再リクエストするための変数です。
変数のpayload, headersには以下のドキュメントを参考に作成しました。
url = "https://api.anz.prismacloud.io/login"

username = "ACCESS_KEYを入れる" 
password = "SECRET_KEYを入れる"

wasResponseError = False
payload = '{\"username\":\"' + username + \
    '\",\"password\":\"' + password + '\"}'
headers = {
    "Accept": "application/json; charset=UTF-8",
    "Content-Type": "application/json; charset=UTF-8"
}
Prisma Cloudにリクエストを送信していきます。
最初にresponse変数を作成してその変数の中身にリクエストするコードを書いていきます。
リクエストの中身にはPOST、先ほど変数で定義したurl、data=には変数で定義したpayload、headers=には同じく定義したheadersを入れます。
while True:
    response = requests.request(
        "POST", url, data=payload, headers=headers)
次に ステータスコードが200の場合にレスポンスからtokenだけを取り出す処理を書きます。このtokenがAPIキーとなります。
この時ステータスコードが200であれば wasResponseErrorをFalseにしてwhileを終わらせます。                            

ステータスコードが200ではない場合(elseの場合)ステータスコードを表示して、過去にレスポンスのステータスコード200以外が返却されたことがあれば、プログラムを終了し、ステータスコード200以外が返却されるのが初めてであれば一定時間経過後、再度リクエストします

if response.status_code == 200:
        token = json.loads(response.text)['token']
        wasResponseError = False
        break
    else:
       print(f"You couldn't be authenticated [status_code:{response.status_code}]")


        if wasResponseError:
            exit()
        else:
            sleep(32)
            wasResponseError = True
ステータスコード200のレスポンスは以下の感じで返ってきます。
{
    "message": "login_successful",
    "token": "TokenID",
    "customerNames": [
        {
            "customerName": "SCSK",
            "prismaId": "12345678910",
            "tosAccepted": true
        }
    ]
}
ログイン処理の全体コードが以下になります。
import requests

from time import sleep
import json

url = "https://api.anz.prismacloud.io/login"
username = "ACCESS_KEYを入れる" 
password = "SECRET_KEYを入れる"

wasResponseError = False

payload = '{\"username\":\"' + username + \
    '\",\"password\":\"' + password + '\"}'
headers = {
    "Accept": "application/json; charset=UTF-8",
    "Content-Type": "application/json; charset=UTF-8"

}

while True:
    response = requests.request(
        "POST", url, data=payload, headers=headers)
    if response.status_code == 200:
        token = json.loads(response.text)['token']
        wasResponseError = False
        break
    else:
        print(f"You couldn't be authenticated [status_code:{response.status_code}]")


        if wasResponseError:
            exit()
        else:
            sleep(32)
            wasResponseError = True

アラート件数の取得

ログイン処理を作成したので次はアラート件数を取得していきます。

querystring辞書のdetailedキーの値はFalseにしておきます。

payloadはドキュメントを参照し、作成しました。

timeRange内のstartTimeとendTimeは4/1 ~ 4/31までのミリ秒のUnix時間になっています。

headersはx-redlock-authを追加し、そこにログイン処理で変数を作成したtokenを格納します。

Alert APIドキュメント: List Alerts V2 – POST | Develop with Palo Alto Networks (pan.dev)

wasResponseError = False
alert_url = "https://api.anz.prismacloud.io/v2/alert"
querystring = {"detailed": False}
payload = {
    "detailed": False,
    "fields": [],
    "limit": 5000,
    "offset": 0,
    "pageToken": "",
    "sortBy": ["alertTime:asc"],
    "timeRange": {
        "type": "absolute",
        "value": {
            "startTime": 1711897200000,
            "endTime": 1714575599000                
        }
    }
}


headers = {
    "content-type": "application/json; charset=UTF-8",
    "x-redlock-auth": token
}
リクエスト処理です。
ここはログイン処理と同様になっています。
変わっているところとすれば、requestメソッドにjson、params引数が追加されたことです。
while True:
    response = requests.request(
        "POST", alert_url, json=payload, headers=headers, params=querystring)
    if response.status_code == 200:
        alertdata = json.loads(response.text)
        wasResponseError = False
        break
    else:
        print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]")
        if wasResponseError:
            exit()
        else:
            sleep(32)
            wasResponseError = True
取得したアラート一覧からアラートIDのみを抽出する処理です。
アラートIDを抽出して、リストに格納し、アラート件数を表示します。
alert_ids = [item['id'] for item in alertdata['items']]
print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")
全体のコードです。
wasResponseError = False
alert_url = "https://api.anz.prismacloud.io/v2/alert"
querystring = {"detailed": False}
payload = {
    "detailed": False,
    "fields": [],
    "limit": 5000,
    "offset": 0,
    "pageToken": "",
    "sortBy": ["alertTime:asc"],
    "timeRange": {
        "type": "absolute",
        "value": {
            "startTime": 1711897200000,
            "endTime": 1714575599000                
        }
    }
}
headers = {
    "content-type": "application/json; charset=UTF-8",
    "x-redlock-auth": token
}
while True:
    response = requests.request(
        "POST", alert_url, json=payload, headers=headers, params=querystring)
    if response.status_code == 200:
        alertdata = json.loads(response.text)
        wasResponseError = False
        break
    else:
        print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]")
        if wasResponseError:
            exit()
        else:
            sleep(32)
            wasResponseError = True
alert_ids = [item['id'] for item in alertdata['items']]
print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")
レスポンス時の中身です。
※一部を切り取っていますので詳細はドキュメントを参照してください。
"items": [
        {
            "id": "P-12345",
            "status": "resolved",
            "reason": "RESOURCE_DELETED",
            "firstSeen": 1711898107130,
            "lastSeen": 1712157098580,
            "alertTime": 1711898107130,
            "lastUpdated": 1712157098580,
            "policyId": "abcdef-12ab-1234-abcdef-123abcdef",
            "saveSearchId": "ab12345n-123f-1234-abcd-abbbcs",
            "metadata": {
                "saveSearchId": "abcdefg-123456-abcd"
            },
    ]

プログラム全体

今回書いたプログラムの全体になります。

import requests
from time import sleep
import json
# ログイン処理
url = "https://api.anz.prismacloud.io/login"
username = "ACCESS_KEY"
password = "SECRET_KEY"
wasResponseError = False
payload = '{\"username\":\"' + username + \
    '\",\"password\":\"' + password + '\"}'
headers = {
    "Accept": "application/json; charset=UTF-8",
    "Content-Type": "application/json; charset=UTF-8"
}
while True:
     response = requests.request(
         "POST", url, data=payload, headers=headers)
     if response.status_code == 200:
         token = json.loads(response.text)['token']
         wasResponseError = False
         break
     else:
         print(f"You couldn't be authenticated [status_code:{response.status_code}]")
         if wasResponseError:
            exit()
         else:
            sleep(32)
            wasResponseError = True
# アラート取得
wasResponseError = False
alert_url = "https://api.anz.prismacloud.io/v2/alert"
querystring = {"detailed": False}
payload = {
    "detailed": False,
    "fields": [],
    "limit": 5000,
    "offset": 0,
    "pageToken": "",
    "sortBy": ["alertTime:asc"],
    "timeRange": {
        "type": "absolute",
        "value": {
            "startTime": 1711897200000,
            "endTime": 1714575599000                
        }
    }
}
headers = {
    "content-type": "application/json; charset=UTF-8",
    "x-redlock-auth": token
}
while True:
    response = requests.request(
        "POST", alert_url, json=payload, headers=headers, params=querystring)
    if response.status_code == 200:
        alertdata = json.loads(response.text)
        wasResponseError = False
        break
    else:
        print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]")
        if wasResponseError:
            exit()
        else:
            sleep(32)
            wasResponseError = True
alert_ids = [item['id'] for item in alertdata['items']]
print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")

実行結果

以下画像が実行結果です。

おわりに

当社では、複数クラウド環境の設定状況を自動でチェックし、設定ミスやコンプライアンス違反、異常行動などのリスクを診断するCSPMソリューションを販売しております。

マルチクラウド設定診断サービス with CSPM| SCSK株式会社

ご興味のある方は是非、お気軽にお問い合わせください。

タイトルとURLをコピーしました