diff --git a/providers/google/docs/operators/cloud/dataflow.rst b/providers/google/docs/operators/cloud/dataflow.rst index 6483912381e75..393fb614c22e1 100644 --- a/providers/google/docs/operators/cloud/dataflow.rst +++ b/providers/google/docs/operators/cloud/dataflow.rst @@ -144,14 +144,6 @@ Here is an example of creating and running a pipeline in Java with jar stored on :start-after: [START howto_operator_start_java_job_jar_on_gcs_deferrable] :end-before: [END howto_operator_start_java_job_jar_on_gcs_deferrable] -Here is an example of creating and running a pipeline in Java with jar stored on local file system: - -.. exampleinclude:: /../../google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_start_java_job_local_jar] - :end-before: [END howto_operator_start_java_job_local_jar] - Here is an example of creating and running a streaming pipeline in Java with jar stored on GCS: .. exampleinclude:: /../../google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py diff --git a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py index ed74ede79d472..a88366bd3eb9b 100644 --- a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py +++ b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py @@ -42,7 +42,6 @@ from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator from airflow.providers.google.cloud.operators.dataflow import CheckJobRunning from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator -from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator try: from airflow.sdk import TriggerRule @@ -57,13 +56,6 @@ PUBLIC_BUCKET = "airflow-system-tests-resources" JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar" -# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple -# worker. -# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar. -# Because gcs/data/ is shared folder for Airflow's workers. -IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", "")) -LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME -REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}" GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}" GCS_OUTPUT = f"gs://{BUCKET_NAME}" LOCATION = "europe-west3" @@ -77,44 +69,6 @@ ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) - download_file = GCSToLocalFilesystemOperator( - task_id="download_file", - object_name=REMOTE_JAR_FILE_PATH, - bucket=PUBLIC_BUCKET, - filename=LOCAL_JAR, - ) - - # [START howto_operator_start_java_job_local_jar] - start_java_job_direct = BeamRunJavaPipelineOperator( - task_id="start_java_job_direct", - jar=GCS_JAR, - pipeline_options={ - "output": GCS_OUTPUT, - }, - job_class="org.apache.beam.examples.WordCount", - dataflow_config={ - "check_if_running": CheckJobRunning.WaitForRun, - "location": LOCATION, - "poll_sleep": 10, - }, - ) - # [END howto_operator_start_java_job_local_jar] - - start_java_job_direct_deferrable = BeamRunJavaPipelineOperator( - task_id="start_java_job_direct_deferrable", - jar=LOCAL_JAR, - pipeline_options={ - "output": GCS_OUTPUT, - }, - job_class="org.apache.beam.examples.WordCount", - dataflow_config={ - "check_if_running": CheckJobRunning.WaitForRun, - "location": LOCATION, - "poll_sleep": 10, - }, - deferrable=True, - ) - # [START howto_operator_start_java_job_jar_on_gcs] start_java_job_dataflow = BeamRunJavaPipelineOperator( runner=BeamRunnerType.DataflowRunner, @@ -161,11 +115,8 @@ ( # TEST SETUP create_bucket - >> download_file # TEST BODY >> [ - start_java_job_direct, - start_java_job_direct_deferrable, start_java_job_dataflow, start_java_job_dataflow_deferrable, ] diff --git a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py index 05d4511663a74..93980bf9ff74b 100644 --- a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py +++ b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py @@ -80,31 +80,6 @@ ) # [END howto_operator_start_python_job] - start_python_job_direct = BeamRunPythonPipelineOperator( - task_id="start_python_job_direct", - py_file="apache_beam.examples.wordcount", - py_options=["-m"], - pipeline_options={ - "output": GCS_OUTPUT, - }, - py_requirements=["apache-beam[gcp]==2.67.0"], - py_interpreter="python3", - py_system_site_packages=False, - ) - - start_python_job_direct_deferrable = BeamRunPythonPipelineOperator( - task_id="start_python_job_direct_deferrable", - py_file="apache_beam.examples.wordcount", - py_options=["-m"], - pipeline_options={ - "output": GCS_OUTPUT, - }, - py_requirements=["apache-beam[gcp]==2.67.0"], - py_interpreter="python3", - py_system_site_packages=False, - deferrable=True, - ) - start_python_job_dataflow_deferrable = BeamRunPythonPipelineOperator( runner=BeamRunnerType.DataflowRunner, task_id="start_python_job_dataflow_deferrable", @@ -138,8 +113,6 @@ # TEST BODY >> [ start_python_job_dataflow, - start_python_job_direct, - start_python_job_direct_deferrable, start_python_job_dataflow_deferrable, ] >> stop_dataflow_job