[A-00174]Cloud PubSubを使ってアプリを作成する

google cloudのCloud Pub/Subを使って簡単なアプリを作ってみます。

・JavaでPublisherとSubscriberを作成し、送受信してみる

Mavenプロジェクトを作成します。適当なフォルダを作成し、下記のコマンドを実行

mvn archetype:generate \
>  -DarchetypeArtifactId=maven-archetype-quickstart \
>  -DinteractiveMode=false \
>  -DgroupId=pubsuttest \
>  -DartifactId=pubsubtest

次にgcloudコマンドでpubsub topic, subscriptionを作成します。

# create pubsub topic.
gcloud pubsub topics create test-topic

# create pubsub subscription.
gcloud pubsub subscriptions create test-sub --topic test-topic

mavenで作成したpom.xmlを下記の通りに書き換えます。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>pubsuttest</groupId>
  <artifactId>pubsubtest</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>pubsubtest</name>
  <url>http://maven.apache.org</url>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>libraries-bom</artifactId>
        <version>26.27.0</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-pubsub</artifactId>
    </dependency>
  </dependencies>
</project>

mavenビルドを実行します。下記のキャプチャの通り、画面から実行する場合は下記のコマンドを選択して実行します。

ターミナルから実行する場合は下記の通りです。

mvn clean install -DskipTests=true

次にPublisherプログラムを作成します。

package pubsuttest;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

public class PublisherMain {
    
    public static void main(String[] args) throws Exception {
        String projectId = "<project-id>";
        String topicId = "test-topic";

        publishMain(projectId, topicId);
    }

    public static void publishMain(String projectId, String topicId) throws IOException, ExecutionException, InterruptedException {
        TopicName topicName = TopicName.of(projectId, topicId);

        Publisher publisher = null;
        try {
            publisher = Publisher.newBuilder(topicName).build();

            String message = "This is Test.";
            ByteString data = ByteString.copyFromUtf8(message);
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

            ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
            String messageId = messageIdFuture.get();
            System.out.println("Published message ID:" + messageId);

        } finally {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
        }
    }
}

上記を実行します。下記のVSCode上の実行ボタンを押下します。

下記の通り、ログがターミナルに表示されます。

MacBook-Pro pubsubtest %  /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Content
s/Home/bin/java @/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest
.PublisherMain 
Published message ID:9152561376235525

Cloudコンソール上でPub/Subのメトリクスを確認すると下記の通りにメッセージが発行されたことを示すグラフが確認できます。

次にサブスクライバーのプログラムを作成します。

package pubsuttest;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

public class SubscriberAsyncMain {
    
    public static void main(String[] args) throws Exception {
        String projectId = "dataflowtest002";
        String subscriptionId = "test-sub";

        subscribeAsyncMain(projectId, subscriptionId);
    }

    public static void subscribeAsyncMain(String projectId, String subscriptionId) {

        ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);

        MessageReceiver messageReceiver = 
        (PubsubMessage message, AckReplyConsumer consumer ) -> {
            System.out.println("Id:" + message.getMessageId());
            System.out.println("Data:" + message.getData().toStringUtf8());
            consumer.ack();
        };

        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(projectSubscriptionName, messageReceiver).build();
            subscriber.startAsync().awaitRunning();
            System.out.printf("Listening for messages on %s:\n", projectSubscriptionName.toString());
            subscriber.awaitTerminated(30, TimeUnit.SECONDS);
        } catch (TimeoutException timeoutException) {
            subscriber.stopAsync();
        }
    }


}

上記を作成したらまず、SubscriberAsyncMainを実行します。次にPublisherMainを実行すると下記の通りターミナルに実行ログが表示されます。

MacBook-Pro pubsubtest %  /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Content
s/Home/bin/java @/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest
.PublisherMain 
Published message ID:10130482704624949
MacBook-Pro pubsubtest %  /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Contents/Home/bin/java @
/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest.SubscriberAsyncMain 
Listening for messages on projects/xxxx/subscriptions/test-sub:
Id:10130482704624949
Data:This is Test.

SubscriberAsyncMainは30秒放置すると自動で止まる仕組みですが、強制終了する場合はCtrl+Cでできます。

・Appendix

公式ドキュメントはこちら

https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library?hl=ja#java

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

*