Skip to content

Commit

Permalink
Use dirty page flags instead of dirty regions (#219)
Browse files Browse the repository at this point in the history
* Started porting changes

* Porting more changes

* Rearrange

* Fix up tests

* Comment on none tracker

* Remove note in comments

* Fix modify-in-loop error

* Fix up signal handling in dist tests

* Tidy up, removing notes

* Review comments

* Avoid address clash on direct message test
  • Loading branch information
Shillaker authored Feb 7, 2022
1 parent 4f4628a commit 51f6a25
Show file tree
Hide file tree
Showing 41 changed files with 1,027 additions and 858 deletions.
4 changes: 2 additions & 2 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ In another terminal, (re)start the server:

```bash
# Start
./dist-tests/dev_server.sh
./dist-test/dev_server.sh

# Restart
./dist-tests/dev_server.sh restart
./dist-test/dev_server.sh restart
```

Back in the CLI, you can then run the tests:
Expand Down
4 changes: 2 additions & 2 deletions include/faabric/mpi/mpi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ extern "C"
* Behind the scenes structs (some parts of which are defined in the MPI
* specification) Each can be extended with private fields as necessary
*
* NOTE - be careful when passing these structs to and from WebAssembly. Any
* datatypes with *different* sizes in 32-/64-bit space need to be
* WARNING be careful when passing these structs to and from WebAssembly.
* Any datatypes with *different* sizes in 32-/64-bit space need to be
* translated carefully
*/
struct faabric_status_public_t
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class Executor
std::unordered_map<std::string, int> cachedGroupIds;
std::unordered_map<std::string, std::vector<std::string>>
cachedDecisionHosts;
std::vector<std::pair<uint32_t, uint32_t>> dirtyRegions;
std::vector<char> dirtyRegions;

void deleteMainThreadSnapshot(const faabric::Message& msg);

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/state/StateKeyValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StateChunk
long offset;
size_t length;

// Note - this pointer will always refer to chunks of the underlying
// This pointer will always refer to chunks of the underlying
// state, so does not need to be deleted
uint8_t* data;
};
Expand Down
14 changes: 0 additions & 14 deletions include/faabric/util/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,6 @@ int safeCopyToBuffer(const uint8_t* dataIn,
uint8_t* buffer,
int bufferLen);

/*
* Returns a list of pairs of <start, length> for any bytes differing between
* the two arrays.
*/
std::vector<std::pair<uint32_t, uint32_t>> diffArrayRegions(
std::span<const uint8_t> a,
std::span<const uint8_t> b);

/*
* Returns a list of flags marking which bytes differ between the two arrays.
*/
std::vector<bool> diffArrays(std::span<const uint8_t> a,
std::span<const uint8_t> b);

template<class T>
T unalignedRead(const uint8_t* bytes)
{
Expand Down
69 changes: 48 additions & 21 deletions include/faabric/util/dirty.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,30 @@
namespace faabric::util {

/*
* Interface to all dirty page tracking. Implementation-specific boilerplate
* held in subclasses.
* Interface to all dirty page tracking. Available types and implementation
* details in classes below.
*/
class DirtyTracker
{
public:
virtual void clearAll() = 0;

virtual void reinitialise() = 0;
virtual std::string getType() = 0;

virtual void startTracking(std::span<uint8_t> region) = 0;

virtual void stopTracking(std::span<uint8_t> region) = 0;

virtual std::vector<std::pair<uint32_t, uint32_t>> getDirtyOffsets(
std::span<uint8_t> region) = 0;
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) = 0;

virtual void startThreadLocalTracking(std::span<uint8_t> region) = 0;

virtual void stopThreadLocalTracking(std::span<uint8_t> region) = 0;

virtual std::vector<std::pair<uint32_t, uint32_t>>
getThreadLocalDirtyOffsets(std::span<uint8_t> region) = 0;

virtual std::vector<std::pair<uint32_t, uint32_t>> getBothDirtyOffsets(
virtual std::vector<char> getThreadLocalDirtyPages(
std::span<uint8_t> region) = 0;

virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) = 0;
};

/*
Expand All @@ -58,24 +56,22 @@ class SoftPTEDirtyTracker final : public DirtyTracker

void clearAll() override;

void reinitialise() override;
std::string getType() override { return "softpte"; }

void startTracking(std::span<uint8_t> region) override;

void stopTracking(std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getDirtyOffsets(
std::span<uint8_t> region) override;
std::vector<char> getDirtyPages(std::span<uint8_t> region) override;

void startThreadLocalTracking(std::span<uint8_t> region) override;

void stopThreadLocalTracking(std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getThreadLocalDirtyOffsets(
std::vector<char> getThreadLocalDirtyPages(
std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getBothDirtyOffsets(
std::span<uint8_t> region) override;
std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override;

private:
FILE* clearRefsFile = nullptr;
Expand All @@ -94,24 +90,22 @@ class SegfaultDirtyTracker final : public DirtyTracker

void clearAll() override;

void reinitialise() override;
std::string getType() override { return "segfault"; }

void startTracking(std::span<uint8_t> region) override;

void stopTracking(std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getDirtyOffsets(
std::span<uint8_t> region) override;
std::vector<char> getDirtyPages(std::span<uint8_t> region) override;

void startThreadLocalTracking(std::span<uint8_t> region) override;

void stopThreadLocalTracking(std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getThreadLocalDirtyOffsets(
std::vector<char> getThreadLocalDirtyPages(
std::span<uint8_t> region) override;

std::vector<std::pair<uint32_t, uint32_t>> getBothDirtyOffsets(
std::span<uint8_t> region) override;
std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override;

// Signal handler for the resulting segfaults
static void handler(int sig, siginfo_t* info, void* ucontext) noexcept;
Expand All @@ -120,5 +114,38 @@ class SegfaultDirtyTracker final : public DirtyTracker
void setUpSignalHandler();
};

/*
* This tracker just marks all pages as dirty. This may be optimal for workloads
* with a small memory where most of that memory will be dirty anyway, so
* diffing every page outweighs the cost of the dirty tracking.
*/
class NoneDirtyTracker final : public DirtyTracker
{
public:
NoneDirtyTracker() = default;

std::string getType() override { return "none"; }

void clearAll() override;

void startTracking(std::span<uint8_t> region) override;

void stopTracking(std::span<uint8_t> region) override;

std::vector<char> getDirtyPages(std::span<uint8_t> region) override;

void startThreadLocalTracking(std::span<uint8_t> region) override;

void stopThreadLocalTracking(std::span<uint8_t> region) override;

std::vector<char> getThreadLocalDirtyPages(
std::span<uint8_t> region) override;

std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override;

private:
std::vector<char> dirtyPages;
};

DirtyTracker& getDirtyTracker();
}
5 changes: 2 additions & 3 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
namespace faabric::util {

/*
* Dedupes a list of dirty regions specified by offset and length
* Merges the dirty page flags from b into a in place
*/
std::vector<std::pair<uint32_t, uint32_t>> dedupeMemoryRegions(
std::vector<std::pair<uint32_t, uint32_t>>& regions);
void mergeDirtyPages(std::vector<char>& a, const std::vector<char>& b);

/*
* Typedef used to enforce RAII on mmapped memory regions
Expand Down
46 changes: 32 additions & 14 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class SnapshotDiff
std::vector<uint8_t> data;
};

/*
* Appends a list of snapshot diffs for any bytes differing between the two
* arrays.
*/
void diffArrayRegions(std::vector<SnapshotDiff>& diffs,
uint32_t startOffset,
uint32_t endOffset,
std::span<const uint8_t> a,
std::span<const uint8_t> b);

class SnapshotMergeRegion
{
public:
Expand All @@ -81,13 +91,22 @@ class SnapshotMergeRegion
void addDiffs(std::vector<SnapshotDiff>& diffs,
std::span<const uint8_t> originalData,
std::span<uint8_t> updatedData,
std::pair<uint32_t, uint32_t> dirtyRegion);
const std::vector<char>& dirtyRegions);

/**
* This allows us to sort the merge regions which is important for diffing
* purposes.
*/
bool operator<(const SnapshotMergeRegion& other) const
{
return (offset < other.offset);
}

private:
void addOverwriteDiff(std::vector<SnapshotDiff>& diffs,
std::span<const uint8_t> original,
std::span<const uint8_t> updatedData,
std::pair<uint32_t, uint32_t> dirtyRegion);
bool operator==(const SnapshotMergeRegion& other) const
{
return offset == other.offset && length == other.length &&
dataType == other.dataType && operation == other.operation;
}
};

/*
Expand Down Expand Up @@ -212,20 +231,19 @@ class SnapshotData
void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation,
bool overwrite = false);
SnapshotMergeOperation operation);

void fillGapsWithOverwriteRegions();

void clearMergeRegions();

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();
std::vector<SnapshotMergeRegion> getMergeRegions();

size_t getQueuedDiffsCount();

void queueDiffs(std::span<SnapshotDiff> diffs);

void writeQueuedDiffs();
int writeQueuedDiffs();

size_t getSize() const { return size; }

Expand All @@ -243,7 +261,7 @@ class SnapshotData
// snapshot.
std::vector<faabric::util::SnapshotDiff> diffWithDirtyRegions(
std::span<uint8_t> updated,
std::vector<std::pair<uint32_t, uint32_t>> dirtyRegions);
const std::vector<char>& dirtyRegions);

private:
size_t size = 0;
Expand All @@ -259,15 +277,15 @@ class SnapshotData

std::vector<std::pair<uint32_t, uint32_t>> trackedChanges;

// Note - we care about the order of this map, as we iterate through it
// in order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
std::vector<SnapshotMergeRegion> mergeRegions;

uint8_t* validatedOffsetPtr(uint32_t offset);

void mapToMemory(uint8_t* target, bool shared);

void writeData(std::span<const uint8_t> buffer, uint32_t offset = 0);

void checkWriteExtension(std::span<const uint8_t> buffer, uint32_t offset);
};

std::string snapshotDataTypeStr(SnapshotDataType dt);
Expand Down
16 changes: 8 additions & 8 deletions src/redis/Redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ std::string RedisInstance::loadScript(redisContext* context,
Redis::Redis(const RedisInstance& instanceIn)
: instance(instanceIn)
{
// Note, connect with IP, not with hostname
// Connect with IP, not with hostname
context = redisConnect(instance.ip.c_str(), instance.port);

if (context == nullptr || context->err) {
Expand Down Expand Up @@ -585,9 +585,9 @@ void Redis::enqueueBytes(const std::string& queueName,
const uint8_t* buffer,
size_t bufferLen)
{
// NOTE: Here we must be careful with the input and specify bytes rather
// than a string otherwise an encoded false boolean can be treated as a
// string terminator
// Here we must be careful with the input and specify bytes rather than a
// string otherwise an encoded false boolean can be treated as a string
// terminator
auto reply = safeRedisCommand(
context, "RPUSH %s %b", queueName.c_str(), buffer, bufferLen);

Expand All @@ -602,13 +602,13 @@ void Redis::enqueueBytes(const std::string& queueName,

UniqueRedisReply Redis::dequeueBase(const std::string& queueName, int timeoutMs)
{
// NOTE - we contradict the default redis behaviour here by doing a
// non-blocking pop when timeout is zero (rather than infinite as in Redis)
// We contradict the default redis behaviour here by doing a non-blocking
// pop when timeout is zero (rather than infinite as in Redis)
bool isBlocking = timeoutMs > 0;

UniqueRedisReply reply{ nullptr, &freeReplyObject };
if (isBlocking) {
// Note, timeouts need to be converted into seconds
// Timeouts need to be converted into seconds
// Floor to one second
int timeoutSecs = std::max(timeoutMs / 1000, 1);

Expand Down Expand Up @@ -671,7 +671,7 @@ void Redis::dequeueMultiple(const std::string& queueName,
long buffLen,
long nElems)
{
// NOTE - much like other range stuff with redis, this is *INCLUSIVE*
// Much like other range stuff with redis, this is *INCLUSIVE*
auto reply = safeRedisCommand(
context, "LRANGE %s 0 %i", queueName.c_str(), nElems - 1);

Expand Down
Loading

0 comments on commit 51f6a25

Please sign in to comment.