[A-00159]GCPで分析基盤を作成してみる①

Google Cloud上に分析基盤(Cloud Pub/Sub, Dataflow, BigQuery)を作成してみるの第一弾。

とりあえずこの回ではベースとなる基盤を作成してみるところだけやりたいと思います。

・Architecture

上記の通り、ローカルPCからデータ(JSON)を送信してPub/Sub→Dataflow→BigQueryという順にデータを連携していきます。

・手順① PubSub Topicを作成する

まずGoogleのクラウドコンソールよりPubSubトピックを作成します。

適当なTopic IDを入力して[作成]ボタンを押下します。

これでトピックは作成されました。下記の通り、PubSubの一覧画面に表示されます。

・手順② BigQueryにテーブルを作成する

次にBigQueryにデータセットとテーブルを作成します。すでに適当なデータセットがある場合はそれを使ってください。

下記のようなスキーマのテーブルを一つ作成します。

・手順③ サービスアカウントを作成する

次にDataflowでしようするサービスアカウントを一つ作成します。

上記のアカウントには次のロールが付いています。[Cloud Dataflow サービスエージェント]になります

・手順④ Dataflowを作成する

次にDataflowを作成します。[テンプレートから作成]を選択します。

下記の赤枠の中に値を入力します。[ジョブ名]は任意のIDを入力します。

[リージョンエンドポイント]は東京を選択します。

[Dataflow テンプレート]では[Pub/Sub Topic to BigQuery]を選択します。

次に[必須パラメータ]の欄を埋めていきます。PubSubTopicには先ほど作成したTopicを選択します。

次に[BigQuery output table]には先ほど作成したテーブルを選択します。

次にGoogle Storageを選択します。Google Storageはこの選択欄の中で作成をできますので適当なストレージを作成します。

次にオプションパラメータの欄をクリックして隠れている欄を表示します。

その中にサービスアカウントを選択する欄があるので先ほど作成したサービスアカウントを選択します。

上記が選択できたら画面最下部にある[ジョブを実行]を押下します。

ボタンを押下したら下記のようにグラフが表示されます(グラフが表示されるまで一定の時間が必要です)

・動作確認

PubSubにてメッセージをパブリッシュしてBigQueryにデータが入るかを確認します。

先ほど作成したTopicを選択します。

[メッセージ]タブを押下します。

[メッセージをパブリッシュ]ボタンを押下します。

ここにJSONテキストを入力します。

{"id": "10000", "datetime": "2023-10-03 18:15:30", "message": "owaf"}

テキストを入力して公開ボタンを押下したらDataflowを見てみます。ログが出力されているはずです。

次にBigQueryのテーブルを確認するとデータが入っているはずです。

・手順⑤ 外部プログラムよりトピックにメッセージを送信してみる

ローカルpcでターミナルを開き下記のコマンドを実行します。gcloudで実行してみます。

gcloud pubsub topics publish projects/xxxx/topics/df-topic --message '{"id": "30002", "datetime": "2023-09-22 18:15:30", "message": "このコマンドがエラーになりませんように"}'
MacBook-Pro:test1$ gcloud pubsub topics publish projects/xxxxx/topics/df-topic --message '{"id": "30002", "datetime": "2023-09-22 18:15:30", "message": "このコマンドがエラーになりませんように"}'
messageIds:
- '8313010155340224'

上記の通り、実行がうまくいくとメッセージIDが発行されます。

BigQueryを確認すると下記のようにレコードが追加されています。

次にpythonで実行してみます。

pubsubのライブラリをインストールします。

pip install --upgrade google-cloud-pubsub

ソースは下記になります。

from google.cloud import pubsub_v1


publisher = pubsub_v1.PublisherClient()

topic_path = publisher.topic_path("xxxx", "df-topic")

sample_dict = b'{"id": "10000", "datetime": "2020-10-03 18:15:30", "message": "Good Luck!"}'

future = publisher.publish(topic_path, sample_dict)
print(future.result())

上記を実行してみます。

python3 pubsub.py

実行が完了すると下記のようにレコードが追加されます。

・Appendix

公式ドキュメントはこちら

※ちなみにGoogleのTutorialはまんまやると動きません。

https://cloud.google.com/dataflow/docs/tutorials/dataflow-stream-to-bigquery?hl=ja

https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library?hl=ja

参考文献はこちら

https://qiita.com/XPT60/items/61e16fd980e5ea87519b

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

*