Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions providers/docker/tests/unit/docker/decorators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@

from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker

pytestmark = pytest.mark.db_test


DEFAULT_DATE = timezone.datetime(2021, 9, 1)
DILL_INSTALLED = find_spec("dill") is not None
DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not installed")
Expand All @@ -47,6 +44,7 @@


class TestDockerDecorator:
@pytest.mark.db_test
def test_basic_docker_operator(self, dag_maker, session):
@task.docker(image="python:3.9-slim", auto_remove="force")
def f():
Expand All @@ -63,6 +61,7 @@ def f():
ti = dr.get_task_instances(session=session)[0]
assert len(ti.xcom_pull()) == 100

@pytest.mark.db_test
def test_basic_docker_operator_with_param(self, dag_maker, session):
@task.docker(image="python:3.9-slim", auto_remove="force")
def f(num_results):
Expand All @@ -81,6 +80,7 @@ def f(num_results):
assert isinstance(result, list)
assert len(result) == 50

@pytest.mark.db_test
def test_basic_docker_operator_with_template_fields(self, dag_maker):
@task.docker(image="python:3.9-slim", container_name="python_{{dag_run.dag_id}}", auto_remove="force")
def f():
Expand All @@ -94,6 +94,7 @@ def f():
rendered = ti.render_templates()
assert rendered.container_name == f"python_{dr.dag_id}"

@pytest.mark.db_test
def test_basic_docker_operator_multiple_output(self, dag_maker, session):
@task.docker(image="python:3.9-slim", multiple_outputs=True, auto_remove="force")
def return_dict(number: int):
Expand All @@ -113,6 +114,7 @@ def return_dict(number: int):
assert ti.xcom_pull(key="43", session=session) == 43
assert ti.xcom_pull(session=session) == {"number": test_number + 1, "43": 43}

@pytest.mark.db_test
def test_no_return(self, dag_maker, session):
@task.docker(image="python:3.9-slim", auto_remove="force")
def f():
Expand Down Expand Up @@ -143,6 +145,7 @@ def do_run():
assert len(dag.task_ids) == 21
assert dag.task_ids[-1] == "do_run__20"

@pytest.mark.db_test
@pytest.mark.parametrize(
"kwargs, actual_exit_code, expected_state",
[
Expand Down Expand Up @@ -183,6 +186,7 @@ def f(exit_code):
ti = dr.get_task_instances(session=session)[0]
assert ti.state == expected_state

@pytest.mark.db_test
def test_setup_decorator_with_decorated_docker_task(self, dag_maker):
@setup
@task.docker(image="python:3.9-slim", auto_remove="force")
Expand All @@ -196,6 +200,7 @@ def f():
setup_task = dag.task_group.children["f"]
assert setup_task.is_setup

@pytest.mark.db_test
def test_teardown_decorator_with_decorated_docker_task(self, dag_maker):
@teardown
@task.docker(image="python:3.9-slim", auto_remove="force")
Expand All @@ -209,6 +214,7 @@ def f():
teardown_task = dag.task_group.children["f"]
assert teardown_task.is_teardown

@pytest.mark.db_test
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
def test_teardown_decorator_with_decorated_docker_task_and_on_failure_fail_arg(
self, dag_maker, on_failure_fail_dagrun
Expand All @@ -226,6 +232,7 @@ def f():
assert teardown_task.is_teardown
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun

@pytest.mark.db_test
@pytest.mark.parametrize(
"serializer",
[
Expand Down Expand Up @@ -277,6 +284,7 @@ def g():
assert some_task.serializer == clone_of_docker_operator.serializer
assert some_task.pickling_library is clone_of_docker_operator.pickling_library

@pytest.mark.db_test
def test_respect_docker_host_env(self, monkeypatch, dag_maker):
monkeypatch.setenv("DOCKER_HOST", "tcp://docker-host-from-env:2375")

Expand All @@ -289,6 +297,7 @@ def f():

assert ret.operator.docker_url == "tcp://docker-host-from-env:2375"

@pytest.mark.db_test
def test_docker_host_env_empty(self, monkeypatch, dag_maker):
monkeypatch.setenv("DOCKER_HOST", "")

Expand All @@ -303,6 +312,7 @@ def f():
# We want to ensure the same behavior.
assert ret.operator.docker_url == "unix://var/run/docker.sock"

@pytest.mark.db_test
def test_docker_host_env_unset(self, monkeypatch, dag_maker):
monkeypatch.delenv("DOCKER_HOST", raising=False)

Expand All @@ -315,6 +325,7 @@ def f():

assert ret.operator.docker_url == "unix://var/run/docker.sock"

@pytest.mark.db_test
def test_failing_task(self, dag_maker, session):
"""Test regression #39319

Expand Down Expand Up @@ -347,6 +358,7 @@ def f():
last_line_of_docker_operator_log = log_content.splitlines()[-1]
assert "ValueError: This task is expected to fail" in last_line_of_docker_operator_log

@pytest.mark.db_test
def test_invalid_serializer(self, dag_maker):
@task.docker(image="python:3.9-slim", auto_remove="force", serializer="airflow")
def f():
Expand Down Expand Up @@ -388,6 +400,7 @@ def f(): ...
f()
assert f"Unable to import `{serializer}` module." in caplog.text

@pytest.mark.db_test
@CLOUDPICKLE_MARKER
def test_add_cloudpickle(self, dag_maker):
@task.docker(image="python:3.9-slim", auto_remove="force", serializer="cloudpickle")
Expand All @@ -398,6 +411,7 @@ def f():
with dag_maker():
f()

@pytest.mark.db_test
@DILL_MARKER
def test_add_dill(self, dag_maker):
@task.docker(image="python:3.9-slim", auto_remove="force", serializer="dill")
Expand Down