diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 4e55ab7ff15a0..8f371ee24278c 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -45,9 +45,6 @@ # You can override them on a per-task basis during operator initialization default_args={ "depends_on_past": False, - "email": ["airflow@example.com"], - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), # 'queue': 'bash_queue', diff --git a/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py b/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py index 3c9f59f1ea14d..a13e0164262a8 100644 --- a/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py +++ b/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py @@ -33,9 +33,6 @@ default_args={ "owner": "airflow", "depends_on_past": False, - "email": ["airflow@airflow.com"], - "email_on_failure": False, - "email_on_retry": False, }, start_date=datetime(2021, 1, 1), schedule=timedelta(1), diff --git a/providers/apache/kafka/tests/system/apache/kafka/example_dag_hello_kafka.py b/providers/apache/kafka/tests/system/apache/kafka/example_dag_hello_kafka.py index 0759a45652772..697b2a9d75418 100644 --- a/providers/apache/kafka/tests/system/apache/kafka/example_dag_hello_kafka.py +++ b/providers/apache/kafka/tests/system/apache/kafka/example_dag_hello_kafka.py @@ -33,8 +33,6 @@ default_args = { "owner": "airflow", "depend_on_past": False, - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), } diff --git a/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py b/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py index ee38cf5e761c5..8e1495f08154c 100644 --- a/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py +++ b/providers/google/tests/system/google/cloud/azure/example_azure_fileshare_to_gcs.py @@ -38,9 +38,6 @@ default_args={ "owner": "airflow", "depends_on_past": False, - "email": ["airflow@example.com"], - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), }, diff --git a/providers/opensearch/tests/system/opensearch/example_opensearch.py b/providers/opensearch/tests/system/opensearch/example_opensearch.py index 904f17a47be7a..f0dbc03bb851f 100644 --- a/providers/opensearch/tests/system/opensearch/example_opensearch.py +++ b/providers/opensearch/tests/system/opensearch/example_opensearch.py @@ -41,8 +41,6 @@ default_args = { "owner": "airflow", "depend_on_past": False, - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), } diff --git a/pyproject.toml b/pyproject.toml index 4414996e7ae61..948041b5beb26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -541,6 +541,7 @@ filterwarnings = [ # We cannot add warnings from the airflow package into `filterwarnings`, # because it invokes import airflow before we set up test environment which breaks the tests. # Instead of that, we use a separate parameter and dynamically add it into `filterwarnings` marker. +# Add airflow.exceptions.RemovedInAirflow4Warning when min provider version for providers is 3.0 forbidden_warnings = [ "airflow.exceptions.RemovedInAirflow3Warning", "airflow.exceptions.AirflowProviderDeprecationWarning", diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index 3efbd1ad0b6e2..072da2a6ae55d 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -34,6 +34,7 @@ import attrs +from airflow.exceptions import RemovedInAirflow4Warning from airflow.sdk.definitions._internal.abstractoperator import ( DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, DEFAULT_OWNER, @@ -532,11 +533,11 @@ class derived from this one results in the creation of a task object, (e.g. user/person/team/role name) to clarify ownership is recommended. :param email: the 'to' email address(es) used in email alerts. This can be a single email or multiple ones. Multiple addresses can be specified as a - comma or semicolon separated string or by passing a list of strings. + comma or semicolon separated string or by passing a list of strings. (deprecated) :param email_on_retry: Indicates whether email alerts should be sent when a - task is retried + task is retried (deprecated) :param email_on_failure: Indicates whether email alerts should be sent when - a task failed + a task failed (deprecated) :param retries: the number of retries that should be performed before failing the task :param retry_delay: delay between retries, can be set as ``timedelta`` or @@ -956,6 +957,25 @@ def __init__( self.email_on_retry = email_on_retry self.email_on_failure = email_on_failure + if email is not None: + warnings.warn( + "email is deprecated please migrate to SmtpNotifier`.", + RemovedInAirflow4Warning, + stacklevel=2, + ) + if email and email_on_retry is not None: + warnings.warn( + "email_on_retry is deprecated please migrate to SmtpNotifier`.", + RemovedInAirflow4Warning, + stacklevel=2, + ) + if email and email_on_failure is not None: + warnings.warn( + "email_on_failure is deprecated please migrate to SmtpNotifier`.", + RemovedInAirflow4Warning, + stacklevel=2, + ) + if execution_timeout is not None and not isinstance(execution_timeout, timedelta): raise ValueError( f"execution_timeout must be timedelta object but passed as type: {type(execution_timeout)}" diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index acabdcf06d909..da8b440423ce0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -2228,8 +2228,8 @@ def test_not_templateable_fields_in_serialized_dag(self): class TestOperator(BaseOperator): template_fields = ( - "email", # templateable "execution_timeout", # not templateable + "run_as_user", # templateable ) def execute(self, context: Context): @@ -2240,18 +2240,18 @@ def execute(self, context: Context): with dag: task = TestOperator( task_id="test_task", - email="{{ ','.join(test_email_list) }}", + run_as_user="{{ test_run_as_user }}", execution_timeout=timedelta(seconds=10), ) - task.render_template_fields(context={"test_email_list": ["foo@test.com", "bar@test.com"]}) - assert task.email == "foo@test.com,bar@test.com" + task.render_template_fields(context={"test_run_as_user": "foo"}) + assert task.run_as_user == "foo" with pytest.raises( AirflowException, match=re.escape( dedent( """Failed to serialize DAG 'test_dag': Cannot template BaseOperator field: - 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('email', 'execution_timeout')""" + 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('execution_timeout', 'run_as_user')""" ) ), ):