Skip to content

Commit

Permalink
Fine-grained snapshot diffs (#129)
Browse files Browse the repository at this point in the history
* Return dirty page numbers, add function for low-level change diffs

* Refactor snapshot tests

* Add change diffs test and hook into executor

* Add per-batch executor counts

* Add more detailed test for snapshot diff pushing

* Move snapshot push flag into executor task

* Formatting

* Review comments
  • Loading branch information
Shillaker authored Jul 15, 2021
1 parent 837db87 commit 7872298
Show file tree
Hide file tree
Showing 16 changed files with 358 additions and 129 deletions.
22 changes: 15 additions & 7 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ class Scheduler;

Scheduler& getScheduler();

class ExecutorTask
{
public:
ExecutorTask(int messageIndexIn,
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
std::shared_ptr<std::atomic<int>> batchCounterIn,
bool needsSnapshotPushIn);

int messageIndex = 0;
std::shared_ptr<faabric::BatchExecuteRequest> req;
std::shared_ptr<std::atomic<int>> batchCounter;
bool needsSnapshotPush = false;
};

class Executor
{
public:
Expand Down Expand Up @@ -65,17 +79,11 @@ class Executor

std::atomic<bool> claimed = false;

std::atomic<bool> pendingSnapshotPush = false;

std::atomic<int> executingTaskCount = 0;

std::mutex threadsMutex;
std::vector<std::shared_ptr<std::thread>> threadPoolThreads;
std::vector<std::shared_ptr<std::thread>> deadThreads;

std::vector<faabric::util::Queue<
std::pair<int, std::shared_ptr<faabric::BatchExecuteRequest>>>>
threadQueues;
std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(int threadPoolIdx);
};
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ AlignedChunk getPageAlignedChunk(long offset, long length);
// -------------------------
void resetDirtyTracking();

std::vector<bool> getDirtyPages(const uint8_t* ptr, int nPages);
std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages);
}
6 changes: 5 additions & 1 deletion include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class SnapshotData
int fd = 0;

std::vector<SnapshotDiff> getDirtyPages();
};

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);

void applyDiff(size_t diffOffset, const uint8_t* diffData, size_t diffLen);
};
}
67 changes: 47 additions & 20 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/state/State.h>
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
Expand All @@ -15,12 +16,22 @@

namespace faabric::scheduler {

ExecutorTask::ExecutorTask(int messageIndexIn,
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
std::shared_ptr<std::atomic<int>> batchCounterIn,
bool needsSnapshotPushIn)
: messageIndex(messageIndexIn)
, req(reqIn)
, batchCounter(batchCounterIn)
, needsSnapshotPush(needsSnapshotPushIn)
{}

// TODO - avoid the copy of the message here?
Executor::Executor(faabric::Message& msg)
: boundMessage(msg)
, threadPoolSize(faabric::util::getUsableCores())
, threadPoolThreads(threadPoolSize)
, threadQueues(threadPoolSize)
, threadTaskQueues(threadPoolSize)
{
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();

Expand All @@ -46,7 +57,8 @@ void Executor::finish()

// Send a kill message
SPDLOG_TRACE("Executor {} killing thread pool {}", id, i);
threadQueues.at(i).enqueue(std::make_pair(POOL_SHUTDOWN, nullptr));
threadTaskQueues[i].enqueue(
ExecutorTask(POOL_SHUTDOWN, nullptr, nullptr, false));

// Await the thread
if (threadPoolThreads.at(i)->joinable()) {
Expand All @@ -66,14 +78,13 @@ void Executor::finish()

// Reset variables
boundMessage.Clear();
executingTaskCount = 0;

lastSnapshot = "";

claimed = false;

threadPoolThreads.clear();
threadQueues.clear();
threadTaskQueues.clear();
}

void Executor::executeTasks(std::vector<int> msgIdxs,
Expand Down Expand Up @@ -121,13 +132,15 @@ void Executor::executeTasks(std::vector<int> msgIdxs,

// Reset dirty page tracking if we're executing threads.
// Note this must be done after the restore has happened
if (isThreads && isSnapshot) {
bool needsSnapshotPush = false;
if (isThreads && isSnapshot && !isMaster) {
faabric::util::resetDirtyTracking();
pendingSnapshotPush = true;
needsSnapshotPush = true;
}

// Set executing task count
executingTaskCount += msgIdxs.size();
// Set up shared counter for this batch of tasks
auto batchCounter =
std::make_shared<std::atomic<int>>(req->messages_size());

// Iterate through and invoke tasks
for (int msgIdx : msgIdxs) {
Expand All @@ -146,7 +159,8 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Enqueue the task
SPDLOG_TRACE(
"Assigning app index {} to thread {}", msg.appindex(), threadPoolIdx);
threadQueues[threadPoolIdx].enqueue(std::make_pair(msgIdx, req));
threadTaskQueues[threadPoolIdx].enqueue(
ExecutorTask(msgIdx, req, batchCounter, needsSnapshotPush));

// Lazily create the thread
if (threadPoolThreads.at(threadPoolIdx) == nullptr) {
Expand All @@ -167,10 +181,19 @@ void Executor::threadPoolThread(int threadPoolIdx)

for (;;) {
SPDLOG_TRACE("Thread starting loop {}:{}", id, threadPoolIdx);
std::pair<int, std::shared_ptr<faabric::BatchExecuteRequest>> task;

int msgIdx;
std::shared_ptr<faabric::BatchExecuteRequest> req;
std::shared_ptr<std::atomic<int>> batchCounter;
bool needsSnapshotPush = false;

try {
task = threadQueues[threadPoolIdx].dequeue(conf.boundTimeout);
ExecutorTask task =
threadTaskQueues[threadPoolIdx].dequeue(conf.boundTimeout);
msgIdx = task.messageIndex;
req = task.req;
batchCounter = task.batchCounter;
needsSnapshotPush = task.needsSnapshotPush;
} catch (faabric::util::QueueTimeoutException& ex) {
// If the thread has had no messages, it needs to
// remove itself
Expand All @@ -182,8 +205,6 @@ void Executor::threadPoolThread(int threadPoolIdx)
break;
}

int msgIdx = task.first;

// If the thread is being killed, the executor itself
// will handle the clean-up
if (msgIdx == POOL_SHUTDOWN) {
Expand All @@ -192,7 +213,6 @@ void Executor::threadPoolThread(int threadPoolIdx)
break;
}

std::shared_ptr<faabric::BatchExecuteRequest> req = task.second;
assert(req->messages_size() >= msgIdx + 1);
faabric::Message& msg = req->mutable_messages()->at(msgIdx);

Expand Down Expand Up @@ -220,7 +240,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
msg.set_returnvalue(returnValue);

// Decrement the task count
int oldTaskCount = executingTaskCount.fetch_sub(1);
int oldTaskCount = batchCounter->fetch_sub(1);
assert(oldTaskCount >= 0);
bool isLastTask = oldTaskCount == 1;

Expand All @@ -231,15 +251,22 @@ void Executor::threadPoolThread(int threadPoolIdx)
oldTaskCount - 1);

// Handle snapshot diffs _before_ we reset the executor
if (isLastTask && pendingSnapshotPush) {
// Get diffs
faabric::util::SnapshotData d = snapshot();
std::vector<faabric::util::SnapshotDiff> diffs = d.getDirtyPages();
if (isLastTask && needsSnapshotPush) {
// Get diffs between original snapshot and after execution
faabric::util::SnapshotData snapshotPostExecution = snapshot();

faabric::util::SnapshotData snapshotPreExecution =
faabric::snapshot::getSnapshotRegistry().getSnapshot(
msg.snapshotkey());

std::vector<faabric::util::SnapshotDiff> diffs =
snapshotPreExecution.getChangeDiffs(snapshotPostExecution.data,
snapshotPostExecution.size);

sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking now that we've pushed the diffs
faabric::util::resetDirtyTracking();
pendingSnapshotPush = false;
}

// If this batch is finished, reset the executor and release its claim.
Expand Down
5 changes: 5 additions & 0 deletions src/snapshot/SnapshotRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ SnapshotRegistry::SnapshotRegistry() {}
faabric::util::SnapshotData& SnapshotRegistry::getSnapshot(
const std::string& key)
{
if (key.empty()) {
SPDLOG_ERROR("Attempting to get snapshot with empty key");
throw std::runtime_error("Getting snapshot with empty key");
}

if (snapshotMap.count(key) == 0) {
SPDLOG_ERROR("Snapshot for {} does not exist", key);
throw std::runtime_error("Snapshot doesn't exist");
Expand Down
5 changes: 2 additions & 3 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize)

// Copy diffs to snapshot
for (const auto* r : *r->chunks()) {
const uint8_t* chunkPtr = r->data()->data();
uint8_t* dest = snap.data + r->offset();
std::memcpy(dest, chunkPtr, r->data()->size());
snap.applyDiff(r->offset(), r->data()->data(), r->data()->size());
}

// Send response
return std::make_unique<faabric::EmptyResponse>();
}
Expand Down
8 changes: 4 additions & 4 deletions src/util/memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,21 @@ std::vector<uint64_t> readPagemapEntries(uintptr_t ptr, int nEntries)
return entries;
}

std::vector<bool> getDirtyPages(const uint8_t* ptr, int nPages)
std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages)
{
uintptr_t vptr = (uintptr_t)ptr;

// Get the pagemap entries
std::vector<uint64_t> entries = readPagemapEntries(vptr, nPages);

// Iterate through to get boolean flags
std::vector<bool> flags(nPages, false);
std::vector<int> pageNumbers;
for (int i = 0; i < nPages; i++) {
if (entries.at(i) & PAGEMAP_SOFT_DIRTY) {
flags.at(i) = true;
pageNumbers.emplace_back(i);
}
}

return flags;
return pageNumbers;
}
}
73 changes: 66 additions & 7 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,86 @@ namespace faabric::util {
std::vector<SnapshotDiff> SnapshotData::getDirtyPages()
{
if (data == nullptr || size == 0) {
std::vector<faabric::util::SnapshotDiff> empty;
std::vector<SnapshotDiff> empty;
return empty;
}

// Get dirty pages
int nPages = getRequiredHostPages(size);
std::vector<bool> dirtyFlags = faabric::util::getDirtyPages(data, nPages);
std::vector<int> dirtyPageNumbers = getDirtyPageNumbers(data, nPages);

// Convert to snapshot diffs
// TODO - reduce number of diffs by merging adjacent dirty pages
std::vector<SnapshotDiff> diffs;
for (int i = 0; i < nPages; i++) {
if (dirtyFlags.at(i)) {
uint32_t offset = i * faabric::util::HOST_PAGE_SIZE;
for (int i : dirtyPageNumbers) {
uint32_t offset = i * HOST_PAGE_SIZE;
diffs.emplace_back(offset, data + offset, HOST_PAGE_SIZE);
}

SPDLOG_DEBUG("Snapshot has {}/{} dirty pages", diffs.size(), nPages);

return diffs;
}

std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
size_t updatedSize)
{
// Work out which pages have changed in the comparison
size_t nThisPages = getRequiredHostPages(size);
std::vector<int> dirtyPageNumbers =
getDirtyPageNumbers(updated, nThisPages);

// Get byte-wise diffs _within_ the dirty pages
// NOTE - this will cause diffs to be split across pages if they hit a page
// boundary, but we can be relatively confident that variables will be
// page-aligned so this shouldn't be a problem
std::vector<SnapshotDiff> diffs;
for (int i : dirtyPageNumbers) {
int pageOffset = i * HOST_PAGE_SIZE;

// Iterate through each byte of the page
bool diffInProgress = false;
int diffStart = 0;
int offset = pageOffset;
for (int b = 0; b < HOST_PAGE_SIZE; b++) {
offset = pageOffset + b;
bool isDirtyByte = *(data + offset) != *(updated + offset);
if (isDirtyByte && !diffInProgress) {
// Diff starts here if it's different and diff not in progress
diffInProgress = true;
diffStart = offset;
} else if (!isDirtyByte && diffInProgress) {
// Diff ends if it's not different and diff is in progress
diffInProgress = false;
diffs.emplace_back(
diffStart, updated + diffStart, offset - diffStart);
}
}

// If we've reached the end with a diff in progress, we need to close it
// off
if (diffInProgress) {
offset++;
diffs.emplace_back(
offset, data + offset, faabric::util::HOST_PAGE_SIZE);
diffStart, updated + diffStart, offset - diffStart);
}
}

SPDLOG_DEBUG("Snapshot has {}/{} dirty pages", diffs.size(), nPages);
// If comparison has more pages than the original, add another diff
// containing all the new pages
if (updatedSize > size) {
diffs.emplace_back(size, updated + size, updatedSize - size);
}

return diffs;
}

void SnapshotData::applyDiff(size_t diffOffset,
const uint8_t* diffData,
size_t diffLen)
{
uint8_t* dest = data + diffOffset;
std::memcpy(dest, diffData, diffLen);
}

}
Loading

0 comments on commit 7872298

Please sign in to comment.