From d6dda2c1de49401c093771bdd92d0e69c94d92b7 Mon Sep 17 00:00:00 2001 From: Carlos Date: Wed, 9 Jun 2021 09:35:57 +0100 Subject: [PATCH] Test fixtures for MPI (#108) * adding test fixtures for mpi * new file for remote mpi testing * update coll. comm tests --- tests/test/scheduler/test_mpi_context.cpp | 47 +- tests/test/scheduler/test_mpi_world.cpp | 618 ++++-------------- .../test/scheduler/test_remote_mpi_worlds.cpp | 298 +++++++++ tests/utils/fixtures.h | 95 +++ 4 files changed, 529 insertions(+), 529 deletions(-) create mode 100644 tests/test/scheduler/test_remote_mpi_worlds.cpp diff --git a/tests/test/scheduler/test_mpi_context.cpp b/tests/test/scheduler/test_mpi_context.cpp index d6a547257..d2a64bae2 100644 --- a/tests/test/scheduler/test_mpi_context.cpp +++ b/tests/test/scheduler/test_mpi_context.cpp @@ -1,28 +1,16 @@ -#include "faabric_utils.h" #include #include #include #include +#include using namespace faabric::scheduler; -static void tearDown(MpiWorld& world) -{ - world.destroy(); - - getScheduler().reset(); -} - namespace tests { -TEST_CASE("Check world creation", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Check world creation", "[mpi]") { - cleanFaabric(); - - faabric::Message msg = faabric::util::messageFactory("mpi", "hellompi"); - msg.set_mpiworldsize(10); - MpiContext c; c.createWorld(msg); @@ -39,33 +27,26 @@ TEST_CASE("Check world creation", "[mpi]") MpiWorldRegistry& reg = getMpiWorldRegistry(); MpiWorld& world = reg.getOrInitialiseWorld(msg); REQUIRE(world.getId() == worldId); - REQUIRE(world.getSize() == 10); - REQUIRE(world.getUser() == "mpi"); - REQUIRE(world.getFunction() == "hellompi"); + REQUIRE(world.getSize() == worldSize); + REQUIRE(world.getUser() == user); + REQUIRE(world.getFunction() == func); - tearDown(world); + world.destroy(); } -TEST_CASE("Check world cannot be created for non-zero rank", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, + "Check world cannot be created for non-zero rank", + "[mpi]") { - cleanFaabric(); - - // Create message with non-zero rank - faabric::Message msg = faabric::util::messageFactory("mpi", "hellompi"); msg.set_mpirank(2); - msg.set_mpiworldsize(10); // Try creating world MpiContext c; REQUIRE_THROWS(c.createWorld(msg)); } -TEST_CASE("Check default world size is set", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Check default world size is set", "[mpi]") { - cleanFaabric(); - - // Create message with non-zero rank - faabric::Message msg = faabric::util::messageFactory("mpi", "hellompi"); msg.set_mpirank(0); // Set a new world size @@ -94,13 +75,11 @@ TEST_CASE("Check default world size is set", "[mpi]") // Reset config conf.defaultMpiWorldSize = origSize; - tearDown(world); + world.destroy(); } -TEST_CASE("Check joining world", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Check joining world", "[mpi]") { - cleanFaabric(); - const std::string expectedHost = faabric::util::getSystemConfig().endpointHost; @@ -136,6 +115,6 @@ TEST_CASE("Check joining world", "[mpi]") const std::string actualHost = world.getHostForRank(1); REQUIRE(actualHost == expectedHost); - tearDown(world); + world.destroy(); } } diff --git a/tests/test/scheduler/test_mpi_world.cpp b/tests/test/scheduler/test_mpi_world.cpp index ccb0f3b92..402ebec8e 100644 --- a/tests/test/scheduler/test_mpi_world.cpp +++ b/tests/test/scheduler/test_mpi_world.cpp @@ -1,4 +1,3 @@ -#include "faabric_utils.h" #include #include @@ -9,34 +8,15 @@ #include #include #include +#include using namespace faabric::scheduler; -static void tearDown(std::vector worlds) -{ - for (auto& world : worlds) { - world->destroy(); - } - - getScheduler().reset(); -} - namespace tests { - -static int worldId = 123; -static int worldSize = 10; -static const char* user = "mpi"; -static const char* func = "hellompi"; - -TEST_CASE("Test world creation", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Test world creation", "[mpi]") { - cleanFaabric(); - - Scheduler& sch = getScheduler(); - // Create the world - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; + MpiWorld world; world.create(msg, worldId, worldSize); REQUIRE(world.getSize() == worldSize); @@ -62,23 +42,13 @@ TEST_CASE("Test world creation", "[mpi]") const std::string actualHost = world.getHostForRank(0); REQUIRE(actualHost == faabric::util::getSystemConfig().endpointHost); - tearDown({ &world }); + world.destroy(); } -TEST_CASE("Test world loading from msg", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test world loading from msg", "[mpi]") { - cleanFaabric(); - - // Create a world - faabric::Message msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld worldA; - worldA.create(msg, worldId, worldSize); - // Create another copy from state scheduler::MpiWorld worldB; - // These would be set by the master rank, when invoking other ranks - msg.set_mpiworldsize(worldSize); - msg.set_mpiworldid(worldId); // Force creating the second world in the _same_ host bool forceLocal = true; worldB.initialiseFromMsg(msg, forceLocal); @@ -88,60 +58,12 @@ TEST_CASE("Test world loading from msg", "[mpi]") REQUIRE(worldB.getUser() == user); REQUIRE(worldB.getFunction() == func); - tearDown({ &worldA, &worldB }); -} - -TEST_CASE("Test rank allocation", "[mpi]") -{ - cleanFaabric(); - - auto& sch = faabric::scheduler::getScheduler(); - - // Force the scheduler to initialise a world in the remote host by setting - // a worldSize bigger than the slots available locally - int worldSize = 2; - faabric::HostResources localResources; - localResources.set_slots(1); - localResources.set_usedslots(1); - faabric::HostResources otherResources; - otherResources.set_slots(1); - - std::string thisHost = faabric::util::getSystemConfig().endpointHost; - std::string otherHost = LOCALHOST; - sch.addHostToGlobalSet(otherHost); - - // Mock everything to make sure the other host has resources as well - faabric::util::setMockMode(true); - sch.setThisHostResources(localResources); - faabric::scheduler::queueResourceResponse(otherHost, otherResources); - - // Create a world - faabric::Message msg = faabric::util::messageFactory(user, func); - msg.set_mpiworldid(worldId); - msg.set_mpiworldsize(worldSize); - - // Create the local world - scheduler::MpiWorld& localWorld = - getMpiWorldRegistry().createWorld(msg, worldId); - - scheduler::MpiWorld remoteWorld; - remoteWorld.overrideHost(otherHost); - remoteWorld.initialiseFromMsg(msg); - - // Now check both world instances report the same mappings - REQUIRE(localWorld.getHostForRank(0) == thisHost); - REQUIRE(localWorld.getHostForRank(1) == otherHost); - - faabric::util::setMockMode(false); - tearDown({ &localWorld, &remoteWorld }); + worldB.destroy(); } -TEST_CASE("Test cartesian communicator", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Test cartesian communicator", "[mpi]") { - cleanFaabric(); - - faabric::Message msg = faabric::util::messageFactory(user, func); - + MpiWorld world; int worldSize; int maxDims = 3; std::vector dims(maxDims); @@ -164,6 +86,7 @@ TEST_CASE("Test cartesian communicator", "[mpi]") { 4, 1, 0, 0, 0, 0 }, { 0, 2, 1, 1, 1, 1 }, { 1, 3, 2, 2, 2, 2 }, { 2, 4, 3, 3, 3, 3 }, { 3, 0, 4, 4, 4, 4 }, }; + world.create(msg, worldId, worldSize); } SECTION("2 x 2 grid") { @@ -184,11 +107,9 @@ TEST_CASE("Test cartesian communicator", "[mpi]") { 0, 0, 3, 3, 2, 2 }, { 1, 1, 2, 2, 3, 3 }, }; + world.create(msg, worldId, worldSize); } - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Get coordinates from rank for (int i = 0; i < worldSize; i++) { std::vector coords(3, -1); @@ -231,10 +152,11 @@ TEST_CASE("Test cartesian communicator", "[mpi]") } } - tearDown({ &world }); + world.destroy(); } void checkMessage(faabric::MPIMessage& actualMessage, + int worldId, int senderRank, int destRank, const std::vector& data) @@ -254,15 +176,8 @@ void checkMessage(faabric::MPIMessage& actualMessage, REQUIRE(actualData == data); } -TEST_CASE("Test send and recv on same host", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test send and recv on same host", "[mpi]") { - cleanFaabric(); - - faabric::Message msg = faabric::util::messageFactory(user, func); - msg.set_mpiworldsize(2); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Send a message between colocated ranks int rankA1 = 0; int rankA2 = 1; @@ -282,7 +197,7 @@ TEST_CASE("Test send and recv on same host", "[mpi]") const std::shared_ptr& queueA2 = world.getLocalQueue(rankA1, rankA2); faabric::MPIMessage actualMessage = *(queueA2->dequeue()); - checkMessage(actualMessage, rankA1, rankA2, messageData); + checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); } SECTION("Test recv") @@ -300,18 +215,10 @@ TEST_CASE("Test send and recv on same host", "[mpi]") REQUIRE(status.MPI_SOURCE == rankA1); REQUIRE(status.bytesSize == messageData.size() * sizeof(int)); } - - tearDown({ &world }); } -TEST_CASE("Test sendrecv", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test sendrecv", "[mpi]") { - cleanFaabric(); - - auto msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Prepare data int rankA = 1; int rankB = 2; @@ -362,18 +269,10 @@ TEST_CASE("Test sendrecv", "[mpi]") // Test integrity of results REQUIRE(recvBufferA == messageDataBA); REQUIRE(recvBufferB == messageDataAB); - - tearDown({ &world }); } -TEST_CASE("Test ring sendrecv", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test ring sendrecv", "[mpi]") { - cleanFaabric(); - - auto msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Use five processes std::vector ranks = { 0, 1, 2, 3, 4 }; @@ -410,18 +309,10 @@ TEST_CASE("Test ring sendrecv", "[mpi]") t.join(); } } - - tearDown({ &world }); } -TEST_CASE("Test async send and recv", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test async send and recv", "[mpi]") { - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Send a couple of async messages (from both to each other) int rankA = 1; int rankB = 2; @@ -448,105 +339,10 @@ TEST_CASE("Test async send and recv", "[mpi]") REQUIRE(actualA == messageDataA); REQUIRE(actualB == messageDataB); - - tearDown({ &world }); } -TEST_CASE("Test send across hosts", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test send/recv message with no data", "[mpi]") { - cleanFaabric(); - - // Start a server on this host - FunctionCallServer server; - server.start(); - usleep(1000 * 100); - - auto& sch = faabric::scheduler::getScheduler(); - - // Force the scheduler to initialise a world in the remote host by setting - // a worldSize bigger than the slots available locally - int worldSize = 2; - faabric::HostResources localResources; - localResources.set_slots(1); - localResources.set_usedslots(1); - faabric::HostResources otherResources; - otherResources.set_slots(1); - - // Set up a remote host - std::string otherHost = LOCALHOST; - sch.addHostToGlobalSet(otherHost); - - // Mock everything to make sure the other host has resources as well - faabric::util::setMockMode(true); - sch.setThisHostResources(localResources); - faabric::scheduler::queueResourceResponse(otherHost, otherResources); - - // Set up the world on this host - faabric::Message msg = faabric::util::messageFactory(user, func); - msg.set_mpiworldid(worldId); - msg.set_mpiworldsize(worldSize); - - // Create the local world - scheduler::MpiWorld& localWorld = - getMpiWorldRegistry().createWorld(msg, worldId); - - scheduler::MpiWorld remoteWorld; - remoteWorld.overrideHost(otherHost); - remoteWorld.initialiseFromMsg(msg); - - // Register two ranks (one on each host) - int rankA = 0; - int rankB = 1; - - std::vector messageData = { 0, 1, 2 }; - - // Undo the mocking, so we actually send the MPI message - faabric::util::setMockMode(false); - - // Send a message that should get sent to this host - remoteWorld.send( - rankB, rankA, BYTES(messageData.data()), MPI_INT, messageData.size()); - usleep(1000 * 100); - - SECTION("Check queueing") - { - REQUIRE(localWorld.getLocalQueueSize(rankB, rankA) == 1); - - // Check message content - faabric::MPIMessage actualMessage = - *(localWorld.getLocalQueue(rankB, rankA)->dequeue()); - checkMessage(actualMessage, rankB, rankA, messageData); - } - - SECTION("Check recv") - { - // Receive the message for the given rank - MPI_Status status{}; - auto buffer = new int[messageData.size()]; - localWorld.recv( - rankB, rankA, BYTES(buffer), MPI_INT, messageData.size(), &status); - - std::vector actual(buffer, buffer + messageData.size()); - REQUIRE(actual == messageData); - - REQUIRE(status.MPI_SOURCE == rankB); - REQUIRE(status.MPI_ERROR == MPI_SUCCESS); - REQUIRE(status.bytesSize == messageData.size() * sizeof(int)); - } - - tearDown({ &localWorld, &remoteWorld }); - - server.stop(); -} - -TEST_CASE("Test send/recv message with no data", "[mpi]") -{ - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - int rankA1 = 1; int rankA2 = 2; @@ -573,18 +369,10 @@ TEST_CASE("Test send/recv message with no data", "[mpi]") REQUIRE(status.MPI_ERROR == MPI_SUCCESS); REQUIRE(status.bytesSize == 0); } - - tearDown({ &world }); } -TEST_CASE("Test recv with partial data", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test recv with partial data", "[mpi]") { - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Send a message with size less than the recipient is expecting std::vector messageData = { 0, 1, 2, 3 }; unsigned long actualSize = messageData.size(); @@ -600,18 +388,10 @@ TEST_CASE("Test recv with partial data", "[mpi]") REQUIRE(status.MPI_SOURCE == 1); REQUIRE(status.MPI_ERROR == MPI_SUCCESS); REQUIRE(status.bytesSize == actualSize * sizeof(int)); - - tearDown({ &world }); } -TEST_CASE("Test probe", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test probe", "[mpi]") { - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - // Send two messages of different sizes std::vector messageData = { 0, 1, 2, 3, 4, 5, 6 }; unsigned long sizeA = 2; @@ -648,183 +428,78 @@ TEST_CASE("Test probe", "[mpi]") // Receive the next message auto bufferB = new int[sizeB]; world.recv(1, 2, BYTES(bufferB), MPI_INT, sizeB * sizeof(int), nullptr); - - tearDown({ &world }); } -TEST_CASE("Check sending to invalid rank", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Check sending to invalid rank", "[mpi]") { - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - world.create(msg, worldId, worldSize); - std::vector input = { 0, 1, 2, 3 }; int invalidRank = worldSize + 2; REQUIRE_THROWS(world.send(0, invalidRank, BYTES(input.data()), MPI_INT, 4)); - - tearDown({ &world }); } -TEST_CASE("Test collective messaging locally and across hosts", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test collective messaging locally", "[mpi]") { - cleanFaabric(); - - FunctionCallServer server; - server.start(); - usleep(1000 * 100); - - auto& sch = faabric::scheduler::getScheduler(); - - // Here we rely on the scheduler running out of resources, and overloading - // the localWorld with ranks 4 and 5 - int thisWorldSize = 6; - faabric::HostResources localResources; - localResources.set_slots(1); - localResources.set_usedslots(1); - faabric::HostResources otherResources; - otherResources.set_slots(3); - - // Set up a remote host - std::string otherHost = LOCALHOST; - sch.addHostToGlobalSet(otherHost); - - // Mock everything to make sure the other host has resources as well - faabric::util::setMockMode(true); - sch.setThisHostResources(localResources); - faabric::scheduler::queueResourceResponse(otherHost, otherResources); - - faabric::Message msg = faabric::util::messageFactory(user, func); - msg.set_mpiworldid(worldId); - msg.set_mpiworldsize(thisWorldSize); - - MpiWorld& localWorld = getMpiWorldRegistry().createWorld(msg, worldId); - - scheduler::MpiWorld remoteWorld; - remoteWorld.overrideHost(otherHost); - remoteWorld.initialiseFromMsg(msg); - - // Unset mock mode to actually send remote MPI messages - faabric::util::setMockMode(false); - - // Register ranks on both hosts - int remoteRankA = 1; - int remoteRankB = 2; - int remoteRankC = 3; - - int localRankA = 4; - int localRankB = 5; - - // Note that ranks are deliberately out of order - std::vector remoteWorldRanks = { remoteRankB, - remoteRankC, - remoteRankA }; - std::vector localWorldRanks = { localRankB, localRankA, 0 }; + int root = 3; SECTION("Broadcast") { - // Broadcast a message + // Broadcast a message from the root std::vector messageData = { 0, 1, 2 }; - remoteWorld.broadcast( - remoteRankB, BYTES(messageData.data()), MPI_INT, messageData.size()); + world.broadcast( + root, BYTES(messageData.data()), MPI_INT, messageData.size()); - // Check the host that the root is on - for (int rank : remoteWorldRanks) { - if (rank == remoteRankB) { + // Recv on all non-root ranks + for (int rank = 0; rank < worldSize; rank++) { + if (rank == root) { continue; } - - std::vector actual(3, -1); - remoteWorld.recv( - remoteRankB, rank, BYTES(actual.data()), MPI_INT, 3, nullptr); - REQUIRE(actual == messageData); - } - - // Check the local host - for (int rank : localWorldRanks) { std::vector actual(3, -1); - localWorld.recv( - remoteRankB, rank, BYTES(actual.data()), MPI_INT, 3, nullptr); + world.recv(root, rank, BYTES(actual.data()), MPI_INT, 3, nullptr); REQUIRE(actual == messageData); } } + // TODO - this is not scatter's behaviour, FIX SECTION("Scatter") { // Build the data int nPerRank = 4; + std::vector actual(nPerRank, -1); int dataSize = nPerRank * worldSize; std::vector messageData(dataSize, 0); for (int i = 0; i < dataSize; i++) { messageData[i] = i; } - // Do the scatter - std::vector actual(nPerRank, -1); - remoteWorld.scatter(remoteRankB, - remoteRankB, - BYTES(messageData.data()), - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - - // Check for root - REQUIRE(actual == std::vector({ 8, 9, 10, 11 })); - - // Check for other remote ranks - remoteWorld.scatter(remoteRankB, - remoteRankA, - nullptr, - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - REQUIRE(actual == std::vector({ 4, 5, 6, 7 })); - - remoteWorld.scatter(remoteRankB, - remoteRankC, - nullptr, - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - REQUIRE(actual == std::vector({ 12, 13, 14, 15 })); - - // Check for local ranks - localWorld.scatter(remoteRankB, - 0, - nullptr, - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - REQUIRE(actual == std::vector({ 0, 1, 2, 3 })); - - localWorld.scatter(remoteRankB, - localRankB, - nullptr, - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - REQUIRE(actual == std::vector({ 20, 21, 22, 23 })); - - localWorld.scatter(remoteRankB, - localRankA, - nullptr, - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); - REQUIRE(actual == std::vector({ 16, 17, 18, 19 })); + // Do the root first + world.scatter(root, + root, + BYTES(messageData.data()), + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + + for (int rank = 0; rank < worldSize; rank++) { + // Do the scatter + if (rank == root) { + continue; + } + world.scatter(root, + rank, + BYTES(messageData.data()), + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + + // Check the results + REQUIRE(actual == std::vector( + messageData.begin() + rank * nPerRank, + messageData.begin() + (rank + 1) * nPerRank)); + } } SECTION("Gather and allgather") @@ -832,7 +507,7 @@ TEST_CASE("Test collective messaging locally and across hosts", "[mpi]") // Build the data for each rank int nPerRank = 4; std::vector> rankData; - for (int i = 0; i < thisWorldSize; i++) { + for (int i = 0; i < worldSize; i++) { std::vector thisRankData; for (int j = 0; j < nPerRank; j++) { thisRankData.push_back((i * nPerRank) + j); @@ -843,59 +518,43 @@ TEST_CASE("Test collective messaging locally and across hosts", "[mpi]") // Build the expectation std::vector expected; - for (int i = 0; i < thisWorldSize * nPerRank; i++) { + for (int i = 0; i < worldSize * nPerRank; i++) { expected.push_back(i); } SECTION("Gather") { - std::vector actual(thisWorldSize * nPerRank, -1); + std::vector actual(worldSize * nPerRank, -1); // Call gather for each rank other than the root (out of order) - int root = localRankA; - for (int rank : remoteWorldRanks) { - remoteWorld.gather(rank, - root, - BYTES(rankData[rank].data()), - MPI_INT, - nPerRank, - nullptr, - MPI_INT, - nPerRank); - } - - for (int rank : localWorldRanks) { + for (int rank = 0; rank < worldSize; rank++) { if (rank == root) { continue; } - localWorld.gather(rank, - root, - BYTES(rankData[rank].data()), - MPI_INT, - nPerRank, - nullptr, - MPI_INT, - nPerRank); + world.gather(rank, + root, + BYTES(rankData[rank].data()), + MPI_INT, + nPerRank, + nullptr, + MPI_INT, + nPerRank); } // Call gather for root - localWorld.gather(root, - root, - BYTES(rankData[root].data()), - MPI_INT, - nPerRank, - BYTES(actual.data()), - MPI_INT, - nPerRank); + world.gather(root, + root, + BYTES(rankData[root].data()), + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); // Check data REQUIRE(actual == expected); } } - - tearDown({ &localWorld, &remoteWorld }); - - server.stop(); } template @@ -998,26 +657,18 @@ template void doReduceTest(scheduler::MpiWorld& world, std::vector> rankData, std::vector& expected); -TEST_CASE("Test reduce", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test reduce", "[mpi]") { - cleanFaabric(); - - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - int thisWorldSize = 5; - world.create(msg, worldId, thisWorldSize); - // Prepare inputs int root = 3; SECTION("Integers") { - std::vector> rankData(thisWorldSize, - std::vector(3)); + std::vector> rankData(worldSize, std::vector(3)); std::vector expected(3, 0); // Prepare rank data - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { rankData[r][0] = r; rankData[r][1] = r * 10; rankData[r][2] = r * 100; @@ -1025,7 +676,7 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Sum operator") { - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { expected[0] += rankData[r][0]; expected[1] += rankData[r][1]; expected[2] += rankData[r][2]; @@ -1037,9 +688,9 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Max operator") { - expected[0] = (thisWorldSize - 1); - expected[1] = (thisWorldSize - 1) * 10; - expected[2] = (thisWorldSize - 1) * 100; + expected[0] = (worldSize - 1); + expected[1] = (worldSize - 1) * 10; + expected[2] = (worldSize - 1) * 100; doReduceTest( world, root, MPI_MAX, MPI_INT, rankData, expected); @@ -1049,7 +700,7 @@ TEST_CASE("Test reduce", "[mpi]") { // Initialize rankData to non-zero values. This catches faulty // reduce implementations that always return zero - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { rankData[r][0] = (r + 1); rankData[r][1] = (r + 1) * 10; rankData[r][2] = (r + 1) * 100; @@ -1066,12 +717,12 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Doubles") { - std::vector> rankData(thisWorldSize, + std::vector> rankData(worldSize, std::vector(3)); std::vector expected(3, 0); // Prepare rank data - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { rankData[r][0] = 2.5 + r; rankData[r][1] = (2.5 + r) * 10; rankData[r][2] = (2.5 + r) * 100; @@ -1079,7 +730,7 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Sum operator") { - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { expected[0] += rankData[r][0]; expected[1] += rankData[r][1]; expected[2] += rankData[r][2]; @@ -1091,9 +742,9 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Max operator") { - expected[0] = (2.5 + thisWorldSize - 1); - expected[1] = (2.5 + thisWorldSize - 1) * 10; - expected[2] = (2.5 + thisWorldSize - 1) * 100; + expected[0] = (2.5 + worldSize - 1); + expected[1] = (2.5 + worldSize - 1) * 10; + expected[2] = (2.5 + worldSize - 1) * 100; doReduceTest( world, root, MPI_MAX, MPI_DOUBLE, rankData, expected); @@ -1112,12 +763,12 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Long long") { - std::vector> rankData(thisWorldSize, + std::vector> rankData(worldSize, std::vector(3)); std::vector expected(3, 0); // Prepare rank data - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { rankData[r][0] = (r + 1); rankData[r][1] = (r + 1) * 10; rankData[r][2] = (r + 1) * 100; @@ -1125,7 +776,7 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Sum operator") { - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { expected[0] += rankData[r][0]; expected[1] += rankData[r][1]; expected[2] += rankData[r][2]; @@ -1137,9 +788,9 @@ TEST_CASE("Test reduce", "[mpi]") SECTION("Max operator") { - expected[0] = thisWorldSize; - expected[1] = thisWorldSize * 10; - expected[2] = thisWorldSize * 100; + expected[0] = worldSize; + expected[1] = worldSize * 10; + expected[2] = worldSize * 100; doReduceTest( world, root, MPI_MAX, MPI_DOUBLE, rankData, expected); @@ -1155,17 +806,10 @@ TEST_CASE("Test reduce", "[mpi]") world, root, MPI_MIN, MPI_DOUBLE, rankData, expected); } } - - tearDown({ &world }); } -TEST_CASE("Test operator reduce", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test operator reduce", "[mpi]") { - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - int thisWorldSize = 5; - world.create(msg, worldId, thisWorldSize); - SECTION("Max") { SECTION("Integers") @@ -1336,23 +980,16 @@ TEST_CASE("Test operator reduce", "[mpi]") (uint8_t*)output.data())); } } - - tearDown({ &world }); } -TEST_CASE("Test gather and allgather", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test gather and allgather", "[mpi]") { - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - int thisWorldSize = 5; int root = 3; - world.create(msg, worldId, thisWorldSize); - // Build up per-rank data and expectation int nPerRank = 3; - int gatheredSize = nPerRank * thisWorldSize; - std::vector> rankData(thisWorldSize, + int gatheredSize = nPerRank * worldSize; + std::vector> rankData(worldSize, std::vector(nPerRank)); std::vector expected(gatheredSize, 0); for (int i = 0; i < gatheredSize; i++) { @@ -1367,7 +1004,7 @@ TEST_CASE("Test gather and allgather", "[mpi]") SECTION("Gather") { // Run gather on all non-root ranks - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { if (r == root) { continue; } @@ -1426,7 +1063,7 @@ TEST_CASE("Test gather and allgather", "[mpi]") // Run allgather in threads std::vector threads; - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { threads.emplace_back([&, r, isInPlace] { if (isInPlace) { // Put this rank's data in place in the recv buffer as @@ -1465,31 +1102,23 @@ TEST_CASE("Test gather and allgather", "[mpi]") REQUIRE(actual == expected); } - - tearDown({ &world }); } -TEST_CASE("Test scan", "[mpi]") +TEST_CASE_METHOD(MpiTestFixture, "Test scan", "[mpi]") { - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - int thisWorldSize = 5; int count = 3; - world.create(msg, worldId, thisWorldSize); // Prepare input data - std::vector> rankData(thisWorldSize, - std::vector(count)); - for (int r = 0; r < thisWorldSize; r++) { + std::vector> rankData(worldSize, std::vector(count)); + for (int r = 0; r < worldSize; r++) { for (int i = 0; i < count; i++) { rankData[r][i] = r * 10 + i; } } // Prepare expected values - std::vector> expected(thisWorldSize, - std::vector(count)); - for (int r = 0; r < thisWorldSize; r++) { + std::vector> expected(worldSize, std::vector(count)); + for (int r = 0; r < worldSize; r++) { for (int i = 0; i < count; i++) { if (r == 0) { expected[r][i] = rankData[r][i]; @@ -1504,9 +1133,8 @@ TEST_CASE("Test scan", "[mpi]") SECTION("Not in place") { inPlace = false; } // Run the scan operation - std::vector> result(thisWorldSize, - std::vector(count)); - for (int r = 0; r < thisWorldSize; r++) { + std::vector> result(worldSize, std::vector(count)); + for (int r = 0; r < worldSize; r++) { if (inPlace) { world.scan(r, BYTES(rankData[r].data()), @@ -1525,16 +1153,16 @@ TEST_CASE("Test scan", "[mpi]") REQUIRE(result[r] == expected[r]); } } - - tearDown({ &world }); } -TEST_CASE("Test all-to-all", "[mpi]") +TEST_CASE_METHOD(MpiBaseTestFixture, "Test all-to-all", "[mpi]") { - const faabric::Message& msg = faabric::util::messageFactory(user, func); - scheduler::MpiWorld world; - int thisWorldSize = 4; - world.create(msg, worldId, thisWorldSize); + // For this test we need a fixed world size of 4, otherwise the built + // expectation won't match + int worldSize = 4; + msg.set_mpiworldsize(worldSize); + MpiWorld world; + world.create(msg, worldId, worldSize); // Build inputs and expected int inputs[4][8] = { @@ -1552,7 +1180,7 @@ TEST_CASE("Test all-to-all", "[mpi]") }; std::vector threads; - for (int r = 0; r < thisWorldSize; r++) { + for (int r = 0; r < worldSize; r++) { threads.emplace_back([&, r] { std::vector actual(8, 0); world.allToAll(r, @@ -1574,6 +1202,6 @@ TEST_CASE("Test all-to-all", "[mpi]") } } - tearDown({ &world }); + world.destroy(); } } diff --git a/tests/test/scheduler/test_remote_mpi_worlds.cpp b/tests/test/scheduler/test_remote_mpi_worlds.cpp new file mode 100644 index 000000000..947311eb1 --- /dev/null +++ b/tests/test/scheduler/test_remote_mpi_worlds.cpp @@ -0,0 +1,298 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace faabric::scheduler; + +namespace tests { +TEST_CASE_METHOD(RemoteMpiTestFixture, "Test rank allocation", "[mpi]") +{ + // Allocate two ranks in total, one rank per host + this->setWorldsSizes(2, 1, 1); + + // Init worlds + MpiWorld& localWorld = getMpiWorldRegistry().createWorld(msg, worldId); + remoteWorld.initialiseFromMsg(msg); + faabric::util::setMockMode(false); + + // Now check both world instances report the same mappings + REQUIRE(localWorld.getHostForRank(0) == thisHost); + REQUIRE(localWorld.getHostForRank(1) == otherHost); + + // Destroy worlds + localWorld.destroy(); + remoteWorld.destroy(); +} + +TEST_CASE_METHOD(RemoteMpiTestFixture, "Test send across hosts", "[mpi]") +{ + // Start a server on this host + FunctionCallServer server; + server.start(); + usleep(1000 * 100); + + // Register two ranks (one on each host) + this->setWorldsSizes(2, 1, 1); + int rankA = 0; + int rankB = 1; + std::vector messageData = { 0, 1, 2 }; + + // Init worlds + MpiWorld& localWorld = getMpiWorldRegistry().createWorld(msg, worldId); + remoteWorld.initialiseFromMsg(msg); + faabric::util::setMockMode(false); + + // Send a message that should get sent to this host + remoteWorld.send( + rankB, rankA, BYTES(messageData.data()), MPI_INT, messageData.size()); + usleep(1000 * 100); + + SECTION("Check queueing") + { + REQUIRE(localWorld.getLocalQueueSize(rankB, rankA) == 1); + + // Check message content + faabric::MPIMessage actualMessage = + *(localWorld.getLocalQueue(rankB, rankA)->dequeue()); + REQUIRE(actualMessage.worldid() == worldId); + REQUIRE(actualMessage.count() == messageData.size()); + REQUIRE(actualMessage.sender() == rankB); + REQUIRE(actualMessage.destination() == rankA); + } + + SECTION("Check recv") + { + // Receive the message for the given rank + MPI_Status status{}; + auto buffer = new int[messageData.size()]; + localWorld.recv( + rankB, rankA, BYTES(buffer), MPI_INT, messageData.size(), &status); + + std::vector actual(buffer, buffer + messageData.size()); + REQUIRE(actual == messageData); + + REQUIRE(status.MPI_SOURCE == rankB); + REQUIRE(status.MPI_ERROR == MPI_SUCCESS); + REQUIRE(status.bytesSize == messageData.size() * sizeof(int)); + } + + // Destroy worlds + localWorld.destroy(); + remoteWorld.destroy(); + + server.stop(); +} + +TEST_CASE_METHOD(RemoteMpiTestFixture, + "Test collective messaging across hosts", + "[mpi]") +{ + FunctionCallServer server; + server.start(); + usleep(1000 * 100); + + // Here we rely on the scheduler running out of resources, and overloading + // the localWorld with ranks 4 and 5 + int thisWorldSize = 6; + this->setWorldsSizes(thisWorldSize, 1, 3); + int remoteRankA = 1; + int remoteRankB = 2; + int remoteRankC = 3; + int localRankA = 4; + int localRankB = 5; + + // Init worlds + MpiWorld& localWorld = getMpiWorldRegistry().createWorld(msg, worldId); + remoteWorld.initialiseFromMsg(msg); + faabric::util::setMockMode(false); + + // Note that ranks are deliberately out of order + std::vector remoteWorldRanks = { remoteRankB, + remoteRankC, + remoteRankA }; + std::vector localWorldRanks = { localRankB, localRankA, 0 }; + + SECTION("Broadcast") + { + // Broadcast a message + std::vector messageData = { 0, 1, 2 }; + remoteWorld.broadcast( + remoteRankB, BYTES(messageData.data()), MPI_INT, messageData.size()); + + // Check the host that the root is on + for (int rank : remoteWorldRanks) { + if (rank == remoteRankB) { + continue; + } + + std::vector actual(3, -1); + remoteWorld.recv( + remoteRankB, rank, BYTES(actual.data()), MPI_INT, 3, nullptr); + REQUIRE(actual == messageData); + } + + // Check the local host + for (int rank : localWorldRanks) { + std::vector actual(3, -1); + localWorld.recv( + remoteRankB, rank, BYTES(actual.data()), MPI_INT, 3, nullptr); + REQUIRE(actual == messageData); + } + } + + SECTION("Scatter") + { + // Build the data + int nPerRank = 4; + int dataSize = nPerRank * thisWorldSize; + std::vector messageData(dataSize, 0); + for (int i = 0; i < dataSize; i++) { + messageData[i] = i; + } + + // Do the scatter + std::vector actual(nPerRank, -1); + remoteWorld.scatter(remoteRankB, + remoteRankB, + BYTES(messageData.data()), + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + + // Check for root + REQUIRE(actual == std::vector({ 8, 9, 10, 11 })); + + // Check for other remote ranks + remoteWorld.scatter(remoteRankB, + remoteRankA, + nullptr, + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + REQUIRE(actual == std::vector({ 4, 5, 6, 7 })); + + remoteWorld.scatter(remoteRankB, + remoteRankC, + nullptr, + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + REQUIRE(actual == std::vector({ 12, 13, 14, 15 })); + + // Check for local ranks + localWorld.scatter(remoteRankB, + 0, + nullptr, + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + REQUIRE(actual == std::vector({ 0, 1, 2, 3 })); + + localWorld.scatter(remoteRankB, + localRankB, + nullptr, + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + REQUIRE(actual == std::vector({ 20, 21, 22, 23 })); + + localWorld.scatter(remoteRankB, + localRankA, + nullptr, + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + REQUIRE(actual == std::vector({ 16, 17, 18, 19 })); + } + + SECTION("Gather and allgather") + { + // Build the data for each rank + int nPerRank = 4; + std::vector> rankData; + for (int i = 0; i < thisWorldSize; i++) { + std::vector thisRankData; + for (int j = 0; j < nPerRank; j++) { + thisRankData.push_back((i * nPerRank) + j); + } + + rankData.push_back(thisRankData); + } + + // Build the expectation + std::vector expected; + for (int i = 0; i < thisWorldSize * nPerRank; i++) { + expected.push_back(i); + } + + SECTION("Gather") + { + std::vector actual(thisWorldSize * nPerRank, -1); + + // Call gather for each rank other than the root (out of order) + int root = localRankA; + for (int rank : remoteWorldRanks) { + remoteWorld.gather(rank, + root, + BYTES(rankData[rank].data()), + MPI_INT, + nPerRank, + nullptr, + MPI_INT, + nPerRank); + } + + for (int rank : localWorldRanks) { + if (rank == root) { + continue; + } + localWorld.gather(rank, + root, + BYTES(rankData[rank].data()), + MPI_INT, + nPerRank, + nullptr, + MPI_INT, + nPerRank); + } + + // Call gather for root + localWorld.gather(root, + root, + BYTES(rankData[root].data()), + MPI_INT, + nPerRank, + BYTES(actual.data()), + MPI_INT, + nPerRank); + + // Check data + REQUIRE(actual == expected); + } + } + + // Destroy worlds + localWorld.destroy(); + remoteWorld.destroy(); + + server.stop(); +} +} diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index c7c783420..1568b3cf4 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -1,12 +1,18 @@ #pragma once #include +#include +#include +#include #include #include #include #include +#include #include +#include "DummyExecutorFactory.h" + namespace tests { class RedisTestFixture { @@ -110,4 +116,93 @@ class ConfTestFixture protected: faabric::util::SystemConfig& conf; }; + +class MpiBaseTestFixture : public SchedulerTestFixture +{ + public: + MpiBaseTestFixture() + : user("mpi") + , func("hellompi") + , worldId(123) + , worldSize(5) + , msg(faabric::util::messageFactory(user, func)) + { + std::shared_ptr fac = + std::make_shared(); + faabric::scheduler::setExecutorFactory(fac); + + auto& mpiRegistry = faabric::scheduler::getMpiWorldRegistry(); + mpiRegistry.clear(); + + msg.set_mpiworldid(worldId); + msg.set_mpiworldsize(worldSize); + } + + ~MpiBaseTestFixture() + { + auto& mpiRegistry = faabric::scheduler::getMpiWorldRegistry(); + mpiRegistry.clear(); + } + + protected: + const std::string user; + const std::string func; + int worldId; + int worldSize; + + faabric::Message msg; +}; + +class MpiTestFixture : public MpiBaseTestFixture +{ + public: + MpiTestFixture() { world.create(msg, worldId, worldSize); } + + ~MpiTestFixture() { world.destroy(); } + + protected: + faabric::scheduler::MpiWorld world; +}; + +class RemoteMpiTestFixture : public MpiBaseTestFixture +{ + public: + RemoteMpiTestFixture() + : thisHost(faabric::util::getSystemConfig().endpointHost) + , otherHost(LOCALHOST) + { + remoteWorld.overrideHost(otherHost); + } + + void setWorldsSizes(int worldSize, int ranksWorldOne, int ranksWorldTwo) + { + // Update message + msg.set_mpiworldsize(worldSize); + + // Set local ranks + faabric::HostResources localResources; + localResources.set_slots(ranksWorldOne); + // Account for the master rank that is already running in this world + localResources.set_usedslots(1); + // Set remote ranks + faabric::HostResources otherResources; + otherResources.set_slots(ranksWorldTwo); + // Note that the remaining ranks will be allocated to the world + // with the master host + + std::string otherHost = LOCALHOST; + sch.addHostToGlobalSet(otherHost); + + // Mock everything to make sure the other host has resources as well + faabric::util::setMockMode(true); + sch.setThisHostResources(localResources); + faabric::scheduler::queueResourceResponse(otherHost, otherResources); + } + + protected: + std::string thisHost; + std::string otherHost; + + faabric::scheduler::MpiWorld remoteWorld; +}; }