Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix running tasks with default_impersonation config #17229

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,12 @@ def heartbeat_callback(self, session=None):
)
raise AirflowException("Hostname of job runner does not match")
current_pid = self.task_runner.process.pid

same_process = ti.pid == current_pid
if ti.run_as_user:

if ti.run_as_user or self.task_runner.run_as_user:
same_process = psutil.Process(ti.pid).ppid() == current_pid

if ti.pid is not None and not same_process:
self.log.warning("Recorded pid %s does not match " "the current pid %s", ti.pid, current_pid)
raise AirflowException("PID of job runner does not match")
Expand Down
42 changes: 42 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,48 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, dag_maker):
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

@conf_vars({('core', 'default_impersonation'): 'testuser'})
@mock.patch('airflow.jobs.local_task_job.psutil')
def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, dag_maker):
session = settings.Session()
with dag_maker('test_localtaskjob_heartbeat'):
op1 = DummyOperator(task_id='op1')
dr = dag_maker.dag_run
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.RUNNING
ti.pid = 2
ti.hostname = get_hostname()
session.commit()

job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
ti.task = op1
ti.refresh_from_task(op1)
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.process = mock.Mock()
job1.task_runner.process.pid = 2
# Here, ti.pid is 2, the parent process of ti.pid is a mock(different).
# And task_runner process is 2. Should fail
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

job1.task_runner.process.pid = 1
# We make the parent process of ti.pid to equal the task_runner process id
psutil_mock.Process.return_value.ppid.return_value = 1
ti.state = State.RUNNING
ti.pid = 2
# The task_runner process id is 1, same as the parent process of ti.pid
# as seen above
assert job1.task_runner.run_as_user == 'testuser'
session.merge(ti)
session.commit()
job1.heartbeat_callback(session=None)

# Here the task_runner process id is changed to 2
# while parent process of ti.pid is kept at 1, which is different
job1.task_runner.process.pid = 2
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
Expand Down