[A-00171]Apache BeamをDataflowで動かしてみる
下記の記事で作成したプログラムを使用します。
今回は作成したApache BeamプログラムをDataflowジョブで動かしてみたいと思います。
今回作成したプログラムは上記の記事にて紹介してます。Dataflow上での実行結果はエラーとなりますのでとりあえず動かすところまでを紹介ます。
・ソースコード
ディレクトリ構成は下記です。基本的にApp.javaとpom.xmlがあれば動かせます。

Java+Mavenで動かします。環境はJDK20となっております。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>demo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<beam.version>2.51.0</beam.version>
<bigquery.version>v2-rev20220924-2.0.0</bigquery.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>20</maven.compiler.source>
<maven.compiler.target>20</maven.compiler.target>
<google-api-client.version>2.0.0</google-api-client.version>
<guava.version>32.0.1-jre</guava.version>
<pubsub.version>v1-rev20220904-2.0.0</pubsub.version>
<libraries-bom.version>26.17.0</libraries-bom.version>
</properties>
<dependencies>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google-api-client.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>${bigquery.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-core -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.51.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.51.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version> <!-- "-jre" for Java 8 or higher -->
</dependency>
<!-- GCP libraries BOM sets the version for google http client -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>${libraries-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>dataflow-runner</id>
<!-- Makes the DataflowRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
package com.example;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
/**
* Hello world!
*
*/
public class App {
public interface Options extends StreamingOptions {
@Description("Input text to print.")
@Default.String("My input text")
String getInputText();
void setInputText(String value);
}
public static PCollection<String> buildPipeline(Pipeline pipeline, String inputText) {
return pipeline.apply("Create elements", Create.of(Arrays.asList("Hello", "World!", inputText)))
.apply("Print elements"
, MapElements.into(TypeDescriptors.strings()).via(x -> {
System.out.println(x);
return x;
}));
}
public static void main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
var pipeline = Pipeline.create(options);
App.buildPipeline(pipeline, options.getInputText());
pipeline.run().waitUntilFinish();
}
}
上記を作成したらmvnコマンドでclean,installを実行してください。
・Google Cloudに認証設定を行う。
google-cloud-sdkがインストールされた状態で下記のコマンドを実行していきます。
まずは利用するプロジェクトIDをプロファイルに設定します。
gcloud config set project <your-project-id>
次に利用するサービスのAPIを有効にします。
gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
次にローカル認証でログインします。
gcloud auth application-default login
次にGoogleアカウントにロール付与します
gcloud projects add-iam-policy-binding <your-project-id> --member="user:<your-account-gmail-address>" --role=roles/iam.serviceAccountUser
次にCloud Storageバケットを作成します。適当な一意の名前で作成します。
gsutil mb -c STANDARD -l US gs://<bucket-name>
次にサービスアカウントにロールを付与します。デフォルトで作成されるサービスアカウントの先頭IDを入力してください。
gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/dataflow.admin
gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/dataflow.worker
gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/storage.objectAdmin
・Dataflowをデプロイ・ジョブ実行する
下記のMavenコマンドでデプロイと実行を両方同時に行います。
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<your-project-id> --gcpTempLocation=gs://<bucket-name>/temp/ --runner=DataflowRunner --region=asia-northeast1"
実行すると下記の長いログが出力されます。最終的にはエラーとなるので実行される雰囲気だけ掴んでください。
[INFO]
[INFO] --------------------------< com.example:demo >--------------------------
[INFO] Building demo 1.0-SNAPSHOT
[INFO] from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.pom (28 kB at 40 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.pom (19 kB at 296 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.jar (486 kB at 2.0 MB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.jar (299 kB at 3.2 MB/s)
[INFO]
[INFO] --- resources:3.0.2:resources (default-resources) @ demo ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/anonymous/Documents/practice/apache-beam/test2/demo/src/main/resources
[INFO]
[INFO] --- compiler:3.8.0:compile (default-compile) @ demo ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec:3.1.0:java (default-cli) @ demo ---
Oct 31, 2023 2:52:50 AM org.apache.beam.runners.dataflow.options.DataflowPipelineOptions$StagingLocationFactory create
INFO: No stagingLocation provided, falling back to gcpTempLocation
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.dataflow.DataflowRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 250 files. Enable logging at DEBUG level to see which files will be staged.
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
....
INFO: 2023-10-30T17:53:11.021Z: Starting 1 workers in asia-northeast1-b...
....
INFO: 2023-10-30T17:54:36.535Z: Workers have started successfully.
....
INFO: 2023-10-30T17:55:27.639Z: Stopping worker pool...
....
INFO: 2023-10-30T17:55:59.149Z: Worker pool stopped.
....
INFO: Job 2023-10-30_10_53_00-4745917885078386353 failed with status FAILED.
[WARNING] thread Thread[#29,pool-2-thread-1,5,com.example.App] was interrupted but is still alive after waiting at least 15000msecs
[WARNING] thread Thread[#29,pool-2-thread-1,5,com.example.App] will linger despite being asked to die via interruption
[WARNING] thread Thread[#30,OpenCensus.Disruptor-0,5,com.example.App] will linger despite being asked to die via interruption
[WARNING] NOTE: 2 thread(s) did not finish despite being asked to via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:43 min
[INFO] Finished at: 2023-10-31T02:56:20+09:00
[INFO] ------------------------------------------------------------------------
Google Cloudコンソール上では下記のようになっています。

これで実行の流れが掴めたかと思います。
次回はちゃんと正常終了するジョブを作成したいと思います。
・Appendix
公式ドキュメントはこちら
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline?hl=ja
https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-java?hl=ja
参考文献はこちら
https://stackoverflow.com/questions/69771678/how-to-add-apache-beam-direct-runner-to-classpath
コメントを残す