Skip to content

Commit

Permalink
Add resource management to scheduler for multi-GPU
Browse files Browse the repository at this point in the history
See NVIDIA#104, now you can select n specific GPUs, or the next n available
GPUs.
For now, the plan is just to assume that multi-GPU is enabled since
caffe-nv version 0.12 will be required.
This hasn't actually been tested with a multi-GPU caffe build yet
  • Loading branch information
lukeyeager committed May 16, 2015
1 parent a55642b commit f53902c
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 104 deletions.
12 changes: 11 additions & 1 deletion digits/dataset/tasks/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ def html_id(self):
return super(CreateDbTask, self).html_id()

@override
def task_arguments(self, **kwargs):
def offer_resources(self, resources):
key = 'create_db_task_pool'
if key not in resources:
return None
for resource in resources[key]:
if resource.remaining() >= 1:
return {key: [(resource.identifier, 1)]}
return None

@override
def task_arguments(self, resources):
args = [sys.executable, os.path.join(os.path.dirname(os.path.dirname(digits.__file__)), 'tools', 'create_db.py'),
self.path(self.input_file),
self.path(self.db_name),
Expand Down
12 changes: 11 additions & 1 deletion digits/dataset/tasks/parse_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ def html_id(self):
return 'task-parse-folder-%s' % ('-'.join(sets))

@override
def task_arguments(self, **kwargs):
def offer_resources(self, resources):
key = 'parse_folder_task_pool'
if key not in resources:
return None
for resource in resources[key]:
if resource.remaining() >= 1:
return {key: [(resource.identifier, 1)]}
return None

@override
def task_arguments(self, resources):
args = [sys.executable, os.path.join(os.path.dirname(os.path.dirname(digits.__file__)), 'tools', 'parse_folder.py'),
self.folder,
self.path(utils.constants.LABELS_FILE),
Expand Down
14 changes: 9 additions & 5 deletions digits/model/tasks/caffe_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,7 @@ def iteration_to_epoch(self, it):
return float(it * self.train_epochs) / self.solver.max_iter

@override
def task_arguments(self, **kwargs):
gpu_id = kwargs.pop('gpu_id', None)

def task_arguments(self, resources):
if config_option('caffe_root') == '<PATHS>':
caffe_bin = 'caffe'
else:
Expand All @@ -381,8 +379,14 @@ def task_arguments(self, **kwargs):
'--solver=%s' % self.path(self.solver_file),
]

if gpu_id:
args.append('--gpu=%d' % gpu_id)
if 'gpus' in resources:
identifiers = []
for identifier, value in resources['gpus']:
identifiers.append(identifier)
if len(identifiers) == 1:
args.append('--gpu=%s' % identifiers[0])
elif len(identifiers) > 1:
args.append('--gpus=%s' % ','.join(identifiers))
if self.pretrained_model:
args.append('--weights=%s' % self.path(self.pretrained_model))

Expand Down
35 changes: 35 additions & 0 deletions digits/model/tasks/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from digits import utils
from digits.task import Task
from digits.utils import override

# NOTE: Increment this everytime the picked object changes
PICKLE_VERSION = 2
Expand Down Expand Up @@ -98,6 +99,40 @@ def __setstate__(self, state):
self.snapshots = []
self.dataset = None

@override
def offer_resources(self, resources):
if 'gpus' not in resources:
return None
if not resources['gpus']:
return {} # don't use a GPU at all
if self.gpu_count is not None:
identifiers = []
for resource in resources['gpus']:
if resource.remaining() >= 1:
identifiers.append(resource.identifier)
if len(identifiers) == self.gpu_count:
break
if len(identifiers) == self.gpu_count:
return {'gpus': [(i, 1) for i in identifiers]}
else:
return None
elif self.selected_gpus is not None:
found_all = True
for i in self.selected_gpus:
found = False
for gpu in resources['gpus']:
if i == gpu.identifier:
found = True
break
if not found:
found_all = False
break
if found_all:
return {'gpus': [(i, 1) for i in self.selected_gpus]}
else:
return None
return None

def send_progress_update(self, epoch):
"""
Sends socketio message about the current progress
Expand Down
228 changes: 137 additions & 91 deletions digits/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,86 @@
from digits.utils import errors
from log import logger

class Resource(object):
"""
Stores information about which tasks are using a resource
"""

class ResourceAllocation(object):
"""
Marks that a task is using [part of] a resource
"""
def __init__(self, task, value):
"""
Arguments:
task -- which task is using the resource
value -- how much of the resource is being used
"""
self.task = task
self.value = value

def __init__(self, identifier=None, max_value=1):
"""
Keyword arguments:
identifier -- some way to identify this resource
max_value -- a numeric representation of the capacity of this resource
"""
if identifier is None:
self.identifier = id(self)
else:
self.identifier = identifier
self.max_value = max_value
self.allocations = []

def remaining(self):
"""
Returns the amount of this resource that is not being used
"""
return self.max_value - sum(a.value for a in self.allocations)

def allocate(self, task, value):
"""
A task is requesting to use this resource
"""
if self.remaining() - value < 0:
raise RuntimeError('Resource is already maxed out at %s/%s' % (
self.remaining(),
self.max_value)
)
self.allocations.append(self.ResourceAllocation(task, value))

def deallocate(self, task):
"""
The task has finished using this resource
"""
for i, a in enumerate(self.allocations):
if id(task) == id(a.task):
self.allocations.pop(i)
return True
return False

class Scheduler:
"""
Coordinates execution of Jobs
"""
# How many concurrent tasks will run
NUM_SPLIT_THREADS = 1
NUM_CREATE_THREADS = 2

def __init__(self, gpu_list, verbose=False):
def __init__(self, gpu_list=None, verbose=False):
"""
Arguments:
gpu_list -- a list of GPUs to be distributed for training tasks
Keyword arguments:
gpu_list -- a comma-separated string which is a list of GPU id's
verbose -- if True, print more errors
"""
self.jobs = []

# Keeps track of which GPUs are in use
self.gpu_list = []
if gpu_list and gpu_list != 'NONE':
if isinstance(gpu_list, str):
gpu_list = [int(x) for x in gpu_list.split(',')]
elif isinstance(gpu_list, list):
pass
else:
raise ValueError('invalid gpu_list: %s' % gpu_list)
for index in gpu_list:
self.gpu_list.append({'index': int(index), 'active': False})

self.verbose = verbose

self.split_queue = gevent.queue.Queue()
self.create_queue = gevent.queue.Queue()
self.train_queue = gevent.queue.Queue()
# Keeps track of resource usage
self.resources = {
# TODO: break this into CPU cores, memory usage, IO usage, etc.
'parse_folder_task_pool': [Resource()],
'create_db_task_pool': [Resource(max_value=2)],
'gpus': [Resource(identifier=index)
for index in gpu_list.split(',')] if gpu_list else [],
}

self.running = False
self.shutdown = gevent.event.Event()
Expand Down Expand Up @@ -238,19 +283,6 @@ def start(self):

gevent.spawn(self.main_thread)

for x in xrange(self.NUM_SPLIT_THREADS):
gevent.spawn(self.task_thread, self.split_queue)

for x in xrange(self.NUM_CREATE_THREADS):
gevent.spawn(self.task_thread, self.create_queue)

if len(self.gpu_list):
for x in xrange(len(self.gpu_list)):
gevent.spawn(self.task_thread, self.train_queue)
else:
for x in xrange(1): # Only start 1 thread if running in CPU mode
gevent.spawn(self.task_thread, self.train_queue)

self.running = True
return True

Expand Down Expand Up @@ -308,28 +340,27 @@ def start_this_job(job):
if job.status == Status.RUN:
alldone = True
for task in job.tasks:
if task.status == Status.INIT:
if task.status in [Status.INIT, Status.WAIT]:
alldone = False
# try to start the task
if task.ready_to_queue():
logger.debug('%s task queued.' % task.name(), job_id=job.id())
task.status = Status.WAIT
if isinstance(task, dataset_tasks.ParseFolderTask):
self.split_queue.put( (job, task) )
elif isinstance(task, dataset_tasks.CreateDbTask):
self.create_queue.put( (job, task) )
elif isinstance(task, model_tasks.TrainTask):
self.train_queue.put( (job, task) )
requested_resources = task.offer_resources(self.resources)
if requested_resources is None:
task.status = Status.WAIT
else:
logger.error('Task type %s not recognized' % type(task).__name__, job_id=job.id())
task.exception = Exception('Task type not recognized')
task.status = Status.ERROR
elif task.status == Status.WAIT or task.status == Status.RUN:
if self.reserve_resources(task, requested_resources):
logger.debug('%s task started.' % task.name(),
job_id=job.id())
gevent.spawn(self.run_task,
task, requested_resources)
elif task.status == Status.RUN:
# job is not done
alldone = False
elif task.status == Status.DONE:
pass
elif task.status == Status.ABORT:
elif task.status in [Status.DONE, Status.ABORT]:
# job is done
pass
elif task.status == Status.ERROR:
# propogate error status up to job
job.status = Status.ERROR
alldone = False
break
Expand Down Expand Up @@ -363,48 +394,63 @@ def sigterm_handler(self, signal, frame):
"""
self.shutdown.set()

def task_thread(self, queue):
def task_error(self, task, error):
"""
Executes tasks in queue
Handle an error while executing a task
"""
while not self.shutdown.is_set():
if queue.empty() is False:
(job, task) = queue.get_nowait()
logger.error('%s: %s' % (type(e).__name__, e), task.job_id)
task.exception = e
task.traceback = traceback.format_exc()
task.status = Status.ERROR

# Don't run the task if the job is done
if job.status in [Status.ERROR, Status.ABORT]:
task.status = Status.ABORT
else:
options = {}
gpu_id = -1
try:
if isinstance(task, model_tasks.TrainTask):
### Select GPU
if len(self.gpu_list):
for gpu in self.gpu_list:
if not gpu['active']:
gpu_id = gpu['index']
gpu['active'] = True
break
assert gpu_id != -1, 'no available GPU'
else:
gpu_id = None
options['gpu_id'] = gpu_id
def reserve_resources(self, task, resources):
"""
Reserve resources for a task
"""
try:
# reserve resources
for resource_type, requests in resources.iteritems():
for identifier, value in requests:
found = False
for resource in self.resources[resource_type]:
if resource.identifier == identifier:
resource.allocate(task, value)
found = True
break
if not found:
raise RuntimeError('Resource "%s" with identifier="%s" not found' % (
resource_type, identifier))
return True
except Exception as e:
self.task_error(task, e)
self.release_resources(task, resources)
return False

task.run(**options)
def release_resources(self, task, resources):
"""
Release resources previously reserved for a task
"""
# release resources
for resource_type, requests in resources.iteritems():
for identifier, value in requests:
for resource in self.resources[resource_type]:
if resource.identifier == identifier:
resource.deallocate(task)

except Exception as e:
logger.error('%s: %s' % (type(e).__name__, e), job_id=job.id())
task.exception = e
task.traceback = traceback.format_exc()
task.status = Status.ERROR
finally:
### Release GPU
if gpu_id != -1 and gpu_id is not None:
for gpu in self.gpu_list:
if gpu['index'] == gpu_id:
gpu['active'] = False
else:
# Wait before checking again for a task
time.sleep(utils.wait_time())

def run_task(self, task, resources):
"""
Executes a task
Arguments:
task -- the task to run
resources -- the resources allocated for this task
a dict mapping resource_type to lists of (identifier, value) tuples
"""
try:
task.run(resources)
except Exception as e:
self.task_error(task, e)
finally:
self.release_resources(task, resources)

Loading

0 comments on commit f53902c

Please sign in to comment.