From ad8fcd3a82a8bf762677117b3fff5aa3834e4c8f Mon Sep 17 00:00:00 2001 From: Carlos Date: Fri, 15 Oct 2021 17:42:24 +0100 Subject: [PATCH] Allow generating execution graphs from MPI runs (#154) * log mpi calls to generate execution graphs * add test for exec graph generation in mpi * setting result in scheduler, and deeply comparing mpi messages * remove extra include * pr comments * manually blacklist/whitelist message fields --- src/scheduler/MpiWorld.cpp | 2 + tests/test/scheduler/test_exec_graph.cpp | 75 ++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 9d6524e3e..83d5223c3 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -192,6 +192,8 @@ void MpiWorld::create(const faabric::Message& call, int newId, int newSize) msg.set_mpiworldid(id); msg.set_mpirank(i + 1); msg.set_mpiworldsize(size); + // Log chained functions to generate execution graphs + sch.logChainedFunction(call.id(), msg.id()); } std::vector executedAt; diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 5154663d1..595c8eb12 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -3,8 +3,11 @@ #include "faabric_utils.h" #include +#include #include +#include #include +#include using namespace scheduler; @@ -67,4 +70,76 @@ TEST_CASE("Test execution graph", "[scheduler]") checkExecGraphEquality(expected, actual); } + +TEST_CASE_METHOD(MpiBaseTestFixture, + "Test MPI execution graph", + "[mpi][scheduler]") +{ + faabric::scheduler::MpiWorld world; + msg.set_ismpi(true); + + // Update the result for the master message + sch.setFunctionResult(msg); + + // Build the message vector to reconstruct the graph + std::vector messages(worldSize); + for (int rank = 0; rank < worldSize; rank++) { + messages.at(rank) = faabric::util::messageFactory("mpi", "hellompi"); + messages.at(rank).set_id(0); + messages.at(rank).set_timestamp(0); + messages.at(rank).set_finishtimestamp(0); + messages.at(rank).set_resultkey(""); + messages.at(rank).set_statuskey(""); + messages.at(rank).set_executedhost( + faabric::util::getSystemConfig().endpointHost); + messages.at(rank).set_ismpi(true); + messages.at(rank).set_mpiworldid(worldId); + messages.at(rank).set_mpirank(rank); + messages.at(rank).set_mpiworldsize(worldSize); + } + + world.create(msg, worldId, worldSize); + + world.destroy(); + + // Build expected graph + ExecGraphNode nodeB1 = { .msg = messages.at(1) }; + ExecGraphNode nodeB2 = { .msg = messages.at(2) }; + ExecGraphNode nodeB3 = { .msg = messages.at(3) }; + ExecGraphNode nodeB4 = { .msg = messages.at(4) }; + + ExecGraphNode nodeA = { .msg = messages.at(0), + .children = { nodeB1, nodeB2, nodeB3, nodeB4 } }; + + ExecGraph expected{ .rootNode = nodeA }; + + // Wait for the MPI messages to finish + sch.getFunctionResult(msg.id(), 500); + for (const auto& id : sch.getChainedFunctions(msg.id())) { + sch.getFunctionResult(id, 500); + } + ExecGraph actual = sch.getFunctionExecGraph(msg.id()); + + // Unset the fields that we can't recreate + actual.rootNode.msg.set_id(0); + actual.rootNode.msg.set_finishtimestamp(0); + actual.rootNode.msg.set_timestamp(0); + actual.rootNode.msg.set_resultkey(""); + actual.rootNode.msg.set_statuskey(""); + actual.rootNode.msg.set_outputdata(""); + for (auto& node : actual.rootNode.children) { + node.msg.set_id(0); + node.msg.set_finishtimestamp(0); + node.msg.set_timestamp(0); + node.msg.set_resultkey(""); + node.msg.set_statuskey(""); + node.msg.set_outputdata(""); + } + + // Check the execution graph + REQUIRE(countExecGraphNodes(actual) == worldSize); + REQUIRE(countExecGraphNodes(expected) == worldSize); + + checkExecGraphEquality(expected, actual); +} }