Skip to content

Commit

Permalink
Allow merge regions to cross pages (#158)
Browse files Browse the repository at this point in the history
* Add cross-page regions and update test

* Change to stand-alone test

* Logging for diffs

* Fix up tests

* Fixed snapshot tests

* Add test for skipping diffs

* Correct sum values in test
  • Loading branch information
Shillaker authored Oct 19, 2021
1 parent ad8fcd3 commit 0b251c1
Show file tree
Hide file tree
Showing 3 changed files with 459 additions and 54 deletions.
23 changes: 21 additions & 2 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <string>
#include <vector>

#include <faabric/util/logging.h>

namespace faabric::util {

enum SnapshotDataType
Expand Down Expand Up @@ -36,14 +38,27 @@ struct SnapshotMergeRegion
class SnapshotDiff
{
public:
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
uint32_t offset = 0;
size_t size = 0;
const uint8_t* data = nullptr;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

SnapshotDiff() = default;

SnapshotDiff(SnapshotDataType dataTypeIn,
SnapshotMergeOperation operationIn,
uint32_t offsetIn,
const uint8_t* dataIn,
size_t sizeIn)
{
dataType = dataTypeIn;
operation = operationIn;
offset = offsetIn;
data = dataIn;
size = sizeIn;
}

SnapshotDiff(uint32_t offsetIn, const uint8_t* dataIn, size_t sizeIn)
{
offset = offsetIn;
Expand Down Expand Up @@ -76,4 +91,8 @@ class SnapshotData
// order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
};

std::string snapshotDataTypeStr(SnapshotDataType dt);

std::string snapshotMergeOpStr(SnapshotMergeOperation op);
}
174 changes: 160 additions & 14 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,32 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
std::vector<int> dirtyPageNumbers =
getDirtyPageNumbers(updated, nThisPages);

SPDLOG_TRACE("Diffing {} pages with {} changed pages and {} merge regions",
nThisPages,
dirtyPageNumbers.size(),
mergeRegions.size());

for (auto& m : mergeRegions) {
SPDLOG_TRACE("{} {} merge region at {} {}-{}",
snapshotDataTypeStr(m.second.dataType),
snapshotMergeOpStr(m.second.operation),
m.first,
m.second.offset,
m.second.offset + m.second.length);
}

// Get iterator over merge regions
std::map<uint32_t, SnapshotMergeRegion>::iterator mergeIt =
mergeRegions.begin();

// Get byte-wise diffs _within_ the dirty pages
//
// NOTE - this will cause diffs to be split across pages if they hit a page
// boundary, but we can be relatively confident that variables will be
// page-aligned so this shouldn't be a problem
// NOTE - if raw diffs cover page boundaries, they will be split into
// multiple diffs, each of which is page-aligned.
// We can be relatively confident that variables will be page-aligned so
// this shouldn't be a problem.
//
// Merge regions support crossing page boundaries.
//
// For each byte we encounter have the following possible scenarios:
//
Expand All @@ -75,12 +92,44 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
offset = pageOffset + b;
bool isDirtyByte = *(data + offset) != *(updated + offset);

// Skip any merge regions we've passed
while (mergeIt != mergeRegions.end() &&
offset >=
(mergeIt->second.offset + mergeIt->second.length)) {
SnapshotMergeRegion region = mergeIt->second;
SPDLOG_TRACE("At offset {}, past region {} {} {}-{}",
offset,
snapshotDataTypeStr(region.dataType),
snapshotMergeOpStr(region.operation),
region.offset,
region.offset + region.length);

++mergeIt;
}

// Check if we're in a merge region
bool isInMergeRegion =
mergeIt != mergeRegions.end() &&
offset >= mergeIt->second.offset &&
offset < (mergeIt->second.offset + mergeIt->second.length);

if (isDirtyByte && isInMergeRegion) {
// If we've entered a merge region with a diff in progress, we
// need to close it off
if (diffInProgress) {
diffs.emplace_back(
diffStart, updated + diffStart, offset - diffStart);

SPDLOG_TRACE(
"Finished {} {} diff between {}-{} before merge region",
snapshotDataTypeStr(diffs.back().dataType),
snapshotMergeOpStr(diffs.back().operation),
diffs.back().offset,
diffs.back().offset + diffs.back().size);

diffInProgress = false;
}

SnapshotMergeRegion region = mergeIt->second;

// Set up the diff
Expand All @@ -91,8 +140,6 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
diff.dataType = region.dataType;
diff.operation = region.operation;

bool isIgnore = false;

// Modify diff data for certain operations
switch (region.dataType) {
case (SnapshotDataType::Int): {
Expand Down Expand Up @@ -144,7 +191,10 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
case (SnapshotDataType::Raw): {
switch (region.operation) {
case (SnapshotMergeOperation::Ignore): {
isIgnore = true;
break;
}
case (SnapshotMergeOperation::Overwrite): {
// Default behaviour
break;
}
default: {
Expand All @@ -155,6 +205,7 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
"Unhandled raw merge operation");
}
}

break;
}
default: {
Expand All @@ -165,35 +216,83 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
}
}

SPDLOG_TRACE("Diff at {} falls in {} {} merge region {}-{}",
pageOffset + b,
snapshotDataTypeStr(region.dataType),
snapshotMergeOpStr(region.operation),
region.offset,
region.offset + region.length);

// Add the diff to the list
if (!isIgnore) {
if (diff.operation != SnapshotMergeOperation::Ignore) {
diffs.emplace_back(diff);
}

// Bump the loop variable to the end of this region (note that
// the loop itself will increment onto the next)
b = (region.offset - pageOffset) + (region.length - 1);
// Work out the offset where this region ends
int regionEndOffset =
(region.offset - pageOffset) + region.length;

// Move onto the next merge region
++mergeIt;
if (regionEndOffset < HOST_PAGE_SIZE) {
// Skip over this region, still more offsets left in this
// page
SPDLOG_TRACE(
"{} {} merge region {}-{} finished. Skipping to {}",
snapshotDataTypeStr(region.dataType),
snapshotMergeOpStr(region.operation),
region.offset,
region.offset + region.length,
pageOffset + regionEndOffset);

// Bump the loop variable to the end of this region (note
// that the loop itself will increment onto the next).
b = regionEndOffset - 1;
} else {
// Merge region extends over this page, move onto next
SPDLOG_TRACE(
"{} {} merge region {}-{} over page boundary {} ({}-{})",
snapshotDataTypeStr(region.dataType),
snapshotMergeOpStr(region.operation),
region.offset,
region.offset + region.length,
i,
pageOffset,
pageOffset + HOST_PAGE_SIZE);

break;
}
} else if (isDirtyByte && !diffInProgress) {
// Diff starts here if it's different and diff not in progress
diffInProgress = true;
diffStart = offset;

SPDLOG_TRACE("Started Raw Overwrite diff at {}", diffStart);
} else if (!isDirtyByte && diffInProgress) {
// Diff ends if it's not different and diff is in progress
diffInProgress = false;
diffs.emplace_back(
diffStart, updated + diffStart, offset - diffStart);

SPDLOG_TRACE("Finished {} {} diff between {}-{}",
snapshotDataTypeStr(diffs.back().dataType),
snapshotMergeOpStr(diffs.back().operation),
diffs.back().offset,
diffs.back().offset + diffs.back().size);
}
}

// If we've reached the end with a diff in progress, we need to close it
// off
// If we've reached the end of this page with a diff in progress, we
// need to close it off
if (diffInProgress) {
offset++;

diffs.emplace_back(
diffStart, updated + diffStart, offset - diffStart);

SPDLOG_TRACE("Found {} {} diff between {}-{} at end of page",
snapshotDataTypeStr(diffs.back().dataType),
snapshotMergeOpStr(diffs.back().operation),
diffs.back().offset,
diffs.back().offset + diffs.back().size);
}
}

Expand All @@ -219,4 +318,51 @@ void SnapshotData::addMergeRegion(uint32_t offset,
faabric::util::UniqueLock lock(snapMx);
mergeRegions[offset] = region;
}

std::string snapshotDataTypeStr(SnapshotDataType dt)
{
switch (dt) {
case (SnapshotDataType::Raw): {
return "Raw";
}
case (SnapshotDataType::Int): {
return "Int";
}
default: {
SPDLOG_ERROR("Cannot convert snapshot data type to string: {}", dt);
throw std::runtime_error("Cannot convert data type to string");
}
}
}

std::string snapshotMergeOpStr(SnapshotMergeOperation op)
{
switch (op) {
case (SnapshotMergeOperation::Ignore): {
return "Ignore";
}
case (SnapshotMergeOperation::Max): {
return "Max";
}
case (SnapshotMergeOperation::Min): {
return "Min";
}
case (SnapshotMergeOperation::Overwrite): {
return "Overwrite";
}
case (SnapshotMergeOperation::Product): {
return "Product";
}
case (SnapshotMergeOperation::Subtract): {
return "Subtract";
}
case (SnapshotMergeOperation::Sum): {
return "Sum";
}
default: {
SPDLOG_ERROR("Cannot convert snapshot merge op to string: {}", op);
throw std::runtime_error("Cannot convert merge op to string");
}
}
}
}
Loading

0 comments on commit 0b251c1

Please sign in to comment.