Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Add Per Device Threaded Engine Policy, Explicit use copy
Browse files Browse the repository at this point in the history
  • Loading branch information
tqchen committed Sep 14, 2015
1 parent 253cd79 commit daff15c
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 38 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
- TASK=python CXX=g++
- TASK=python3 CXX=g++
- TASK=python_naive CXX=g++
- TASK=python_perdev CXX=g++
- TASK=cpp_unittest CXX=g++

# dependent apt packages
Expand Down
6 changes: 4 additions & 2 deletions include/mxnet/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ typedef Opr* OprHandle;
enum class FnProperty {
/*! \brief Normal operation */
kNormal,
/*! \brief Copy operation between CPU and GPU */
kCopy,
/*! \brief Copy operation from GPU to other devices */
kCopyFromGPU,
/*! \brief Copy operation from CPU to other devices */
kCopyToGPU,
/*! \brief Asynchronous function call */
kAsync
}; // enum class FnProperty
Expand Down
12 changes: 10 additions & 2 deletions scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,23 @@ if [ ${TASK} == "python3" ]; then
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_naive" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=NaiveEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_perdev" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEnginePerDevice
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "cpp_unittest" ]; then
Expand Down
32 changes: 12 additions & 20 deletions src/engine/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <dmlc/base.h>
#include <cstddef>
#include <array>
#include <vector>
#include <thread>
#include <utility>
#include "mxnet/base.h"
Expand All @@ -24,11 +24,17 @@ class ThreadPool {
* \param size size of the thread pool.
* \param func the function to run on the thread pool.
*/
explicit ThreadPool(size_t size, std::function<void()> func);
/*!
* \brief Destructor.
*/
~ThreadPool() noexcept(false);
explicit ThreadPool(size_t size, std::function<void()> func)
: worker_threads_(size) {
for (auto& i : worker_threads_) {
i = std::thread(func);

This comment has been minimized.

Copy link
@idealboy

idealboy Jan 18, 2017

Is very slow to Start the Thread Entry Function?

}
}
~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

private:
/*!
Expand All @@ -44,20 +50,6 @@ class ThreadPool {
*/
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};

ThreadPool::ThreadPool(size_t size, std::function<void()> func)
: worker_threads_(size) {
for (auto& i : worker_threads_) {
i = std::thread(func);
}
}

ThreadPool::~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

} // namespace engine
} // namespace mxnet
#endif // MXNET_ENGINE_THREAD_POOL_H_
6 changes: 4 additions & 2 deletions src/engine/threaded_engine_perdevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <dmlc/logging.h>
#include <dmlc/parameter.h>
#include <dmlc/concurrency.h>
#include <array>
#include "./threaded_engine.h"
#include "./thread_pool.h"
#include "./stream_manager.h"
Expand Down Expand Up @@ -92,7 +93,8 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
*/
inline ThreadWorkerBlock *GetGPUWorkerBlock(size_t dev_id,
FnProperty prop) {
bool is_copy = (prop == FnProperty::kCopy);
bool is_copy = (prop == FnProperty::kCopyFromGPU ||
prop == FnProperty::kCopyToGPU);
CHECK_LT(dev_id, kMaxNumGPUs)
<< "GPU Device index " << dev_id
<< " exceed bound " << kMaxNumGPUs;
Expand Down Expand Up @@ -130,7 +132,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
dmlc::ConcurrentBlockingQueue<OprBlock*>* task_queue) {
#if MXNET_USE_CUDA
// allocate stream
mshadow::SetDevice(dev_id);
mshadow::SetDevice<gpu>(dev_id);
RunContext run_ctx;
mshadow::Stream<gpu> *stream;
if (is_copy_worker) {
Expand Down
7 changes: 5 additions & 2 deletions src/engine/threaded_engine_pooled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class ThreadedEnginePooled : public ThreadedEngine {
LOG(FATAL) << "Please compile with CUDA enabled";
#endif // MXNET_USE_CUDA
}
auto&& rctx = opr_block->opr->prop == FnProperty::kCopy
bool is_copy = (opr_block->opr->prop == FnProperty::kCopyFromGPU ||
opr_block->opr->prop == FnProperty::kCopyToGPU);
auto&& rctx = is_copy
? streams_.GetIORunContext(opr_block->ctx)
: streams_.GetRunContext(opr_block->ctx);
this->ExecuteOprBlock(rctx, opr_block);
Expand All @@ -97,7 +99,8 @@ class ThreadedEnginePooled : public ThreadedEngine {
*/
void DoPushToQueue(OprBlock* opr_block) {
switch (opr_block->opr->prop) {
case FnProperty::kCopy: {
case FnProperty::kCopyFromGPU:
case FnProperty::kCopyToGPU: {
io_task_queue_.Push(opr_block);
break;
}
Expand Down
14 changes: 7 additions & 7 deletions src/ndarray/ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void CopyFromTo(const NDArray &from, NDArray *to) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
ndarray::Copy<cpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), const_vars, {ret.ptr_->var});
} else {
#if MXNET_USE_CUDA
Expand All @@ -178,28 +178,28 @@ void CopyFromTo(const NDArray &from, NDArray *to) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
ndarray::Copy<cpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, ret.ctx(), const_vars, {ret.ptr_->var});
}, ret.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyToGPU);
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->PushSync([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
ndarray::Copy<gpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
}, from.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyFromGPU);
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->PushSync([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
ndarray::Copy<gpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
}, from.ctx(), const_vars, {ret.ptr_->var}, FnProperty::kCopyFromGPU);
} else {
LOG(FATAL) << "unknown device mask";
}
Expand Down
3 changes: 0 additions & 3 deletions src/symbol/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,6 @@ GraphExecutor::GetOpExecEntry(uint32_t nid) {
}
}

for (const Resource& r : op_node.op_ctx.requested) {
exec.mutate_vars.push_back(static_cast<DAGEngine::Variable>(r.var));
}
// start setup exec function.
for (const Resource& r : op_node.op_ctx.requested) {
exec.mutate_vars.push_back(static_cast<Engine::VarHandle>(r.var));
Expand Down

0 comments on commit daff15c

Please sign in to comment.