Skip to content

Commit

Permalink
Migrate Cncf.Kubernetes example DAGs to new design #22441 (#24132)
Browse files Browse the repository at this point in the history
* Migrate Cncf.Kubernetes example DAGs to new design #22441
  • Loading branch information
chethanuk authored Jun 3, 2022
1 parent 942e126 commit 7ad4e67
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 39 deletions.
17 changes: 0 additions & 17 deletions airflow/providers/cncf/kubernetes/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-cncf-kubernetes/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/cncf/kubernetes/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/cncf/kubernetes>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/>
Installing from sources <installing-providers-from-sources>

Expand Down
6 changes: 3 additions & 3 deletions docs/apache-airflow-providers-cncf-kubernetes/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Using this method will ensure correctness
and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_cluster_resources]
:end-before: [END howto_operator_k8s_cluster_resources]
Expand Down Expand Up @@ -122,7 +122,7 @@ Create the Secret using ``kubectl``:
Then use it in your pod like so:

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_private_image]
:end-before: [END howto_operator_k8s_private_image]
Expand All @@ -136,7 +136,7 @@ alongside the Pod. The Pod must write the XCom value into this location at the `

See the following example on how this occurs:

.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py
:language: python
:start-after: [START howto_operator_k8s_write_xcom]
:end-before: [END howto_operator_k8s_write_xcom]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
This is an example dag for using the KubernetesPodOperator.
"""

import os
from datetime import datetime

from kubernetes.client import models as k8s
Expand Down Expand Up @@ -97,6 +98,8 @@

# [END howto_operator_k8s_cluster_resources]

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_kubernetes_operator"

with DAG(
dag_id='example_kubernetes_operator',
Expand Down Expand Up @@ -158,6 +161,17 @@
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
# [END howto_operator_k8s_write_xcom]

write_xcom >> pod_task_xcom_result
# [END howto_operator_k8s_write_xcom]

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""

import os
from datetime import datetime, timedelta

# [START import_module]
Expand All @@ -40,27 +41,43 @@

# [START instantiate_dag]

dag = DAG(
'spark_pi',

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "spark_pi"

with DAG(
DAG_ID,
default_args={'max_active_runs': 1},
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
)
) as dag:
# [START SparkKubernetesOperator_DAG]
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="default",
application_file="example_spark_kubernetes_spark_pi.yaml",
do_xcom_push=True,
dag=dag,
)

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
dag=dag,
)
t1 >> t2

# [END SparkKubernetesOperator_DAG]
from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="default",
application_file="example_spark_kubernetes_spark_pi.yaml",
do_xcom_push=True,
dag=dag,
)
from tests.system.utils import get_test_run # noqa: E402

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
dag=dag,
)
t1 >> t2
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 7ad4e67

Please sign in to comment.