From daff15c20bc692835870a99e266f2c16bb019063 Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 14 Sep 2015 12:57:52 -0700 Subject: [PATCH] Add Per Device Threaded Engine Policy, Explicit use copy --- .travis.yml | 1 + include/mxnet/engine.h | 6 +++-- scripts/travis_script.sh | 12 ++++++++-- src/engine/thread_pool.h | 32 ++++++++++--------------- src/engine/threaded_engine_perdevice.cc | 6 +++-- src/engine/threaded_engine_pooled.cc | 7 ++++-- src/ndarray/ndarray.cc | 14 +++++------ src/symbol/graph_executor.cc | 3 --- 8 files changed, 43 insertions(+), 38 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5c7a5d2562a6..1d9e5bad4ed3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ env: - TASK=python CXX=g++ - TASK=python3 CXX=g++ - TASK=python_naive CXX=g++ + - TASK=python_perdev CXX=g++ - TASK=cpp_unittest CXX=g++ # dependent apt packages diff --git a/include/mxnet/engine.h b/include/mxnet/engine.h index f185da8215c3..6bcf7b3da3da 100644 --- a/include/mxnet/engine.h +++ b/include/mxnet/engine.h @@ -33,8 +33,10 @@ typedef Opr* OprHandle; enum class FnProperty { /*! \brief Normal operation */ kNormal, - /*! \brief Copy operation between CPU and GPU */ - kCopy, + /*! \brief Copy operation from GPU to other devices */ + kCopyFromGPU, + /*! \brief Copy operation from CPU to other devices */ + kCopyToGPU, /*! \brief Asynchronous function call */ kAsync }; // enum class FnProperty diff --git a/scripts/travis_script.sh b/scripts/travis_script.sh index 1b250afdf70b..07abd4881dce 100755 --- a/scripts/travis_script.sh +++ b/scripts/travis_script.sh @@ -40,7 +40,7 @@ if [ ${TASK} == "python3" ]; then make all || exit -1 export MXNET_ENGINE_TYPE=ThreadedEngine nosetests tests/python/unittest || exit -1 - nosetests tests/python/train || exit -1 + nosetests tests/python/train || exit -1 fi if [ ${TASK} == "python_naive" ]; then @@ -48,7 +48,15 @@ if [ ${TASK} == "python_naive" ]; then make all || exit -1 export MXNET_ENGINE_TYPE=NaiveEngine nosetests tests/python/unittest || exit -1 - nosetests tests/python/train || exit -1 + nosetests tests/python/train || exit -1 +fi + +if [ ${TASK} == "python_perdev" ]; then + echo "USE_CUDA=0" >> config.mk + make all || exit -1 + export MXNET_ENGINE_TYPE=ThreadedEnginePerDevice + nosetests tests/python/unittest || exit -1 + nosetests tests/python/train || exit -1 fi if [ ${TASK} == "cpp_unittest" ]; then diff --git a/src/engine/thread_pool.h b/src/engine/thread_pool.h index acbf61896df4..b88cddaa29c5 100644 --- a/src/engine/thread_pool.h +++ b/src/engine/thread_pool.h @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include "mxnet/base.h" @@ -24,11 +24,17 @@ class ThreadPool { * \param size size of the thread pool. * \param func the function to run on the thread pool. */ - explicit ThreadPool(size_t size, std::function func); - /*! - * \brief Destructor. - */ - ~ThreadPool() noexcept(false); + explicit ThreadPool(size_t size, std::function func) + : worker_threads_(size) { + for (auto& i : worker_threads_) { + i = std::thread(func); + } + } + ~ThreadPool() noexcept(false) { + for (auto&& i : worker_threads_) { + i.join(); + } + } private: /*! @@ -44,20 +50,6 @@ class ThreadPool { */ DISALLOW_COPY_AND_ASSIGN(ThreadPool); }; - -ThreadPool::ThreadPool(size_t size, std::function func) - : worker_threads_(size) { - for (auto& i : worker_threads_) { - i = std::thread(func); - } -} - -ThreadPool::~ThreadPool() noexcept(false) { - for (auto&& i : worker_threads_) { - i.join(); - } -} - } // namespace engine } // namespace mxnet #endif // MXNET_ENGINE_THREAD_POOL_H_ diff --git a/src/engine/threaded_engine_perdevice.cc b/src/engine/threaded_engine_perdevice.cc index 09c12da13938..39681130e43d 100644 --- a/src/engine/threaded_engine_perdevice.cc +++ b/src/engine/threaded_engine_perdevice.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include "./threaded_engine.h" #include "./thread_pool.h" #include "./stream_manager.h" @@ -92,7 +93,8 @@ class ThreadedEnginePerDevice : public ThreadedEngine { */ inline ThreadWorkerBlock *GetGPUWorkerBlock(size_t dev_id, FnProperty prop) { - bool is_copy = (prop == FnProperty::kCopy); + bool is_copy = (prop == FnProperty::kCopyFromGPU || + prop == FnProperty::kCopyToGPU); CHECK_LT(dev_id, kMaxNumGPUs) << "GPU Device index " << dev_id << " exceed bound " << kMaxNumGPUs; @@ -130,7 +132,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { dmlc::ConcurrentBlockingQueue* task_queue) { #if MXNET_USE_CUDA // allocate stream - mshadow::SetDevice(dev_id); + mshadow::SetDevice(dev_id); RunContext run_ctx; mshadow::Stream *stream; if (is_copy_worker) { diff --git a/src/engine/threaded_engine_pooled.cc b/src/engine/threaded_engine_pooled.cc index e8027eeea1f9..0978b32ea8d6 100644 --- a/src/engine/threaded_engine_pooled.cc +++ b/src/engine/threaded_engine_pooled.cc @@ -86,7 +86,9 @@ class ThreadedEnginePooled : public ThreadedEngine { LOG(FATAL) << "Please compile with CUDA enabled"; #endif // MXNET_USE_CUDA } - auto&& rctx = opr_block->opr->prop == FnProperty::kCopy + bool is_copy = (opr_block->opr->prop == FnProperty::kCopyFromGPU || + opr_block->opr->prop == FnProperty::kCopyToGPU); + auto&& rctx = is_copy ? streams_.GetIORunContext(opr_block->ctx) : streams_.GetRunContext(opr_block->ctx); this->ExecuteOprBlock(rctx, opr_block); @@ -97,7 +99,8 @@ class ThreadedEnginePooled : public ThreadedEngine { */ void DoPushToQueue(OprBlock* opr_block) { switch (opr_block->opr->prop) { - case FnProperty::kCopy: { + case FnProperty::kCopyFromGPU: + case FnProperty::kCopyToGPU: { io_task_queue_.Push(opr_block); break; } diff --git a/src/ndarray/ndarray.cc b/src/ndarray/ndarray.cc index e9be7e445da6..feb3de61be2d 100644 --- a/src/ndarray/ndarray.cc +++ b/src/ndarray/ndarray.cc @@ -169,7 +169,7 @@ void CopyFromTo(const NDArray &from, NDArray *to) { ret.ptr_->CheckAndAlloc(); TBlob tmp = ret.data(); ndarray::Copy(from.data(), &tmp, - from.ctx(), ret.ctx(), ctx); + from.ctx(), ret.ctx(), ctx); }, from.ctx(), const_vars, {ret.ptr_->var}); } else { #if MXNET_USE_CUDA @@ -178,28 +178,28 @@ void CopyFromTo(const NDArray &from, NDArray *to) { ret.ptr_->CheckAndAlloc(); TBlob tmp = ret.data(); ndarray::Copy(from.data(), &tmp, - from.ctx(), ret.ctx(), ctx); + from.ctx(), ret.ctx(), ctx); // Wait GPU kernel to complete ctx.get_stream()->Wait(); - }, ret.ctx(), const_vars, {ret.ptr_->var}); + }, ret.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyToGPU); } else if (a == gpu::kDevMask && b == cpu::kDevMask) { Engine::Get()->PushSync([from, ret](RunContext ctx) { ret.ptr_->CheckAndAlloc(); TBlob tmp = ret.data(); ndarray::Copy(from.data(), &tmp, - from.ctx(), ret.ctx(), ctx); + from.ctx(), ret.ctx(), ctx); // Wait GPU kernel to complete ctx.get_stream()->Wait(); - }, from.ctx(), const_vars, {ret.ptr_->var}); + }, from.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyFromGPU); } else if (a == gpu::kDevMask && b == gpu::kDevMask) { Engine::Get()->PushSync([from, ret](RunContext ctx) { ret.ptr_->CheckAndAlloc(); TBlob tmp = ret.data(); ndarray::Copy(from.data(), &tmp, - from.ctx(), ret.ctx(), ctx); + from.ctx(), ret.ctx(), ctx); // Wait GPU kernel to complete ctx.get_stream()->Wait(); - }, from.ctx(), const_vars, {ret.ptr_->var}); + }, from.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyFromGPU); } else { LOG(FATAL) << "unknown device mask"; } diff --git a/src/symbol/graph_executor.cc b/src/symbol/graph_executor.cc index 498ec4f942e0..8e08952eb234 100644 --- a/src/symbol/graph_executor.cc +++ b/src/symbol/graph_executor.cc @@ -205,9 +205,6 @@ GraphExecutor::GetOpExecEntry(uint32_t nid) { } } - for (const Resource& r : op_node.op_ctx.requested) { - exec.mutate_vars.push_back(static_cast(r.var)); - } // start setup exec function. for (const Resource& r : op_node.op_ctx.requested) { exec.mutate_vars.push_back(static_cast(r.var));