【GCP】Cloud FunctionsのCloud StorageトリガーでDataFusionパイプラインを起動

こんにちは。SCSKの山口です。

今回は、Cloud Functions とData Fusionを組み合わせて、下記のパイプラインを構築してみたいと思います。

  • Cloud Functions のEventarc トリガーでGCSバケットへのファイル配置・更新を検知する
  • Cloud Functions からDataFusionパイプラインを起動し、データ加工とBigQuery へのデータ格納を行う

特定のバケットにファイルが置かれた際に、それを検知してData Fusion起動→BigQuery テーブルへデータ格納。といった流れです。

Data Fusion パイプライン作成

今回は、下記のブログで作成したパイプラインを再利用します。

【GCP】Cloud Data Fusion で Wrangler を柔軟に使い倒す
Google CloudのData Fusionでパイプラインを構築する機会が多くあり、その中でも「Wrangler」のプライグインを多く使用したので、その際に便利だった機能をご紹介します。

CSVファイル内のハイフン付きの電話番号からハイフンを取り除き、BigQuery テーブルへ格納するパイプラインです。

Wranglerプラグインを直感的に操作する方法を紹介しています。本ブログのメインではないので詳細は省きます。気になる方はぜひご覧ください。

 

Cloud Functions ファンクション作成

次にファンクションを作成し、デプロイします。

構成

下記構成で作成します。

  • 基本
    • 環境:第2世代
    • 関数名:yamaguchi-test-datafusionkick
    • リージョン:asia-northeast1(東京)
  • トリガー
    • トリガーのタイプ:Cloud Storage
    • イベントタイプ:google.cloud.storage.object.v1.finalized
    • バケット:yamaguchi_test_20240806

イベントタイプの「google.cloud.storage.object.v1.finalized」については、下記ブログの「Cloud Storageトリガーとは?」で紹介してあるのでこちらをご覧ください。

Cloud FunctionsのCloud Storageトリガーをフォルダレベルで指定したい!!
先日、Dialogflow CXのAgentからのテストケース作成を自動化していて、「あるフォルダにファイルがアップロードされたら起動するCloud Functionsを作りたい」とふと思いました。そこで今回はバケットレベルで指定するCloud Storageトリガーをどうにかしてフォルダレベルで指定できないか調べてみました。

今回は、「yamaguchi_test_20240806」バケットを対象とし、バケットレベルで更新を検知する設定とします。

コード

今回作成したコードは下記のとおりです。

main.py
import functions_framework
import os
import requests
from google.cloud import data_fusion_v1
from google.auth import compute_engine
from google.auth.transport.requests import Request

@functions_framework.cloud_event
def call_datafusion(cloud_event):
    # Data Fusionクライアント生成
    DataFusion_client = data_fusion_v1.DataFusionClient()

    # アクセストークン取得 →{access_token}
    credentials = compute_engine.Credentials()
    credentials.refresh(Request())
    access_token = credentials.token

    # Data Fusionパイプライン呼び出し用の認証ヘッダ設定 →{auth_header_fusion}
    auth_header_fusion = {'Authorization': f'Bearer {access_token}'}

    #パラメータ設定
    namespace = 'default'
    app_name = 'yamaguchi_test_20240805'
    workflow_name = 'DataPipelineWorkflow'

    #CDAPのエンドポイントを取得(gcloudコマンド無し) →{cdap_endpoint}
    set_request = data_fusion_v1.GetInstanceRequest(
        name="projects/scsksdv-dev-XXXXXXXXX/locations/asia-northeast1/instances/test-df-02",
    )
    get_prm = DataFusion_client.get_instance(request=set_request)
    cdap_endpoint = get_prm.api_endpoint

    # CDAP APIにリクエストを送信
    url = f'{cdap_endpoint}/v3/namespaces/{namespace}/apps/{app_name}/workflows/{workflow_name}/start'
    response = requests.post(url, headers=auth_header_fusion)

    # レスポンスを確認
    if response.ok:
        print(f'パイプライン呼び出しは正常に処理されました。ステータスコード: {response.content.decode()}')
        print(f'IDトークン: {ID_token}')
        print(f'アクセストークン: {access_token}')
    else:
        print(f'パイプライン呼び出しでエラーが発生しました。ステータスコード: {response.content.decode()}')

    return ("END")
requirements.txt
functions-framework==3.*
google-cloud-storage
google-auth
google-cloud-data-fusion
pandas
requests

以上でファンクションは完成です。

 

ファンクションのコードに関しての備忘(※読み飛ばしても問題ないです)

コードを作成するにあたっていくつかの壁に当たったので備忘として残しておきます。

今回パイプラインを起動するコードの作成にあたって、下記の方法(コマンド実行)でパイプラインが起動できることをはじめに確認しました。

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/start"

上記コマンドでは、認証とパイプライン起動のために「AUTH_TOKEN」、「CDAP_ENDPOINT」を必要とします。

この「2つのトークンの取得 → CDAP APIへリクエスト送信」のコードをCloud Functions で実行したところ、下記のエラーにあたりました。

errorとなる部分
error内容
500 Internal Server Error: The server encountered an initial error and was unable to complete your request.

詳細を調べたところ、Cloud Functions内で「gcloud」のコマンドが実行できず「CDAP_ENDPOINT」が正常に取得できていないことが原因でした。

対応策

上記の対応策として、「gcloudコマンドを使わないコードに置き換える」手段を取りました。

下記ライブラリリファレンスから代替ライブラリを探し、「CDAP_ENDPOINT」を正しい形で取得できるようコードを修正しました。

API と Python ライブラリ  |  Google Cloud
→CDAP_ENDPOINTの欲しい形は「https://…」
→怪しいライブラリを発見
→コピペして実行してみると、欲しい形「https://…」でCDAP_ENDPOINTが取得できた

 

実践

ファンクションとパイプラインが準備できたので、ここから実践に入ります。

まず、今回やりたいことをおさらいします。

 

  • Cloud Storageバケットの変更、更新をCloud FunctionsのCloud Storageトリガー出検知
  • Cloud FunctionsからData Fusionのパイプラインをキック
  • Cloud Storage上のファイルをBigQueryテーブルに取り込む

仰々しく図式化しましたが、やりたいことはシンプルです。

 

今回使用する、データ取り込み先のテーブルは下記です。

また、GSCに配置する(テーブルに取り込む)CSVファイルの中身はこんな感じになっています。

 

それではファイルをバケットに配置してみます。

 バケットにファイルを配置すると、

パイプラインが起動されました。(Provisioning状態)

しばらく待つと、

パイプライン処理が完了し、BigQueryテーブルにレコードが追加されました。大成功です。

 

最後に

今回はCloud FunctionsのCloud Storageトリガーでバケットの変更を検知し、Data Fusionパイプラインを起動してみました。

バケット単位での変更検知ができるとなると、次にやりたいのは「フォルダ・ファイル単位」の変更検知ですよね。
実は、フォルダ・ファイル単位の検知トリガーはまだ実装されていません。(なんてことだ・・・)

しかし、Cloud Functions内のコードに処理を寄せることで、疑似的に実現することは可能です。
こちらについては下記ブログで詳細な方法が紹介されているのでぜひご覧ください。(先を越されてた・・・)

Cloud FunctionsのCloud Storageトリガーをフォルダレベルで指定したい!!
先日、Dialogflow CXのAgentからのテストケース作成を自動化していて、「あるフォルダにファイルがアップロードされたら起動するCloud Functionsを作りたい」とふと思いました。そこで今回はバケットレベルで指定するCloud Storageトリガーをどうにかしてフォルダレベルで指定できないか調べてみました。
著者について

Google Cloud歴2年目です。
日々捌ききれないほどのインプットを浴びているので、本ブログをアウトプットの場として活用しています。
-----
好きな(よく触っている)サービス:BigQuery、Cloud Functions、Data Fusion
保有資格:応用情報技術者、Google Cloud 認定資格全冠(11冠)
受賞歴:Google Cloud Partner Top Engineer 2025(カテゴリ:General)受賞

山口翔平をフォローする

クラウドに強いによるエンジニアブログです。

SCSKクラウドサービス(Google Cloud)は、Google Cloudの多彩なAIや各種サービスを活用したワンストップソリューションを提供します。SCSKのノウハウや体制を有効活用し、業務課題の解決に必要な全体検討と組み合わせで、最適な業務実装まで支援します。

Google Cloudデータベースデータ分析・活用基盤
シェアする
タイトルとURLをコピーしました