diff --git a/docs/api-docs.rst b/docs/api-docs.rst index 5f9274dd..9c03a01a 100644 --- a/docs/api-docs.rst +++ b/docs/api-docs.rst @@ -53,6 +53,7 @@ such as ``pdb``. (, "entrypoint": ) (, "inputs": [ (, , ...)]) (, "outputs": [ (, , ...)]) + (, "progress_pipe": ) } ::= { diff --git a/docs/plugins.rst b/docs/plugins.rst index 6b00f082..f0232396 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -156,6 +156,33 @@ If you specify an absolute path, it must start with ``/mnt/girder_worker/data/`` will be thrown before the task is run. These conventions apply whether the path is specified in the ``id`` or ``path`` field. +Progress reporting from docker tasks +************************************ + +Docker tasks have the option of reporting progress back to Girder via a special named pipe. If +you want to do this, specify ``"progress_pipe": true"`` in your docker task specification. This +will create a special named pipe at ``/mnt/girder_worker/data/.girder_progress``. In your container +logic, you may write progress notification messages to this named pipe, one per line. +Progress messages should be JSON objects with the following fields, all of which are optional: + + * ``message`` (string): A human-readable message about the current task progress. + * ``current`` (number): A numeric value representing current progress, which should always be + less than or equal to the ``total`` value. + * ``total`` (number): A numeric value representing the maximum progress value, i.e. the value + of ``current`` when the task is complete. Only pass this field if the total is changing or being + initially set. + +An example progress notification string: :: + + {"message": "Halfway there!", "total": 100, "current": 50} + +.. note:: When writing to the named pipe, you should explicitly call ``flush`` on the file + descriptor afterward, otherwise the messages may sit in a buffer and may not reach the + Girder server as you write them. + +.. note:: This feature may not work on Docker on non-Linux platforms, and the call to open the + pipe for writing from within the container may block indefinitely. + Management of Docker Containers and Images ****************************************** @@ -164,14 +191,14 @@ never removed. Docker containers are automatically removed when the task is complete. As an alternative, a 'garbage collection' process can be used instead. It can -be enabled by modifying settings in the ``[docker]`` section of the config +be enabled by modifying settings in the ``[docker]`` section of the config file, which can be done using the command: .. code-block :: none girder-worker-config set docker gc True -When the ``gc`` config value is set to ``True``, containers are not removed +When the ``gc`` config value is set to ``True``, containers are not removed when the task ends. Instead, periodically, any images not associated with a container will be removed, and then any stopped containers will be removed. This will free disk space associated with the images, but may remove images @@ -189,7 +216,6 @@ Only containers that have been stopped longer than a certain time are removed. This time defaults to an hour, and can be specified as any number of seconds via the ``cache_timeout`` setting. - Girder IO --------- diff --git a/girder_worker/core/utils.py b/girder_worker/core/utils.py index d5bef633..27de18d6 100644 --- a/girder_worker/core/utils.py +++ b/girder_worker/core/utils.py @@ -2,6 +2,7 @@ import errno import functools import imp +import json import os import girder_worker import girder_worker.plugins @@ -196,7 +197,7 @@ def load_plugin(name, paths): name, '\n '.join(paths))) -def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr): +def _close_pipes(rds, wds, input_pipes, output_pipes, close_output_pipe): """ Helper to close remaining input and output adapters after the subprocess completes. @@ -205,7 +206,7 @@ def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr): for fd in rds: if fd in output_pipes: output_pipes[fd].close() - if fd not in (stdout, stderr): + if close_output_pipe(fd): os.close(fd) # close any remaining input adapters @@ -214,7 +215,7 @@ def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr): os.close(fd) -def _setup_input_pipes(input_pipes, stdin): +def _setup_input_pipes(input_pipes): """ Given a mapping of input pipes, return a tuple with 2 elements. The first is a list of file descriptors to pass to ``select`` as writeable descriptors. @@ -227,10 +228,6 @@ def _setup_input_pipes(input_pipes, stdin): if isinstance(pipe, int): # This is assumed to be an open system-level file descriptor wds.append(pipe) - elif pipe == '_stdin': - # Special case for binding to standard input - input_pipes[stdin] = input_pipes['_stdin'] - wds.append(stdin) else: if not os.path.exists(pipe): raise Exception('Input pipe does not exist: %s' % pipe) @@ -260,6 +257,83 @@ def _open_ipipes(wds, fifos, input_pipes): return wds, fifos, input_pipes +def select_loop(exit_condition=lambda: True, close_output=lambda x: True, + outputs=None, inputs=None): + """ + Run a select loop for a set of input and output pipes + + :param exit_condition: A function to evaluate to determine if the select + loop should terminate if all pipes are empty. + :type exit_condition: function + :param close_output: A function to use to test whether a output + should be closed when EOF is reached. Certain output pipes such as + stdout, stderr should not be closed. + :param outputs: This should be a dictionary mapping pipe descriptors + to instances of ``StreamPushAdapter`` that should handle the data from + the stream. The keys of this dictionary are open file descriptors, + which are integers. + :type outputs: dict + :param inputs: This should be a dictionary mapping pipe descriptors + to instances of ``StreamFetchAdapter`` that should handle sending + input data in chunks. Keys in this dictionary can be either open file + descriptors (integers) or a string representing a path to an existing + fifo on the filesystem. This second case supports the use of named + pipes, since they must be opened for reading before they can be opened + for writing + :type inputs: dict + """ + + BUF_LEN = 65536 + inputs = inputs or {} + outputs = outputs or {} + + rds = [fd for fd in outputs.keys() if isinstance(fd, int)] + wds, fifos = _setup_input_pipes(inputs) + + try: + while True: + # We evaluate this first so that we get one last iteration of + # of the loop before breaking out of the loop. + exit = exit_condition() + + # get ready pipes, timeout of 100 ms + readable, writable, _ = select.select(rds, wds, (), 0.1) + + for ready_fd in readable: + buf = os.read(ready_fd, BUF_LEN) + + if buf: + outputs[ready_fd].write(buf) + else: + outputs[ready_fd].close() + # Should we close this pipe? In the case of stdout or stderr + # bad things happen if parent closes + if close_output(ready_fd): + os.close(ready_fd) + rds.remove(ready_fd) + for ready_fd in writable: + # TODO for now it's OK for the input reads to block since + # input generally happens first, but we should consider how to + # support non-blocking stream inputs in the future. + buf = inputs[ready_fd].read(BUF_LEN) + + if buf: + os.write(ready_fd, buf) + else: # end of stream + wds.remove(ready_fd) + os.close(ready_fd) + + wds, fifos, inputs = _open_ipipes(wds, fifos, inputs) + # all pipes empty? + empty = (not rds or not readable) and (not wds or not writable) + + if (empty and exit): + break + + finally: + _close_pipes(rds, wds, inputs, outputs, close_output) + + def run_process(command, output_pipes=None, input_pipes=None): """ Run a subprocess, and listen for its outputs on various pipes. @@ -285,12 +359,13 @@ def run_process(command, output_pipes=None, input_pipes=None): must be opened for reading before they can be opened for writing :type input_pipes: dict """ - BUF_LEN = 65536 - input_pipes = input_pipes or {} - output_pipes = output_pipes or {} + p = subprocess.Popen(args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) + input_pipes = input_pipes or {} + output_pipes = output_pipes or {} + # we now know subprocess stdout and stderr filenos, so bind the adapters stdout = p.stdout.fileno() stderr = p.stderr.fileno() @@ -300,51 +375,24 @@ def run_process(command, output_pipes=None, input_pipes=None): output_pipes[stderr] = output_pipes.get( '_stderr', WritePipeAdapter({}, sys.stderr)) - rds = [fd for fd in output_pipes.keys() if isinstance(fd, int)] - wds, fifos = _setup_input_pipes(input_pipes, stdin) - - try: - while True: - status = p.poll() - # get ready pipes - readable, writable, _ = select.select(rds, wds, (), 0) - - for ready_pipe in readable: - buf = os.read(ready_pipe, BUF_LEN) + # Special case for _stdin + if '_stdin' in input_pipes: + input_pipes[stdin] = input_pipes['_stdin'] - if buf: - output_pipes[ready_pipe].write(buf) - else: - output_pipes[ready_pipe].close() - if ready_pipe not in (stdout, stderr): - # bad things happen if parent closes stdout or stderr - os.close(ready_pipe) - rds.remove(ready_pipe) - for ready_pipe in writable: - # TODO for now it's OK for the input reads to block since - # input generally happens first, but we should consider how to - # support non-blocking stream inputs in the future. - buf = input_pipes[ready_pipe].read(BUF_LEN) + def exit_condition(): + status = p.poll() + return status is not None - if buf: - os.write(ready_pipe, buf) - else: # end of stream - wds.remove(ready_pipe) - os.close(ready_pipe) + def close_output_pipe(pipe): + return pipe not in (stdout, stderr) - wds, fifos, input_pipes = _open_ipipes(wds, fifos, input_pipes) - empty = (not rds or not readable) and (not wds or not writable) - if empty and status is not None: - # all pipes are empty and the process has returned, we are done - break - elif not rds and not wds and status is None: - # all pipes are closed but the process is still running - p.wait() + try: + select_loop(exit_condition=exit_condition, + close_output=close_output_pipe, + outputs=output_pipes, inputs=input_pipes) except Exception: p.kill() # kill child process if something went wrong on our end raise - finally: - _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr) return p @@ -450,3 +498,39 @@ def __init__(self, output_spec, key, dictionary=None): def write(self, buf): self.dictionary[self.key] += buf + + +class JobProgressAdapter(StreamPushAdapter): + def __init__(self, job_manager): + """ + This reads structured JSON documents one line at a time and sends + them as progress events via the JobManager. + + :param job_manager: The job manager to use to send the progress events. + :type job_manager: girder_worker.utils.JobManager + """ + super(JobProgressAdapter, self).__init__(None) + + self.job_manager = job_manager + self._buf = b'' + + def write(self, buf): + lines = buf.split(b'\n') + if self._buf: + lines[0] = self._buf + lines[0] + self._buf = lines[-1] + + for line in lines[:-1]: + self._parse(line) + + def _parse(self, line): + try: + doc = json.loads(line.decode('utf8')) + except ValueError: + return # TODO log? + + if not isinstance(doc, dict): + return # TODO log? + + self.job_manager.updateProgress( + total=doc.get('total'), current=doc.get('current'), message=doc.get('message')) diff --git a/girder_worker/plugins/docker/__init__.py b/girder_worker/plugins/docker/__init__.py index 3b9fff06..b1bbb980 100644 --- a/girder_worker/plugins/docker/__init__.py +++ b/girder_worker/plugins/docker/__init__.py @@ -3,6 +3,8 @@ import subprocess import tempfile import time +import docker +from docker.errors import DockerException from girder_worker import config, logger # Minimum interval in seconds at which to run the docker-gc script @@ -86,17 +88,26 @@ def task_cleanup(e): from .executor import DATA_VOLUME if e.info['task']['mode'] == 'docker' and '_tempdir' in e.info['kwargs']: tmpdir = e.info['kwargs']['_tempdir'] - cmd = [ - 'docker', 'run', '--rm', '-v', '%s:%s' % (tmpdir, DATA_VOLUME), - 'busybox', 'chmod', '-R', 'a+rw', DATA_VOLUME - ] - p = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = p.communicate() - if p.returncode: - logger.error( - 'Error setting perms on docker tempdir %s.\nSTDOUT: %s\nSTDERR:%s', - tmpdir, out, err) - raise Exception('Docker tempdir chmod returned code %d.' % p.returncode) + client = docker.from_env() + config = { + 'tty': True, + 'volumes': { + tmpdir: { + 'bind': DATA_VOLUME, + 'mode': 'rw' + } + }, + 'detach': False, + 'remove': True + } + args = ['chmod', '-R', 'a+rw', DATA_VOLUME] + + try: + client.containers.run('busybox', args, **config) + except DockerException as dex: + logger.error('Error setting perms on docker tempdir %s.' % tmpdir) + logger.exception(dex) + raise def load(params): diff --git a/girder_worker/plugins/docker/executor.py b/girder_worker/plugins/docker/executor.py index da9bd710..72158b33 100644 --- a/girder_worker/plugins/docker/executor.py +++ b/girder_worker/plugins/docker/executor.py @@ -1,27 +1,28 @@ import os import re -import subprocess +import sys +import docker +from docker.errors import DockerException from girder_worker import logger from girder_worker.core import TaskSpecValidationError, utils from girder_worker.core.io import make_stream_fetch_adapter, make_stream_push_adapter DATA_VOLUME = '/mnt/girder_worker/data' +BLACKLISTED_DOCKER_RUN_ARGS = ['tty', 'detach'] def _pull_image(image): """ Pulls the specified Docker image onto this worker. """ - command = ('docker', 'pull', image) - p = subprocess.Popen(args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - - if p.returncode != 0: - logger.error( - 'Error pulling Docker image %s:\nSTDOUT: %s\nSTDERR: %s', image, stdout, stderr) - - raise Exception('Docker pull returned code {}.'.format(p.returncode)) + client = docker.from_env() + try: + client.images.pull(image) + except DockerException as dex: + logger.error('Error pulling Docker image %s:' % image) + logger.exception(dex) + raise def _transform_path(inputs, taskInputs, inputId, tmpDir): @@ -96,7 +97,7 @@ def validate_task_outputs(task_outputs): 'filepath-target outputs.') -def _setup_pipes(task_inputs, inputs, task_outputs, outputs, tempdir): +def _setup_pipes(task_inputs, inputs, task_outputs, outputs, tempdir, job_mgr, progress_pipe): """ Returns a 2 tuple of input and output pipe mappings. The first element is a dict mapping input file descriptors to the corresponding stream adapters, @@ -114,8 +115,7 @@ def make_pipe(id, spec, bindings): given spec is not a streaming spec, returns False. If it is, returns the path to the pipe file that was created. """ - if (spec.get('stream') and id in bindings and - spec.get('target') == 'filepath'): + if spec.get('stream') and id in bindings and spec.get('target') == 'filepath': path = spec.get('path', id) if path.startswith('/'): raise Exception('Streaming filepaths must be relative.') @@ -138,25 +138,81 @@ def make_pipe(id, spec, bindings): opipes[os.open(pipe, os.O_RDONLY | os.O_NONBLOCK)] = \ make_stream_push_adapter(outputs[id]) + # handle special stream output for job progress + if progress_pipe and job_mgr: + path = os.path.join(tempdir, '.girder_progress') + os.mkfifo(path) + opipes[os.open(path, os.O_RDONLY | os.O_NONBLOCK)] = utils.JobProgressAdapter(job_mgr) + # special handling for stdin, stdout, and stderr pipes if '_stdin' in task_inputs and '_stdin' in inputs: if task_inputs['_stdin'].get('stream'): ipipes['_stdin'] = make_stream_fetch_adapter(inputs['_stdin']) else: - ipipes['_stdin'] = utils.MemoryFetchAdapter( - inputs[id], inputs[id]['data']) + ipipes['_stdin'] = utils.MemoryFetchAdapter(inputs[id], inputs[id]['data']) for id in ('_stdout', '_stderr'): if id in task_outputs and id in outputs: if task_outputs[id].get('stream'): opipes[id] = make_stream_push_adapter(outputs[id]) else: - opipes[id] = utils.AccumulateDictAdapter( - outputs[id], 'script_data') + opipes[id] = utils.AccumulateDictAdapter(outputs[id], 'script_data') return ipipes, opipes +def _run_container(image, args, **kwargs): + # TODO we could allow configuration of non default socket + client = docker.from_env() + + logger.info('Running container: image: %s args: %s kwargs: %s' % (image, args, kwargs)) + try: + return client.containers.run(image, args, **kwargs) + except DockerException as dex: + logger.error(dex) + raise + + +def _run_select_loop(container, opipes, ipipes): + stdout = None + stderr = None + try: + # attach to standard streams + stdout = container.attach_socket(params={ + 'stdout': True, + 'logs': True, + 'stream': True + }) + + stderr = container.attach_socket(params={ + 'stderr': True, + 'logs': True, + 'stream': True + }) + + def exit_condition(): + container.reload() + return container.status in ['exited', 'dead'] + + def close_output(output): + return output not in (stdout.fileno(), stderr.fileno()) + + opipes[stdout.fileno()] = opipes.get( + '_stdout', utils.WritePipeAdapter({}, sys.stdout)) + opipes[stderr.fileno()] = opipes.get( + '_stderr', utils.WritePipeAdapter({}, sys.stderr)) + + # Run select loop + utils.select_loop(exit_condition=exit_condition, close_output=close_output, + outputs=opipes, inputs=ipipes) + finally: + # Close our stdout and stderr sockets + if stdout: + stdout.close() + if stderr: + stderr.close() + + def run(task, inputs, outputs, task_inputs, task_outputs, **kwargs): image = task['docker_image'] @@ -164,35 +220,51 @@ def run(task, inputs, outputs, task_inputs, task_outputs, **kwargs): logger.info('Pulling Docker image: %s', image) _pull_image(image) + progress_pipe = task.get('progress_pipe', False) + tempdir = kwargs.get('_tempdir') + job_mgr = kwargs.get('_job_manager') args = _expand_args(task.get('container_args', []), inputs, task_inputs, tempdir) ipipes, opipes = _setup_pipes( - task_inputs, inputs, task_outputs, outputs, tempdir) + task_inputs, inputs, task_outputs, outputs, tempdir, job_mgr, progress_pipe) if 'entrypoint' in task: if isinstance(task['entrypoint'], (list, tuple)): - ep_args = ['--entrypoint'] + task['entrypoint'] + ep_args = task['entrypoint'] else: - ep_args = ['--entrypoint', task['entrypoint']] + ep_args = [task['entrypoint']] else: ep_args = [] - if kwargs.get('_rm_container'): - rm_args = ['--rm'] - else: - rm_args = [] - - command = [ - 'docker', 'run', - '-v', '%s:%s' % (tempdir, DATA_VOLUME) - ] + task.get('docker_run_args', []) + rm_args + ep_args + [image] + args - - logger.info('Running container: %s', repr(command)) - - p = utils.run_process(command, output_pipes=opipes, input_pipes=ipipes) - if p.returncode != 0: - raise Exception('Error: docker run returned code %d.' % p.returncode) + run_kwargs = { + 'tty': True, + 'volumes': { + tempdir: { + 'bind': DATA_VOLUME, + 'mode': 'rw' + } + }, + 'detach': True + } + + if ep_args: + run_kwargs['entrypoint'] = ep_args + + # Allow run args to overriden + extra_run_kwargs = task.get('docker_run_args', {}) + # Filter out any we don't want to override + extra_run_kwargs = {k: v for k, v in extra_run_kwargs.items() if k not + in BLACKLISTED_DOCKER_RUN_ARGS} + run_kwargs.update(extra_run_kwargs) + + container = _run_container(image, args, **run_kwargs) + + try: + _run_select_loop(container, opipes, ipipes) + finally: + if container and kwargs.get('_rm_container'): + container.remove() for name, spec in task_outputs.iteritems(): if spec.get('target') == 'filepath' and not spec.get('stream'): diff --git a/girder_worker/plugins/docker/requirements.txt b/girder_worker/plugins/docker/requirements.txt new file mode 100644 index 00000000..f419fb4e --- /dev/null +++ b/girder_worker/plugins/docker/requirements.txt @@ -0,0 +1 @@ +docker==2.1.0 diff --git a/girder_worker/plugins/docker/tests/docker_integration_test.py b/girder_worker/plugins/docker/tests/docker_integration_test.py index 2ae11a93..0463bbac 100644 --- a/girder_worker/plugins/docker/tests/docker_integration_test.py +++ b/girder_worker/plugins/docker/tests/docker_integration_test.py @@ -5,6 +5,7 @@ import stat import sys import unittest +import docker import girder_worker from girder_worker.core import run, io @@ -135,7 +136,7 @@ def testDockerModeStdio(self): sys.stdout = _old lines = stdout_captor.getvalue().splitlines() - message = '%s\n' % self._test_message + message = '%s\r\n' % self._test_message self.assertTrue(message not in lines) self.assertEqual(out['_stdout']['data'], message) @@ -272,3 +273,76 @@ def read(self, buf_len): self.assertTrue(os.path.exists(pipe)) self.assertTrue(stat.S_ISFIFO(os.stat(pipe).st_mode)) self.assertEqual(output['_stdout']['data'].rstrip(), self._test_message) + + def testDockerModeRemoveContainer(self): + """ + Test automatic container removal + """ + task = { + 'mode': 'docker', + 'docker_image': test_image, + 'pull_image': True, + 'container_args': ['$input{test_mode}', '$input{message}'], + 'inputs': [{ + 'id': 'test_mode', + 'name': '', + 'format': 'string', + 'type': 'string' + }, { + 'id': 'message', + 'name': '', + 'format': 'string', + 'type': 'string' + }], + 'outputs': [] + } + + inputs = { + 'test_mode': { + 'format': 'string', + 'data': 'stdio' + }, + 'message': { + 'format': 'string', + 'data': self._test_message + } + } + + docker_client = docker.from_env() + containers = docker_client.containers.list(limit=1) + last_container_id = containers[0].id if len(containers) > 0 else None + + run( + task, inputs=inputs, _tempdir=self._tmp, cleanup=True, validate=False, + auto_convert=False) + + def _fetch_new_containers(last_container_id): + if last_container_id: + filters = { + 'since': last_container_id + } + new_containers = docker_client.containers.list(all=True, filters=filters) + else: + new_containers = docker_client.containers.list(all=True) + + return new_containers + + new_containers = _fetch_new_containers(last_container_id) + # Now assert that the container was removed + self.assertEqual(len(new_containers), 0) + + # Now confirm that the container doesn't get removed if we set + # _rm_container = False + girder_worker.config.set('docker', 'gc', 'True') + # Stop GC removing anything + girder_worker.config.set('docker', 'cache_timeout', str(sys.maxint)) + + task['_rm_container'] = False + run( + task, inputs=inputs, _tempdir=self._tmp, cleanup=True, validate=False, + auto_convert=False, _rm_containers=False) + new_containers = _fetch_new_containers(last_container_id) + self.assertEqual(len(new_containers), 1) + self.assertEqual(new_containers[0].attrs.get('Config', {})['Image'], test_image) + # Clean it up + new_containers[0].remove() diff --git a/girder_worker/plugins/docker/tests/docker_test.py b/girder_worker/plugins/docker/tests/docker_test.py index 9132a02e..eb77efd2 100644 --- a/girder_worker/plugins/docker/tests/docker_test.py +++ b/girder_worker/plugins/docker/tests/docker_test.py @@ -12,13 +12,27 @@ from girder_worker.core import cleanup, run, io, TaskSpecValidationError from girder_worker.plugins.docker.executor import DATA_VOLUME + _tmp = None OUT_FD, ERR_FD = 100, 200 _out = six.StringIO('output message\n') _err = six.StringIO('error message\n') -processMock = mock.Mock() -processMock.configure_mock(**{ +stdout_socket_mock = mock.Mock() +stdout_socket_mock.fileno.return_value = OUT_FD + +stderr_socket_mock = mock.Mock() +stderr_socket_mock.fileno.return_value = ERR_FD + +docker_container_mock = mock.Mock() +docker_container_mock.attach_socket.side_effect = [stdout_socket_mock, stderr_socket_mock] +docker_container_mock.status = 'exited' + +docker_client_mock = mock.Mock() +docker_client_mock.containers.run.return_value = docker_container_mock + +process_mock = mock.Mock() +process_mock.configure_mock(**{ 'communicate.return_value': ('', ''), 'poll.return_value': 0, 'stdout.fileno.return_value': OUT_FD, @@ -27,6 +41,13 @@ }) +def _reset_mocks(): + global docker_client_mock, docker_container_mock + docker_client_mock.reset_mock() + # Need to reset side_effect + docker_container_mock.attach_socket.side_effect = [stdout_socket_mock, stderr_socket_mock] + + # Monkey patch select.select in the docker task module def _mockSelect(r, w, x, *args, **kwargs): return r, w, x @@ -67,9 +88,12 @@ def tearDownModule(): class TestDockerMode(unittest.TestCase): - @mock.patch('subprocess.Popen') - def testDockerMode(self, mockPopen): - mockPopen.return_value = processMock + def setUp(self): + _reset_mocks() + + @mock.patch('docker.from_env') + def testDockerMode(self, from_env): + from_env.return_value = docker_client_mock task = { 'mode': 'docker', @@ -142,29 +166,36 @@ def fetchMock(url, request): } }) - self.assertEqual(mockPopen.call_count, 3) - cmd1, cmd2, cmd3 = [x[1]['args'] for x in mockPopen.call_args_list] + # We should have one call to images.pull(...) + self.assertEqual(docker_client_mock.images.pull.call_count, 1) + self.assertEqual(docker_client_mock.images.pull.call_args_list[0][0], + ('test/test:latest', )) + + # We should have two calls to containers.run(...) + self.assertEqual(docker_client_mock.containers.run.call_count, 2) + run1, run2 = docker_client_mock.containers.run.call_args_list + + args, kwargs = run1 + self.assertEqual(args[0], 'test/test:latest') + six.assertRegex(self, kwargs['volumes'].keys()[0], _tmp + '/.*') + self.assertEqual(kwargs['volumes'].itervalues().next()['bind'], + DATA_VOLUME) + self.assertEqual(args[1][0:2], ['-f', '%s/file.txt' % DATA_VOLUME]) + self.assertEqual(args[1][-2], '--temp-dir=%s' % DATA_VOLUME) + self.assertEqual(args[1][-1], '--bar') - self.assertEqual(cmd1, ('docker', 'pull', 'test/test:latest')) - self.assertEqual(cmd2[:3], ['docker', 'run', '-v']) - six.assertRegex(self, cmd2[3], _tmp + '/.*:%s' % DATA_VOLUME) - self.assertEqual(cmd2[4:7], [ - 'test/test:latest', '-f', '%s/file.txt' % DATA_VOLUME]) - self.assertEqual(cmd2[-2], '--temp-dir=%s' % DATA_VOLUME) - self.assertEqual(cmd2[-1], '--bar') - self.assertEqual(len(cmd2), 9) + args, kwargs = run2 + self.assertEqual(args[0], 'busybox') - self.assertEqual(cmd3[:4], ['docker', 'run', '--rm', '-v']) - six.assertRegex(self, cmd3[4], _tmp + '/.*:%s' % DATA_VOLUME) - self.assertEqual(cmd3[5:], ['busybox', 'chmod', '-R', 'a+rw', DATA_VOLUME]) - self.assertEqual(len(cmd3), 10) + self.assertTrue(kwargs['remove']) + six.assertRegex(self, kwargs['volumes'].keys()[0], _tmp + '/.*') + self.assertEqual(kwargs['volumes'].itervalues().next()['bind'], DATA_VOLUME) + self.assertEqual(args[1], ['chmod', '-R', 'a+rw', DATA_VOLUME]) # Make sure we can specify a custom entrypoint to the container - mockPopen.reset_mock() - task['entrypoint'] = '/bin/bash' + _reset_mocks() - # Make sure additional docker run args work - task['docker_run_args'] = ['--net', 'none'] + task['entrypoint'] = '/bin/bash' inputs['foo'] = { 'mode': 'http', @@ -175,15 +206,22 @@ def fetchMock(url, request): 'data': False } run(task, inputs=inputs, validate=False, auto_convert=False) - self.assertEqual(mockPopen.call_count, 3) - cmd2 = mockPopen.call_args_list[1][1]['args'] - self.assertEqual(cmd2[4:9], [ - '--net', 'none', '--entrypoint', '/bin/bash', 'test/test:latest']) - self.assertNotIn('--bar', cmd2) - self.assertEqual(cmd2[9:11], ['-f', '%s/file.txt' % DATA_VOLUME]) + + self.assertEqual(docker_client_mock.containers.run.call_count, 2) + args, kwargs = docker_client_mock.containers.run.call_args_list[0] + self.assertEqual(args[0], 'test/test:latest') + self.assertEqual(kwargs['entrypoint'], ['/bin/bash']) + + self.assertNotIn('--bar', args) + self.assertEqual(args[1][0:2], ['-f', '%s/file.txt' % DATA_VOLUME]) + _reset_mocks() + + # Make sure custom config settings are respected + girder_worker.config.set('docker', 'cache_timeout', '123456') + girder_worker.config.set( + 'docker', 'exclude_images', 'test/test:latest') # Make sure we can pass empty values - mockPopen.reset_mock() task['inputs'].append({ 'id': 'baz', 'format': 'string', @@ -197,45 +235,56 @@ def fetchMock(url, request): 'type': 'string' } run(task, inputs=inputs, validate=False, auto_convert=False) - self.assertEqual(mockPopen.call_count, 3) - cmd2 = mockPopen.call_args_list[1][1]['args'] - self.assertEqual(cmd2[4:9], [ - '--net', 'none', '--entrypoint', '/bin/bash', 'test/test:latest']) - self.assertNotIn('--bar', cmd2) - self.assertEqual(cmd2[9:11], ['-f', '%s/file.txt' % DATA_VOLUME]) - self.assertEqual(cmd2[cmd2.index('--baz'):cmd2.index('--baz')+2], - ['--baz', '']) + self.assertEqual(docker_client_mock.containers.run.call_count, 2) + args = docker_client_mock.containers.run.call_args_list[0][0] + self.assertEqual(args[0], 'test/test:latest') + # self.assertEqual(mockPopen.call_count, 3) +# cmd2 = mockPopen.call_args_list[1][1]['args'] + self.assertEqual(args[1], [ + '-f', + '/mnt/girder_worker/data/file.txt', + '--temp-dir=/mnt/girder_worker/data', + '--baz', '' + ]) + self.assertNotIn('--bar', args) + # And non-empty values - mockPopen.reset_mock() + _reset_mocks() inputs['baz']['data'] = 'parameter1' run(task, inputs=inputs, validate=False, auto_convert=False) - self.assertEqual(mockPopen.call_count, 3) - cmd2 = mockPopen.call_args_list[1][1]['args'] - self.assertEqual(cmd2[cmd2.index('--baz'):cmd2.index('--baz')+2], - ['--baz', 'parameter1']) + self.assertEqual(docker_client_mock.containers.run.call_count, 2) + args = docker_client_mock.containers.run.call_args_list[0][0] + self.assertEqual(args[0], 'test/test:latest') + self.assertEqual(args[1], [ + '-f', + '/mnt/girder_worker/data/file.txt', + '--temp-dir=/mnt/girder_worker/data', + '--baz', 'parameter1' + ]) + + # Clean up del inputs['baz'] task['inputs'].pop() task['container_args'].pop() task['container_args'].pop() - mockPopen.reset_mock() - # Make sure we can skip pulling the image + _reset_mocks() task['pull_image'] = False inputs['foo'] = { 'mode': 'http', 'url': 'https://foo.com/file.txt' } run(task, inputs=inputs, validate=False, auto_convert=False) - self.assertEqual(mockPopen.call_count, 2) - cmd1 = [x[1]['args'] for x in mockPopen.call_args_list][0] - self.assertEqual(tuple(cmd1[:2]), ('docker', 'run')) - self.assertEqual(cmd1[4:6], ['--net', 'none']) + + # Assert no call to images.pull + self.assertEqual(docker_client_mock.images.pull.call_count, 0) + self.assertEqual(docker_client_mock.containers.run.call_count, 2) @mock.patch('subprocess.Popen') def testCleanupHook(self, mockPopen): os.makedirs(_tmp) - mockPopen.return_value = processMock + mockPopen.return_value = process_mock girder_worker.config.set('docker', 'gc', 'True') girder_worker.config.set('docker', 'cache_timeout', '123456') girder_worker.config.set('docker', 'exclude_images', 'test/test:latest') @@ -253,11 +302,11 @@ def testCleanupHook(self, mockPopen): @mock.patch('subprocess.Popen') def testCleanupHookWithoutOptIn(self, mockPopen): - mockPopen.return_value = processMock + mockPopen.return_value = process_mock cleanup.main() self.assertEqual(mockPopen.call_count, 0) # Now with explicit settings - mockPopen.return_value = processMock + mockPopen.return_value = process_mock girder_worker.config.set('docker', 'gc', 'False') girder_worker.config.set('docker', 'cache_timeout', '123456') girder_worker.config.set('docker', 'exclude_images', 'test/test:latest') @@ -265,9 +314,9 @@ def testCleanupHookWithoutOptIn(self, mockPopen): cleanup.main() self.assertEqual(mockPopen.call_count, 0) - @mock.patch('subprocess.Popen') - def testOutputValidation(self, mockPopen): - mockPopen.return_value = processMock + @mock.patch('docker.from_env') + def testOutputValidation(self, from_env): + from_env.return_value = docker_client_mock task = { 'mode': 'docker', @@ -286,6 +335,7 @@ def testOutputValidation(self, mockPopen): with self.assertRaisesRegexp(TaskSpecValidationError, msg): run(task) + _reset_mocks() task['outputs'][0]['target'] = 'filepath' task['outputs'][0]['path'] = '/tmp/some/invalid/path' msg = (r'^Docker filepath output paths must either start with "%s/" ' @@ -299,7 +349,8 @@ def testOutputValidation(self, mockPopen): with self.assertRaisesRegexp(Exception, msg): run(task) # Make sure docker stuff actually got called in this case. - self.assertEqual(mockPopen.call_count, 3) + + self.assertEqual(docker_client_mock.containers.run.call_count, 2) # Simulate a task that has written into the temp dir tmp = os.path.join(_tmp, 'simulated_output') @@ -308,6 +359,7 @@ def testOutputValidation(self, mockPopen): path = os.path.join(tmp, 'valid_path.txt') with open(path, 'w') as f: f.write('simulated output') + _reset_mocks() outputs = run(task, _tempdir=tmp) self.assertEqual(outputs, { 'file_output_1': { @@ -315,7 +367,7 @@ def testOutputValidation(self, mockPopen): 'format': 'text' } }) - + _reset_mocks() # If no path is specified, we should fall back to the input name del task['outputs'][0]['path'] path = os.path.join(_tmp, '.*', 'file_output_1') @@ -323,11 +375,9 @@ def testOutputValidation(self, mockPopen): with self.assertRaisesRegexp(Exception, msg): run(task) - @mock.patch('girder_worker.core.utils.run_process') - @mock.patch('subprocess.Popen') - def testNamedPipes(self, mockPopen, mockRunProcess): - mockRunProcess.return_value = processMock - mockPopen.return_value = processMock + @mock.patch('docker.from_env') + def testNamedPipes(self, from_env): + from_env.return_value = docker_client_mock task = { 'mode': 'docker', @@ -366,3 +416,47 @@ def write(self, buf): pipe = os.path.join(tmp, 'named_pipe') self.assertTrue(os.path.exists(pipe)) self.assertTrue(stat.S_ISFIFO(os.stat(pipe).st_mode)) + + @mock.patch('docker.from_env') + def testDockerRunArgs(self, from_env): + from_env.return_value = docker_client_mock + + task = { + 'mode': 'docker', + 'docker_image': 'test/test:latest', + 'container_args': [ + '-f', '$input{foo}', '--temp-dir=$input{_tempdir}', + '$flag{bar}' + ], + 'docker_run_args': { + 'network_disabled': True + }, + 'pull_image': True, + 'inputs': [], + 'outputs': [{ + 'id': '_stderr', + 'format': 'string', + 'type': 'string' + }] + } + run(task, inputs={}, cleanup=False, validate=False, + auto_convert=False) + + kwargs = docker_client_mock.containers.run.call_args_list[0][1] + self.assertTrue('network_disabled' in kwargs) + self.assertTrue(kwargs['network_disabled']) + + # Ensure we can't override detach and tty + _reset_mocks() + task['docker_run_args'] = { + 'detach': False, + 'tty': False + } + run(task, inputs={}, cleanup=False, validate=False, + auto_convert=False) + + kwargs = docker_client_mock.containers.run.call_args_list[0][1] + self.assertTrue('detach' in kwargs) + self.assertTrue(kwargs['detach']) + self.assertTrue('tty' in kwargs) + self.assertTrue(kwargs['tty'])