Skip to content

Commit

Permalink
#1173: only enable theWorkerGrp when threading is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
cz4rs committed Feb 1, 2021
1 parent ee906bd commit f166c59
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmake/load_threading_package.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ elseif(vt_fcontext_enabled)
else()
message(
STATUS
"\n\n\nThreading disabled\n\n\n"
"Threading disabled"
)
config_no_threading()
endif()
2 changes: 1 addition & 1 deletion cmake/threading_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ function(config_no_threading)
set(vt_feature_cmake_openmp "0" PARENT_SCOPE)
set(vt_feature_cmake_stdthread "0" PARENT_SCOPE)
set(vt_fcontext_enabled "0" PARENT_SCOPE)
endfunction(config_no_threading)
endfunction(config_no_threading)
4 changes: 4 additions & 0 deletions src/vt/collective/collective_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void CollectiveAnyOps<instance>::scheduleThenFinalize(
if (workers == no_workers) {
sched_fn();
} else {
#if vt_threading_enabled
theWorkerGrp()->spawnWorkersBlock(sched_fn);
#else
sched_fn();
#endif
}

CollectiveAnyOps<instance>::finalize(has_rt ? std::move(in_rt) : nullptr);
Expand Down
4 changes: 3 additions & 1 deletion src/vt/configs/debug/debug_masterconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ using VTPrintConfig = Configuration<
!vt_check_enabled(openmp) && !vt_check_enabled(stdthread)

#define vt_threading_enabled \
vt_check_enabled(openmp) && vt_check_enabled(stdthread) && vt_check_enabled(fcontext)
(vt_check_enabled(openmp) \
or vt_check_enabled(stdthread) \
or vt_check_enabled(fcontext))

#endif /*INCLUDED_VT_CONFIGS_DEBUG_DEBUG_MASTERCONFIG_H*/
2 changes: 2 additions & 0 deletions src/vt/context/context_attorney.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ namespace vt { namespace ctx {
* by the runtime and other components
*/
struct ContextAttorney {
#if vt_threading_enabled
/// Allow the worker to modify the contextual worker
friend worker::WorkerGroupType;
/// Allow the worker group to modify the contextual worker
friend worker::WorkerType;
#endif
/// Allow the runtime to set the number of workers
friend runtime::Runtime;

Expand Down
4 changes: 4 additions & 0 deletions src/vt/pool/pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,13 @@ void Pool::dealloc(void* const buf) {
);

if (pool_type != ePoolSize::Malloc && alloc_worker != worker) {
#if vt_threading_enabled
theWorkerGrp()->enqueueForWorker(worker, [buf]{
thePool()->dealloc(buf);
});
#else
thePool()->dealloc(buf);
#endif
return;
}

Expand Down
8 changes: 8 additions & 0 deletions src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ void Runtime::initializeComponents() {
>{}
);

#if vt_threading_enabled
p_->registerComponent<worker::WorkerGroupType>(
&theWorkerGrp, Deps<
ctx::Context, // Everything depends on theContext
Expand All @@ -743,6 +744,7 @@ void Runtime::initializeComponents() {
term::TerminationDetector // Depends on TD for idle callbacks
>{}
);
#endif

p_->registerComponent<collective::CollectiveAlg>(
&theCollective, Deps<
Expand Down Expand Up @@ -911,10 +913,12 @@ void Runtime::initializeComponents() {
p_->add<vrt::collection::balance::StatsRestartReader>();
}

#if vt_threading_enabled
bool const has_workers = num_workers_ != no_workers;
if (has_workers) {
p_->add<worker::WorkerGroupType>();
}
#endif

p_->construct();

Expand Down Expand Up @@ -966,6 +970,7 @@ void Runtime::initializeWorkers(WorkerCountType const num_workers) {
bool const has_workers = num_workers != no_workers;

if (has_workers) {
#if vt_threading_enabled
ContextAttorney::setNumWorkers(num_workers);

// Initialize individual memory pool for each worker
Expand All @@ -980,6 +985,7 @@ void Runtime::initializeWorkers(WorkerCountType const num_workers) {
}
};
theWorkerGrp->registerIdleListener(localTermFn);
#endif
} else {
// Without workers running on the node, the termination detector should
// enable/disable the global collective epoch based on the state of the
Expand Down Expand Up @@ -1058,9 +1064,11 @@ void Runtime::printMemoryFootprint() const {
static_cast<vrt::VirtualContextManager*>(base)
);
} else if (name == "WorkerGroupOMP" || name == "WorkerGroup") {
#if vt_threading_enabled
printComponentFootprint(
static_cast<worker::WorkerGroupType*>(base)
);
#endif
} else if (name == "Collective") {
printComponentFootprint(
static_cast<collective::CollectiveAlg*>(base)
Expand Down
2 changes: 2 additions & 0 deletions src/vt/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ struct Runtime {
ComponentPtrType<phase::PhaseManager> thePhase = nullptr;

// Node-level worker-based components for vt (these are optional)
#if vt_threading_enabled
ComponentPtrType<worker::WorkerGroupType> theWorkerGrp = nullptr;
#endif

// Optional components
#if vt_check_enabled(trace_enabled)
Expand Down
2 changes: 2 additions & 0 deletions src/vt/runtime/runtime_get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ using CollectionManagerType = vrt::collection::CollectionManager;
ctx::Context* theContext() { return CUR_RT_TS->theContext; }
pool::Pool* thePool() { return CUR_RT_TS->thePool; }
vrt::VirtualContextManager* theVirtualManager() { return CUR_RT_TS->theVirtualManager; }
#if vt_threading_enabled
worker::WorkerGroupType* theWorkerGrp() { return CUR_RT_TS->theWorkerGrp; }
#endif

// Non thread-safe runtime components
collective::CollectiveAlg* theCollective() { return CUR_RT->theCollective; }
Expand Down
4 changes: 3 additions & 1 deletion src/vt/standalone/vt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ int vt_main(

auto comm_fn = vrCommThreadWork<VrtContextT>;

if (workers == no_workers) {
if (workers == no_workers or !vt_threading_enabled) {
comm_fn();
} else {
#if vt_threading_enabled
vtAssert(theWorkerGrp() != nullptr, "Must have valid worker group");
theWorkerGrp()->spawnWorkersBlock(comm_fn);
#endif
}

vt_debug_print(gen, node, "vt_main: auto finalize workers={}\n", workers);
Expand Down
11 changes: 9 additions & 2 deletions src/vt/vrt/context/context_vrtinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ void VirtualInfo::setVirtualContextPtr(VirtualPtrType in_vrt_ptr) {
);

msg_buffer_.attach([this](VirtualMessage* msg){
#if vt_threading_enabled
theWorkerGrp()->enqueueCommThread([this,msg]{
enqueueWorkUnit(msg);
});
#else
(void)this;
enqueueWorkUnit(msg);
#endif
});
}

Expand All @@ -105,15 +110,17 @@ bool VirtualInfo::enqueueWorkUnit(VirtualMessage* raw_msg) {
};

bool const has_workers = theContext()->hasWorkers();
bool const execute_comm = msg->getExecuteCommThread();

if (has_workers) {
if (has_workers and vt_threading_enabled) {
#if vt_threading_enabled
bool const execute_comm = msg->getExecuteCommThread();
if (hasCoreMap() && !execute_comm) {
auto const core = getCore();
theWorkerGrp()->enqueueForWorker(core, work_unit);
} else {
theWorkerGrp()->enqueueCommThread(work_unit);
}
#endif
return true;
} else {
work_unit();
Expand Down
9 changes: 9 additions & 0 deletions src/vt/vrt/context/context_vrtmanager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,15 @@ messaging::PendingSend VirtualContextManager::sendSerialMsg(
} else {
return messaging::PendingSend(
base_msg, msg_sz, [=](MsgPtr<BaseMsgType> mymsg) {
#if vt_threading_enabled
theWorkerGrp()->enqueueCommThread([=]{
auto typed_msg = reinterpret_cast<MsgT*>(mymsg.get());
theVirtualManager()->sendSerialMsg<VcT, MsgT, f>(toProxy, typed_msg);
});
#else
auto typed_msg = reinterpret_cast<MsgT*>(mymsg.get());
theVirtualManager()->sendSerialMsg<VcT, MsgT, f>(toProxy, typed_msg);
#endif
}
);
}
Expand Down Expand Up @@ -295,7 +300,11 @@ VirtualProxyType VirtualContextManager::makeVirtualMap(Args... args) {

// work to defer to the worker thread
auto work_unit = [=]{ cl->make(); delete cl; };
#if vt_threading_enabled
theWorkerGrp()->enqueueForWorker(mapped_core, work_unit);
#else
work_unit();
#endif

return proxy;
}
Expand Down
6 changes: 1 addition & 5 deletions src/vt/worker/worker_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
#include "vt/worker/worker_stdthread.h"
#elif vt_check_enabled(fcontext)
#include "vt/worker/worker_seq.h"
#else
// #include "vt/worker/worker_dummy.h"
#endif

#include <vector>
Expand Down Expand Up @@ -117,9 +115,7 @@ struct WorkerGroupAny
using WorkerGroupSTD = WorkerGroupAny<StdThreadWorker>;
#elif vt_check_enabled(fcontext)
using WorkerGroupSeq = WorkerGroupAny<WorkerSeq>;
#else
// using WorkerGroupDummy = WorkerGroupAny<WorkerDummy>;
#endif /*vt_check_enabled(stdthread)*/
#endif

}} /* end namespace vt::worker */

Expand Down
4 changes: 0 additions & 4 deletions src/vt/worker/worker_headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ namespace vt { namespace worker {
using WorkerGroupType = WorkerGroupSTD;
#elif vt_check_enabled(fcontext)
using WorkerGroupType = WorkerGroupSeq;
#else
// using WorkerGroupType = WorkerGroupDummy;
#endif

#if vt_check_enabled(openmp)
Expand All @@ -75,8 +73,6 @@ namespace vt { namespace worker {
using WorkerType = StdThreadWorker;
#elif vt_check_enabled(fcontext)
using WorkerType = WorkerSeq;
#else
// using WorkerType = WorkerDummy;
#endif

}} /* end namespace vt::worker */
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/runtime/test_memory_footprint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ TEST_F(TestMemoryFootprinting, test_async_event) {
printMemoryFootprint(&event);
}

#if vt_threading_enabled
TEST_F(TestMemoryFootprinting, test_worker_group_type) {
printMemoryFootprint(theWorkerGrp());
}
#endif

TEST_F(TestMemoryFootprinting, test_virtual_context_manager) {
printMemoryFootprint(theVirtualManager());
Expand Down

0 comments on commit f166c59

Please sign in to comment.