[A-00159]GCPで分析基盤を作成してみる①
Google Cloud上に分析基盤(Cloud Pub/Sub, Dataflow, BigQuery)を作成してみるの第一弾。
とりあえずこの回ではベースとなる基盤を作成してみるところだけやりたいと思います。
・Architecture

上記の通り、ローカルPCからデータ(JSON)を送信してPub/Sub→Dataflow→BigQueryという順にデータを連携していきます。
・手順① PubSub Topicを作成する
まずGoogleのクラウドコンソールよりPubSubトピックを作成します。

適当なTopic IDを入力して[作成]ボタンを押下します。

これでトピックは作成されました。下記の通り、PubSubの一覧画面に表示されます。

・手順② BigQueryにテーブルを作成する
次にBigQueryにデータセットとテーブルを作成します。すでに適当なデータセットがある場合はそれを使ってください。
下記のようなスキーマのテーブルを一つ作成します。

・手順③ サービスアカウントを作成する
次にDataflowでしようするサービスアカウントを一つ作成します。

上記のアカウントには次のロールが付いています。[Cloud Dataflow サービスエージェント]になります

・手順④ Dataflowを作成する
次にDataflowを作成します。[テンプレートから作成]を選択します。

下記の赤枠の中に値を入力します。[ジョブ名]は任意のIDを入力します。
[リージョンエンドポイント]は東京を選択します。
[Dataflow テンプレート]では[Pub/Sub Topic to BigQuery]を選択します。

次に[必須パラメータ]の欄を埋めていきます。PubSubTopicには先ほど作成したTopicを選択します。

次に[BigQuery output table]には先ほど作成したテーブルを選択します。

次にGoogle Storageを選択します。Google Storageはこの選択欄の中で作成をできますので適当なストレージを作成します。


次にオプションパラメータの欄をクリックして隠れている欄を表示します。

その中にサービスアカウントを選択する欄があるので先ほど作成したサービスアカウントを選択します。

上記が選択できたら画面最下部にある[ジョブを実行]を押下します。

ボタンを押下したら下記のようにグラフが表示されます(グラフが表示されるまで一定の時間が必要です)

・動作確認
PubSubにてメッセージをパブリッシュしてBigQueryにデータが入るかを確認します。
先ほど作成したTopicを選択します。

[メッセージ]タブを押下します。

[メッセージをパブリッシュ]ボタンを押下します。

ここにJSONテキストを入力します。
{"id": "10000", "datetime": "2023-10-03 18:15:30", "message": "owaf"}

テキストを入力して公開ボタンを押下したらDataflowを見てみます。ログが出力されているはずです。

次にBigQueryのテーブルを確認するとデータが入っているはずです。

・手順⑤ 外部プログラムよりトピックにメッセージを送信してみる
ローカルpcでターミナルを開き下記のコマンドを実行します。gcloudで実行してみます。
gcloud pubsub topics publish projects/xxxx/topics/df-topic --message '{"id": "30002", "datetime": "2023-09-22 18:15:30", "message": "このコマンドがエラーになりませんように"}'
MacBook-Pro:test1$ gcloud pubsub topics publish projects/xxxxx/topics/df-topic --message '{"id": "30002", "datetime": "2023-09-22 18:15:30", "message": "このコマンドがエラーになりませんように"}'
messageIds:
- '8313010155340224'
上記の通り、実行がうまくいくとメッセージIDが発行されます。
BigQueryを確認すると下記のようにレコードが追加されています。

次にpythonで実行してみます。
pubsubのライブラリをインストールします。
pip install --upgrade google-cloud-pubsub
ソースは下記になります。
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("xxxx", "df-topic")
sample_dict = b'{"id": "10000", "datetime": "2020-10-03 18:15:30", "message": "Good Luck!"}'
future = publisher.publish(topic_path, sample_dict)
print(future.result())
上記を実行してみます。
python3 pubsub.py
実行が完了すると下記のようにレコードが追加されます。

・Appendix
公式ドキュメントはこちら
※ちなみにGoogleのTutorialはまんまやると動きません。
https://cloud.google.com/dataflow/docs/tutorials/dataflow-stream-to-bigquery?hl=ja
https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library?hl=ja
参考文献はこちら
https://qiita.com/XPT60/items/61e16fd980e5ea87519b
コメントを残す