From c9ecaa504486d6abca7f3ac0850aeda45b4afd52 Mon Sep 17 00:00:00 2001 From: Adrian Edbert Luman Date: Tue, 19 Aug 2025 16:12:22 +0800 Subject: [PATCH 1/3] Remove KRB5CCNAME on task run with user impersonation --- .../airflow/sdk/execution_time/task_runner.py | 2 + .../execution_time/test_task_runner.py | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 82d5f44c55a13..44f636d0858dd 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -689,6 +689,8 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: log = structlog.get_logger(logger_name="task") if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and os.environ.get("_AIRFLOW__STARTUP_MSG"): + if 'KRB5CCNAME' in os.environ: + del os.environ["KRB5CCNAME"] # entrypoint of re-exec process msg = TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"]) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index e28ad12f10cc1..e2e0ff5d80d34 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -745,6 +745,49 @@ def execute(self, context): assert "_AIRFLOW__STARTUP_MSG" not in os.environ +def test_task_run_with_user_impersonation_remove_krb5ccname_on_reexecuted_process( + mocked_parse, make_ti_context, time_machine, mock_supervisor_comms +): + class CustomOperator(BaseOperator): + def execute(self, context): + print("Hi from CustomOperator!") + + task = CustomOperator(task_id="impersonation_task", run_as_user="airflowuser") + instant = timezone.datetime(2024, 12, 3, 10, 0) + + what = StartupDetails( + ti=TaskInstance( + id=uuid7(), + task_id="impersonation_task", + dag_id="basic_dag", + run_id="c", + try_number=1, + dag_version_id=uuid7(), + ), + dag_rel_path="", + bundle_info=FAKE_BUNDLE, + ti_context=make_ti_context(), + start_date=timezone.utcnow(), + ) + + mocked_parse(what, "basic_dag", task) + time_machine.move_to(instant, tick=False) + + mock_supervisor_comms._get_response.return_value = what + + mock_os_env = { + "KRB5CCNAME": "/tmp/airflow_krb5_ccache", + "_AIRFLOW__REEXECUTED_PROCESS": "1", + "_AIRFLOW__STARTUP_MSG": what.model_dump_json() + } + with mock.patch.dict("os.environ", mock_os_env, clear=True): + startup() + + assert os.environ["_AIRFLOW__REEXECUTED_PROCESS"] == "1" + assert "_AIRFLOW__STARTUP_MSG" in os.environ + assert "KRB5CCNAME" not in os.environ + + @pytest.mark.parametrize( ["command", "rendered_command"], [ From f730ac0a1f264d4dc8819898a21ad698f7254ef2 Mon Sep 17 00:00:00 2001 From: Adrian Edbert Luman Date: Tue, 19 Aug 2025 17:46:17 +0800 Subject: [PATCH 2/3] Fix ruff format --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 2 +- task-sdk/tests/task_sdk/execution_time/test_task_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 44f636d0858dd..6647f86cb8634 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -689,7 +689,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: log = structlog.get_logger(logger_name="task") if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and os.environ.get("_AIRFLOW__STARTUP_MSG"): - if 'KRB5CCNAME' in os.environ: + if "KRB5CCNAME" in os.environ: del os.environ["KRB5CCNAME"] # entrypoint of re-exec process msg = TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"]) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index e2e0ff5d80d34..a34447f599dc4 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -778,7 +778,7 @@ def execute(self, context): mock_os_env = { "KRB5CCNAME": "/tmp/airflow_krb5_ccache", "_AIRFLOW__REEXECUTED_PROCESS": "1", - "_AIRFLOW__STARTUP_MSG": what.model_dump_json() + "_AIRFLOW__STARTUP_MSG": what.model_dump_json(), } with mock.patch.dict("os.environ", mock_os_env, clear=True): startup() From 1fd532ef08e27c416af73d702fe16387f295ea43 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 19 Aug 2025 13:58:21 +0100 Subject: [PATCH 3/3] Update task-sdk/src/airflow/sdk/execution_time/task_runner.py --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 6647f86cb8634..be39f436a09bc 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -689,8 +689,8 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: log = structlog.get_logger(logger_name="task") if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and os.environ.get("_AIRFLOW__STARTUP_MSG"): - if "KRB5CCNAME" in os.environ: - del os.environ["KRB5CCNAME"] + # Clear any Kerberos replace cache if there is one, so new process can't reuse it. + os.environ.pop("KRB5CCNAME", None) # entrypoint of re-exec process msg = TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"])