diff --git a/include/faabric/scheduler/ExecutorFactory.h b/include/faabric/scheduler/ExecutorFactory.h index b06f0e36d..bb71d67ce 100644 --- a/include/faabric/scheduler/ExecutorFactory.h +++ b/include/faabric/scheduler/ExecutorFactory.h @@ -10,6 +10,8 @@ class ExecutorFactory virtual ~ExecutorFactory(){}; virtual std::shared_ptr createExecutor(faabric::Message& msg) = 0; + + virtual void flushHost(); }; void setExecutorFactory(std::shared_ptr fac); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index dcaa7d992..725feb224 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -36,8 +36,6 @@ class Executor void finish(); - virtual void flush(); - virtual void reset(faabric::Message& msg); virtual int32_t executeTask( diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 2eac32f55..e1005c8e1 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -311,8 +311,6 @@ int32_t Executor::executeTask(int threadPoolIdx, void Executor::postFinish() {} -void Executor::flush() {} - void Executor::reset(faabric::Message& msg) {} faabric::util::SnapshotData Executor::snapshot() diff --git a/src/scheduler/ExecutorFactory.cpp b/src/scheduler/ExecutorFactory.cpp index 399e023cd..01d9a2490 100644 --- a/src/scheduler/ExecutorFactory.cpp +++ b/src/scheduler/ExecutorFactory.cpp @@ -1,9 +1,15 @@ #include +#include namespace faabric::scheduler { static std::shared_ptr _factory; +void ExecutorFactory::flushHost() +{ + SPDLOG_WARN("Using default flush method"); +} + void setExecutorFactory(std::shared_ptr fac) { _factory = fac; diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 0f2526b89..e4ac1e1ef 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -66,9 +66,6 @@ void FunctionCallServer::recvFlush(faabric::transport::Message& body) // Clear the scheduler scheduler.flushLocally(); - - // Reset the scheduler - scheduler.reset(); } void FunctionCallServer::recvExecuteFunctions(faabric::transport::Message& body) @@ -85,7 +82,7 @@ void FunctionCallServer::recvUnregister(faabric::transport::Message& body) PARSE_MSG(faabric::UnregisterRequest, body.data(), body.size()) std::string funcStr = faabric::util::funcToString(msg.function(), false); - SPDLOG_INFO("Unregistering host {} for {}", msg.host(), funcStr); + SPDLOG_DEBUG("Unregistering host {} for {}", msg.host(), funcStr); // Remove the host from the warm set scheduler.removeRegisteredHost(msg.host(), msg.function()); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index ea49b9709..0df99e888 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -677,15 +677,11 @@ void Scheduler::flushLocally() SPDLOG_INFO("Flushing host {}", faabric::util::getSystemConfig().endpointHost); - // Call flush on all executors - for (auto& p : executors) { - for (auto& e : p.second) { - e->flush(); - } - } - // Reset this scheduler reset(); + + // Flush the host + getExecutorFactory()->flushHost(); } void Scheduler::setFunctionResult(faabric::Message& msg) diff --git a/tests/test/scheduler/test_function_client_server.cpp b/tests/test/scheduler/test_function_client_server.cpp index f90684f77..de5a33029 100644 --- a/tests/test/scheduler/test_function_client_server.cpp +++ b/tests/test/scheduler/test_function_client_server.cpp @@ -19,7 +19,7 @@ #define TEST_TIMEOUT_MS 500 -using namespace scheduler; +using namespace faabric::scheduler; namespace tests { class ClientServerFixture @@ -30,6 +30,7 @@ class ClientServerFixture protected: FunctionCallServer server; FunctionCallClient cli; + std::shared_ptr executorFactory; public: ClientServerFixture() @@ -39,15 +40,15 @@ class ClientServerFixture usleep(1000 * TEST_TIMEOUT_MS); // Set up executor - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + executorFactory = std::make_shared(); + setExecutorFactory(executorFactory); } ~ClientServerFixture() { cli.close(); server.stop(); + executorFactory->reset(); } }; @@ -126,6 +127,9 @@ TEST_CASE_METHOD(ClientServerFixture, "Test sending flush message", "[scheduler]") { + // Check no flushes to begin with + REQUIRE(executorFactory->getFlushCount() == 0); + // Set up some state faabric::state::State& state = faabric::state::getGlobalState(); state.getKV("demo", "blah", 10); @@ -156,6 +160,10 @@ TEST_CASE_METHOD(ClientServerFixture, // Check state has been cleared REQUIRE(state.getKVCount() == 0); + + // Check the flush hook has been called + int flushCount = executorFactory->getFlushCount(); + REQUIRE(flushCount == 1); } TEST_CASE_METHOD(ClientServerFixture, diff --git a/tests/utils/DummyExecutorFactory.cpp b/tests/utils/DummyExecutorFactory.cpp index f5663fdf8..3d49b513c 100644 --- a/tests/utils/DummyExecutorFactory.cpp +++ b/tests/utils/DummyExecutorFactory.cpp @@ -1,6 +1,8 @@ #include "DummyExecutorFactory.h" #include "DummyExecutor.h" +#include + namespace faabric::scheduler { std::shared_ptr DummyExecutorFactory::createExecutor( @@ -8,4 +10,19 @@ std::shared_ptr DummyExecutorFactory::createExecutor( { return std::make_shared(msg); } + +int DummyExecutorFactory::getFlushCount() +{ + return flushCount; +} + +void DummyExecutorFactory::flushHost() +{ + flushCount++; +} + +void DummyExecutorFactory::reset() +{ + flushCount = 0; +} } diff --git a/tests/utils/DummyExecutorFactory.h b/tests/utils/DummyExecutorFactory.h index b44e50e80..afbd05ea6 100644 --- a/tests/utils/DummyExecutorFactory.h +++ b/tests/utils/DummyExecutorFactory.h @@ -6,7 +6,17 @@ namespace faabric::scheduler { class DummyExecutorFactory : public ExecutorFactory { + public: + void reset(); + + int getFlushCount(); + protected: std::shared_ptr createExecutor(faabric::Message& msg) override; + + void flushHost() override; + + private: + int flushCount = 0; }; }