こんにちは、SCSKの齋藤です。
本記事では、TMetric というタイムトラッキングツールからデータを自動で抽出し、BigQueryにロードするまでのプロセスを、Cloud RunとPythonを活用して実装した事例を紹介します。日々の業務で利用するトラッキングツールのデータ分析に活用したい、自動化して効率的にデータ収集を行いたい、という方にとって参考にしていただける内容となっています。TMetric のデータを活用し、プロジェクトの進捗状況の把握、メンバーの稼働状況の分析、リソース配分の最適化などを実現したい方は必見です。
この記事でわかること
- Cloud Run の活用: Cloud Run を利用して、データ抽出処理を自動化するメリットと実装方法
- Python によるデータ処理: データ抽出、CSV ファイルへの書き込み、GCS へのアップロード、BigQuery へのロードといった一連の処理を Python で行う方法
- TMetric API の活用: TMetric API を利用して、普段業務で使用しているタイムトラッキングツールからデータを取得する方法
なぜCloud RunとPythonなのか? -それぞれの強み
今回の実装で Cloud Run と Python を選んだ理由は、それぞれの持つ強みがデータ抽出処理に最適だったからです。
Cloud Run のメリット
Cloud Run は、コンテナイメージをデプロイして実行できるサーバーレス実行環境です。以下のメリットがあります。
- サーバーレス:インフラの管理が不要で、コードの実行に集中できる。サーバーのプロビジョニングやスケーリング、メンテナンスといった運用作業からも解放されます。
- 自動スケーリング:トラフィックの増減に応じて自動的にスケールするため、リソースを効率的に利用可能です。
- 従量課金:実際に利用したリソースに対してのみ課金されるため、コスト効率が高いです。
- CI/CD連携:継続的インテグレーション/継続的デリバリーパイプラインとの連携が容易で、デプロイを自動化できます。
Pythonのメリット
Pythonは、汎用性の高いプログラミング言語であり、データ処理やAPI連携に強みを持っています。
- 豊富なライブラリ:
requests
(HTTPリクエスト)、csv
(csvファイル操作)、google-cloud-storage
(GCS 操作)、google-cloud-bigquery
(BigQuery 操作) など、データ処理に必要なライブラリが豊富に揃っています。これらのライブラリを活用することで、複雑な処理も比較的簡単に実装できます。 - 柔軟性:データ抽出、変換、ロードなど、データパイプラインの様々な処理をPythonで柔軟に実装できます。
Cloud Run × Pythonの相乗効果
Cloud Run の持つサーバーレスの利点と、Python の持つデータ処理能力を組み合わせることで、スケーラブルでコスト効率の高いデータ抽出パイプラインを構築できます。Cloud Run がインフラの管理を担い、Python が複雑なデータ処理を担うことで、開発者はビジネスロジックに集中できます。
実装の概要 - Cloud Run と Python の連携
今回の実装では、以下のステップで TMetric のデータを BigQuery に自動的にロードします。Python スクリプトは Cloud Run 上で実行されます。
- Cloud Run へのデプロイ: Python スクリプトを Docker イメージとして Cloud Run にデプロイします。Dockerfile には Python のバージョン、必要なライブラリ、スクリプトの実行コマンドなどを記述します。
- Python スクリプトの実行: Cloud Run が Docker イメージを実行し、Python スクリプトが TMetric API からデータを抽出します。
- データ変換: 抽出したデータを CSV 形式で一時ファイルに書き込みます。この際、タイムゾーンを JST に変換するなど、データ分析に適した形式に変換します。
- GCS アップロード: 作成した CSV ファイルを Google Cloud Storage (GCS) にアップロードします。
google-cloud-storage
ライブラリを使用します。 - BigQuery ロード: GCS にアップロードされた CSV ファイルを BigQuery にロードします。
google-cloud-bigquery
ライブラリを使用します。 - 自動実行: Cloud Run にデプロイされた Python スクリプトを、Cloud Scheduler でスケジュール設定し、定期的に自動実行します。
実装のポイント
TMetric API の活用とデータ構造の理解
TMetric API を利用する上で、以下の点に注意しました。
- API エンドポイントの選定: TMetric API には、様々なエンドポイントが存在します。今回は、時間エントリーデータを取得するために、
/accounts/{accountId}/timeentries
エンドポイントを使用しました。 - 認証: TMetric API へのアクセスには、認証トークンが必要です。環境変数を使用してセキュアにトークンを管理し、
Authorization
ヘッダーに含めてリクエストを送信します。 - パラメータの設定: TMetric API には、様々なパラメータを設定できます。今回は、
startDate
、endDate
、userId
パラメータを使用して、特定の期間とユーザーの時間エントリーデータを取得しました。 - データ構造の理解: TMetric API から返される JSON データは、ネストされた構造を持っています。必要な情報を抽出するために、データの構造を理解し、適切なキーを指定する必要があります。時間エントリーデータには、
startTime
、endTime
、task
、user
などの情報が含まれています。
TMetric API レスポンス例
[ { "id": 12345, "startTime": "2023-10-26T09:00:00Z", "endTime": "2023-10-26T17:00:00Z", "task": { "id": 67890, "name": "開発タスク", "externalLink": { "caption": "JIRA-123", "link": "https://example.com/browse/JIRA-123" } }, "user": { "id": 11223, "name": "tarou" } }, ... ]
Python コード – TMetric API からのデータ抽出例
import requests import json from datetime import datetime, timezone, timedelta from pydantic_settings import BaseSettings class Settings(BaseSettings): TMetric_WORKSPACE_ID: str TMetric_TOKEN: str class Config: env_prefix = "" settings = Settings() def fetch_tmetric_data(user_id, start_date, end_date): url = f"https://app.tmetric.com/api/v3/accounts/{settings.TMetric_WORKSPACE_ID}/timeentries" headers = {"Authorization": f"Bearer {settings.TMetric_TOKEN}"} params = {"startDate": start_date, "endDate": end_date, "userId": user_id} try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return None # Example usage # TMetric_WORKSPACE_ID, TMetric_TOKEN を設定する settings = Settings() user_id = 123 # Replace with the actual user ID start_date = "2025-01-01" end_date = "2025-01-31" data = fetch_tmetric_data(user_id, start_date, end_date) if data: print(json.dumps(data, indent=2)) # Print the formatted JSON data else: print("Failed to retrieve data.")
環境変数の設定
TMetric API のトークンなどの機密情報は、環境変数として Cloud Run に設定します。Google Cloud Secret Manager と連携することで、よりセキュアに管理できます。Cloud Run の設定画面から、環境変数を設定します。
サービスアカウントの設定
Cloud Run のサービスアカウントに、GCS への書き込み権限、BigQuery への書き込み権限を付与します。最小限の権限を与えることで、セキュリティリスクを低減します。IAM & 管理画面から、サービスアカウントの権限を設定します。
Python コード – ライブラリ活用
Python コードでは、必要なライブラリをインポートし、それぞれの機能を活用します。
import csv from datetime import datetime, timedelta, timezone import requests from google.cloud import bigquery, storage # ... (省略)
例えば、GCS にファイルをアップロードするには、以下のコードを使用します。
from google.cloud import storage def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): """GCSにファイルをアップロードする関数""" try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name) print(f"File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}") except Exception as e: print(f"GCSへのアップロード中にエラーが発生しました: {e}")
BigQuery にデータをロードするには、以下のコードを使用します。
from google.cloud import bigquery def load_csv_to_bigquery(bucket_name, blob_name, dataset_id, table_id, project_id, start_date, end_date): """GCS上のCSVファイルをBigQueryにロード""" try: # BigQueryクライアントの初期化 client = bigquery.Client(project=project_id) dataset_ref = client.dataset(dataset_id, project=project_id) table_ref = dataset_ref.table(table_id) # BigQueryジョブの設定 job_config = bigquery.LoadJobConfig( skip_leading_rows=1, source_format=bigquery.SourceFormat.CSV, field_delimiter="\t", write_disposition=bigquery.WriteDisposition.WRITE_APPEND, ) # GCS URI gcs_uri = f"gs://{bucket_name}/{blob_name}" # ロードジョブの開始 load_job = client.load_table_from_uri(gcs_uri, table_ref, job_config=job_config) load_job.result() # ジョブの完了を待機 print(f"GCS上のファイル {gcs_uri} を BigQuery の {dataset_id}.{table_id} にロードしました。") except Exception as e: print(f"BigQueryへのロード中にエラーが発生しました: {e}")
Cloud Schedulerの設定
Cloud Run にデプロイした Python スクリプトを自動的に実行するため、Cloud Scheduler を設定します。 Cloud Scheduler は、指定したスケジュールに基づいて HTTP リクエストを送信し、Cloud Run Job をトリガーします。
Cloud Scheduler の設定手順
- Cloud Run Job のエンドポイントの確認
Cloud Run にデプロイした Job の詳細画面から、トリガーに使用する URL を確認します。この URL は、Cloud Scheduler から HTTP リクエストを送信する先となります。
(例: https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/[プロジェクトID]/jobs/[ジョブ名]:run)
- サービスアカウントの作成
Cloud Scheduler が Cloud Run Job を実行するための権限を持つサービスアカウントを作成します。サービスアカウントに、Cloud Run 起動元のロールを付与します。これにより、サービスアカウントは Cloud Run Job を起動できるようになります。
- Cloud Scheduler 設定例
gcloud scheduler jobs create http tmetric-data-schedule \ --schedule="0 23 * * *" \ --time-zone="Asia/Tokyo" \ --location="asia-northeast1" \ --uri="https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/[プロジェクトID]/jobs/tmetric-data:run" \ --oauth-service-account-email="[サービスアカウント名]@[プロジェクトID].iam.gserviceaccount.com" \ --description="TMetric データ抽出スケジュール" \ --project="[プロジェクトID]"
- よりセキュアな環境を構築するためには、Cloud Run Job に Identity-Aware Proxy (IAP) を設定し、Cloud Scheduler からのアクセスのみを許可することも可能です。
- Cloud Logging を活用することで、ジョブの実行履歴やエラーログを詳細に確認できます。
- Cloud Monitoring と連携することで、ジョブの実行状況を監視し、異常発生時にアラートを送信できます。
工夫 -自動化と運用の効率化
- 設定ファイルの利用: 環境変数だけでなく、設定ファイル(YAML など)を利用することで、より複雑な設定を管理しやすくしました。
- ログ出力の強化: 詳細なログの出力による、問題発生時の原因特定容易化。Cloud Loggingとの連携によるログの一元管理。
- テストの自動化: 単体テストや結合テストの自動化による、コード品質の向上。
- エラー通知: エラー発生時にSlackなどのチャットツールに通知を送信する設定による、迅速な対応が可能に。
- Cloud Schedulerの利用: Cloud Run Jobsでのスケジュール実行を目的とした、Cloud Schedulerによるhttpsリクエストの送信。
まとめ
Cloud Run と Python を組み合わせることで、TMetric のデータを自動的に抽出し、BigQuery にロードするパイプラインを構築することができました。Cloud Run のサーバーレス環境がインフラ管理の負担を軽減し、Python の豊富なライブラリがデータ処理を容易にしました。この自動化によって、手作業によるデータ収集の手間を省き、より効率的にデータ分析を行うことができます。
今回の実装で得られた知見は、他のトラッキングツールや API からのデータ抽出にも応用できます。ぜひ、Cloud Run と Python を活用してみてください。