diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 10043b5baad4f..e366136f69924 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -689,6 +689,8 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: log = structlog.get_logger(logger_name="task") if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and os.environ.get("_AIRFLOW__STARTUP_MSG"): + # Clear any Kerberos replace cache if there is one, so new process can't reuse it. + os.environ.pop("KRB5CCNAME", None) # entrypoint of re-exec process msg = TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"]) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index fc64d02fbc72a..48550de9fc6e2 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -733,6 +733,49 @@ def execute(self, context): assert "_AIRFLOW__STARTUP_MSG" not in os.environ +def test_task_run_with_user_impersonation_remove_krb5ccname_on_reexecuted_process( + mocked_parse, make_ti_context, time_machine, mock_supervisor_comms +): + class CustomOperator(BaseOperator): + def execute(self, context): + print("Hi from CustomOperator!") + + task = CustomOperator(task_id="impersonation_task", run_as_user="airflowuser") + instant = timezone.datetime(2024, 12, 3, 10, 0) + + what = StartupDetails( + ti=TaskInstance( + id=uuid7(), + task_id="impersonation_task", + dag_id="basic_dag", + run_id="c", + try_number=1, + dag_version_id=uuid7(), + ), + dag_rel_path="", + bundle_info=FAKE_BUNDLE, + ti_context=make_ti_context(), + start_date=timezone.utcnow(), + ) + + mocked_parse(what, "basic_dag", task) + time_machine.move_to(instant, tick=False) + + mock_supervisor_comms._get_response.return_value = what + + mock_os_env = { + "KRB5CCNAME": "/tmp/airflow_krb5_ccache", + "_AIRFLOW__REEXECUTED_PROCESS": "1", + "_AIRFLOW__STARTUP_MSG": what.model_dump_json(), + } + with mock.patch.dict("os.environ", mock_os_env, clear=True): + startup() + + assert os.environ["_AIRFLOW__REEXECUTED_PROCESS"] == "1" + assert "_AIRFLOW__STARTUP_MSG" in os.environ + assert "KRB5CCNAME" not in os.environ + + @pytest.mark.parametrize( ["command", "rendered_command"], [