Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change flush to execute once per host #112

Merged
merged 4 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/faabric/scheduler/ExecutorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class ExecutorFactory
virtual ~ExecutorFactory(){};

virtual std::shared_ptr<Executor> createExecutor(faabric::Message& msg) = 0;

virtual void flushHost();
};

void setExecutorFactory(std::shared_ptr<ExecutorFactory> fac);
Expand Down
2 changes: 0 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class Executor

void finish();

virtual void flush();

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
Expand Down
2 changes: 0 additions & 2 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ int32_t Executor::executeTask(int threadPoolIdx,

void Executor::postFinish() {}

void Executor::flush() {}

void Executor::reset(faabric::Message& msg) {}

faabric::util::SnapshotData Executor::snapshot()
Expand Down
6 changes: 6 additions & 0 deletions src/scheduler/ExecutorFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/util/logging.h>

namespace faabric::scheduler {

static std::shared_ptr<ExecutorFactory> _factory;

void ExecutorFactory::flushHost()
{
SPDLOG_WARN("Using default flush method");
}

void setExecutorFactory(std::shared_ptr<ExecutorFactory> fac)
{
_factory = fac;
Expand Down
5 changes: 1 addition & 4 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ void FunctionCallServer::recvFlush(faabric::transport::Message& body)

// Clear the scheduler
scheduler.flushLocally();

// Reset the scheduler
scheduler.reset();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler reset gets called in flushLocally

}

void FunctionCallServer::recvExecuteFunctions(faabric::transport::Message& body)
Expand All @@ -85,7 +82,7 @@ void FunctionCallServer::recvUnregister(faabric::transport::Message& body)
PARSE_MSG(faabric::UnregisterRequest, body.data(), body.size())

std::string funcStr = faabric::util::funcToString(msg.function(), false);
SPDLOG_INFO("Unregistering host {} for {}", msg.host(), funcStr);
SPDLOG_DEBUG("Unregistering host {} for {}", msg.host(), funcStr);

// Remove the host from the warm set
scheduler.removeRegisteredHost(msg.host(), msg.function());
Expand Down
10 changes: 3 additions & 7 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,11 @@ void Scheduler::flushLocally()
SPDLOG_INFO("Flushing host {}",
faabric::util::getSystemConfig().endpointHost);

// Call flush on all executors
for (auto& p : executors) {
for (auto& e : p.second) {
e->flush();
}
}

// Reset this scheduler
reset();

// Flush the host
getExecutorFactory()->flushHost();
}

void Scheduler::setFunctionResult(faabric::Message& msg)
Expand Down
16 changes: 12 additions & 4 deletions tests/test/scheduler/test_function_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#define TEST_TIMEOUT_MS 500

using namespace scheduler;
using namespace faabric::scheduler;

namespace tests {
class ClientServerFixture
Expand All @@ -30,6 +30,7 @@ class ClientServerFixture
protected:
FunctionCallServer server;
FunctionCallClient cli;
std::shared_ptr<DummyExecutorFactory> executorFactory;

public:
ClientServerFixture()
Expand All @@ -39,15 +40,15 @@ class ClientServerFixture
usleep(1000 * TEST_TIMEOUT_MS);

// Set up executor
std::shared_ptr<faabric::scheduler::ExecutorFactory> fac =
std::make_shared<faabric::scheduler::DummyExecutorFactory>();
faabric::scheduler::setExecutorFactory(fac);
executorFactory = std::make_shared<DummyExecutorFactory>();
setExecutorFactory(executorFactory);
}

~ClientServerFixture()
{
cli.close();
server.stop();
executorFactory->reset();
}
};

Expand Down Expand Up @@ -126,6 +127,9 @@ TEST_CASE_METHOD(ClientServerFixture,
"Test sending flush message",
"[scheduler]")
{
// Check no flushes to begin with
REQUIRE(executorFactory->getFlushCount() == 0);

// Set up some state
faabric::state::State& state = faabric::state::getGlobalState();
state.getKV("demo", "blah", 10);
Expand Down Expand Up @@ -156,6 +160,10 @@ TEST_CASE_METHOD(ClientServerFixture,

// Check state has been cleared
REQUIRE(state.getKVCount() == 0);

// Check the flush hook has been called
int flushCount = executorFactory->getFlushCount();
REQUIRE(flushCount == 1);
}

TEST_CASE_METHOD(ClientServerFixture,
Expand Down
17 changes: 17 additions & 0 deletions tests/utils/DummyExecutorFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
#include "DummyExecutorFactory.h"
#include "DummyExecutor.h"

#include <faabric/util/logging.h>

namespace faabric::scheduler {

std::shared_ptr<Executor> DummyExecutorFactory::createExecutor(
faabric::Message& msg)
{
return std::make_shared<DummyExecutor>(msg);
}

int DummyExecutorFactory::getFlushCount()
{
return flushCount;
}

void DummyExecutorFactory::flushHost()
{
flushCount++;
}

void DummyExecutorFactory::reset()
{
flushCount = 0;
}
}
10 changes: 10 additions & 0 deletions tests/utils/DummyExecutorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@ namespace faabric::scheduler {

class DummyExecutorFactory : public ExecutorFactory
{
public:
void reset();

int getFlushCount();

protected:
std::shared_ptr<Executor> createExecutor(faabric::Message& msg) override;

void flushHost() override;

private:
int flushCount = 0;
};
}