[A-00174]Cloud PubSubを使ってアプリを作成する
google cloudのCloud Pub/Subを使って簡単なアプリを作ってみます。
・JavaでPublisherとSubscriberを作成し、送受信してみる
Mavenプロジェクトを作成します。適当なフォルダを作成し、下記のコマンドを実行
mvn archetype:generate \
> -DarchetypeArtifactId=maven-archetype-quickstart \
> -DinteractiveMode=false \
> -DgroupId=pubsuttest \
> -DartifactId=pubsubtest
次にgcloudコマンドでpubsub topic, subscriptionを作成します。
# create pubsub topic.
gcloud pubsub topics create test-topic
# create pubsub subscription.
gcloud pubsub subscriptions create test-sub --topic test-topic
mavenで作成したpom.xmlを下記の通りに書き換えます。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>pubsuttest</groupId>
<artifactId>pubsubtest</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>pubsubtest</name>
<url>http://maven.apache.org</url>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.27.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
</dependencies>
</project>
mavenビルドを実行します。下記のキャプチャの通り、画面から実行する場合は下記のコマンドを選択して実行します。

ターミナルから実行する場合は下記の通りです。
mvn clean install -DskipTests=true
次にPublisherプログラムを作成します。
package pubsuttest;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
public class PublisherMain {
public static void main(String[] args) throws Exception {
String projectId = "<project-id>";
String topicId = "test-topic";
publishMain(projectId, topicId);
}
public static void publishMain(String projectId, String topicId) throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topicName).build();
String message = "This is Test.";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published message ID:" + messageId);
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
上記を実行します。下記のVSCode上の実行ボタンを押下します。

下記の通り、ログがターミナルに表示されます。
MacBook-Pro pubsubtest % /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Content
s/Home/bin/java @/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest
.PublisherMain
Published message ID:9152561376235525
Cloudコンソール上でPub/Subのメトリクスを確認すると下記の通りにメッセージが発行されたことを示すグラフが確認できます。

次にサブスクライバーのプログラムを作成します。
package pubsuttest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
public class SubscriberAsyncMain {
public static void main(String[] args) throws Exception {
String projectId = "dataflowtest002";
String subscriptionId = "test-sub";
subscribeAsyncMain(projectId, subscriptionId);
}
public static void subscribeAsyncMain(String projectId, String subscriptionId) {
ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageReceiver =
(PubsubMessage message, AckReplyConsumer consumer ) -> {
System.out.println("Id:" + message.getMessageId());
System.out.println("Data:" + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(projectSubscriptionName, messageReceiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", projectSubscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
上記を作成したらまず、SubscriberAsyncMainを実行します。次にPublisherMainを実行すると下記の通りターミナルに実行ログが表示されます。
MacBook-Pro pubsubtest % /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Content
s/Home/bin/java @/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest
.PublisherMain
Published message ID:10130482704624949
MacBook-Pro pubsubtest % /usr/bin/env /Library/Java/JavaVirtualMachines/jdk-21.jdk/Contents/Home/bin/java @
/var/folders/bz/b6k5z51n4bv4q575_0ynyxqr0000gn/T/cp_dvafrvmyt7i7vc45pps7472yn.argfile pubsuttest.SubscriberAsyncMain
Listening for messages on projects/xxxx/subscriptions/test-sub:
Id:10130482704624949
Data:This is Test.
SubscriberAsyncMainは30秒放置すると自動で止まる仕組みですが、強制終了する場合はCtrl+Cでできます。
・Appendix
公式ドキュメントはこちら
https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library?hl=ja#java
コメントを残す