Skip to content

Commit

Permalink
#955 use runInEpochCollective instead of addAction
Browse files Browse the repository at this point in the history
  • Loading branch information
cz4rs committed Aug 11, 2020
1 parent 76d9f74 commit c6e622d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 119 deletions.
137 changes: 59 additions & 78 deletions tests/unit/group/test_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,27 @@ TEST_F(TestGroup, test_group_range_construct_1) {
auto const& num_nodes = theContext()->getNumNodes();
NodeType const lo = 0;
NodeType const hi = num_nodes / 2;
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
theTerm()->addAction([=]{
if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);

runInEpochCollective([&]{
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
num_recv = 0;
});

if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

TEST_F(TestGroup, test_group_range_construct_2) {
Expand All @@ -101,76 +103,55 @@ TEST_F(TestGroup, test_group_range_construct_2) {
NodeType const lo = 1;
NodeType const max_val = 5;
NodeType const hi = std::min<NodeType>(num_nodes,max_val);
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
theTerm()->addAction([=]{
if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);


runInEpochCollective([&]{
if (this_node == 0) {
auto list = std::make_unique<region::Range>(lo,hi);
theGroup()->newGroup(
std::move(list), [](GroupType group){
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
}
num_recv = 0;
});

if (this_node >= lo && this_node < hi) {
EXPECT_EQ(num_recv, 1);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

TEST_F(TestGroup, test_group_collective_construct_1) {
auto const& this_node = theContext()->getNode();
auto const& num_nodes = theContext()->getNumNodes();
bool const node_filter = this_node % 2 == 0;
theGroup()->newGroupCollective(
node_filter, [=](GroupType group) {
auto const& in_group = theGroup()->inGroup(group);
auto const& is_default_group = theGroup()->groupDefault(group);
EXPECT_EQ(in_group, node_filter);
EXPECT_EQ(is_default_group, false);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
theTerm()->addAction([=]{
if (node_filter) {
EXPECT_EQ(num_recv, num_nodes);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;

runInEpochCollective([&]{
theGroup()->newGroupCollective(
node_filter, [=](GroupType group) {
auto const& in_group = theGroup()->inGroup(group);
auto const& is_default_group = theGroup()->groupDefault(group);
EXPECT_EQ(in_group, node_filter);
EXPECT_EQ(is_default_group, false);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
}
);
});
}

// TEST_F(TestGroup, test_group_collective_construct_2) {
// auto const& this_node = theContext()->getNode();
// auto const& num_nodes = theContext()->getNumNodes();
// auto const& node_filter = this_node % 2 == 1;
// theGroup()->newGroupCollective(
// node_filter, [=](GroupType group) {
// auto const& in_group = theGroup()->inGroup(group);
// auto const& is_default_group = theGroup()->groupDefault(group);
// ::fmt::print("{}: new group collective lambda\n", this_node);
// EXPECT_EQ(in_group, node_filter);
// EXPECT_EQ(is_default_group, false);
// auto msg = makeMessage<TestMsg>();
// envelopeSetGroup(msg->env, group);
// theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
// }
// );
// theTerm()->addAction([=]{
// if (node_filter) {
// EXPECT_EQ(num_recv, num_nodes);
// } else {
// EXPECT_EQ(num_recv, 0);
// }
// num_recv = 0;
// });
// }
if (node_filter) {
EXPECT_EQ(num_recv, num_nodes);
} else {
EXPECT_EQ(num_recv, 0);
}
num_recv = 0;
}

}}} // end namespace vt::tests::unit
21 changes: 3 additions & 18 deletions tests/unit/location/test_hops.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ struct TestColl : Collection<TestColl,vt::Index2D> {
std::vector<double> vec;
};

template <typename Callable>
void executeInEpoch(Callable&& fn) {
auto this_node = theContext()->getNode();
auto ep = vt::theTerm()->makeEpochCollective();
vt::theMsg()->pushEpoch(ep);
if (this_node == 0) {
fn();
}
vt::theMsg()->popEpoch(ep);
vt::theTerm()->finishedEpoch(ep);
bool done = false;
vt::theTerm()->addAction(ep, [&done]{ done = true; });
do vt::runScheduler(); while (!done);
}

TEST_F(TestHops, test_hops_1) {
auto num_nodes = theContext()->getNumNodes();
auto this_node = theContext()->getNode();
Expand All @@ -169,23 +154,23 @@ TEST_F(TestHops, test_hops_1) {
if (this_node == 0) {
vt_print(gen, "Doing work stage 1 for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::doWork>(false);
}
});
if (this_node == 0) {
vt_print(gen, "Doing work stage 2 for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::doWork>(true);
}
});
if (this_node == 0) {
vt_print(gen, "Running LB for iter={}\n", i);
}
executeInEpoch([=]{
runInEpochCollective([&]{
if (this_node == 0) {
proxy.broadcast<TestColl::TestMsg,&TestColl::dolb>();
}
Expand Down
33 changes: 10 additions & 23 deletions tests/unit/location/test_location_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,13 @@ void verifyCacheConsistency(
// perform the checks only at the end of the epoch
// to ensure that all entity messages have been
// correctly delivered before.
auto epoch = vt::theTerm()->makeEpochCollective();
runInEpochCollective([&]{
// create an entity message to route
auto msg = vt::makeMessage<MsgT>(entity, my_node);
// check if should be serialized or not
bool serialize = msg->getSerialize();

// create an entity message to route
auto msg = vt::makeMessage<MsgT>(entity, my_node);
// check if should be serialized or not
bool serialize = msg->getSerialize();

bool finished = false;

vt::theTerm()->addAction(epoch, [=,&finished]{
if (my_node not_eq home) {

// check the routing protocol to be used by the manager.
bool is_eager = theLocMan()->virtual_loc->useEagerProtocol(msg);

Expand Down Expand Up @@ -193,20 +188,12 @@ void verifyCacheConsistency(
// regardless of the protocol (eager or not)
EXPECT_TRUE(isCached(entity));
}
finished = true;
});

if (my_node not_eq home) {
// route entity message
vt::theLocMan()->virtual_loc->routeMsg<MsgT>(entity, home, msg, serialize);
}
// wait for all ranks and finish the epoch
vt::theCollective()->barrier();
vt::theTerm()->finishedEpoch(epoch);

while (not finished) {
vt::runScheduler();
}
if (my_node not_eq home) {
// route entity message
vt::theLocMan()->virtual_loc->routeMsg<MsgT>(entity, home, msg, serialize);
}
});
}
}

Expand Down

0 comments on commit c6e622d

Please sign in to comment.