Google Workspaceの監査ログ 保持期間延長方法 ~BigQueryへ転送~

こんにちは。SCSKの磯野です。

Google Workspaceの監査ログは、保持期間が6か月程度のものがほとんどです。
例)SAML のログイベント データ:6か月

データの保持期間とタイムラグ - Google Workspace 管理者 ヘルプ
レポート、セキュリティ調査ツール、監査と調査ページタイムラグにより、セキュリティ調査ツールと ページのレポートや検索結果で最新のデータが表示されない場合があります。また、ログイベント データは、利用できなくなる前に一定期間保持されます。 管...

しかし、社内ルール等で1年以上の監査ログ保管が義務付けられているようなケースは多いと思います。
今回は、Google Workspaceの監査ログの保持期間延長のために、Google Cloudへログを共有する方法をご紹介します。

基本的にはGoogle Workspaceの監査ログはGoogle Cloudと共有される

次のログイベント データが Google Cloud と共有されます。

  • Groups Enterprise のログイベント
  • 管理ログイベント
  • ユーザーのログイベント

Enterprise Standard、Enterprise Plus、Education Standard または Education Plus、Voice Premier、Cloud Identity Premium Edition(※) をご利用の場合は、下記すべてのログイベントデータが Google Cloud と共有されます。

  • OAuth のログイベント
  • SAML ログイベント
  • アクセスの透明性に関するログイベント(Enterprise Plus と Education エディションのみ)
Google Cloud サービスとデータを共有する - Google Workspace 管理者 ヘルプ
管理コンソールで、Google Workspace、Cloud Identity、Essentials アカウントのデータを組織の Google Cloud アカウントのサービスと共有できます。共有データには Google Cloud Au...

Google CloudとのGoogle Workspace データの共有を有効にすることで、Google Workspace の監査ログがGoogle Cloudへ共有されます。監査ログはログバケットに保管されるため、Google Workspaceの保持期間よりも長くログを保管することが可能です。

なお、監査ログをデフォルトの保持期間より長く保持するには、カスタム保持を構成してください。

課題:Google WorkspaceプランによりGoogle Cloudに共有されないログも

Business StandardプランなどのGoogle Cloud共有対象(※)外のプランを利用している場合、
SMALログイベント等の一部のログイベントはGoogle Cloudと共有されません。

課題へのアプローチ案

Business StandardプランなどのGoogle Cloud共有対象(※)外のプランを利用しており、SMALログイベントの保持期間を延長したい、といった場合はどのように保持期間を延長したらよいのでしょうか。
以下のようなアプローチが考えられます。
  • Google Workspaceの監査ログは特権管理者あるいはレポート管理者が「管理者コンソール画面>レポート」という画面より手動でログのダウンロードを行う
  • Reports API を使用してGoogleドライブへ転送するスクリプトを開発する
  • Reports API を使用してBigQueryへ転送するスクリプトを開発する

運用面や検索のしやすさの観点から、今回は3つ目の「BigQueryへ自動保管するスクリプトを開発する」方法について詳しく説明します。

Google Cloud共有対象のプラン(※)の場合
冒頭に記載した通り、Google Cloud との Google Workspace データの共有を有効にすることでGoogle Cloudへログの転送が可能のため、個別のスクリプト開発は不要です。
なお、

 

Reports APIを使用してGoogle Workspaceの監査ログをBigQueryへ転送する

構成

日次で実行し、前日分の監査ログをBigQueryへ格納するCloudFunctionsを作成します。
例として、SAMLログを取得するコードを紹介します。

CloudFunctionsのコード

  • 1度に取得するログは1000件以下を想定しています。
    1000件以上のログが想定される場合はpageTokenの処理が必要です。
  • 冪等性は加味していません。再実行すると重複してデータが格納されます。
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from flask import Flask, jsonify
from google.cloud import bigquery
import pandas as pd
from datetime import datetime, timedelta
import pytz

app = Flask(__name__)

def insert_into_bigquery(dataset_id, table_id, df):

client = bigquery.Client()

# データを挿入
job = client.load_table_from_dataframe(df, f"{dataset_id}.{table_id}")
job.result() # Wait for the job to complete.


@app.route('/', methods=['POST'])
def get_gws_logs(request):

# サービスアカウントの資格情報を取得
SERVICE_ACCOUNT_FILE = '/mnt/secrets/token.json'

credentials = ServiceAccountCredentials.from_json_keyfile_name(
SERVICE_ACCOUNT_FILE,
scopes=['https://www.googleapis.com/auth/admin.reports.audit.readonly'])

# Google Admin SDKのサービスオブジェクトを作成し、権限委任
# Google Workspaceの監査ログ閲覧権限のあるユーザー(特権管理者あるいはレポート管理者)のメールアドレスを使用する
credentials = credentials.create_delegated('xxx@sample.com')
service = build('admin', 'reports_v1',credentials=credentials)

# 日本時間のタイムゾーンを設定
JST = pytz.timezone('Asia/Tokyo')
now_jst = datetime.now(JST)

# 前日の日付範囲を日本時間で設定
start_date_jst = now_jst - timedelta(days=1)
start_time_jst = start_date_jst.replace(hour=0, minute=0, second=0, microsecond=0)
end_time_jst = start_time_jst + timedelta(days=1)

start_time = start_time_jst.isoformat()
end_time = end_time_jst.isoformat()

# SAMLログの取得
# https://developers.google.com/admin-sdk/reports/reference/rest/v1/activities/list?hl=ja
results = service.activities().list(
userKey='all',
applicationName='saml', 
startTime=start_time,
endTime=end_time
).execute()
print(f"{start_time}から{end_time}までのログの取得が完了しました")

if 'items' not in results or not results['items']:
# 結果が空の場合
return jsonify({'status': 'success', 'message': 'No logs found for the previous day'})


data = []
for activity in results.get('items', []):
for event in activity['events']:
row = {
'date': activity['id']['time'],
'event_name': event['name'],
'description': event['type'],
'actor': activity['actor']['email'],
'application_name': next((param['value'] for param in event['parameters'] if param['name'] == 'application_name'), None),
'initiated_by': next((param['value'] for param in event['parameters'] if param['name'] == 'initiated_by'), None),
'failure_type': next((param['value'] for param in event['parameters'] if param['name'] == 'failure_type'), None),
'response_status': next((param['value'] for param in event['parameters'] if param['name'] == 'response_status'), None),
'second_level_status': next((param['value'] for param in event['parameters'] if param['name'] == 'second_level_status'), None),
'ip_address': activity.get('ipAddress')
}
data.append(row)

df = pd.DataFrame(data)

# 日付カラムをDATETIME型に変換し、日本時間に変換
df['date'] = pd.to_datetime(df['date'])
df['date'] = df['date'].dt.tz_convert('Asia/Tokyo')

#DATETIME型はタイムゾーンを考慮しないため、日本時間のままタイムゾーン情報を削除
df['date'] = df['date'].apply(lambda x: x.replace(tzinfo=None))


# データセットとテーブルの指定
dataset_id = 'xxx'
table_id = 'xxx'

insert_into_bigquery(dataset_id, table_id, df)

return jsonify({'status': 'success', 'message': 'Logs inserted into BigQuery successfully'})


if __name__ == '__main__':
app.run(debug=True)

 

デプロイ方法

  • サービスアカウントのキーはSecret Managerへ格納しています。Secret Managerの作成方法は以下の通り。

    • gcloud secrets create {secret名}
    • gcloud secrets versions add {secret名} --data-file="./token.json"
  • CloudFunctionsのデプロイ
gcloud functions deploy {cloud functions名} \
--region=asia-northeast1 \
--runtime python310 \
--memory 512MB \
--gen2  \
--source=. \
--trigger-http \
--entry-point=get_gws_logs \
--service-account {サービスアカウント名} \
--set-secrets '/mnt/secrets/token.json={secret名}:latest'
  • Cloud Schedulerのデプロイ(略)

まとめ

いかがだったでしょうか。

今回は、Reports APIを使用してGoogle Workspaceの監査ログをBigQueryへ転送する方法をご紹介しました。

本記事が皆様のお役に立てれば幸いです。

 

タイトルとURLをコピーしました