Skip to content

Conversation

@eitanme
Copy link
Contributor

@eitanme eitanme commented Oct 21, 2022

At @o-nikolas request, I'm creating a new PR to attempt to fix #16204 where the ExternalTaskSensor would hang indefinitely when an execution_date_fn is used, failed_states/allowed_states are set, and external DAGs have mixed states upstream.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add or update a unit test case to cover this edge? So that we don't regress again in the future. Somewhere here-ish:

def test_external_task_sensor_fn_multiple_execution_dates(self):
bash_command_code = """
{% set s=logical_date.time().second %}
echo "second is {{ s }}"
if [[ $(( {{ s }} % 60 )) == 1 ]]
then
exit 1
fi
exit 0
"""
dag_external_id = TEST_DAG_ID + '_external'
dag_external = DAG(dag_external_id, default_args=self.args, schedule=timedelta(seconds=1))
task_external_with_failure = BashOperator(
task_id="task_external_with_failure", bash_command=bash_command_code, retries=0, dag=dag_external
)
task_external_without_failure = EmptyOperator(
task_id="task_external_without_failure", retries=0, dag=dag_external
)
task_external_without_failure.run(
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(seconds=1), ignore_ti_state=True
)
session = settings.Session()
TI = TaskInstance
try:
task_external_with_failure.run(
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(seconds=1), ignore_ti_state=True
)
# The test_with_failure task is excepted to fail
# once per minute (the run on the first second of
# each minute).
except Exception as e:
failed_tis = (
session.query(TI)
.filter(
TI.dag_id == dag_external_id,
TI.state == State.FAILED,
TI.execution_date == DEFAULT_DATE + timedelta(seconds=1),
)
.all()
)
if len(failed_tis) == 1 and failed_tis[0].task_id == 'task_external_with_failure':
pass
else:
raise e
dag_id = TEST_DAG_ID
dag = DAG(dag_id, default_args=self.args, schedule=timedelta(minutes=1))
task_without_failure = ExternalTaskSensor(
task_id='task_without_failure',
external_dag_id=dag_external_id,
external_task_id='task_external_without_failure',
execution_date_fn=lambda dt: [dt + timedelta(seconds=i) for i in range(2)],
allowed_states=['success'],
retries=0,
timeout=1,
poke_interval=1,
dag=dag,
)
task_with_failure = ExternalTaskSensor(
task_id='task_with_failure',
external_dag_id=dag_external_id,
external_task_id='task_external_with_failure',
execution_date_fn=lambda dt: [dt + timedelta(seconds=i) for i in range(2)],
allowed_states=['success'],
retries=0,
timeout=1,
poke_interval=1,
dag=dag,
)
task_without_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
with pytest.raises(AirflowSensorTimeout):
task_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_external_task_sensor_delta(self):
self.add_time_sensor()
op = ExternalTaskSensor(
task_id='test_external_task_sensor_check_delta',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_delta=timedelta(0),
allowed_states=['success'],
dag=self.dag,
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_external_task_sensor_fn(self):
self.add_time_sensor()
# check that the execution_fn works
op1 = ExternalTaskSensor(
task_id='test_external_task_sensor_check_delta_1',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=lambda dt: dt + timedelta(0),
allowed_states=['success'],
dag=self.dag,
)
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
# double check that the execution is being called by failing the test
op2 = ExternalTaskSensor(
task_id='test_external_task_sensor_check_delta_2',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=lambda dt: dt + timedelta(days=1),
allowed_states=['success'],
timeout=1,
poke_interval=1,
dag=self.dag,
)
with pytest.raises(exceptions.AirflowSensorTimeout):
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_external_task_sensor_fn_multiple_args(self):
"""Check this task sensor passes multiple args with full context. If no failure, means clean run."""
self.add_time_sensor()
def my_func(dt, context):
assert context['logical_date'] == dt
return dt + timedelta(0)
op1 = ExternalTaskSensor(
task_id='test_external_task_sensor_multiple_arg_fn',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=my_func,
allowed_states=['success'],
dag=self.dag,
)
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_external_task_sensor_fn_kwargs(self):
"""Check this task sensor passes multiple args with full context. If no failure, means clean run."""
self.add_time_sensor()
def my_func(dt, ds_nodash, tomorrow_ds_nodash):
assert ds_nodash == dt.strftime("%Y%m%d")
assert tomorrow_ds_nodash == (dt + timedelta(days=1)).strftime("%Y%m%d")
return dt + timedelta(0)
op1 = ExternalTaskSensor(
task_id='test_external_task_sensor_fn_kwargs',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=my_func,
allowed_states=['success'],
dag=self.dag,
)
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@eitanme
Copy link
Contributor Author

eitanme commented Oct 21, 2022

Good point @o-nikolas I was being lazy and didn't want to figure out how to run the test suite. You've pushed me to do the right thing and I think I've added a commit/test that addresses it and fails with the old code while passing with the new. Let me know what you think.

@eitanme eitanme requested a review from o-nikolas October 21, 2022 23:08
@o-nikolas
Copy link
Contributor

Good point @o-nikolas I was being lazy and didn't want to figure out how to run the test suite. You've pushed me to do the right thing and I think I've added a commit/test that addresses it and fails with the old code while passing with the new. Let me know what you think.

Thanks for adding a test case! A committer will be required to run the workflow for a first time committer (and ultimately merge if everything passes and they are happy with the code). Unfortunately I am not a committer, I'll CC a few who may have time to have a look: @eladkal @potiuk @kaxil

@o-nikolas
Copy link
Contributor

There was a change merged recently to standardize quoting in Airflow, you'll need to rebase this PR and run static checks locally to patch those up.
You can enable pre-commit to easily run static checks on your code (readme).

@o-nikolas o-nikolas requested review from eladkal and potiuk October 25, 2022 17:56
@eitanme
Copy link
Contributor Author

eitanme commented Oct 26, 2022

@o-nikolas I merged in main enabled pre-commit and updated my quoting style so hopefully that's the last of the linting though we'll see for sure after PR checks run. Thanks for the help and heads up that it had changed.

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me

@eladkal eladkal added this to the Airflow 2.4.3 milestone Oct 27, 2022
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Oct 27, 2022
@potiuk
Copy link
Member

potiuk commented Oct 27, 2022

It's fine but I think we need a newsfragment describing this change in the behaviours. This is borderline breaking change/bugfix and while I would lean more on the bugfix side, but people could have relied on it and it should be described in more detail in changelog.

@eitanme
Copy link
Contributor Author

eitanme commented Oct 27, 2022

@potiuk because of this bug, to use the ExternalTaskSensor currently you must explicitly set a timeout on the sensor or your DAG will hang forever. To your point on reliance on old behavior, to workaround the bug, folks may have set that timeout to avoid an infinite hang.

In those cases, fixing this bug will cause a change in the exception they receive from AirflowSensorTimeout to the generic AirflowException. If they are relying on catching the AirflowSensorTimeout exception subclass they may have issues though if they catch the base class they'd still be OK.

Does that sound about right? What would you propose we do?

I'm happy to update a changelog if I'm pointed in the right direction?

Also, there are some failing checks on this PR that I don't understand. Specifically, in the Sqlite Py3.7: API Always CLI Core Integration Other Providers WWW check a test fails that I'm pretty sure I don't go anywhere near:

FAILED tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_heartbeat_failed_fast

Any ideas on that front? The logs are long and I didn't see much useful in them while looking through so I wanted to ask before trying to dig deeper as I'm not super familiar with this code-base and the checks on it.

@eladkal
Copy link
Contributor

eladkal commented Oct 28, 2022

I'm happy to update a changelog if I'm pointed in the right direction?

In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@eitanme
Copy link
Contributor Author

eitanme commented Oct 29, 2022

@eladkal I've added a newsfragment as suggested.

For what it's worth, I think this should break very few if any people because most folks would want their DAGs to fail in the case of an upstream external task failing whether by timeout or because a task in a chain fails. I doubt there are many people who are trying to catch an exception and act on it in this case, but it's at least documented now that you might have to change the exception type in case anyone falls into that bucket.

@potiuk
Copy link
Member

potiuk commented Oct 31, 2022

Can you please rebase/solve conflicts?

@eitanme eitanme force-pushed the external-task-sensor-fail-fix branch from c470ba2 to 96b0039 Compare October 31, 2022 17:08
@eitanme
Copy link
Contributor Author

eitanme commented Oct 31, 2022

@potiuk sorry about that, I had been merging in main instead of rebasing on it, but I've changed that.

Let me know if you need anything else on my end.

@eitanme
Copy link
Contributor Author

eitanme commented Nov 4, 2022

@eladkal I noticed that the following test failed for the "Tests / Sqlite Py3.7: API Always CLI Core Integration Other Providers WWW (pull_request)" pre-submit job but I'm pretty sure this test has nothing to do with my code. Does that seem right? Is there anything I should be doing? Or do these tests fail intermittently or something?

Thanks for the help!

=================================== FAILURES ===================================
_________________ TestLocalTaskJob.test_heartbeat_failed_fast __________________

self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 0x7f1378e24390>

    def test_heartbeat_failed_fast(self):
        """
        Test that task heartbeat will sleep when it fails fast
        """
        self.mock_base_job_sleep.side_effect = time.sleep
        dag_id = "test_heartbeat_failed_fast"
        task_id = "test_heartbeat_failed_fast_op"
        with create_session() as session:
    
            dag_id = "test_heartbeat_failed_fast"
            task_id = "test_heartbeat_failed_fast_op"
            dag = self.dagbag.get_dag(dag_id)
            task = dag.get_task(task_id)
    
            dr = dag.create_dagrun(
                run_id="test_heartbeat_failed_fast_run",
                state=State.RUNNING,
                execution_date=DEFAULT_DATE,
                start_date=DEFAULT_DATE,
                session=session,
            )
    
            ti = dr.task_instances[0]
            ti.refresh_from_task(task)
            ti.state = State.QUEUED
            ti.hostname = get_hostname()
            ti.pid = 1
            session.commit()
    
            job = LocalTaskJob(task_instance=ti, executor=MockExecutor(do_update=False))
            job.heartrate = 2
            heartbeat_records = []
            job.heartbeat_callback = lambda session: heartbeat_records.append(job.latest_heartbeat)
            job._execute()
            assert len(heartbeat_records) > 2
            for i in range(1, len(heartbeat_records)):
                time1 = heartbeat_records[i - 1]
                time2 = heartbeat_records[i]
                # Assert that difference small enough
                delta = (time2 - time1).total_seconds()
>               assert abs(delta - job.heartrate) < 0.8
E               assert 1.0317020000000001 < 0.8
E                +  where 1.0317020000000001 = abs((3.031702 - 2))
E                +    where 2 = <airflow.jobs.local_task_job.LocalTaskJob object at 0x7f1378e35cd0>.heartrate

tests/jobs/test_local_task_job.py:312: AssertionError

@eladkal
Copy link
Contributor

eladkal commented Nov 5, 2022

I reran the test. lets see what happens

@eitanme
Copy link
Contributor Author

eitanme commented Nov 6, 2022

@eladkal that seems to have done the trick, thanks for the help!

Anything else you need on my end to get this merged?

@ephraimbuddy
Copy link
Contributor

Are we getting this into 2.4.3? @eladkal @potiuk @uranusjr

@ephraimbuddy ephraimbuddy modified the milestones: Airflow 2.4.3, Airflow 2.4.4 Nov 9, 2022
@uranusjr uranusjr merged commit 34e21ea into apache:main Nov 10, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 10, 2022

Awesome work, congrats on your first merged pull request!

Adityamalik123 pushed a commit to Adityamalik123/airflow that referenced this pull request Nov 12, 2022
@ephraimbuddy ephraimbuddy modified the milestones: Airflow 2.4.4, Airflow 2.5.0 Nov 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ExternalTaskSensor does not fail when failed_states is set along with a execution_date_fn

6 participants