Skip to content

Commit

Permalink
Fix running tasks with default_impersonation config (#17229)
Browse files Browse the repository at this point in the history
When default_impersonation is set in the configuration, airflow fails
 to run task due to PID mismatch between the recorded PID and the current PID

 This change fixes it by checking if task_runner.run_as_user is True and use the
 same way we check when ti.run_as_user is true to check the PID

(cherry picked from commit 40419dd)
  • Loading branch information
ephraimbuddy authored and kaxil committed Aug 17, 2021
1 parent 4122ede commit 262e087
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
5 changes: 4 additions & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ def heartbeat_callback(self, session=None):
)
raise AirflowException("Hostname of job runner does not match")
current_pid = self.task_runner.process.pid

same_process = ti.pid == current_pid
if ti.run_as_user:

if ti.run_as_user or self.task_runner.run_as_user:
same_process = psutil.Process(ti.pid).ppid() == current_pid

if ti.pid is not None and not same_process:
self.log.warning("Recorded pid %s does not match " "the current pid %s", ti.pid, current_pid)
raise AirflowException("PID of job runner does not match")
Expand Down
42 changes: 42 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,48 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock):
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

@conf_vars({('core', 'default_impersonation'): 'testuser'})
@mock.patch('airflow.jobs.local_task_job.psutil')
def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, dag_maker):
session = settings.Session()
with dag_maker('test_localtaskjob_heartbeat'):
op1 = DummyOperator(task_id='op1')
dr = dag_maker.dag_run
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.RUNNING
ti.pid = 2
ti.hostname = get_hostname()
session.commit()

job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
ti.task = op1
ti.refresh_from_task(op1)
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.process = mock.Mock()
job1.task_runner.process.pid = 2
# Here, ti.pid is 2, the parent process of ti.pid is a mock(different).
# And task_runner process is 2. Should fail
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

job1.task_runner.process.pid = 1
# We make the parent process of ti.pid to equal the task_runner process id
psutil_mock.Process.return_value.ppid.return_value = 1
ti.state = State.RUNNING
ti.pid = 2
# The task_runner process id is 1, same as the parent process of ti.pid
# as seen above
assert job1.task_runner.run_as_user == 'testuser'
session.merge(ti)
session.commit()
job1.heartbeat_callback(session=None)

# Here the task_runner process id is changed to 2
# while parent process of ti.pid is kept at 1, which is different
job1.task_runner.process.pid = 2
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
Expand Down

0 comments on commit 262e087

Please sign in to comment.