Skip to content

Commit

Permalink
add gpu support
Browse files Browse the repository at this point in the history
  • Loading branch information
windreamer committed Apr 28, 2016
1 parent c504a9f commit 7ee2499
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:jessie
FROM nvidia/cuda:cudnn-runtime
RUN apt-get update && apt-get install --no-install-recommends -y \
g++ \
libopenblas-dev \
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ wheel==0.29.0
wsgiref==0.1.2
zkpython==0.4.2
-e git+https://github.com/douban/mesos.interface.git#egg=mesos.interface
https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.8.0-cp27-none-linux_x86_64.whl
https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.8.0-cp27-none-linux_x86_64.whl
30 changes: 25 additions & 5 deletions tfmesos/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import math
import select
import signal
import socket
Expand All @@ -18,22 +19,24 @@

class Job(object):

def __init__(self, name, num, cpus=1.0, mem=1024.0):
def __init__(self, name, num, cpus=1.0, mem=1024.0, gpus=0):
self.name = name
self.num = num
self.cpus = cpus
self.gpus = gpus
self.mem = mem


class Task(object):

def __init__(self, mesos_task_id, job_name, task_index,
cpus=1.0, mem=1024.0, volumes={}):
cpus=1.0, mem=1024.0, gpus=0, volumes={}):
self.mesos_task_id = mesos_task_id
self.job_name = job_name
self.task_index = task_index

self.cpus = cpus
self.gpus = gpus
self.mem = mem
self.volumes = volumes
self.offered = False
Expand All @@ -49,7 +52,7 @@ def __str__(self):
addr=%s
>''' % (self.mesos_task_id, self.addr))

def to_task_info(self, offer, master_addr):
def to_task_info(self, offer, master_addr, gpu_ids=[]):
ti = mesos_pb2.TaskInfo()
ti.task_id.value = str(self.mesos_task_id)
ti.slave_id.value = offer.slave_id.value
Expand All @@ -60,6 +63,12 @@ def to_task_info(self, offer, master_addr):
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = self.cpus

if self.gpus:
gpus = ti.resources.add()
gpus.name = 'gpus'
gpus.type = mesos_pb2.Value.SET
gpus.set.item.extend(gpu_ids)

mem = ti.resources.add()
mem.name = 'mem'
mem.type = mesos_pb2.Value.SCALAR
Expand Down Expand Up @@ -136,6 +145,7 @@ def __init__(self, task_spec, master=None, name=None, quiet=False,
task_index,
cpus=job.cpus,
mem=job.mem,
gpus=job.gpus,
volumes=volumes,
)
)
Expand All @@ -155,27 +165,36 @@ def resourceOffers(self, driver, offers):
continue

offered_cpus = offered_mem = 0.0
offered_gpus = []
offered_tasks = []

for resource in offer.resources:
if resource.name == "cpus":
offered_cpus = resource.scalar.value
elif resource.name == "mem":
offered_mem = resource.scalar.value
elif resource.name == "gpus":
offered_gpus = resource.set.item

for task in self.tasks:
if task.offered:
continue

if not (task.cpus <= offered_cpus and
task.mem <= offered_mem):
task.mem <= offered_mem and
task.gpus <= len(offered_gpus)):

continue

offered_cpus -= task.cpus
offered_mem -= task.mem
gpus = int(math.ceil(self.gpus))
gpu_ids = offered_gpus[:gpus]
offered_gpus = offered_gpus[gpus:]
task.offered = True
offered_tasks.append(task.to_task_info(offer, self.addr))
offered_tasks.append(
task.to_task_info(
offer, self.addr, gpu_ids=gpu_ids))

driver.launchTasks(offer.id, offered_tasks, mesos_pb2.Filters())

Expand All @@ -195,6 +214,7 @@ def _start_tf_cluster(self):
"task_index": task.task_index,
"cpus": task.cpus,
"mem": task.mem,
"gpus": task.gpus,
"cluster_def": cluster_def,
}
send(task.connection, response)
Expand Down
3 changes: 2 additions & 1 deletion tfmesos/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def main(argv):
task_index = response["task_index"]
cpus = response["cpus"]
mem = response["mem"]
gpus = response["gpus"]

server_def = tf.train.ServerDef(
cluster=tf.train.ClusterSpec(cluster_def).as_cluster_def(),
Expand All @@ -38,7 +39,7 @@ def main(argv):
)

server_def.default_session_config.device_count["CPU"] = int(cpus)
server_def.default_session_config.device_count["GPU"] = 0
server_def.default_session_config.device_count["GPU"] = int(gpus)
(soft, hard) = resource.getrlimit(resource.RLIMIT_AS)
soft = min(float(mem), soft, hard)
resource.setrlimit(resource.RLIMIT_AS, (soft, hard))
Expand Down

0 comments on commit 7ee2499

Please sign in to comment.