Skip to content

Commit

Permalink
[AIRFLOW-2888] Remove shell=True and bash from task launch (apache#3740)
Browse files Browse the repository at this point in the history
shell=True is a security risk. Bash is not required to launch
tasks and will consume extra resources.
  • Loading branch information
bolkedebruin authored and Alice Berard committed Jan 3, 2019
1 parent e73fa28 commit 9888919
Show file tree
Hide file tree
Showing 17 changed files with 48 additions and 40 deletions.
7 changes: 7 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ assists users migrating to a new version.

## Airflow Master

### Rename of BashTaskRunner to StandardTaskRunner

BashTaskRunner has been renamed to StandardTaskRunner. It is the default task runner
so you might need to update your config.

`task_runner = StandardTaskRunner`

## Airflow 1.10

Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ donot_pickle = False
dagbag_import_timeout = 30

# The class to use for running task instances in a subprocess
task_runner = BashTaskRunner
task_runner = StandardTaskRunner

# If set, tasks without a `run_as_user` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/executors/mesos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def resourceOffers(self, driver, offers):

command = mesos_pb2.CommandInfo()
command.shell = True
command.value = cmd
command.value = " ".join(cmd)
task.command.MergeFrom(command)

# If docker image for airflow is specified in config then pull that
Expand Down
3 changes: 1 addition & 2 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
image=kube_executor_config.image or self.kube_config.kube_image,
image_pull_policy=(kube_executor_config.image_pull_policy or
self.kube_config.kube_image_pull_policy),
cmds=['bash', '-cx', '--'],
args=[airflow_command],
cmds=airflow_command,
labels={
'airflow-worker': worker_uuid,
'dag_id': dag_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/task_runner/cgroup_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def start(self):
"creating another one",
cgroups.get("cpu"), cgroups.get("memory")
)
self.process = self.run_command(['bash', '-c'], join_args=True)
self.process = self.run_command()
return

# Create a unique cgroup name
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def queue_task_instance(
# cfg_path is needed to propagate the config values if using impersonation
# (run_as_user), given that there are different code paths running tasks.
# For a long term solution we need to address AIRFLOW-1986
command = task_instance.command(
command = task_instance.command_as_list(
local=True,
mark_success=mark_success,
ignore_all_deps=ignore_all_deps,
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def execute_command(command):
log.info("Executing command in Celery: %s", command)
env = os.environ.copy()
try:
subprocess.check_call(command, shell=True, stderr=subprocess.STDOUT,
subprocess.check_call(command, stderr=subprocess.STDOUT,
close_fds=True, env=env)
except subprocess.CalledProcessError as e:
log.exception('execute_command encountered a CalledProcessError')
Expand Down Expand Up @@ -84,7 +84,7 @@ def execute_async(self, key, command,
self.log.info("[celery] queuing {key} through celery, "
"queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
args=command, queue=queue)
self.last_state[key] = celery_states.PENDING

def sync(self):
Expand Down
3 changes: 1 addition & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def execute_work(self, key, command):
if key is None:
return
self.log.info("%s running %s", self.__class__.__name__, command)
command = "exec bash -c '{0}'".format(command)
try:
subprocess.check_call(command, shell=True, close_fds=True)
subprocess.check_call(command, close_fds=True)
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def sync(self):
self.log.info("Executing command: %s", command)

try:
subprocess.check_call(command, shell=True, close_fds=True)
subprocess.check_call(command, close_fds=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
# actually enqueue them
for task_instance in task_instances:
simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
command = " ".join(TI.generate_command(
command = TI.generate_command(
task_instance.dag_id,
task_instance.task_id,
task_instance.execution_date,
Expand All @@ -1337,7 +1337,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
ignore_ti_state=False,
pool=task_instance.pool,
file_path=simple_dag.full_filepath,
pickle_id=simple_dag.pickle_id))
pickle_id=simple_dag.pickle_id)

priority = task_instance.priority_weight
queue = task_instance.queue
Expand Down
6 changes: 3 additions & 3 deletions airflow/task/task_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# under the License.

from airflow import configuration
from airflow.task.task_runner.bash_task_runner import BashTaskRunner
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
from airflow.exceptions import AirflowException

_TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER')
Expand All @@ -34,8 +34,8 @@ def get_task_runner(local_task_job):
:return: The task runner to use to run the task.
:rtype: airflow.task.task_runner.base_task_runner.BaseTaskRunner
"""
if _TASK_RUNNER == "BashTaskRunner":
return BashTaskRunner(local_task_job)
if _TASK_RUNNER == "StandardTaskRunner":
return StandardTaskRunner(local_task_job)
elif _TASK_RUNNER == "CgroupTaskRunner":
from airflow.contrib.task_runner.cgroup_task_runner import CgroupTaskRunner
return CgroupTaskRunner(local_task_job)
Expand Down
4 changes: 3 additions & 1 deletion airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _read_task_logs(self, stream):
self._task_instance.job_id, self._task_instance.task_id,
line.rstrip('\n'))

def run_command(self, run_with, join_args=False):
def run_command(self, run_with=None, join_args=False):
"""
Run the task command
Expand All @@ -119,8 +119,10 @@ def run_command(self, run_with, join_args=False):
:return: the process that was run
:rtype: subprocess.Popen
"""
run_with = run_with or []
cmd = [" ".join(self._command)] if join_args else self._command
full_cmd = run_with + cmd

self.log.info('Running: %s', full_cmd)
proc = subprocess.Popen(
full_cmd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
from airflow.utils.helpers import reap_process_group


class BashTaskRunner(BaseTaskRunner):
class StandardTaskRunner(BaseTaskRunner):
"""
Runs the raw Airflow task by invoking through the Bash shell.
"""
def __init__(self, local_task_job):
super(BashTaskRunner, self).__init__(local_task_job)
super(StandardTaskRunner, self).__init__(local_task_job)

def start(self):
self.process = self.run_command(['bash', '-c'], join_args=True)
self.process = self.run_command()

def return_code(self):
return self.process.poll()
Expand All @@ -41,4 +41,4 @@ def terminate(self):
reap_process_group(self.process.pid, self.log)

def on_finish(self):
super(BashTaskRunner, self).on_finish()
super(StandardTaskRunner, self).on_finish()
8 changes: 4 additions & 4 deletions tests/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor):
# start the executor
executor.start()

success_command = 'echo 1'
fail_command = 'exit 1'
success_command = ['true', ]
fail_command = ['false', ]

executor.execute_async(key='success', command=success_command)
executor.execute_async(key='fail', command=fail_command)
Expand Down
9 changes: 5 additions & 4 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -27,14 +27,15 @@
# leave this it is used by the test worker
import celery.contrib.testing.tasks


class CeleryExecutorTest(unittest.TestCase):
def test_celery_integration(self):
executor = CeleryExecutor()
executor.start()
with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):

success_command = 'echo 1'
fail_command = 'exit 1'
success_command = ['true', ]
fail_command = ['false', ]

executor.execute_async(key='success', command=success_command)
# errors are propagated for some reason
Expand Down
10 changes: 5 additions & 5 deletions tests/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -33,11 +33,11 @@ def execution_parallelism(self, parallelism=0):
executor.start()

success_key = 'success {}'
success_command = 'echo {}'
fail_command = 'exit 1'
success_command = ['true', ]
fail_command = ['false', ]

for i in range(self.TEST_SUCCESS_COMMANDS):
key, command = success_key.format(i), success_command.format(i)
key, command = success_key.format(i), success_command
executor.execute_async(key=key, command=command)
executor.running[key] = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -25,7 +25,7 @@
from airflow import models, settings
from airflow.jobs import LocalTaskJob
from airflow.models import TaskInstance as TI
from airflow.task.task_runner import BashTaskRunner
from airflow.task.task_runner import StandardTaskRunner
from airflow.utils import timezone
from airflow.utils.state import State

Expand Down Expand Up @@ -61,7 +61,7 @@
}


class TestBashTaskRunner(unittest.TestCase):
class TestStandardTaskRunner(unittest.TestCase):
def setUp(self):
dictConfig(LOGGING_CONFIG)

Expand All @@ -71,7 +71,7 @@ def test_start_and_terminate(self):
local_task_job.task_instance.run_as_user = None
local_task_job.task_instance.command_as_list.return_value = ['sleep', '1000']

runner = BashTaskRunner(local_task_job)
runner = StandardTaskRunner(local_task_job)
runner.start()

pgid = os.getpgid(runner.process.pid)
Expand Down Expand Up @@ -119,7 +119,7 @@ def test_on_kill(self):
ti = TI(task=task, execution_date=DEFAULT_DATE)
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)

runner = BashTaskRunner(job1)
runner = StandardTaskRunner(job1)
runner.start()

# give the task some time to startup
Expand Down

0 comments on commit 9888919

Please sign in to comment.