diff --git a/providers/google/docs/operators/cloud/dataflow.rst b/providers/google/docs/operators/cloud/dataflow.rst index f4b4408f59118..f72f724879687 100644 --- a/providers/google/docs/operators/cloud/dataflow.rst +++ b/providers/google/docs/operators/cloud/dataflow.rst @@ -138,6 +138,14 @@ Here is an example of creating and running a pipeline in Java with jar stored on :start-after: [START howto_operator_start_java_job_local_jar] :end-before: [END howto_operator_start_java_job_local_jar] +Here is an example of creating and running a streaming pipeline in Java with jar stored on GCS: + +.. exampleinclude:: /../../providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_java_streaming] + :end-before: [END howto_operator_start_java_streaming] + .. _howto/operator:PythonSDKPipelines: Python SDK pipelines diff --git a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py new file mode 100644 index 0000000000000..cf6c3228fc71e --- /dev/null +++ b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with Java(streaming). + +Important Note: + This test downloads Java JAR file from the public bucket. In case the JAR file cannot be downloaded + or is not compatible with the Java version used in the test. + There is no streaming pipeline example for Apache Beam Java SDK, the source code and build instructions + are located in `providers/google/tests/system/google/cloud/dataflow/resources/java_streaming_src/`. + + You can follow the instructions on how to pack a self-executing jar here: + https://beam.apache.org/documentation/runners/dataflow/ + +Requirements: + These operators require the gcloud command and Java's JRE to run. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.providers.apache.beam.hooks.beam import BeamRunnerType +from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator +from airflow.providers.google.cloud.operators.dataflow import DataflowStopJobOperator +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.operators.pubsub import ( + PubSubCreateTopicOperator, + PubSubDeleteTopicOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +DAG_ID = "dataflow_java_streaming" +LOCATION = "europe-west3" +BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}" +GCS_TMP = f"gs://{BUCKET_NAME}/temp" +GCS_OUTPUT = f"gs://{BUCKET_NAME}/DF_OUT" +RESOURCE_BUCKET = "airflow-system-tests-resources" +JAR_FILE_NAME = "stream-pubsub-example-bundled-v-0.1.jar" +GCS_JAR_PATH = f"gs://{RESOURCE_BUCKET}/dataflow/java/{JAR_FILE_NAME}" +# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple +# worker. +# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar. +# Because gcs/data/ is shared folder for Airflow's workers. +IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", "")) +LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME +REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}" + +OUTPUT_TOPIC_ID = f"tp-{ENV_ID}-out" +INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime" +OUTPUT_TOPIC = f"projects/{PROJECT_ID}/topics/{OUTPUT_TOPIC_ID}" + + +with DAG( + DAG_ID, + schedule="@once", + start_date=datetime(2025, 2, 1), + catchup=False, + tags=["example", "dataflow", "java", "streaming"], +) as dag: + create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) + download_file = GCSToLocalFilesystemOperator( + task_id="download_file", + object_name=f"dataflow/java/{JAR_FILE_NAME}", + bucket=RESOURCE_BUCKET, + filename=LOCAL_JAR, + ) + create_output_pub_sub_topic = PubSubCreateTopicOperator( + task_id="create_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False + ) + # [START howto_operator_start_java_streaming] + + start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator( + runner=BeamRunnerType.DataflowRunner, + task_id="start_java_streaming_dataflow_job", + jar=LOCAL_JAR, + pipeline_options={ + "tempLocation": GCS_TMP, + "input_topic": INPUT_TOPIC, + "output_topic": OUTPUT_TOPIC, + "streaming": True, + }, + dataflow_config={ + "job_name": f"java-streaming-job-{ENV_ID}", + "location": LOCATION, + }, + ) + # [END howto_operator_start_java_streaming] + stop_dataflow_job = DataflowStopJobOperator( + task_id="stop_dataflow_job", + location=LOCATION, + job_id="{{ task_instance.xcom_pull(task_ids='start_java_streaming_dataflow_job')['dataflow_job_id'] }}", + ) + delete_topic = PubSubDeleteTopicOperator( + task_id="delete_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID + ) + delete_topic.trigger_rule = TriggerRule.ALL_DONE + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + create_bucket + >> download_file + >> create_output_pub_sub_topic + # TEST BODY + >> start_java_streaming_job_dataflow + # TEST TEARDOWN + >> stop_dataflow_job + >> delete_topic + >> delete_bucket + ) + + from tests_common.test_utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD new file mode 100644 index 0000000000000..f8be0c9191ec8 --- /dev/null +++ b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD @@ -0,0 +1,58 @@ +# Dataflow Java Streaming + +This project is a streaming example running with latest Apache Beam Java SDK (v.2.63), +as there is no "official" streaming example yet https://beam.apache.org/get-started/wordcount-example/. + +Sample logic to direct streaming data from input Pub/Sub source to the output PubSub topic to work +with unbounded data source/destination. + +That used for Java dataflow streaming system tests. + + +## Requirements + +- **Java 17**: Ensure you have Java 17 installed and configured on your system. +- **Maven**: Make sure Maven is installed and configured on your system. +- Maven is used for dependency management and building the project. + +## Project Structure + +The project's structure is as follows: + +```plaintext +├── src +│ ├── main +│ │ ├── java +│ │ │ └── org +│ │ │ └── example +│ │ │ └── pubsub +│ │ │ └── StreamingExample.java +├── pom.xml +└── README.md +``` + +## Build +It was checked to build inside Breeze container with dependencies installed from the [requirements](#requirements). + +The output artifact is `target/stream-pubsub-example-bundled-v-0.1.jar` executable. + + +## Run +To run use the +```bash +java -jar target/stream_pubsub-bundled-sample-0.1.jar \ +--runner=DataflowRunner \ +--project= \ +--region= \ +--jobName= \ +--input_toopic= \ +--output_topic= +``` + +optionally you might add the +`'--labels='` or `--tempLocation=` and `--stagingLocation=` +or other dataflow pipeline options, if needed. + + +## Runners +The `DataflowRunner` and `DirectRunner` are supported. diff --git a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml new file mode 100644 index 0000000000000..85548bf7ab17e --- /dev/null +++ b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml @@ -0,0 +1,131 @@ + + + + 4.0.0 + org.example + stream-pubsub-example + v-0.1 + + UTF-8 + 2.63.0 + 2.0.16 + 1.113.0 + 2.45.0 + 2.45.0 + 17 + 17 + + 3.8.1 + 3.2.4 + + + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.version} + + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + ${beam.version} + + + + com.google.cloud + google-cloud-pubsub + ${google.cloud.version} + + + + + org.slf4j + slf4j-jdk14 + ${slf4j.version} + runtime + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + ${project.artifactId}-bundled-${project.version} + + + *:* + + META-INF/LICENSE + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + org.example.pubsub.StreamingExample + + + + + + + + + diff --git a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java new file mode 100644 index 0000000000000..2616fccad9253 --- /dev/null +++ b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java @@ -0,0 +1,42 @@ +package org.example.pubsub; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import java.util.logging.Logger; + + +public class StreamingExample { + + public interface StreamingExampleOptions extends PipelineOptions, StreamingOptions { + @Description("Input Pub/Sub Topic") + @Validation.Required + String getInput_topic(); + void setInput_topic(String inputTopic); + + @Description("Output Pub/Sub Topic") + @Validation.Required + String getOutput_topic(); + void setOutput_topic(String outputTopic); + } + + public static void main(String[] args) { + StreamingExampleOptions options = PipelineOptionsFactory + .fromArgs(args) + .withValidation() + .as(StreamingExampleOptions.class); + options.setStreaming(true); + + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInput_topic())) + .apply("WriteToPubSub", PubsubIO.writeStrings().to(options.getOutput_topic())); + + pipeline.run(); + } +} diff --git a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py index 7139170aff723..7c486526dca14 100755 --- a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py +++ b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py @@ -33,6 +33,7 @@ "static", "dist", "node_modules", + "non_python_src", ] PATH_EXTENSION_STRING = '__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore'