こんにちは。SCSKの磯野です。
Google Workspaceの監査ログは、保持期間が6か月程度のものがほとんどです。
例)SAML のログイベント データ:6か月
しかし、社内ルール等で1年以上の監査ログ保管が義務付けられているようなケースは多いと思います。
今回は、Google Workspaceの監査ログの保持期間延長のために、Google Cloudへログを共有する方法をご紹介します。
基本的にはGoogle Workspaceの監査ログはGoogle Cloudと共有される
次のログイベント データが Google Cloud と共有されます。
- Groups Enterprise のログイベント
- 管理ログイベント
- ユーザーのログイベント
Enterprise Standard、Enterprise Plus、Education Standard または Education Plus、Voice Premier、Cloud Identity Premium Edition(※) をご利用の場合は、下記すべてのログイベントデータが Google Cloud と共有されます。
- OAuth のログイベント
- SAML ログイベント
- アクセスの透明性に関するログイベント(Enterprise Plus と Education エディションのみ)
Google CloudとのGoogle Workspace データの共有を有効にすることで、Google Workspace の監査ログがGoogle Cloudへ共有されます。監査ログはログバケットに保管されるため、Google Workspaceの保持期間よりも長くログを保管することが可能です。
なお、監査ログをデフォルトの保持期間より長く保持するには、カスタム保持を構成してください。
課題:Google WorkspaceプランによりGoogle Cloudに共有されないログも
Business StandardプランなどのGoogle Cloud共有対象(※)外のプランを利用している場合、
SMALログイベント等の一部のログイベントはGoogle Cloudと共有されません。
課題へのアプローチ案
- Google Workspaceの監査ログは特権管理者あるいはレポート管理者が「管理者コンソール画面>レポート」という画面より手動でログのダウンロードを行う
- Reports API を使用してGoogleドライブへ転送するスクリプトを開発する
- Reports API を使用してBigQueryへ転送するスクリプトを開発する
運用面や検索のしやすさの観点から、今回は3つ目の「BigQueryへ自動保管するスクリプトを開発する」方法について詳しく説明します。
Reports APIを使用してGoogle Workspaceの監査ログをBigQueryへ転送する
構成
日次で実行し、前日分の監査ログをBigQueryへ格納するCloudFunctionsを作成します。
例として、SAMLログを取得するコードを紹介します。
CloudFunctionsのコード
from googleapiclient.discovery import build from oauth2client.service_account import ServiceAccountCredentials from flask import Flask, jsonify from google.cloud import bigquery import pandas as pd from datetime import datetime, timedelta import pytz app = Flask(__name__) def insert_into_bigquery(dataset_id, table_id, df): client = bigquery.Client() # データを挿入 job = client.load_table_from_dataframe(df, f"{dataset_id}.{table_id}") job.result() # Wait for the job to complete. @app.route('/', methods=['POST']) def get_gws_logs(request): # サービスアカウントの資格情報を取得 SERVICE_ACCOUNT_FILE = '/mnt/secrets/token.json' credentials = ServiceAccountCredentials.from_json_keyfile_name( SERVICE_ACCOUNT_FILE, scopes=['https://www.googleapis.com/auth/admin.reports.audit.readonly']) # Google Admin SDKのサービスオブジェクトを作成し、権限委任 # Google Workspaceの監査ログ閲覧権限のあるユーザー(特権管理者あるいはレポート管理者)のメールアドレスを使用する credentials = credentials.create_delegated('xxx@sample.com') service = build('admin', 'reports_v1',credentials=credentials) # 日本時間のタイムゾーンを設定 JST = pytz.timezone('Asia/Tokyo') now_jst = datetime.now(JST) # 前日の日付範囲を日本時間で設定 start_date_jst = now_jst - timedelta(days=1) start_time_jst = start_date_jst.replace(hour=0, minute=0, second=0, microsecond=0) end_time_jst = start_time_jst + timedelta(days=1) start_time = start_time_jst.isoformat() end_time = end_time_jst.isoformat() # SAMLログの取得 # https://developers.google.com/admin-sdk/reports/reference/rest/v1/activities/list?hl=ja results = service.activities().list( userKey='all', applicationName='saml', startTime=start_time, endTime=end_time ).execute() print(f"{start_time}から{end_time}までのログの取得が完了しました") if 'items' not in results or not results['items']: # 結果が空の場合 return jsonify({'status': 'success', 'message': 'No logs found for the previous day'}) data = [] for activity in results.get('items', []): for event in activity['events']: row = { 'date': activity['id']['time'], 'event_name': event['name'], 'description': event['type'], 'actor': activity['actor']['email'], 'application_name': next((param['value'] for param in event['parameters'] if param['name'] == 'application_name'), None), 'initiated_by': next((param['value'] for param in event['parameters'] if param['name'] == 'initiated_by'), None), 'failure_type': next((param['value'] for param in event['parameters'] if param['name'] == 'failure_type'), None), 'response_status': next((param['value'] for param in event['parameters'] if param['name'] == 'response_status'), None), 'second_level_status': next((param['value'] for param in event['parameters'] if param['name'] == 'second_level_status'), None), 'ip_address': activity.get('ipAddress') } data.append(row) df = pd.DataFrame(data) # 日付カラムをDATETIME型に変換し、日本時間に変換 df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_convert('Asia/Tokyo') #DATETIME型はタイムゾーンを考慮しないため、日本時間のままタイムゾーン情報を削除 df['date'] = df['date'].apply(lambda x: x.replace(tzinfo=None)) # データセットとテーブルの指定 dataset_id = 'xxx' table_id = 'xxx' insert_into_bigquery(dataset_id, table_id, df) return jsonify({'status': 'success', 'message': 'Logs inserted into BigQuery successfully'}) if __name__ == '__main__': app.run(debug=True)
デプロイ方法
-
サービスアカウントのキーはSecret Managerへ格納しています。Secret Managerの作成方法は以下の通り。
gcloud secrets create {secret名}
gcloud secrets versions add {secret名} --data-file="./token.json"
- CloudFunctionsのデプロイ
gcloud functions deploy {cloud functions名} \
--region=asia-northeast1 \
--runtime python310 \
--memory 512MB \
--gen2 \
--source=. \
--trigger-http \
--entry-point=get_gws_logs \
--service-account {サービスアカウント名} \
--set-secrets '/mnt/secrets/token.json={secret名}:latest'
- Cloud Schedulerのデプロイ(略)
まとめ
いかがだったでしょうか。
今回は、Reports APIを使用してGoogle Workspaceの監査ログをBigQueryへ転送する方法をご紹介しました。
本記事が皆様のお役に立てれば幸いです。