[A-00171]Apache BeamをDataflowで動かしてみる

下記の記事で作成したプログラムを使用します。

今回は作成したApache BeamプログラムをDataflowジョブで動かしてみたいと思います。

今回作成したプログラムは上記の記事にて紹介してます。Dataflow上での実行結果はエラーとなりますのでとりあえず動かすところまでを紹介ます。

・ソースコード

ディレクトリ構成は下記です。基本的にApp.javaとpom.xmlがあれば動かせます。

Java+Mavenで動かします。環境はJDK20となっております。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>demo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <beam.version>2.51.0</beam.version>
    <bigquery.version>v2-rev20220924-2.0.0</bigquery.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>20</maven.compiler.source>
    <maven.compiler.target>20</maven.compiler.target>
    <google-api-client.version>2.0.0</google-api-client.version>
    <guava.version>32.0.1-jre</guava.version>
    <pubsub.version>v1-rev20220904-2.0.0</pubsub.version>
    <libraries-bom.version>26.17.0</libraries-bom.version>
  </properties>

  <dependencies>
    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>${google-api-client.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-bigquery</artifactId>
      <version>${bigquery.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-core -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.51.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.51.0</version>
</dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>${guava.version}</version>  <!-- "-jre" for Java 8 or higher -->
      </dependency>
      <!-- GCP libraries BOM sets the version for google http client -->
      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>libraries-bom</artifactId>
        <version>${libraries-bom.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>
</project>
package com.example;

import java.util.Arrays;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

/**
 * Hello world!
 *
 */
public class App {

    public interface Options extends StreamingOptions {
        @Description("Input text to print.")
        @Default.String("My input text")
        String getInputText();

        void setInputText(String value);
        
    }

    public static PCollection<String> buildPipeline(Pipeline pipeline, String inputText) {
        return pipeline.apply("Create elements", Create.of(Arrays.asList("Hello", "World!", inputText)))
        .apply("Print elements"
        , MapElements.into(TypeDescriptors.strings()).via(x -> {
            System.out.println(x);
            return x;
        }));
    }



    public static void main(String[] args) {
        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        var pipeline = Pipeline.create(options);
        App.buildPipeline(pipeline, options.getInputText());
        pipeline.run().waitUntilFinish();
    }

}

上記を作成したらmvnコマンドでclean,installを実行してください。

・Google Cloudに認証設定を行う。

google-cloud-sdkがインストールされた状態で下記のコマンドを実行していきます。

まずは利用するプロジェクトIDをプロファイルに設定します。

gcloud config set project <your-project-id>

次に利用するサービスのAPIを有効にします。

gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com

次にローカル認証でログインします。

gcloud auth application-default login

次にGoogleアカウントにロール付与します

gcloud projects add-iam-policy-binding <your-project-id> --member="user:<your-account-gmail-address>" --role=roles/iam.serviceAccountUser

次にCloud Storageバケットを作成します。適当な一意の名前で作成します。

gsutil mb -c STANDARD -l US gs://<bucket-name>

次にサービスアカウントにロールを付与します。デフォルトで作成されるサービスアカウントの先頭IDを入力してください。

gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/dataflow.admin

gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/dataflow.worker

gcloud projects add-iam-policy-binding <your-project-id> --member="serviceAccount:<service-account-no>-compute@developer.gserviceaccount.com" --role=roles/storage.objectAdmin

・Dataflowをデプロイ・ジョブ実行する

下記のMavenコマンドでデプロイと実行を両方同時に行います。

mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.example.App -Dexec.args="--project=<your-project-id> --gcpTempLocation=gs://<bucket-name>/temp/ --runner=DataflowRunner --region=asia-northeast1"

実行すると下記の長いログが出力されます。最終的にはエラーとなるので実行される雰囲気だけ掴んでください。

[INFO] 
[INFO] --------------------------< com.example:demo >--------------------------
[INFO] Building demo 1.0-SNAPSHOT
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.pom (28 kB at 40 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.pom (19 kB at 296 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/2.51.0/beam-runners-google-cloud-dataflow-java-2.51.0.jar (486 kB at 2.0 MB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.51.0/beam-sdks-java-io-kafka-2.51.0.jar (299 kB at 3.2 MB/s)
[INFO] 
[INFO] --- resources:3.0.2:resources (default-resources) @ demo ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/anonymous/Documents/practice/apache-beam/test2/demo/src/main/resources
[INFO] 
[INFO] --- compiler:3.8.0:compile (default-compile) @ demo ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec:3.1.0:java (default-cli) @ demo ---
Oct 31, 2023 2:52:50 AM org.apache.beam.runners.dataflow.options.DataflowPipelineOptions$StagingLocationFactory create
INFO: No stagingLocation provided, falling back to gcpTempLocation
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.dataflow.DataflowRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 250 files. Enable logging at DEBUG level to see which files will be staged.
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:51 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Oct 31, 2023 2:52:52 AM org.apache.beam.runners.core.construction.Environments$JavaVersion forSpecification
WARNING: unsupported Java version: 20, falling back to: 17

....
INFO: 2023-10-30T17:53:11.021Z: Starting 1 workers in asia-northeast1-b...
....
INFO: 2023-10-30T17:54:36.535Z: Workers have started successfully.
....
INFO: 2023-10-30T17:55:27.639Z: Stopping worker pool...
....
INFO: 2023-10-30T17:55:59.149Z: Worker pool stopped.
....
INFO: Job 2023-10-30_10_53_00-4745917885078386353 failed with status FAILED.
[WARNING] thread Thread[#29,pool-2-thread-1,5,com.example.App] was interrupted but is still alive after waiting at least 15000msecs
[WARNING] thread Thread[#29,pool-2-thread-1,5,com.example.App] will linger despite being asked to die via interruption
[WARNING] thread Thread[#30,OpenCensus.Disruptor-0,5,com.example.App] will linger despite being asked to die via interruption
[WARNING] NOTE: 2 thread(s) did not finish despite being asked to via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  03:43 min
[INFO] Finished at: 2023-10-31T02:56:20+09:00
[INFO] ------------------------------------------------------------------------

Google Cloudコンソール上では下記のようになっています。

これで実行の流れが掴めたかと思います。

次回はちゃんと正常終了するジョブを作成したいと思います。

・Appendix

公式ドキュメントはこちら

https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline?hl=ja

https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-java?hl=ja#run-the-pipeline-on-the-dataflow-service

https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-java?hl=ja

参考文献はこちら

https://stackoverflow.com/questions/69771678/how-to-add-apache-beam-direct-runner-to-classpath

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

*