今回はPrisma Cloud APIの解説をしていこうと思います。
その後、実際にPrisma CloudのAPIとPythonを用いてPrisma Cloudで検知した1か月分のアラート件数を取得するプログラムを作成します。
Prisma Cloud APIとは
Prisma Cloud APIはプログラムを介してPrisma Cloudの機能にアクセスできるインターフェースです。APIを使用することで、Webコンソールで行っている操作をプログラムで実行することが可能になります。
Prisma CloudではAPIを実行するときにAPIキーを必要とします。まずはそのAPIキーを取得するために事前準備を行い、ログインAPIでAPIキーを取得してから、APIを実行する流れとなります。
事前準備
事前準備として以下のものが必要になります。
・Prisma CloudのACCESS_KEY
・Prisma CloudのSECRET_KEY
・Prisma CloudのAPI URL
API URLはドキュメントを参照:Cloud Security API | Develop with Palo Alto Networks (pan.dev)
アクセスキーの発行方法
Prisma CloudのACCESS_KEYとSECRET_KEYを発行していない場合発行する必要があります。
Prisma Cloudコンソールに行き、右上の「Settings」> 左ペイン「Access Control」> 右上の「Add」> 「Access Key」を押下します。
押下後「Name」と「Enable Expiration」の設定値が求められますが、任意で入力してください。
※Enable Expirationはアクセスキーの有効期限を決めるかの設定値になります。オフであれば有効期限はありません。
作成後以下の画像のようにAccess Key IDとSecret Access Keyがでてくるので必ず保存してください。
※「Download.csv file」を押下するとAccess Key IDとSecret Access Keyの入ったファイルが保存されます。
ログイン処理
ここからはログイン処理のコードを書いていきます。
まず必要なライブラリをインポートします。
import requests from time import sleep import json
リクエストするための情報を書いていきます。
url変数には事前準備で準備したURLを入力し、末尾にはAPIの/loginを入れてください。
passwordには事前準備で用意したSECRET_KEYを入力します。
wasResponseErrorはレスポンスのステータスコードが200以外の時に再リクエストするための変数です。
url = "https://api.anz.prismacloud.io/login" username = "ACCESS_KEYを入れる" password = "SECRET_KEYを入れる" wasResponseError = False payload = '{\"username\":\"' + username + \ '\",\"password\":\"' + password + '\"}' headers = { "Accept": "application/json; charset=UTF-8", "Content-Type": "application/json; charset=UTF-8" }
リクエストの中身にはPOST、先ほど変数で定義したurl、data=には変数で定義したpayload、headers=には同じく定義したheadersを入れます。
while True: response = requests.request( "POST", url, data=payload, headers=headers)
この時ステータスコードが200であれば wasResponseErrorをFalseにしてwhileを終わらせます。
ステータスコードが200ではない場合(elseの場合)ステータスコードを表示して、過去にレスポンスのステータスコード200以外が返却されたことがあれば、プログラムを終了し、ステータスコード200以外が返却されるのが初めてであれば一定時間経過後、再度リクエストします
if response.status_code == 200: token = json.loads(response.text)['token'] wasResponseError = False break else: print(f"You couldn't be authenticated [status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True
{ "message": "login_successful", "token": "TokenID", "customerNames": [ { "customerName": "SCSK", "prismaId": "12345678910", "tosAccepted": true } ] }
import requests from time import sleep import json url = "https://api.anz.prismacloud.io/login" username = "ACCESS_KEYを入れる" password = "SECRET_KEYを入れる" wasResponseError = False payload = '{\"username\":\"' + username + \ '\",\"password\":\"' + password + '\"}' headers = { "Accept": "application/json; charset=UTF-8", "Content-Type": "application/json; charset=UTF-8" } while True: response = requests.request( "POST", url, data=payload, headers=headers) if response.status_code == 200: token = json.loads(response.text)['token'] wasResponseError = False break else: print(f"You couldn't be authenticated [status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True
アラート件数の取得
ログイン処理を作成したので次はアラート件数を取得していきます。
querystring辞書のdetailedキーの値はFalseにしておきます。
payloadはドキュメントを参照し、作成しました。
timeRange内のstartTimeとendTimeは4/1 ~ 4/31までのミリ秒のUnix時間になっています。
headersはx-redlock-authを追加し、そこにログイン処理で変数を作成したtokenを格納します。
Alert APIドキュメント: List Alerts V2 – POST | Develop with Palo Alto Networks (pan.dev)
wasResponseError = False alert_url = "https://api.anz.prismacloud.io/v2/alert" querystring = {"detailed": False} payload = { "detailed": False, "fields": [], "limit": 5000, "offset": 0, "pageToken": "", "sortBy": ["alertTime:asc"], "timeRange": { "type": "absolute", "value": { "startTime": 1711897200000, "endTime": 1714575599000 } } } headers = { "content-type": "application/json; charset=UTF-8", "x-redlock-auth": token }
変わっているところとすれば、requestメソッドにjson、params引数が追加されたことです。
while True: response = requests.request( "POST", alert_url, json=payload, headers=headers, params=querystring) if response.status_code == 200: alertdata = json.loads(response.text) wasResponseError = False break else: print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True
alert_ids = [item['id'] for item in alertdata['items']] print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")
wasResponseError = False alert_url = "https://api.anz.prismacloud.io/v2/alert" querystring = {"detailed": False} payload = { "detailed": False, "fields": [], "limit": 5000, "offset": 0, "pageToken": "", "sortBy": ["alertTime:asc"], "timeRange": { "type": "absolute", "value": { "startTime": 1711897200000, "endTime": 1714575599000 } } } headers = { "content-type": "application/json; charset=UTF-8", "x-redlock-auth": token } while True: response = requests.request( "POST", alert_url, json=payload, headers=headers, params=querystring) if response.status_code == 200: alertdata = json.loads(response.text) wasResponseError = False break else: print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True alert_ids = [item['id'] for item in alertdata['items']] print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")
※一部を切り取っていますので詳細はドキュメントを参照してください。
"items": [ { "id": "P-12345", "status": "resolved", "reason": "RESOURCE_DELETED", "firstSeen": 1711898107130, "lastSeen": 1712157098580, "alertTime": 1711898107130, "lastUpdated": 1712157098580, "policyId": "abcdef-12ab-1234-abcdef-123abcdef", "saveSearchId": "ab12345n-123f-1234-abcd-abbbcs", "metadata": { "saveSearchId": "abcdefg-123456-abcd" }, ]
プログラム全体
今回書いたプログラムの全体になります。
import requests from time import sleep import json # ログイン処理 url = "https://api.anz.prismacloud.io/login" username = "ACCESS_KEY" password = "SECRET_KEY" wasResponseError = False payload = '{\"username\":\"' + username + \ '\",\"password\":\"' + password + '\"}' headers = { "Accept": "application/json; charset=UTF-8", "Content-Type": "application/json; charset=UTF-8" } while True: response = requests.request( "POST", url, data=payload, headers=headers) if response.status_code == 200: token = json.loads(response.text)['token'] wasResponseError = False break else: print(f"You couldn't be authenticated [status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True # アラート取得 wasResponseError = False alert_url = "https://api.anz.prismacloud.io/v2/alert" querystring = {"detailed": False} payload = { "detailed": False, "fields": [], "limit": 5000, "offset": 0, "pageToken": "", "sortBy": ["alertTime:asc"], "timeRange": { "type": "absolute", "value": { "startTime": 1711897200000, "endTime": 1714575599000 } } } headers = { "content-type": "application/json; charset=UTF-8", "x-redlock-auth": token } while True: response = requests.request( "POST", alert_url, json=payload, headers=headers, params=querystring) if response.status_code == 200: alertdata = json.loads(response.text) wasResponseError = False break else: print(f"Bad Request(/v2/alert)[status_code:{response.status_code}]") if wasResponseError: exit() else: sleep(32) wasResponseError = True alert_ids = [item['id'] for item in alertdata['items']] print(f"4/1 ~ 4/31までのアラート数は{len(alert_ids)} です。")
実行結果
以下画像が実行結果です。
おわりに
当社では、複数クラウド環境の設定状況を自動でチェックし、設定ミスやコンプライアンス違反、異常行動などのリスクを診断するCSPMソリューションを販売しております。
マルチクラウド設定診断サービス with CSPM| SCSK株式会社
ご興味のある方は是非、お気軽にお問い合わせください。