Skip to content

Commit

Permalink
Don't bake ENV and _cmd into tmp config for non-sudo
Browse files Browse the repository at this point in the history
If we are running tasks via sudo then AIRFLOW__ config env vars won't be
visible anymore (without them showing up in `ps`) and we likely might
not have permission to run the _cmd's specified to find the passwords.

But if we are running as the same user then there is no need to "bake"
those options in to the temporary config file -- if the operator decided
they didn't want those values appearing in a config file on disk, then
lets do our best to respect that.

Note: this commit originally appears in 2019 but a critical piece was
missing, meaning that the secrets/envs were still actually appearing.
  • Loading branch information
ashb committed Oct 13, 2021
1 parent cb9d142 commit 6dc8988
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 4 deletions.
4 changes: 2 additions & 2 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, local_task_job):
# want to have to specify them in the sudo call - they would show
# up in `ps` that way! And run commands now, as the other user
# might not be able to run the cmds to get credentials
cfg_path = tmp_configuration_copy(chmod=0o600)
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True)

# Give ownership of file to user; only they can read and write
subprocess.call(['sudo', 'chown', self.run_as_user, cfg_path], close_fds=True)
Expand All @@ -83,7 +83,7 @@ def __init__(self, local_task_job):
# we are running as the same user, and can pass through environment
# variables then we don't need to include those in the config copy
# - the runner can read/execute those values as it needs
cfg_path = tmp_configuration_copy(chmod=0o600)
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, include_cmds=False)

self._error_file = NamedTemporaryFile(delete=True)
if self.run_as_user:
Expand Down
14 changes: 12 additions & 2 deletions airflow/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,23 @@
from airflow.configuration import conf


def tmp_configuration_copy(chmod=0o600):
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
"""
Returns a path for a temporary file including a full copy of the configuration
settings.
:param include_env: Should the value of configuration from ``AIRFLOW__``
environment variables be included or not
:type include_env: bool
:param include_cmds: Should the result of calling any *_cmd config be
set (True, default), or should the _cmd options be left as the
command to run (False)
:type include_cmds: bool
:return: a path to a temporary file
"""
cfg_dict = conf.as_dict(display_sensitive=True, raw=True)
cfg_dict = conf.as_dict(
display_sensitive=True, raw=True, include_cmds=include_cmds, include_env=include_env
)
temp_fd, cfg_path = mkstemp()

with os.fdopen(temp_fd, 'w') as temp_file:
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ def dag_maker(request):
from airflow.utils.log.logging_mixin import LoggingMixin

class DagFactory(LoggingMixin):
_own_session = False

def __init__(self):
from airflow.models import DagBag

Expand Down Expand Up @@ -577,6 +579,7 @@ def __call__(
from airflow.utils import timezone

if session is None:
self._own_session = True
session = settings.Session()

self.kwargs = kwargs
Expand Down Expand Up @@ -629,6 +632,8 @@ def cleanup(self):
synchronize_session=False
)
self.session.commit()
if self._own_session:
self.session.expunge_all()

factory = DagFactory()

Expand Down
54 changes: 54 additions & 0 deletions tests/task/task_runner/test_base_task_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import mock

import pytest

from airflow.jobs.local_task_job import LocalTaskJob
from airflow.models.baseoperator import BaseOperator
from airflow.task.task_runner.base_task_runner import BaseTaskRunner


@pytest.mark.parametrize(["impersonation"], (("nobody",), (None,)))
@mock.patch('subprocess.call')
@mock.patch('os.chown')
@mock.patch('airflow.task.task_runner.base_task_runner.tmp_configuration_copy')
def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, dag_maker, impersonation):
with dag_maker("test"):
BaseOperator(task_id="task_1", run_as_user=impersonation)

dr = dag_maker.create_dagrun()

ti = dr.task_instances[0]
job = LocalTaskJob(ti)
runner = BaseTaskRunner(job)
# So we don't try to delete it -- cos the file wont exist
del runner._cfg_path

includes = bool(impersonation)

tmp_configuration_copy.assert_called_with(chmod=0o600, include_env=includes, include_cmds=includes)

if impersonation:
chown.assert_called()
subprocess_call.assert_called_with(
['sudo', 'chown', impersonation, tmp_configuration_copy.return_value], close_fds=True
)
else:
chown.assert_not_called()
subprocess_call.not_assert_called()

0 comments on commit 6dc8988

Please sign in to comment.