Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Migrate to docker sdk #96

Merged
merged 34 commits into from
May 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
593a456
Factor out select loop into separate function
cjh1 Jan 24, 2017
dbea85e
Add docker-py to requirements.txt
cjh1 Jan 30, 2017
4c4b748
select_loop renames
cjh1 Jan 30, 2017
4d3da76
Migrate 'docker run' to docker-py
cjh1 Jan 30, 2017
9716a66
Fix up test
cjh1 Jan 30, 2017
f16f4a7
Migrate 'docker pull' to docker-py
cjh1 Jan 30, 2017
0099dec
Fix formatting
cjh1 Jan 30, 2017
2978208
Fix typo
cjh1 Jan 31, 2017
fa28e2b
Update docker_test to expect docker-py calls
cjh1 Jan 31, 2017
a45eeac
Fix formatting
cjh1 Jan 31, 2017
686c307
Migrate cleanup to SDK
cjh1 Apr 7, 2017
ea3be0b
Fix up tests after rebase
cjh1 Apr 7, 2017
08dfe3b
Support streaming progress via named pipe
zachmullen Apr 10, 2017
2c29351
Flake8 fixes
cjh1 Apr 12, 2017
4c243f0
Docs for new functionality
zachmullen Apr 12, 2017
be91621
Merge remote-tracking branch 'origin/master' into docker-sdk
cjh1 Apr 12, 2017
f88cf07
Bump docker-py version to 2.1.0
cjh1 Apr 12, 2017
dab7b9e
auto_remove is not available until 1.25
cjh1 Apr 12, 2017
933fdc0
Call exit_condition at start of loop
cjh1 Apr 18, 2017
9ddbeac
Add sleep in select_loop to avoid hot loop
cjh1 Apr 18, 2017
ff6d7d0
Remove unecessary p.wait()
cjh1 Apr 18, 2017
475173e
Fix default function for exit_condition
cjh1 Apr 18, 2017
c1aae0a
Fix container exit states and ensure container is reloaded
cjh1 Apr 18, 2017
e299660
Move timeout to select call
cjh1 Apr 18, 2017
e18a024
Evaluate exit_condition before select
cjh1 Apr 18, 2017
acd0af3
Add test for container removal
cjh1 Apr 19, 2017
2199e8e
Add better exception handling and simplify run function
cjh1 Apr 19, 2017
2a7b409
Fix typo
cjh1 Apr 19, 2017
02fd37b
Guard against case where no containers exist
cjh1 Apr 19, 2017
6a8fcd3
Merge remote-tracking branch 'origin/progress-pipe' into docker-sdk
cjh1 Apr 20, 2017
78ce781
Allow extra docker run arguments to be passed
cjh1 Apr 27, 2017
e9ab515
Merge remote-tracking branch 'origin/master' into docker-sdk
cjh1 Apr 28, 2017
4ef390f
Use logger.exception(...) for logging exceptions
cjh1 Apr 28, 2017
e53abb0
Move docker package requirement into plugin
cjh1 Apr 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ such as ``pdb``.
(, "entrypoint": <custom override for container entry point>)
(, "inputs": [<TASK_INPUT> (, <TASK_INPUT>, ...)])
(, "outputs": [<TASK_OUTPUT> (, <TASK_OUTPUT>, ...)])
(, "progress_pipe": <set to true to create a channel for progress notifications>)
}

<WORKFLOW_TASK> ::= {
Expand Down
32 changes: 29 additions & 3 deletions docs/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ If you specify an absolute path, it must start with ``/mnt/girder_worker/data/``
will be thrown before the task is run. These conventions apply whether the path
is specified in the ``id`` or ``path`` field.

Progress reporting from docker tasks
************************************

Docker tasks have the option of reporting progress back to Girder via a special named pipe. If
you want to do this, specify ``"progress_pipe": true"`` in your docker task specification. This
will create a special named pipe at ``/mnt/girder_worker/data/.girder_progress``. In your container
logic, you may write progress notification messages to this named pipe, one per line.
Progress messages should be JSON objects with the following fields, all of which are optional:

* ``message`` (string): A human-readable message about the current task progress.
* ``current`` (number): A numeric value representing current progress, which should always be
less than or equal to the ``total`` value.
* ``total`` (number): A numeric value representing the maximum progress value, i.e. the value
of ``current`` when the task is complete. Only pass this field if the total is changing or being
initially set.

An example progress notification string: ::

{"message": "Halfway there!", "total": 100, "current": 50}

.. note:: When writing to the named pipe, you should explicitly call ``flush`` on the file
descriptor afterward, otherwise the messages may sit in a buffer and may not reach the
Girder server as you write them.

.. note:: This feature may not work on Docker on non-Linux platforms, and the call to open the
pipe for writing from within the container may block indefinitely.

Management of Docker Containers and Images
******************************************

Expand All @@ -164,14 +191,14 @@ never removed. Docker containers are automatically removed when the task is
complete.

As an alternative, a 'garbage collection' process can be used instead. It can
be enabled by modifying settings in the ``[docker]`` section of the config
be enabled by modifying settings in the ``[docker]`` section of the config
file, which can be done using the command:

.. code-block :: none

girder-worker-config set docker gc True

When the ``gc`` config value is set to ``True``, containers are not removed
When the ``gc`` config value is set to ``True``, containers are not removed
when the task ends. Instead, periodically, any images not associated with a
container will be removed, and then any stopped containers will be removed.
This will free disk space associated with the images, but may remove images
Expand All @@ -189,7 +216,6 @@ Only containers that have been stopped longer than a certain time are removed.
This time defaults to an hour, and can be specified as any number of seconds
via the ``cache_timeout`` setting.


Girder IO
---------

Expand Down
182 changes: 133 additions & 49 deletions girder_worker/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import errno
import functools
import imp
import json
import os
import girder_worker
import girder_worker.plugins
Expand Down Expand Up @@ -196,7 +197,7 @@ def load_plugin(name, paths):
name, '\n '.join(paths)))


def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr):
def _close_pipes(rds, wds, input_pipes, output_pipes, close_output_pipe):
"""
Helper to close remaining input and output adapters after the subprocess
completes.
Expand All @@ -205,7 +206,7 @@ def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr):
for fd in rds:
if fd in output_pipes:
output_pipes[fd].close()
if fd not in (stdout, stderr):
if close_output_pipe(fd):
os.close(fd)

# close any remaining input adapters
Expand All @@ -214,7 +215,7 @@ def _close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr):
os.close(fd)


def _setup_input_pipes(input_pipes, stdin):
def _setup_input_pipes(input_pipes):
"""
Given a mapping of input pipes, return a tuple with 2 elements. The first is
a list of file descriptors to pass to ``select`` as writeable descriptors.
Expand All @@ -227,10 +228,6 @@ def _setup_input_pipes(input_pipes, stdin):
if isinstance(pipe, int):
# This is assumed to be an open system-level file descriptor
wds.append(pipe)
elif pipe == '_stdin':
# Special case for binding to standard input
input_pipes[stdin] = input_pipes['_stdin']
wds.append(stdin)
else:
if not os.path.exists(pipe):
raise Exception('Input pipe does not exist: %s' % pipe)
Expand Down Expand Up @@ -260,6 +257,83 @@ def _open_ipipes(wds, fifos, input_pipes):
return wds, fifos, input_pipes


def select_loop(exit_condition=lambda: True, close_output=lambda x: True,
outputs=None, inputs=None):
"""
Run a select loop for a set of input and output pipes

:param exit_condition: A function to evaluate to determine if the select
loop should terminate if all pipes are empty.
:type exit_condition: function
:param close_output: A function to use to test whether a output
should be closed when EOF is reached. Certain output pipes such as
stdout, stderr should not be closed.
:param outputs: This should be a dictionary mapping pipe descriptors
to instances of ``StreamPushAdapter`` that should handle the data from
the stream. The keys of this dictionary are open file descriptors,
which are integers.
:type outputs: dict
:param inputs: This should be a dictionary mapping pipe descriptors
to instances of ``StreamFetchAdapter`` that should handle sending
input data in chunks. Keys in this dictionary can be either open file
descriptors (integers) or a string representing a path to an existing
fifo on the filesystem. This second case supports the use of named
pipes, since they must be opened for reading before they can be opened
for writing
:type inputs: dict
"""

BUF_LEN = 65536
inputs = inputs or {}
outputs = outputs or {}

rds = [fd for fd in outputs.keys() if isinstance(fd, int)]
wds, fifos = _setup_input_pipes(inputs)

try:
while True:
# We evaluate this first so that we get one last iteration of
# of the loop before breaking out of the loop.
exit = exit_condition()

# get ready pipes, timeout of 100 ms
readable, writable, _ = select.select(rds, wds, (), 0.1)

for ready_fd in readable:
buf = os.read(ready_fd, BUF_LEN)

if buf:
outputs[ready_fd].write(buf)
else:
outputs[ready_fd].close()
# Should we close this pipe? In the case of stdout or stderr
# bad things happen if parent closes
if close_output(ready_fd):
os.close(ready_fd)
rds.remove(ready_fd)
for ready_fd in writable:
# TODO for now it's OK for the input reads to block since
# input generally happens first, but we should consider how to
# support non-blocking stream inputs in the future.
buf = inputs[ready_fd].read(BUF_LEN)

if buf:
os.write(ready_fd, buf)
else: # end of stream
wds.remove(ready_fd)
os.close(ready_fd)

wds, fifos, inputs = _open_ipipes(wds, fifos, inputs)
# all pipes empty?
empty = (not rds or not readable) and (not wds or not writable)

if (empty and exit):
break

finally:
_close_pipes(rds, wds, inputs, outputs, close_output)


def run_process(command, output_pipes=None, input_pipes=None):
"""
Run a subprocess, and listen for its outputs on various pipes.
Expand All @@ -285,12 +359,13 @@ def run_process(command, output_pipes=None, input_pipes=None):
must be opened for reading before they can be opened for writing
:type input_pipes: dict
"""
BUF_LEN = 65536
input_pipes = input_pipes or {}
output_pipes = output_pipes or {}

p = subprocess.Popen(args=command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stdin=subprocess.PIPE)

input_pipes = input_pipes or {}
output_pipes = output_pipes or {}

# we now know subprocess stdout and stderr filenos, so bind the adapters
stdout = p.stdout.fileno()
stderr = p.stderr.fileno()
Expand All @@ -300,51 +375,24 @@ def run_process(command, output_pipes=None, input_pipes=None):
output_pipes[stderr] = output_pipes.get(
'_stderr', WritePipeAdapter({}, sys.stderr))

rds = [fd for fd in output_pipes.keys() if isinstance(fd, int)]
wds, fifos = _setup_input_pipes(input_pipes, stdin)

try:
while True:
status = p.poll()
# get ready pipes
readable, writable, _ = select.select(rds, wds, (), 0)

for ready_pipe in readable:
buf = os.read(ready_pipe, BUF_LEN)
# Special case for _stdin
if '_stdin' in input_pipes:
input_pipes[stdin] = input_pipes['_stdin']

if buf:
output_pipes[ready_pipe].write(buf)
else:
output_pipes[ready_pipe].close()
if ready_pipe not in (stdout, stderr):
# bad things happen if parent closes stdout or stderr
os.close(ready_pipe)
rds.remove(ready_pipe)
for ready_pipe in writable:
# TODO for now it's OK for the input reads to block since
# input generally happens first, but we should consider how to
# support non-blocking stream inputs in the future.
buf = input_pipes[ready_pipe].read(BUF_LEN)
def exit_condition():
status = p.poll()
return status is not None

if buf:
os.write(ready_pipe, buf)
else: # end of stream
wds.remove(ready_pipe)
os.close(ready_pipe)
def close_output_pipe(pipe):
return pipe not in (stdout, stderr)

wds, fifos, input_pipes = _open_ipipes(wds, fifos, input_pipes)
empty = (not rds or not readable) and (not wds or not writable)
if empty and status is not None:
# all pipes are empty and the process has returned, we are done
break
elif not rds and not wds and status is None:
# all pipes are closed but the process is still running
p.wait()
try:
select_loop(exit_condition=exit_condition,
close_output=close_output_pipe,
outputs=output_pipes, inputs=input_pipes)
except Exception:
p.kill() # kill child process if something went wrong on our end
raise
finally:
_close_pipes(rds, wds, input_pipes, output_pipes, stdout, stderr)

return p

Expand Down Expand Up @@ -450,3 +498,39 @@ def __init__(self, output_spec, key, dictionary=None):

def write(self, buf):
self.dictionary[self.key] += buf


class JobProgressAdapter(StreamPushAdapter):
def __init__(self, job_manager):
"""
This reads structured JSON documents one line at a time and sends
them as progress events via the JobManager.

:param job_manager: The job manager to use to send the progress events.
:type job_manager: girder_worker.utils.JobManager
"""
super(JobProgressAdapter, self).__init__(None)

self.job_manager = job_manager
self._buf = b''

def write(self, buf):
lines = buf.split(b'\n')
if self._buf:
lines[0] = self._buf + lines[0]
self._buf = lines[-1]

for line in lines[:-1]:
self._parse(line)

def _parse(self, line):
try:
doc = json.loads(line.decode('utf8'))
except ValueError:
return # TODO log?

if not isinstance(doc, dict):
return # TODO log?

self.job_manager.updateProgress(
total=doc.get('total'), current=doc.get('current'), message=doc.get('message'))
33 changes: 22 additions & 11 deletions girder_worker/plugins/docker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import subprocess
import tempfile
import time
import docker
from docker.errors import DockerException
from girder_worker import config, logger

# Minimum interval in seconds at which to run the docker-gc script
Expand Down Expand Up @@ -86,17 +88,26 @@ def task_cleanup(e):
from .executor import DATA_VOLUME
if e.info['task']['mode'] == 'docker' and '_tempdir' in e.info['kwargs']:
tmpdir = e.info['kwargs']['_tempdir']
cmd = [
'docker', 'run', '--rm', '-v', '%s:%s' % (tmpdir, DATA_VOLUME),
'busybox', 'chmod', '-R', 'a+rw', DATA_VOLUME
]
p = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if p.returncode:
logger.error(
'Error setting perms on docker tempdir %s.\nSTDOUT: %s\nSTDERR:%s',
tmpdir, out, err)
raise Exception('Docker tempdir chmod returned code %d.' % p.returncode)
client = docker.from_env()
config = {
'tty': True,
'volumes': {
tmpdir: {
'bind': DATA_VOLUME,
'mode': 'rw'
}
},
'detach': False,
'remove': True
}
args = ['chmod', '-R', 'a+rw', DATA_VOLUME]

try:
client.containers.run('busybox', args, **config)
except DockerException as dex:
logger.error('Error setting perms on docker tempdir %s.' % tmpdir)
logger.exception(dex)
raise


def load(params):
Expand Down
Loading