Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions providers/google/docs/operators/cloud/dataflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,6 @@ Here is an example of creating and running a pipeline in Java with jar stored on
:start-after: [START howto_operator_start_java_job_jar_on_gcs_deferrable]
:end-before: [END howto_operator_start_java_job_jar_on_gcs_deferrable]

Here is an example of creating and running a pipeline in Java with jar stored on local file system:

.. exampleinclude:: /../../google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_java_job_local_jar]
:end-before: [END howto_operator_start_java_job_local_jar]

Here is an example of creating and running a streaming pipeline in Java with jar stored on GCS:

.. exampleinclude:: /../../google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import CheckJobRunning
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator

try:
from airflow.sdk import TriggerRule
Expand All @@ -57,13 +56,6 @@
PUBLIC_BUCKET = "airflow-system-tests-resources"

JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar"
# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple
# worker.
# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar.
# Because gcs/data/ is shared folder for Airflow's workers.
IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
GCS_OUTPUT = f"gs://{BUCKET_NAME}"
LOCATION = "europe-west3"
Expand All @@ -77,44 +69,6 @@
) as dag:
create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)

download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name=REMOTE_JAR_FILE_PATH,
bucket=PUBLIC_BUCKET,
filename=LOCAL_JAR,
)

# [START howto_operator_start_java_job_local_jar]
start_java_job_direct = BeamRunJavaPipelineOperator(
task_id="start_java_job_direct",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
)
# [END howto_operator_start_java_job_local_jar]

start_java_job_direct_deferrable = BeamRunJavaPipelineOperator(
task_id="start_java_job_direct_deferrable",
jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
deferrable=True,
)

# [START howto_operator_start_java_job_jar_on_gcs]
start_java_job_dataflow = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
Expand Down Expand Up @@ -161,11 +115,8 @@
(
# TEST SETUP
create_bucket
>> download_file
# TEST BODY
>> [
start_java_job_direct,
start_java_job_direct_deferrable,
start_java_job_dataflow,
start_java_job_dataflow_deferrable,
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,6 @@
)
# [END howto_operator_start_python_job]

start_python_job_direct = BeamRunPythonPipelineOperator(
task_id="start_python_job_direct",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.67.0"],
py_interpreter="python3",
py_system_site_packages=False,
)

start_python_job_direct_deferrable = BeamRunPythonPipelineOperator(
task_id="start_python_job_direct_deferrable",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.67.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)

start_python_job_dataflow_deferrable = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job_dataflow_deferrable",
Expand Down Expand Up @@ -138,8 +113,6 @@
# TEST BODY
>> [
start_python_job_dataflow,
start_python_job_direct,
start_python_job_direct_deferrable,
start_python_job_dataflow_deferrable,
]
>> stop_dataflow_job
Expand Down