Cloud Run × PythonでタイムトラッキングツールからAPI自動取得バッチを構築してみた

こんにちは、SCSKの齋藤です。

本記事では、TMetric というタイムトラッキングツールからデータを自動で抽出し、BigQueryにロードするまでのプロセスを、Cloud RunとPythonを活用して実装した事例を紹介します。日々の業務で利用するトラッキングツールのデータ分析に活用したい、自動化して効率的にデータ収集を行いたい、という方にとって参考にしていただける内容となっています。TMetric のデータを活用し、プロジェクトの進捗状況の把握、メンバーの稼働状況の分析、リソース配分の最適化などを実現したい方は必見です。

 

この記事でわかること

  • Cloud Run の活用: Cloud Run を利用して、データ抽出処理を自動化するメリットと実装方法
  • Python によるデータ処理: データ抽出、CSV ファイルへの書き込み、GCS へのアップロード、BigQuery へのロードといった一連の処理を Python で行う方法
  • TMetric API の活用: TMetric API を利用して、普段業務で使用しているタイムトラッキングツールからデータを取得する方法

なぜCloud RunとPythonなのか? -それぞれの強み

今回の実装で Cloud Run と Python を選んだ理由は、それぞれの持つ強みがデータ抽出処理に最適だったからです。

Cloud Run のメリット

Cloud Run は、コンテナイメージをデプロイして実行できるサーバーレス実行環境です。以下のメリットがあります。

  • サーバーレス:インフラの管理が不要で、コードの実行に集中できる。サーバーのプロビジョニングやスケーリング、メンテナンスといった運用作業からも解放されます。
  • 自動スケーリング:トラフィックの増減に応じて自動的にスケールするため、リソースを効率的に利用可能です。
  • 従量課金:実際に利用したリソースに対してのみ課金されるため、コスト効率が高いです。
  • CI/CD連携:継続的インテグレーション/継続的デリバリーパイプラインとの連携が容易で、デプロイを自動化できます。

Pythonのメリット

Pythonは、汎用性の高いプログラミング言語であり、データ処理やAPI連携に強みを持っています。

  • 豊富なライブラリrequests(HTTPリクエスト)、csv(csvファイル操作)、google-cloud-storage (GCS 操作)、google-cloud-bigquery (BigQuery 操作) など、データ処理に必要なライブラリが豊富に揃っています。これらのライブラリを活用することで、複雑な処理も比較的簡単に実装できます。
  • 柔軟性:データ抽出、変換、ロードなど、データパイプラインの様々な処理をPythonで柔軟に実装できます。

Cloud Run × Pythonの相乗効果

Cloud Run の持つサーバーレスの利点と、Python の持つデータ処理能力を組み合わせることで、スケーラブルでコスト効率の高いデータ抽出パイプラインを構築できます。Cloud Run がインフラの管理を担い、Python が複雑なデータ処理を担うことで、開発者はビジネスロジックに集中できます。

実装の概要 - Cloud Run と Python の連携

今回の実装では、以下のステップで TMetric のデータを BigQuery に自動的にロードします。Python スクリプトは Cloud Run 上で実行されます。

  1. Cloud Run へのデプロイ: Python スクリプトを Docker イメージとして Cloud Run にデプロイします。Dockerfile には Python のバージョン、必要なライブラリ、スクリプトの実行コマンドなどを記述します。
  2. Python スクリプトの実行: Cloud Run が Docker イメージを実行し、Python スクリプトが TMetric API からデータを抽出します。
  3. データ変換: 抽出したデータを CSV 形式で一時ファイルに書き込みます。この際、タイムゾーンを JST に変換するなど、データ分析に適した形式に変換します。
  4. GCS アップロード: 作成した CSV ファイルを Google Cloud Storage (GCS) にアップロードします。google-cloud-storage ライブラリを使用します。
  5. BigQuery ロード: GCS にアップロードされた CSV ファイルを BigQuery にロードします。google-cloud-bigquery ライブラリを使用します。
  6. 自動実行: Cloud Run にデプロイされた Python スクリプトを、Cloud Scheduler でスケジュール設定し、定期的に自動実行します。

実装のポイント

TMetric API の活用とデータ構造の理解

TMetric API を利用する上で、以下の点に注意しました。

  • API エンドポイントの選定: TMetric API には、様々なエンドポイントが存在します。今回は、時間エントリーデータを取得するために、/accounts/{accountId}/timeentries エンドポイントを使用しました。
  • 認証: TMetric API へのアクセスには、認証トークンが必要です。環境変数を使用してセキュアにトークンを管理し、Authorization ヘッダーに含めてリクエストを送信します。
  • パラメータの設定: TMetric API には、様々なパラメータを設定できます。今回は、startDateendDateuserId パラメータを使用して、特定の期間とユーザーの時間エントリーデータを取得しました。
  • データ構造の理解: TMetric API から返される JSON データは、ネストされた構造を持っています。必要な情報を抽出するために、データの構造を理解し、適切なキーを指定する必要があります。時間エントリーデータには、startTimeendTimetaskuser などの情報が含まれています。

TMetric API レスポンス例

[
  {
    "id": 12345,
    "startTime": "2023-10-26T09:00:00Z",
    "endTime": "2023-10-26T17:00:00Z",
    "task": {
      "id": 67890,
      "name": "開発タスク",
      "externalLink": {
        "caption": "JIRA-123",
        "link": "https://example.com/browse/JIRA-123"
      }
    },
    "user": {
      "id": 11223,
      "name": "tarou"
    }
  },
  ...
]

Python コード – TMetric API からのデータ抽出例

import requests
import json
from datetime import datetime, timezone, timedelta
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    TMetric_WORKSPACE_ID: str
    TMetric_TOKEN: str
    class Config:
        env_prefix = ""

settings = Settings()
def fetch_tmetric_data(user_id, start_date, end_date):
    url = f"https://app.tmetric.com/api/v3/accounts/{settings.TMetric_WORKSPACE_ID}/timeentries"
    headers = {"Authorization": f"Bearer {settings.TMetric_TOKEN}"}
    params = {"startDate": start_date, "endDate": end_date, "userId": user_id}
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        return None

# Example usage
# TMetric_WORKSPACE_ID, TMetric_TOKEN を設定する
settings = Settings()
user_id = 123  # Replace with the actual user ID
start_date = "2025-01-01"
end_date = "2025-01-31"
data = fetch_tmetric_data(user_id, start_date, end_date)

if data:
    print(json.dumps(data, indent=2))  # Print the formatted JSON data
else:
    print("Failed to retrieve data.")

 

 環境変数の設定

TMetric API のトークンなどの機密情報は、環境変数として Cloud Run に設定します。Google Cloud Secret Manager と連携することで、よりセキュアに管理できます。Cloud Run の設定画面から、環境変数を設定します。

サービスアカウントの設定

Cloud Run のサービスアカウントに、GCS への書き込み権限、BigQuery への書き込み権限を付与します。最小限の権限を与えることで、セキュリティリスクを低減します。IAM & 管理画面から、サービスアカウントの権限を設定します。

Python コード – ライブラリ活用

Python コードでは、必要なライブラリをインポートし、それぞれの機能を活用します。

import csv
from datetime import datetime, timedelta, timezone
import requests
from google.cloud import bigquery, storage

# ... (省略)

例えば、GCS にファイルをアップロードするには、以下のコードを使用します。

from google.cloud import storage

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """GCSにファイルをアップロードする関数"""
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_name)

        print(f"File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}")
    except Exception as e:
        print(f"GCSへのアップロード中にエラーが発生しました: {e}")

BigQuery にデータをロードするには、以下のコードを使用します。

from google.cloud import bigquery

def load_csv_to_bigquery(bucket_name, blob_name, dataset_id, table_id, project_id, start_date, end_date):
    """GCS上のCSVファイルをBigQueryにロード"""
    try:
        # BigQueryクライアントの初期化
        client = bigquery.Client(project=project_id)
        dataset_ref = client.dataset(dataset_id, project=project_id)
        table_ref = dataset_ref.table(table_id)

        # BigQueryジョブの設定
        job_config = bigquery.LoadJobConfig(
            skip_leading_rows=1, 
            source_format=bigquery.SourceFormat.CSV,
            field_delimiter="\t",  
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        )

        # GCS URI
        gcs_uri = f"gs://{bucket_name}/{blob_name}"

        # ロードジョブの開始
        load_job = client.load_table_from_uri(gcs_uri, table_ref, job_config=job_config)
        load_job.result()  # ジョブの完了を待機
        print(f"GCS上のファイル {gcs_uri} を BigQuery の {dataset_id}.{table_id} にロードしました。")

    except Exception as e:
        print(f"BigQueryへのロード中にエラーが発生しました: {e}")

Cloud Schedulerの設定

Cloud Run にデプロイした Python スクリプトを自動的に実行するため、Cloud Scheduler を設定します。 Cloud Scheduler は、指定したスケジュールに基づいて HTTP リクエストを送信し、Cloud Run Job をトリガーします。

Cloud Scheduler の設定手順

  1. Cloud Run Job のエンドポイントの確認
    Cloud Run にデプロイした Job の詳細画面から、トリガーに使用する URL を確認します。この URL は、Cloud Scheduler から HTTP リクエストを送信する先となります。
    (例: https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/[プロジェクトID]/jobs/[ジョブ名]:run)
  2. サービスアカウントの作成
    Cloud Scheduler が Cloud Run Job を実行するための権限を持つサービスアカウントを作成します。サービスアカウントに、Cloud Run 起動元のロールを付与します。これにより、サービスアカウントは Cloud Run Job を起動できるようになります。

  3. Cloud Scheduler 設定例
    gcloud scheduler jobs create http tmetric-data-schedule \
        --schedule="0 23 * * *" \
        --time-zone="Asia/Tokyo" \
        --location="asia-northeast1" \
        --uri="https://asia-northeast1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/[プロジェクトID]/jobs/tmetric-data:run" \
        --oauth-service-account-email="[サービスアカウント名]@[プロジェクトID].iam.gserviceaccount.com" \
        --description="TMetric データ抽出スケジュール" \
        --project="[プロジェクトID]"
    • よりセキュアな環境を構築するためには、Cloud Run Job に Identity-Aware Proxy (IAP) を設定し、Cloud Scheduler からのアクセスのみを許可することも可能です。
    • Cloud Logging を活用することで、ジョブの実行履歴やエラーログを詳細に確認できます。
    • Cloud Monitoring と連携することで、ジョブの実行状況を監視し、異常発生時にアラートを送信できます。

工夫 -自動化と運用の効率化

  • 設定ファイルの利用: 環境変数だけでなく、設定ファイル(YAML など)を利用することで、より複雑な設定を管理しやすくしました。
  • ログ出力の強化: 詳細なログの出力による、問題発生時の原因特定容易化。Cloud Loggingとの連携によるログの一元管理。
  • テストの自動化: 単体テストや結合テストの自動化による、コード品質の向上。
  • エラー通知: エラー発生時にSlackなどのチャットツールに通知を送信する設定による、迅速な対応が可能に。
  • Cloud Schedulerの利用: Cloud Run Jobsでのスケジュール実行を目的とした、Cloud Schedulerによるhttpsリクエストの送信。

まとめ

Cloud Run と Python を組み合わせることで、TMetric のデータを自動的に抽出し、BigQuery にロードするパイプラインを構築することができました。Cloud Run のサーバーレス環境がインフラ管理の負担を軽減し、Python の豊富なライブラリがデータ処理を容易にしました。この自動化によって、手作業によるデータ収集の手間を省き、より効率的にデータ分析を行うことができます。

今回の実装で得られた知見は、他のトラッキングツールや API からのデータ抽出にも応用できます。ぜひ、Cloud Run と Python を活用してみてください。

タイトルとURLをコピーしました