diff --git a/docs/development.md b/docs/development.md index 24932631b..2c7e08d96 100644 --- a/docs/development.md +++ b/docs/development.md @@ -100,10 +100,10 @@ In another terminal, (re)start the server: ```bash # Start -./dist-tests/dev_server.sh +./dist-test/dev_server.sh # Restart -./dist-tests/dev_server.sh restart +./dist-test/dev_server.sh restart ``` Back in the CLI, you can then run the tests: diff --git a/include/faabric/mpi/mpi.h b/include/faabric/mpi/mpi.h index 42fd3026b..49418532d 100644 --- a/include/faabric/mpi/mpi.h +++ b/include/faabric/mpi/mpi.h @@ -24,8 +24,8 @@ extern "C" * Behind the scenes structs (some parts of which are defined in the MPI * specification) Each can be extended with private fields as necessary * - * NOTE - be careful when passing these structs to and from WebAssembly. Any - * datatypes with *different* sizes in 32-/64-bit space need to be + * WARNING be careful when passing these structs to and from WebAssembly. + * Any datatypes with *different* sizes in 32-/64-bit space need to be * translated carefully */ struct faabric_status_public_t diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 147797382..f2dbc6f34 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -109,7 +109,7 @@ class Executor std::unordered_map cachedGroupIds; std::unordered_map> cachedDecisionHosts; - std::vector> dirtyRegions; + std::vector dirtyRegions; void deleteMainThreadSnapshot(const faabric::Message& msg); diff --git a/include/faabric/state/StateKeyValue.h b/include/faabric/state/StateKeyValue.h index 7274f00b7..86b2cc46d 100644 --- a/include/faabric/state/StateKeyValue.h +++ b/include/faabric/state/StateKeyValue.h @@ -37,7 +37,7 @@ class StateChunk long offset; size_t length; - // Note - this pointer will always refer to chunks of the underlying + // This pointer will always refer to chunks of the underlying // state, so does not need to be deleted uint8_t* data; }; diff --git a/include/faabric/util/bytes.h b/include/faabric/util/bytes.h index a1d7cbc9d..11b3579b9 100644 --- a/include/faabric/util/bytes.h +++ b/include/faabric/util/bytes.h @@ -27,20 +27,6 @@ int safeCopyToBuffer(const uint8_t* dataIn, uint8_t* buffer, int bufferLen); -/* - * Returns a list of pairs of for any bytes differing between - * the two arrays. - */ -std::vector> diffArrayRegions( - std::span a, - std::span b); - -/* - * Returns a list of flags marking which bytes differ between the two arrays. - */ -std::vector diffArrays(std::span a, - std::span b); - template T unalignedRead(const uint8_t* bytes) { diff --git a/include/faabric/util/dirty.h b/include/faabric/util/dirty.h index afcb2d692..d469b53ab 100644 --- a/include/faabric/util/dirty.h +++ b/include/faabric/util/dirty.h @@ -17,32 +17,30 @@ namespace faabric::util { /* - * Interface to all dirty page tracking. Implementation-specific boilerplate - * held in subclasses. + * Interface to all dirty page tracking. Available types and implementation + * details in classes below. */ class DirtyTracker { public: virtual void clearAll() = 0; - virtual void reinitialise() = 0; + virtual std::string getType() = 0; virtual void startTracking(std::span region) = 0; virtual void stopTracking(std::span region) = 0; - virtual std::vector> getDirtyOffsets( - std::span region) = 0; + virtual std::vector getDirtyPages(std::span region) = 0; virtual void startThreadLocalTracking(std::span region) = 0; virtual void stopThreadLocalTracking(std::span region) = 0; - virtual std::vector> - getThreadLocalDirtyOffsets(std::span region) = 0; - - virtual std::vector> getBothDirtyOffsets( + virtual std::vector getThreadLocalDirtyPages( std::span region) = 0; + + virtual std::vector getBothDirtyPages(std::span region) = 0; }; /* @@ -58,24 +56,22 @@ class SoftPTEDirtyTracker final : public DirtyTracker void clearAll() override; - void reinitialise() override; + std::string getType() override { return "softpte"; } void startTracking(std::span region) override; void stopTracking(std::span region) override; - std::vector> getDirtyOffsets( - std::span region) override; + std::vector getDirtyPages(std::span region) override; void startThreadLocalTracking(std::span region) override; void stopThreadLocalTracking(std::span region) override; - std::vector> getThreadLocalDirtyOffsets( + std::vector getThreadLocalDirtyPages( std::span region) override; - std::vector> getBothDirtyOffsets( - std::span region) override; + std::vector getBothDirtyPages(std::span region) override; private: FILE* clearRefsFile = nullptr; @@ -94,24 +90,22 @@ class SegfaultDirtyTracker final : public DirtyTracker void clearAll() override; - void reinitialise() override; + std::string getType() override { return "segfault"; } void startTracking(std::span region) override; void stopTracking(std::span region) override; - std::vector> getDirtyOffsets( - std::span region) override; + std::vector getDirtyPages(std::span region) override; void startThreadLocalTracking(std::span region) override; void stopThreadLocalTracking(std::span region) override; - std::vector> getThreadLocalDirtyOffsets( + std::vector getThreadLocalDirtyPages( std::span region) override; - std::vector> getBothDirtyOffsets( - std::span region) override; + std::vector getBothDirtyPages(std::span region) override; // Signal handler for the resulting segfaults static void handler(int sig, siginfo_t* info, void* ucontext) noexcept; @@ -120,5 +114,38 @@ class SegfaultDirtyTracker final : public DirtyTracker void setUpSignalHandler(); }; +/* + * This tracker just marks all pages as dirty. This may be optimal for workloads + * with a small memory where most of that memory will be dirty anyway, so + * diffing every page outweighs the cost of the dirty tracking. + */ +class NoneDirtyTracker final : public DirtyTracker +{ + public: + NoneDirtyTracker() = default; + + std::string getType() override { return "none"; } + + void clearAll() override; + + void startTracking(std::span region) override; + + void stopTracking(std::span region) override; + + std::vector getDirtyPages(std::span region) override; + + void startThreadLocalTracking(std::span region) override; + + void stopThreadLocalTracking(std::span region) override; + + std::vector getThreadLocalDirtyPages( + std::span region) override; + + std::vector getBothDirtyPages(std::span region) override; + + private: + std::vector dirtyPages; +}; + DirtyTracker& getDirtyTracker(); } diff --git a/include/faabric/util/memory.h b/include/faabric/util/memory.h index d5511a2bc..22c312914 100644 --- a/include/faabric/util/memory.h +++ b/include/faabric/util/memory.h @@ -11,10 +11,9 @@ namespace faabric::util { /* - * Dedupes a list of dirty regions specified by offset and length + * Merges the dirty page flags from b into a in place */ -std::vector> dedupeMemoryRegions( - std::vector>& regions); +void mergeDirtyPages(std::vector& a, const std::vector& b); /* * Typedef used to enforce RAII on mmapped memory regions diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index c19cbfd2f..0264a51d5 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -63,6 +63,16 @@ class SnapshotDiff std::vector data; }; +/* + * Appends a list of snapshot diffs for any bytes differing between the two + * arrays. + */ +void diffArrayRegions(std::vector& diffs, + uint32_t startOffset, + uint32_t endOffset, + std::span a, + std::span b); + class SnapshotMergeRegion { public: @@ -81,13 +91,22 @@ class SnapshotMergeRegion void addDiffs(std::vector& diffs, std::span originalData, std::span updatedData, - std::pair dirtyRegion); + const std::vector& dirtyRegions); + + /** + * This allows us to sort the merge regions which is important for diffing + * purposes. + */ + bool operator<(const SnapshotMergeRegion& other) const + { + return (offset < other.offset); + } - private: - void addOverwriteDiff(std::vector& diffs, - std::span original, - std::span updatedData, - std::pair dirtyRegion); + bool operator==(const SnapshotMergeRegion& other) const + { + return offset == other.offset && length == other.length && + dataType == other.dataType && operation == other.operation; + } }; /* @@ -212,20 +231,19 @@ class SnapshotData void addMergeRegion(uint32_t offset, size_t length, SnapshotDataType dataType, - SnapshotMergeOperation operation, - bool overwrite = false); + SnapshotMergeOperation operation); void fillGapsWithOverwriteRegions(); void clearMergeRegions(); - std::map getMergeRegions(); + std::vector getMergeRegions(); size_t getQueuedDiffsCount(); void queueDiffs(std::span diffs); - void writeQueuedDiffs(); + int writeQueuedDiffs(); size_t getSize() const { return size; } @@ -243,7 +261,7 @@ class SnapshotData // snapshot. std::vector diffWithDirtyRegions( std::span updated, - std::vector> dirtyRegions); + const std::vector& dirtyRegions); private: size_t size = 0; @@ -259,15 +277,15 @@ class SnapshotData std::vector> trackedChanges; - // Note - we care about the order of this map, as we iterate through it - // in order of offsets - std::map mergeRegions; + std::vector mergeRegions; uint8_t* validatedOffsetPtr(uint32_t offset); void mapToMemory(uint8_t* target, bool shared); void writeData(std::span buffer, uint32_t offset = 0); + + void checkWriteExtension(std::span buffer, uint32_t offset); }; std::string snapshotDataTypeStr(SnapshotDataType dt); diff --git a/src/redis/Redis.cpp b/src/redis/Redis.cpp index 5a9a4c3f2..e8b9783ad 100644 --- a/src/redis/Redis.cpp +++ b/src/redis/Redis.cpp @@ -82,7 +82,7 @@ std::string RedisInstance::loadScript(redisContext* context, Redis::Redis(const RedisInstance& instanceIn) : instance(instanceIn) { - // Note, connect with IP, not with hostname + // Connect with IP, not with hostname context = redisConnect(instance.ip.c_str(), instance.port); if (context == nullptr || context->err) { @@ -585,9 +585,9 @@ void Redis::enqueueBytes(const std::string& queueName, const uint8_t* buffer, size_t bufferLen) { - // NOTE: Here we must be careful with the input and specify bytes rather - // than a string otherwise an encoded false boolean can be treated as a - // string terminator + // Here we must be careful with the input and specify bytes rather than a + // string otherwise an encoded false boolean can be treated as a string + // terminator auto reply = safeRedisCommand( context, "RPUSH %s %b", queueName.c_str(), buffer, bufferLen); @@ -602,13 +602,13 @@ void Redis::enqueueBytes(const std::string& queueName, UniqueRedisReply Redis::dequeueBase(const std::string& queueName, int timeoutMs) { - // NOTE - we contradict the default redis behaviour here by doing a - // non-blocking pop when timeout is zero (rather than infinite as in Redis) + // We contradict the default redis behaviour here by doing a non-blocking + // pop when timeout is zero (rather than infinite as in Redis) bool isBlocking = timeoutMs > 0; UniqueRedisReply reply{ nullptr, &freeReplyObject }; if (isBlocking) { - // Note, timeouts need to be converted into seconds + // Timeouts need to be converted into seconds // Floor to one second int timeoutSecs = std::max(timeoutMs / 1000, 1); @@ -671,7 +671,7 @@ void Redis::dequeueMultiple(const std::string& queueName, long buffLen, long nElems) { - // NOTE - much like other range stuff with redis, this is *INCLUSIVE* + // Much like other range stuff with redis, this is *INCLUSIVE* auto reply = safeRedisCommand( context, "LRANGE %s 0 %i", queueName.c_str(), nElems - 1); diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 305ca58b3..a6b044cfc 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -20,6 +20,12 @@ #include #include +// Default snapshot size here is set to support 32-bit WebAssembly, but could be +// made configurable on the function call or language. +#define ONE_MB (1024L * 1024L) +#define ONE_GB (1024L * ONE_MB) +#define DEFAULT_MAX_SNAP_SIZE (4 * ONE_GB) + #define POOL_SHUTDOWN -1 namespace faabric::scheduler { @@ -124,44 +130,58 @@ std::vector> Executor::executeThreads( SPDLOG_DEBUG("Executor {} executing {} threads", id, req->messages_size()); faabric::Message& msg = req->mutable_messages()->at(0); - std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg); std::string funcStr = faabric::util::funcToString(msg, false); - std::shared_ptr snap = nullptr; - bool exists = false; + // Check if we've got a cached decision + std::string cacheKey = + std::to_string(msg.appid()) + "_" + std::to_string(req->messages_size()); + + bool hasCachedDecision = false; { faabric::util::SharedLock lock(threadExecutionMutex); - exists = reg.snapshotExists(snapshotKey); + hasCachedDecision = + cachedDecisionHosts.find(cacheKey) != cachedDecisionHosts.end(); } - if (!exists) { + std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg); + bool snapshotExists = reg.snapshotExists(snapshotKey); + + if (!snapshotExists) { faabric::util::FullLock lock(threadExecutionMutex); if (!reg.snapshotExists(snapshotKey)) { SPDLOG_DEBUG( "Creating main thread snapshot: {} for {}", snapshotKey, funcStr); - snap = - std::make_shared(getMemoryView()); + std::shared_ptr snap = + std::make_shared( + getMemoryView(), DEFAULT_MAX_SNAP_SIZE); reg.registerSnapshot(snapshotKey, snap); } else { - exists = true; + // This only hits when we realise there is a snapshot when we + // thought there wasn't + snapshotExists = true; } } - if (exists) { + // Avoid race conditions on snapshot being initialised using shared lock + // here + std::shared_ptr snap = nullptr; + { + faabric::util::SharedLock lock(threadExecutionMutex); + snap = reg.getSnapshot(snapshotKey); + } + + if (snapshotExists) { SPDLOG_DEBUG( "Main thread snapshot exists: {} for {}", snapshotKey, funcStr); - // Get main snapshot - snap = reg.getSnapshot(snapshotKey); std::span memView = getMemoryView(); // Get dirty regions since last batch of threads tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - std::vector> dirtyRegions = - tracker.getBothDirtyOffsets(memView); + std::vector dirtyRegions = tracker.getBothDirtyPages(memView); // Apply changes to snapshot snap->fillGapsWithOverwriteRegions(); @@ -170,7 +190,7 @@ std::vector> Executor::executeThreads( if (updates.empty()) { SPDLOG_TRACE( - "No updates to main thread snapshot for {} from {} dirty regions", + "No updates to main thread snapshot for {} over {} pages", faabric::util::funcToString(msg, false), dirtyRegions.size()); } else { @@ -184,26 +204,19 @@ std::vector> Executor::executeThreads( snap->clearMergeRegions(); } - // Now we have to apply the merge regions for this parallel section + // Now we have to add any merge regions we've been saving up for this + // batch of thread for (const auto& mr : mergeRegions) { - snap->addMergeRegion( - mr.offset, mr.length, mr.dataType, mr.operation, true); - } - - // TODO - here the main thread will wait, so technically frees up a slot - // that could be used. - std::string cacheKey = - std::to_string(msg.appid()) + "_" + std::to_string(req->messages_size()); - bool hasCachedDecision = false; - { - faabric::util::SharedLock lock(threadExecutionMutex); - hasCachedDecision = - cachedDecisionHosts.find(cacheKey) != cachedDecisionHosts.end(); + snap->addMergeRegion(mr.offset, mr.length, mr.dataType, mr.operation); } if (!hasCachedDecision) { faabric::util::FullLock lock(threadExecutionMutex); if (cachedDecisionHosts.find(cacheKey) == cachedDecisionHosts.end()) { + SPDLOG_TRACE("Creating new decision for {} threads of {}", + req->messages_size(), + funcStr); + // Set up a new group int groupId = faabric::util::generateGid(); for (auto& m : *req->mutable_messages()) { @@ -216,7 +229,7 @@ std::vector> Executor::executeThreads( // Cache the decision for next time SPDLOG_DEBUG( - "No cached decision for {} x {}/{}, caching group {}, hosts: {}", + "Caching decision for {} x {}/{}, caching group {}, hosts: {}", req->messages().size(), msg.user(), msg.function(), @@ -226,6 +239,8 @@ std::vector> Executor::executeThreads( cachedGroupIds[cacheKey] = groupId; cachedDecisionHosts[cacheKey] = decision.hosts; } else { + // This only happens when we thought we didn't have a cached + // decision, then when we acquired the lock, realised we did hasCachedDecision = true; } } @@ -283,14 +298,14 @@ std::vector> Executor::executeThreads( "Executor {} got results for {} threads", id, req->messages_size()); // Write queued changes to snapshot - snap->writeQueuedDiffs(); - - // Set memory size to fit new snapshot - setMemorySize(snap->getSize()); + int nWritten = snap->writeQueuedDiffs(); - // Remap the memory + // Remap memory to snapshot if it's been updated std::span memView = getMemoryView(); - snap->mapToMemory(memView); + if (nWritten > 0) { + setMemorySize(snap->getSize()); + snap->mapToMemory(memView); + } // Start tracking again memView = getMemoryView(); @@ -568,12 +583,11 @@ void Executor::threadPoolThread(int threadPoolIdx) // Add this thread's changes to executor-wide list of dirty regions auto thisThreadDirtyRegions = - tracker.getThreadLocalDirtyOffsets(memView); + tracker.getThreadLocalDirtyPages(memView); faabric::util::FullLock lock(threadExecutionMutex); - dirtyRegions.insert(dirtyRegions.end(), - thisThreadDirtyRegions.begin(), - thisThreadDirtyRegions.end()); + faabric::util::mergeDirtyPages(dirtyRegions, + thisThreadDirtyRegions); } // Set the return value @@ -600,10 +614,8 @@ void Executor::threadPoolThread(int threadPoolIdx) // Add non-thread-local dirty regions { faabric::util::FullLock lock(threadExecutionMutex); - std::vector> r = - tracker.getDirtyOffsets(memView); - - dirtyRegions.insert(dirtyRegions.end(), r.begin(), r.end()); + std::vector r = tracker.getDirtyPages(memView); + faabric::util::mergeDirtyPages(dirtyRegions, r); } // Fill snapshot gaps with overwrite regions first @@ -710,8 +722,8 @@ void Executor::threadPoolThread(int threadPoolIdx) SPDLOG_DEBUG( "Shutting down thread pool thread {}:{}", id, threadPoolIdx); - // Note - we have to keep a record of dead threads so we can join - // them all when the executor shuts down + // We have to keep a record of dead threads so we can join them all when + // the executor shuts down bool isFinished = true; { faabric::util::UniqueLock threadsLock(threadsMutex); diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index e3f2fb14b..143f1dbac 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -211,9 +211,8 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) auto& sch = faabric::scheduler::getScheduler(); - // Dispatch all the chained calls - // NOTE - with the master being rank zero, we want to spawn - // (size - 1) new functions starting with rank 1 + // Dispatch all the chained calls. With the master being rank zero, we want + // to spawn (size - 1) new functions starting with rank 1 std::shared_ptr req = faabric::util::batchExecFactory(user, function, size - 1); for (int i = 0; i < req->messages_size(); i++) { @@ -727,7 +726,7 @@ void MpiWorld::doRecv(std::shared_ptr& m, status->MPI_SOURCE = m->sender(); status->MPI_ERROR = MPI_SUCCESS; - // Note, take the message size here as the receive count may be larger + // Take the message size here as the receive count may be larger status->bytesSize = m->count() * dataType->size; // TODO - thread through tag diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index c9ad06ec7..b32ff9090 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -241,7 +241,7 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req) { - // Note, we assume all the messages are for the same function and have the + // We assume all the messages are for the same function and have the // same master host faabric::Message& firstMsg = req->mutable_messages()->at(0); std::string masterHost = firstMsg.masterhost(); @@ -472,10 +472,10 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( doStartFunctionMigrationThread(req, decision); } - // NOTE: we want to schedule things on this host _last_, otherwise functions - // may start executing before all messages have been dispatched, thus - // slowing the remaining scheduling. - // Therefore we want to create a list of unique hosts, with this host last. + // We want to schedule things on this host _last_, otherwise functions may + // start executing before all messages have been dispatched, thus slowing + // the remaining scheduling. Therefore we want to create a list of unique + // hosts, with this host last. std::vector orderedHosts; { std::set uniqueHosts(decision.hosts.begin(), diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 92fb6126d..1bc1858fa 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -98,11 +98,8 @@ void SnapshotClient::pushSnapshot( mrsFbVector; mrsFbVector.reserve(data->getMergeRegions().size()); for (const auto& m : data->getMergeRegions()) { - auto mr = CreateSnapshotMergeRegionRequest(mb, - m.second.offset, - m.second.length, - m.second.dataType, - m.second.operation); + auto mr = CreateSnapshotMergeRegionRequest( + mb, m.offset, m.length, m.dataType, m.operation); mrsFbVector.push_back(mr); } @@ -177,11 +174,8 @@ void SnapshotClient::doPushSnapshotDiffs( if (data != nullptr) { mrsFbVector.reserve(data->getMergeRegions().size()); for (const auto& m : data->getMergeRegions()) { - auto mr = CreateSnapshotMergeRegionRequest(mb, - m.second.offset, - m.second.length, - m.second.dataType, - m.second.operation); + auto mr = CreateSnapshotMergeRegionRequest( + mb, m.offset, m.length, m.dataType, m.operation); mrsFbVector.push_back(mr); } diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index 3eaedcaf6..87a6dc346 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -148,8 +148,9 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) // Write diffs and set merge regions if necessary if (r->force()) { - SPDLOG_DEBUG("Forcing write queued diffs to snapshot {}", - r->key()->str()); + SPDLOG_DEBUG("Forcing write queued diffs to snapshot {} ({} regions)", + r->key()->str(), + r->merge_regions()->size()); // Write queued diffs snap->writeQueuedDiffs(); diff --git a/src/state/RedisStateKeyValue.cpp b/src/state/RedisStateKeyValue.cpp index f228a6888..06891891f 100644 --- a/src/state/RedisStateKeyValue.cpp +++ b/src/state/RedisStateKeyValue.cpp @@ -83,7 +83,7 @@ void RedisStateKeyValue::pullChunkFromRemote(long offset, size_t length) offset + length, joinedKey); - // Note - redis ranges are inclusive, so we need to knock one off + // Redis ranges are inclusive, so we need to knock one off size_t rangeStart = offset; size_t rangeEnd = offset + length - 1; diff --git a/src/state/State.cpp b/src/state/State.cpp index 38b38e97e..eb4d97ae1 100644 --- a/src/state/State.cpp +++ b/src/state/State.cpp @@ -154,7 +154,7 @@ std::shared_ptr State::doGetKV(const std::string& user, kvMap.emplace(lookupKey, std::move(kv)); } } else if (stateMode == "inmemory") { - // NOTE - passing IP here is crucial for testing + // Passing IP here is crucial for testing if (sizeless) { auto kv = std::make_shared(user, key, thisIP); diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index 054ea2730..835c822de 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -66,7 +66,7 @@ zmq::socket_t socketFactory(zmq::socket_type socketType, socket.set(zmq::sockopt::rcvtimeo, timeoutMs); socket.set(zmq::sockopt::sndtimeo, timeoutMs); - // Note - setting linger here is essential to avoid infinite hangs + // Setting linger here is essential to avoid infinite hangs socket.set(zmq::sockopt::linger, LINGER_MS); switch (connectType) { diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index 7e4cd2998..a0b1722a9 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -16,12 +16,12 @@ static std::unordered_map> groups; static std::shared_mutex groupsMutex; -// NOTE: Keeping 0MQ sockets in TLS is usually a bad idea, as they _must_ be -// closed before the global context. However, in this case it's worth it -// to cache the sockets across messages, as otherwise we'd be creating and -// destroying a lot of them under high throughput. To ensure things are cleared -// up, see the thread-local tidy-up message on this class and its usage in the -// rest of the codebase. +// Keeping 0MQ sockets in TLS is usually a bad idea, as they _must_ be closed +// before the global context. However, in this case it's worth it to cache the +// sockets across messages, as otherwise we'd be creating and destroying a lot +// of them under high throughput. To ensure things are cleared up, see the +// thread-local tidy-up message on this class and its usage in the rest of the +// codebase. thread_local std:: unordered_map> recvEndpoints; @@ -36,7 +36,7 @@ thread_local std::unordered_map getClient(const std::string& host) { - // Note - this map is thread-local so no locking required + // This map is thread-local so no locking required if (clients.find(host) == clients.end()) { clients.insert( std::pair>( @@ -515,7 +515,7 @@ void PointToPointBroker::sendMessage(int groupId, if (host == conf.endpointHost) { std::string label = getPointToPointKey(groupId, sendIdx, recvIdx); - // Note - this map is thread-local so no locking required + // This map is thread-local so no locking required if (sendEndpoints.find(label) == sendEndpoints.end()) { sendEndpoints[label] = std::make_unique(label); diff --git a/src/util/bytes.cpp b/src/util/bytes.cpp index 49bbdf2a8..19abbe733 100644 --- a/src/util/bytes.cpp +++ b/src/util/bytes.cpp @@ -93,57 +93,4 @@ std::string formatByteArrayToIntString(const std::vector& bytes) return ss.str(); } - -// This function is called in a tight loop over large regions of data so -// make sure it stays efficient. -std::vector> diffArrayRegions( - std::span a, - std::span b) -{ - PROF_START(ByteArrayDiff) - std::vector> regions; - - // Iterate through diffs and work out start and finish offsets of each dirty - // region - uint32_t diffStart = 0; - bool diffInProgress = false; - for (uint32_t i = 0; i < a.size(); i++) { - bool dirty = a.data()[i] != b.data()[i]; - if (dirty && !diffInProgress) { - // Starts at this byte - diffInProgress = true; - diffStart = i; - } else if (!dirty && diffInProgress) { - // Finished on byte before - diffInProgress = false; - regions.emplace_back(diffStart, i - diffStart); - } - } - - // If we finish with a diff in progress, add it - if (diffInProgress) { - regions.emplace_back(diffStart, a.size() - diffStart); - } - - PROF_END(ByteArrayDiff) - return regions; -} - -std::vector diffArrays(std::span a, - std::span b) -{ - if (a.size() != b.size()) { - SPDLOG_ERROR( - "Cannot diff arrays of different sizes {} != {}", a.size(), b.size()); - throw std::runtime_error("Cannot diff arrays of different sizes"); - } - - std::vector diffs(a.size(), false); - for (int i = 0; i < a.size(); i++) { - diffs[i] = a.data()[i] != b.data()[i]; - } - - return diffs; -} - } diff --git a/src/util/crash.cpp b/src/util/crash.cpp index 999994813..83bb48a30 100644 --- a/src/util/crash.cpp +++ b/src/util/crash.cpp @@ -47,7 +47,12 @@ void setUpCrashHandler(int sig) fflush(stderr); crashHandler(TEST_SIGNAL); SPDLOG_INFO("Installing crash handler"); - sigs = { SIGSEGV, SIGABRT, SIGILL, SIGFPE }; + + // We don't handle SIGSEGV here because segfault handling is + // necessary for dirty tracking and if this handler gets initialised + // after the one for dirty tracking it thinks legitimate dirty tracking + // segfaults are crashes + sigs = { SIGABRT, SIGILL, SIGFPE }; } for (auto signo : sigs) { diff --git a/src/util/dirty.cpp b/src/util/dirty.cpp index 3e34295ef..0d63fd019 100644 --- a/src/util/dirty.cpp +++ b/src/util/dirty.cpp @@ -19,20 +19,34 @@ namespace faabric::util { +// This singleton is needed to contain the different singleton +// instances. We can't make them all static variables in the function. +class DirtyTrackerSingleton +{ + public: + SoftPTEDirtyTracker softpte; + SegfaultDirtyTracker sigseg; + NoneDirtyTracker none; +}; + DirtyTracker& getDirtyTracker() { - static SoftPTEDirtyTracker softpte; - static SegfaultDirtyTracker sigseg; + static DirtyTrackerSingleton dt; std::string trackMode = faabric::util::getSystemConfig().dirtyTrackingMode; if (trackMode == "softpte") { - return softpte; - } else if (trackMode == "segfault") { - sigseg.reinitialise(); - return sigseg; - } else { - throw std::runtime_error("Unrecognised dirty tracking mode"); + return dt.softpte; + } + + if (trackMode == "segfault") { + return dt.sigseg; + } + + if (trackMode == "none") { + return dt.none; } + + throw std::runtime_error("Unrecognised dirty tracking mode"); } // ---------------------------------- @@ -101,8 +115,7 @@ void SoftPTEDirtyTracker::stopThreadLocalTracking(std::span region) // Do nothing } -std::vector> SoftPTEDirtyTracker::getDirtyOffsets( - std::span region) +std::vector SoftPTEDirtyTracker::getDirtyPages(std::span region) { PROF_START(GetDirtyRegions) @@ -130,28 +143,14 @@ std::vector> SoftPTEDirtyTracker::getDirtyOffsets( } // Iterate through the pagemap entries to work out which are dirty - std::vector> regions; - std::vector dirtyPages; - bool isRegionInProgress = false; - int pageStart = 0; + std::vector regions(nPages, 0); for (int i = 0; i < nPages; i++) { bool isDirty = entries.at(i) & PAGEMAP_SOFT_DIRTY; - - if (isDirty && !isRegionInProgress) { - isRegionInProgress = true; - pageStart = i; - } else if (!isDirty && isRegionInProgress) { - isRegionInProgress = false; - regions.emplace_back(pageStart * HOST_PAGE_SIZE, - (i - pageStart) * HOST_PAGE_SIZE); + if (isDirty) { + regions[i] = 1; } } - if (isRegionInProgress) { - regions.emplace_back(pageStart * HOST_PAGE_SIZE, - (nPages - pageStart) * HOST_PAGE_SIZE); - } - SPDLOG_TRACE( "Out of {} pages, found {} dirty regions", nPages, regions.size()); @@ -159,20 +158,18 @@ std::vector> SoftPTEDirtyTracker::getDirtyOffsets( return regions; } -std::vector> -SoftPTEDirtyTracker::getBothDirtyOffsets(std::span region) +std::vector SoftPTEDirtyTracker::getBothDirtyPages( + std::span region) { - return getDirtyOffsets(region); + return getDirtyPages(region); } -std::vector> -SoftPTEDirtyTracker::getThreadLocalDirtyOffsets(std::span region) +std::vector SoftPTEDirtyTracker::getThreadLocalDirtyPages( + std::span region) { return {}; } -void SoftPTEDirtyTracker::reinitialise() {} - // ------------------------------ // Segfaults // ------------------------------ @@ -183,59 +180,29 @@ class ThreadTrackingData ThreadTrackingData() = default; ThreadTrackingData(std::span region) - : regionBase(region.data()) + : nPages(faabric::util::getRequiredHostPages(region.size())) + , pageFlags(nPages, 0) + , regionBase(region.data()) , regionTop(region.data() + region.size()) - , nPages(faabric::util::getRequiredHostPages(region.size())) - , pageFlags(nPages, '0') {} void markDirtyPage(void* addr) { ptrdiff_t offset = ((uint8_t*)addr) - regionBase; long pageNum = offset / HOST_PAGE_SIZE; - pageFlags[pageNum] = '1'; - } - - std::vector> getDirtyRegions() - { - PROF_START(GetDirtyRegions) - std::vector> dirty; - if (regionBase == nullptr) { - return dirty; - } - - uint32_t diffPageStart = 0; - bool diffInProgress = false; - for (int i = 0; i < nPages; i++) { - bool isDirty = pageFlags[i] == '1'; - if (isDirty && !diffInProgress) { - diffInProgress = true; - diffPageStart = i; - } else if (!isDirty && diffInProgress) { - diffInProgress = false; - dirty.emplace_back(diffPageStart * HOST_PAGE_SIZE, - (i - diffPageStart) * HOST_PAGE_SIZE); - } - } - - if (diffInProgress) { - dirty.emplace_back(diffPageStart * HOST_PAGE_SIZE, - (nPages - diffPageStart) * HOST_PAGE_SIZE); - } - - PROF_END(GetDirtyRegions) - return dirty; + pageFlags[pageNum] = 1; } bool isInitialised() { return regionTop != nullptr; } + // std::vector here seems to worsen performance by >4x + // std::vector seems to be optimal + int nPages = 0; + std::vector pageFlags; + private: uint8_t* regionBase = nullptr; uint8_t* regionTop = nullptr; - int nPages = 0; - - // Note - std::vector here seems to worsen performance by >4x - std::vector pageFlags; }; static thread_local ThreadTrackingData tracking; @@ -353,30 +320,62 @@ void SegfaultDirtyTracker::stopThreadLocalTracking(std::span region) region.size()); } -void SegfaultDirtyTracker::reinitialise() +std::vector SegfaultDirtyTracker::getThreadLocalDirtyPages( + std::span region) { - if (faabric::util::isTestMode()) { - // This is a hack because catch changes the segfault signal handler - // between test cases, so we have to reinisiatlise - setUpSignalHandler(); + if (!tracking.isInitialised()) { + size_t nPages = getRequiredHostPages(region.size()); + return std::vector(nPages, 0); } + + return tracking.pageFlags; +} + +std::vector SegfaultDirtyTracker::getDirtyPages(std::span region) +{ + return {}; } -std::vector> -SegfaultDirtyTracker::getThreadLocalDirtyOffsets(std::span region) +std::vector SegfaultDirtyTracker::getBothDirtyPages( + std::span region) { - return tracking.getDirtyRegions(); + return getThreadLocalDirtyPages(region); } -std::vector> -SegfaultDirtyTracker::getDirtyOffsets(std::span region) +// ------------------------------ +// None (i.e. mark all pages dirty) +// ------------------------------ + +void NoneDirtyTracker::clearAll() +{ + dirtyPages.clear(); +} + +void NoneDirtyTracker::startThreadLocalTracking(std::span region) {} + +void NoneDirtyTracker::startTracking(std::span region) +{ + size_t nPages = getRequiredHostPages(region.size()); + dirtyPages = std::vector(nPages, 1); +} + +void NoneDirtyTracker::stopTracking(std::span region) {} + +void NoneDirtyTracker::stopThreadLocalTracking(std::span region) {} + +std::vector NoneDirtyTracker::getThreadLocalDirtyPages( + std::span region) { return {}; } -std::vector> -SegfaultDirtyTracker::getBothDirtyOffsets(std::span region) +std::vector NoneDirtyTracker::getDirtyPages(std::span region) +{ + return dirtyPages; +} + +std::vector NoneDirtyTracker::getBothDirtyPages(std::span region) { - return getThreadLocalDirtyOffsets(region); + return getDirtyPages(region); } } diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 3756b54fb..2c934d216 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -12,40 +12,24 @@ namespace faabric::util { -std::vector> dedupeMemoryRegions( - std::vector>& regions) +void mergeDirtyPages(std::vector& a, const std::vector& b) { - if (regions.empty()) { - return {}; + // Extend a to fit + size_t overlap = a.size(); + if (b.size() > a.size()) { + a.reserve(b.size()); + a.insert(a.end(), b.begin() + a.size(), b.end()); + } else if (b.size() < a.size()) { + overlap = b.size(); } - std::vector> deduped; - - // Sort in place - std::sort( - std::begin(regions), - std::end(regions), - [](std::pair& a, std::pair& b) { - return a.first < b.first; - }); - - deduped.push_back(regions.front()); - uint32_t lastOffset = regions.front().first; - for (int i = 1; i < regions.size(); i++) { - const auto& r = regions.at(i); - assert(r.first >= lastOffset); - - if (r.first > lastOffset) { - deduped.push_back(r); - lastOffset = r.first; - } else if (deduped.back().second < r.second) { - deduped.pop_back(); - deduped.push_back(r); - } - } - - return deduped; + std::transform(a.begin(), + a.begin() + overlap, + b.begin(), + a.begin(), + std::logical_or()); } + // ------------------------- // Alignment // ------------------------- @@ -89,7 +73,7 @@ AlignedChunk getPageAlignedChunk(long offset, long length) long nBytesOffset = nPagesOffset * faabric::util::HOST_PAGE_SIZE; - // Note - this value is the offset from the base of the new region + // This value is the offset from the base of the new region long shiftedOffset = offset - nBytesOffset; AlignedChunk c{ diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 5018c6fb5..f44715465 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -27,6 +27,41 @@ std::vector SnapshotDiff::getDataCopy() const return std::vector(data.begin(), data.end()); } +void diffArrayRegions(std::vector& snapshotDiffs, + uint32_t startOffset, + uint32_t endOffset, + std::span a, + std::span b) +{ + // Iterate through diffs and work out start and finish offsets of each dirty + // region + uint32_t diffStart = 0; + bool diffInProgress = false; + for (uint32_t i = startOffset; i < endOffset; i++) { + bool dirty = a.data()[i] != b.data()[i]; + if (dirty && !diffInProgress) { + // Starts at this byte + diffInProgress = true; + diffStart = i; + } else if (!dirty && diffInProgress) { + // Finished on byte before + diffInProgress = false; + snapshotDiffs.emplace_back(SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite, + diffStart, + b.subspan(diffStart, i - diffStart)); + } + } + + // If we finish with a diff in progress, add it + if (diffInProgress) { + snapshotDiffs.emplace_back(SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite, + diffStart, + b.subspan(diffStart, endOffset - diffStart)); + } +} + SnapshotData::SnapshotData(size_t sizeIn) : SnapshotData(sizeIn, sizeIn) {} @@ -79,7 +114,8 @@ void SnapshotData::copyInData(std::span buffer, uint32_t offset) writeData(buffer, offset); } -void SnapshotData::writeData(std::span buffer, uint32_t offset) +void SnapshotData::checkWriteExtension(std::span buffer, + uint32_t offset) { // Try to allocate more memory on top of existing data if necessary. // Will throw an exception if not possible @@ -102,6 +138,12 @@ void SnapshotData::writeData(std::span buffer, uint32_t offset) // Remap data mapMemoryShared({ data.get(), size }, fd); } +} + +void SnapshotData::writeData(std::span buffer, uint32_t offset) +{ + size_t regionEnd = offset + buffer.size(); + checkWriteExtension(buffer, offset); // Copy in new data uint8_t* copyTarget = validatedOffsetPtr(offset); @@ -151,42 +193,17 @@ std::vector SnapshotData::getDataCopy(uint32_t offset, size_t dataSize) void SnapshotData::addMergeRegion(uint32_t offset, size_t length, SnapshotDataType dataType, - SnapshotMergeOperation operation, - bool overwrite) + SnapshotMergeOperation operation) { faabric::util::FullLock lock(snapMx); - SnapshotMergeRegion region(offset, length, dataType, operation); - - if (mergeRegions.find(region.offset) != mergeRegions.end()) { - if (!overwrite) { - SPDLOG_ERROR("Attempting to overwrite existing merge region at {} " - "with {} {} at {}-{}", - region.offset, - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - region.offset, - region.offset + length); - - throw std::runtime_error("Not able to overwrite merge region"); - } - - SPDLOG_TRACE( - "Overwriting existing merge region at {} with {} {} at {}-{}", - region.offset, - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - region.offset, - region.offset + length); - } else { - SPDLOG_DEBUG("Adding new {} {} merge region at {}-{}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - region.offset, - region.offset + length); - } + SPDLOG_DEBUG("Adding new {} {} merge region at {}-{}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length); - mergeRegions[region.offset] = region; + mergeRegions.emplace_back(offset, length, dataType, operation); } void SnapshotData::fillGapsWithOverwriteRegions() @@ -197,35 +214,38 @@ void SnapshotData::fillGapsWithOverwriteRegions() // fill all space if (mergeRegions.empty()) { SPDLOG_TRACE("Filling gap with single overwrite merge region"); - mergeRegions.emplace(std::pair( - 0, - { 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite })); + mergeRegions.emplace_back( + 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); return; } + // We're modifying the regions within the loop so need to make a copy + std::vector regionsCopy = mergeRegions; + + // Sort merge regions to ensure loop below works + std::sort(regionsCopy.begin(), regionsCopy.end()); + uint32_t lastRegionEnd = 0; - for (auto [offset, region] : mergeRegions) { - if (offset == 0) { + for (const auto& r : regionsCopy) { + if (r.offset == 0) { // Zeroth byte is in a merge region - lastRegionEnd = region.length; + lastRegionEnd = r.length; continue; } - uint32_t regionLen = region.offset - lastRegionEnd; + uint32_t regionLen = r.offset - lastRegionEnd; SPDLOG_TRACE("Filling gap with overwrite merge region {}-{}", lastRegionEnd, lastRegionEnd + regionLen); - mergeRegions.emplace(std::pair( - lastRegionEnd, - { lastRegionEnd, - regionLen, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite })); + mergeRegions.emplace_back(lastRegionEnd, + regionLen, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); - lastRegionEnd = region.offset + region.length; + lastRegionEnd = r.offset + r.length; } if (lastRegionEnd < size) { @@ -233,12 +253,10 @@ void SnapshotData::fillGapsWithOverwriteRegions() lastRegionEnd); // Add a final region at the end of the snapshot - mergeRegions.emplace(std::pair( - lastRegionEnd, - { lastRegionEnd, - 0, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite })); + mergeRegions.emplace_back(lastRegionEnd, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); } } @@ -261,7 +279,7 @@ void SnapshotData::mapToMemory(std::span target) PROF_END(MapSnapshot) } -std::map SnapshotData::getMergeRegions() +std::vector SnapshotData::getMergeRegions() { faabric::util::SharedLock lock(snapMx); return mergeRegions; @@ -287,7 +305,7 @@ void SnapshotData::queueDiffs(const std::span diffs) } } -void SnapshotData::writeQueuedDiffs() +int SnapshotData::writeQueuedDiffs() { PROF_START(WriteQueuedDiffs) faabric::util::FullLock lock(snapMx); @@ -295,6 +313,7 @@ void SnapshotData::writeQueuedDiffs() SPDLOG_DEBUG("Writing {} queued diffs to snapshot", queuedDiffs.size()); // Iterate through diffs + int nDiffs = queuedDiffs.size(); for (auto& diff : queuedDiffs) { if (diff.getOperation() == faabric::util::SnapshotMergeOperation::Ignore) { @@ -308,10 +327,6 @@ void SnapshotData::writeQueuedDiffs() if (diff.getOperation() == faabric::util::SnapshotMergeOperation::Overwrite) { - SPDLOG_TRACE("Copying overwrite diff into {}-{}", - diff.getOffset(), - diff.getOffset() + diff.getData().size()); - writeData(diff.getData(), diff.getOffset()); continue; @@ -393,6 +408,8 @@ void SnapshotData::writeQueuedDiffs() // Clear queue queuedDiffs.clear(); PROF_END(WriteQueuedDiffs) + + return nDiffs; } void SnapshotData::clearTrackedChanges() @@ -427,34 +444,54 @@ std::vector SnapshotData::getTrackedChanges() std::vector SnapshotData::diffWithDirtyRegions( std::span updated, - std::vector> dirtyRegions) + const std::vector& dirtyRegions) { faabric::util::SharedLock lock(snapMx); PROF_START(DiffWithSnapshot) std::vector diffs; - if (mergeRegions.empty() || dirtyRegions.empty()) { - return diffs; + // Always add an overwrite region that covers any extension of the + // updated data + if (updated.size() > size) { + PROF_START(ExtensionDiff) + SPDLOG_TRACE( + "Adding diff to extend snapshot from {} to {}", size, updated.size()); + size_t extensionLen = updated.size() - size; + diffs.emplace_back(SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite, + size, + updated.subspan(size, extensionLen)); + PROF_END(ExtensionDiff) } - SPDLOG_TRACE("Diffing {} merge regions vs {} dirty regions", - mergeRegions.size(), - dirtyRegions.size()); - - // First dedupe the memory regions - auto dedupedRegions = dedupeMemoryRegions(dirtyRegions); + // Check to see if we can skip with no dirty regions + PROF_START(DiffDirtySkip) + if (dirtyRegions.empty() || + (std::find(dirtyRegions.begin(), dirtyRegions.end(), 1) == + dirtyRegions.end())) { + SPDLOG_TRACE("No dirty pages, no diffs"); + return diffs; + } + PROF_END(DiffDirtySkip) - // Iterate through merge regions, allow them to add diffs based on the dirty - // regions - std::span original(data.get(), size); - for (auto& mrPair : mergeRegions) { - faabric::util::SnapshotMergeRegion& mr = mrPair.second; + // Check to see if we can skip with no merge regions + if (mergeRegions.empty()) { + SPDLOG_TRACE("No merge regions, no diffs"); + return diffs; + } - // Add the diffs for each dirty region - for (auto& dirtyRegion : dedupedRegions) { - mr.addDiffs(diffs, original, updated, dirtyRegion); - } + // Sort merge regions. This is not strictly necessary but makes testing and + // debugging a lot easier and doesn't take long. Could be removed if it + // became a bottleneck. + std::sort(mergeRegions.begin(), mergeRegions.end()); + + // Iterate through merge regions, allow them to add diffs based on the + // dirty regions + std::span original(data.get(), size); + std::span updatedOverlap = updated.subspan(0, size); + for (auto& mr : mergeRegions) { + mr.addDiffs(diffs, original, updatedOverlap, dirtyRegions); } PROF_END(DiffWithSnapshot) @@ -520,74 +557,6 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op) } } -void SnapshotMergeRegion::addOverwriteDiff( - std::vector& diffs, - std::span original, - std::span updated, - std::pair dirtyRegion) -{ - // In this function we have two possibilities: - // - Dirty region overlaps original and we can compare - // - Dirty region is past original, in which case we need to extend - - // First we calculate where the dirty region starts and ends. If the merge - // region ends before the dirty region, we take that as the limit. If the - // merge region is zero length, we take the whole dirty region. - uint32_t dirtyRegionStart = std::max(dirtyRegion.first, offset); - uint32_t dirtyRegionEnd = dirtyRegion.first + dirtyRegion.second; - if (length > 0) { - dirtyRegionEnd = std::min(dirtyRegionEnd, offset + length); - } - assert(dirtyRegionStart < dirtyRegionEnd); - - // Overlap with original data - if (original.size() > dirtyRegionStart) { - // Work out the end of the overlap - uint32_t overlapEnd = - std::min(dirtyRegionEnd, original.size()); - uint32_t overlapLen = overlapEnd - dirtyRegionStart; - - // Get the subsections of both the original data and dirty region to - // compare - std::span originalSub = - original.subspan(dirtyRegionStart, overlapLen); - - std::span dirtySub = - updated.subspan(dirtyRegionStart, overlapLen); - - std::vector> regions = - diffArrayRegions(originalSub, dirtySub); - - // Iterate through and build diffs - for (auto [start, len] : regions) { - uint32_t diffStart = dirtyRegionStart + start; - SPDLOG_TRACE("Adding {} overwrite diff at {}-{}", - snapshotDataTypeStr(dataType), - diffStart, - diffStart + len); - - diffs.emplace_back( - dataType, operation, diffStart, updated.subspan(diffStart, len)); - } - } - - // Extension past original data - if (dirtyRegionEnd > original.size()) { - uint32_t diffStart = - std::max(original.size(), dirtyRegionStart); - uint32_t diffLength = dirtyRegionEnd - diffStart; - - SPDLOG_TRACE("Adding extension {} overwrite diff at {}-{}", - snapshotDataTypeStr(dataType), - diffStart, - diffStart + diffLength); - diffs.emplace_back(dataType, - operation, - diffStart, - updated.subspan(diffStart, diffLength)); - } -} - SnapshotMergeRegion::SnapshotMergeRegion(uint32_t offsetIn, size_t lengthIn, SnapshotDataType dataTypeIn, @@ -601,52 +570,81 @@ SnapshotMergeRegion::SnapshotMergeRegion(uint32_t offsetIn, void SnapshotMergeRegion::addDiffs(std::vector& diffs, std::span originalData, std::span updatedData, - std::pair dirtyRegion) + const std::vector& dirtyRegions) { - // If the region has zero length, it signifies that it goes to the - // end of the memory, so we go all the way to the end of the dirty - // region. For all other regions, we just check if the dirty range is - // within the merge region. - - uint32_t dirtyRangeStart = dirtyRegion.first; - uint32_t dirtyRangeEnd = dirtyRegion.first + dirtyRegion.second; - - bool isInRange = (dirtyRangeEnd > offset) && - ((length == 0) || (dirtyRangeStart < offset + length)); + if (operation == SnapshotMergeOperation::Ignore) { + return; + } - if (!isInRange) { - SPDLOG_TRACE("{} {} merge region {}-{} not in dirty region {}-{}", + // If the region is past the end of the original data, we ignore it + if (offset > originalData.size()) { + SPDLOG_TRACE("Ignoring {} {} merge {}-{} past end of original at {}", snapshotDataTypeStr(dataType), snapshotMergeOpStr(operation), offset, offset + length, - dirtyRangeStart, - dirtyRangeEnd); + originalData.size()); return; } - SPDLOG_TRACE( - "{} {} merge region {}-{}, dirty region {}-{}, original size {}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - offset, - offset + length, - dirtyRangeStart, - dirtyRangeEnd, - originalData.size()); - - if (operation == SnapshotMergeOperation::Overwrite) { - addOverwriteDiff(diffs, originalData, updatedData, dirtyRegion); + // If the region has zero length, we go all the way to the end of the + // original data. We also make sure we don't go over the end of the + // original + uint32_t mrEnd = length > 0 ? offset + length : originalData.size(); + mrEnd = std::min(mrEnd, originalData.size()); + + // Note that this range will be exclusive, i.e. endPage is the page we + // stop at the start of + size_t startPage = getRequiredHostPagesRoundDown(offset); + size_t endPage = getRequiredHostPages(mrEnd); + + SPDLOG_TRACE("Checking {} {} merge {}-{} over pages {}-{}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + startPage, + endPage - 1); + + // Check if anything dirty in the given region (std::find range is + // exclusive) + auto startIt = dirtyRegions.begin() + startPage; + auto endIt = dirtyRegions.begin() + endPage; + auto foundIt = std::find(startIt, endIt, 1); + if (foundIt == endIt) { + SPDLOG_TRACE("No dirty pages for {} {} {}-{} ({})", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + mrEnd); return; } + startPage += std::distance(startIt, foundIt); - if (operation == SnapshotMergeOperation::Ignore) { - return; - } + if (operation == SnapshotMergeOperation::Overwrite) { + // Iterate through pages + for (int p = startPage; p < endPage; p++) { + // Skip if page not dirty + if (dirtyRegions.at(p) == 0) { + continue; + } + + // Stop at merge region boundaries, making sure we don't start + // checking before the merge region offset, or go over the merge + // region end on the final page (may not be page-aligned) + uint32_t startByte = std::max(offset, p * HOST_PAGE_SIZE); + uint32_t endByte = + std::min(mrEnd, (p + 1) * HOST_PAGE_SIZE); - if (originalData.size() < offset) { - throw std::runtime_error("Do not support non-overwrite operations " - "outside original snapshot"); + SPDLOG_TRACE("Checking page {} {}-{}", p, startByte, endByte); + + diffArrayRegions( + diffs, startByte, endByte, originalData, updatedData); + } + + // This is the end of the overwrite diff + return; } uint8_t* updated = updatedData.data() + offset; diff --git a/src/util/state.cpp b/src/util/state.cpp index 2d1fc760c..7fa2f99c6 100644 --- a/src/util/state.cpp +++ b/src/util/state.cpp @@ -16,7 +16,7 @@ std::string keyForUser(const std::string& user, const std::string& key) void maskDouble(unsigned int* maskArray, unsigned long idx) { - // NOTE - we assume int is half size of double + // We assume int is half size of double unsigned long intIdx = 2 * idx; maskArray[intIdx] |= STATE_MASK_32; maskArray[intIdx + 1] |= STATE_MASK_32; diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index b1fbb69c5..4d017b022 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -78,6 +78,10 @@ void DistTestExecutor::restore(const std::string& snapshotKey) std::span DistTestExecutor::getMemoryView() { + if (dummyMemory.get() == nullptr) { + SPDLOG_ERROR("Dist test executor using memory view on null memory"); + throw std::runtime_error("DistTestExecutor null memory"); + } return { dummyMemory.get(), dummyMemorySize }; } diff --git a/tests/dist/main.cpp b/tests/dist/main.cpp index f45bb4e32..9c24e6320 100644 --- a/tests/dist/main.cpp +++ b/tests/dist/main.cpp @@ -1,5 +1,8 @@ #define CATCH_CONFIG_RUNNER +// Disable catch signal catching to avoid interfering with dirty tracking +#define CATCH_CONFIG_NO_POSIX_SIGNALS 1 + #include #include "DistTestExecutor.h" @@ -9,12 +12,15 @@ #include #include #include +#include #include FAABRIC_CATCH_LOGGER int main(int argc, char* argv[]) { + faabric::util::setUpCrashHandler(); + faabric::transport::initGlobalMessageContext(); faabric::util::initLogging(); tests::initDistTests(); diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index 676723228..d4be794db 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -34,7 +34,7 @@ int main() SPDLOG_INFO("Distributed test server started"); SPDLOG_INFO("---------------------------------"); - // Note, endpoint will block until killed + // Endpoint will block until killed SPDLOG_INFO("Starting HTTP endpoint on worker"); faabric::endpoint::FaabricEndpoint endpoint; endpoint.start(); diff --git a/tests/test/endpoint/test_handler.cpp b/tests/test/endpoint/test_handler.cpp index 1d9406c69..6ca1cbb1d 100644 --- a/tests/test/endpoint/test_handler.cpp +++ b/tests/test/endpoint/test_handler.cpp @@ -33,7 +33,7 @@ TEST_CASE_METHOD(EndpointHandlerTestFixture, "Test valid calls to endpoint", "[endpoint]") { - // Note - must be async to avoid needing a result + // Must be async to avoid needing a result faabric::Message call = faabric::util::messageFactory("foo", "bar"); call.set_isasync(true); std::string user = "foo"; diff --git a/tests/test/main.cpp b/tests/test/main.cpp index 2b1022978..7c7b03736 100644 --- a/tests/test/main.cpp +++ b/tests/test/main.cpp @@ -1,5 +1,8 @@ #define CATCH_CONFIG_RUNNER +// Disable catch signal catching to avoid interfering with dirty tracking +#define CATCH_CONFIG_NO_POSIX_SIGNALS 1 + #include #include "faabric_utils.h" diff --git a/tests/test/redis/test_redis.cpp b/tests/test/redis/test_redis.cpp index deeed8f36..5fa89a6f0 100644 --- a/tests/test/redis/test_redis.cpp +++ b/tests/test/redis/test_redis.cpp @@ -373,7 +373,7 @@ TEST_CASE("Test set operations", "[redis]") std::string setA = "set_a"; std::string setB = "set_b"; - // NOTE - we need to check Redis can store odd strings like IPs + // We need to check Redis can store odd strings like IPs std::string valueA = "12.45.67.89"; std::string valueB = "val_b"; std::string valueC = "192.168.3.4"; diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index fe62c31e3..47e431875 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -171,8 +171,8 @@ int32_t TestExecutor::executeTask( uint8_t pageIdx = threadPoolIdx; // Set up the data. - // Note, avoid writing a zero here as the memory is already zeroed hence - // it's not a change + // Avoid writing a zero here as the memory is already zeroed hence it's + // not a change std::vector data = { (uint8_t)(pageIdx + 1), (uint8_t)(pageIdx + 2), (uint8_t)(pageIdx + 3) }; diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index 0acd747e0..2d712ef67 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -114,7 +114,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, for (int i = 0; i < mergeRegions.size(); i++) { SnapshotMergeRegion expected = mergeRegions.at(i); - SnapshotMergeRegion actual = snapA->getMergeRegions()[expected.offset]; + SnapshotMergeRegion actual = snapA->getMergeRegions()[i]; REQUIRE(actual.offset == expected.offset); REQUIRE(actual.dataType == expected.dataType); @@ -236,8 +236,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, for (int i = 0; i < mergeRegions.size(); i++) { SnapshotMergeRegion expected = mergeRegions.at(i); - SnapshotMergeRegion actual = - snap->getMergeRegions()[expected.offset]; + SnapshotMergeRegion actual = snap->getMergeRegions()[i]; REQUIRE(actual.offset == expected.offset); REQUIRE(actual.dataType == expected.dataType); diff --git a/tests/test/snapshot/test_snapshot_diffs.cpp b/tests/test/snapshot/test_snapshot_diffs.cpp index c673ce3ab..251cde6f9 100644 --- a/tests/test/snapshot/test_snapshot_diffs.cpp +++ b/tests/test/snapshot/test_snapshot_diffs.cpp @@ -25,7 +25,7 @@ void checkSnapshotDiff(int offset, } TEST_CASE_METHOD(SnapshotTestFixture, - "Test no snapshot diffs if no merge regions", + "Test single extension diff if no merge regions and grown", "[snapshot]") { std::string snapKey = "foobar123"; @@ -47,25 +47,42 @@ TEST_CASE_METHOD(SnapshotTestFixture, snap->mapToMemory({ mem.get(), snapSize }); // Track changes + DirtyTracker& tracker = getDirtyTracker(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); + std::vector expected(memPages, 0); + // Make various changes mem[0] = 1; mem[2 * HOST_PAGE_SIZE] = 1; mem[3 * HOST_PAGE_SIZE + 10] = 1; + + // Outside of original snap size mem[8 * HOST_PAGE_SIZE - 20] = 1; + expected[0] = 1; + expected[2] = 1; + expected[3] = 1; + expected[7] = 1; + tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); // Check there are no diffs even though we have dirty regions - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); - REQUIRE(!dirtyRegions.empty()); + auto dirtyRegions = tracker.getBothDirtyPages(memView); + REQUIRE(dirtyRegions == expected); std::vector changeDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); - REQUIRE(changeDiffs.empty()); + REQUIRE(changeDiffs.size() == 1); + + SnapshotDiff actual = changeDiffs.at(0); + REQUIRE(actual.getOffset() == snapSize); + + std::span expectedData = + memView.subspan(snapSize, 3 * HOST_PAGE_SIZE); + REQUIRE(actual.getData().size() == expectedData.size()); } TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") @@ -102,8 +119,9 @@ TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - // NOTE - deliberately add merge regions out of order - // Diff starting in merge region and overlapping the end + // Deliberately add merge regions out of order + + // Merge region across page boudary, capturing only part of a change std::vector dataC = { 7, 6, 5, 4, 3, 2, 1 }; std::vector expectedDataC = { 7, 6, 5, 4 }; int offsetC = 2 * HOST_PAGE_SIZE; @@ -141,25 +159,24 @@ TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - // Write some data to the region that exceeds the size of the original, then - // add a merge region larger than it. Anything outside the original snapshot - // should be marked as changed. + // Write some data to the region that exceeds the size of the original. + // Anything outside the original snapshot should be marked as changed. std::vector dataExtra = { 2, 2, 2 }; - std::vector expectedDataExtra = { 0, 0, 2, 2, 2, 0, 0 }; + uint32_t extensionPages = memPages - snapPages; + std::vector expectedDataExtra(extensionPages * HOST_PAGE_SIZE, 0); int extraOffset = snapSize + HOST_PAGE_SIZE + 10; - std::memcpy(mem.get() + extraOffset, dataExtra.data(), dataExtra.size()); - int extraRegionOffset = extraOffset - 2; - int extraRegionSize = dataExtra.size() + 4; - snap->addMergeRegion(extraRegionOffset, - extraRegionSize, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); + // Copy data into place in the original memory, and update the expectation + std::memcpy(mem.get() + extraOffset, dataExtra.data(), dataExtra.size()); + std::memcpy(expectedDataExtra.data() + HOST_PAGE_SIZE + 10, + dataExtra.data(), + dataExtra.size()); - // Include an offset which doesn't change the data.get(), but will register - // a dirty page - std::vector dataNoChange = { 0, 0, 0 }; + // Include an offset which doesn't change the data, but will register + // a dirty page (do this by writing bytes from the original) int offsetNoChange = 4 * HOST_PAGE_SIZE - 10; + std::vector dataNoChange(mem.get() + offsetNoChange, + mem.get() + offsetNoChange + 5); std::memcpy( mem.get() + offsetNoChange, dataNoChange.data(), dataNoChange.size()); @@ -168,17 +185,18 @@ TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") tracker.stopThreadLocalTracking(memView); // Check we have the right number of diffs - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + auto dirtyRegions = tracker.getBothDirtyPages(memView); std::vector changeDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); REQUIRE(changeDiffs.size() == 6); - checkSnapshotDiff(offsetA, dataA, changeDiffs.at(0)); - checkSnapshotDiff(offsetB1, dataB1, changeDiffs.at(1)); - checkSnapshotDiff(offsetB2, dataB2, changeDiffs.at(2)); - checkSnapshotDiff(offsetC, expectedDataC, changeDiffs.at(3)); - checkSnapshotDiff(regionOffsetD, expectedDataD, changeDiffs.at(4)); - checkSnapshotDiff(extraRegionOffset, expectedDataExtra, changeDiffs.at(5)); + // Diffs are returned increasing order of offset + checkSnapshotDiff(snapSize, expectedDataExtra, changeDiffs.at(0)); + checkSnapshotDiff(offsetA, dataA, changeDiffs.at(1)); + checkSnapshotDiff(offsetB1, dataB1, changeDiffs.at(2)); + checkSnapshotDiff(offsetB2, dataB2, changeDiffs.at(3)); + checkSnapshotDiff(offsetC, expectedDataC, changeDiffs.at(4)); + checkSnapshotDiff(regionOffsetD, expectedDataD, changeDiffs.at(5)); } } diff --git a/tests/test/state/test_redis_state.cpp b/tests/test/state/test_redis_state.cpp index 64eaf483b..de5b8d050 100644 --- a/tests/test/state/test_redis_state.cpp +++ b/tests/test/state/test_redis_state.cpp @@ -14,8 +14,8 @@ using namespace state; /** - * NOTE - there's some copy-pasting in here because we want to run - * the same tests on in-memory and Redis versions. + * There's some copy-pasting in here because we want to run the same tests on + * in-memory and Redis versions. */ namespace tests { diff --git a/tests/test/transport/test_message_endpoint_client.cpp b/tests/test/transport/test_message_endpoint_client.cpp index 41c1480e6..1118ac7f5 100644 --- a/tests/test/transport/test_message_endpoint_client.cpp +++ b/tests/test/transport/test_message_endpoint_client.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -132,7 +133,7 @@ TEST_CASE_METHOD(SchedulerTestFixture, for (int i = 0; i < numMessages; i++) { faabric::transport::Message recvMsg = dst.recv().value(); // Check just a subset of the messages - // Note - this implicitly tests in-order message delivery + // This implicitly tests in-order message delivery if ((i % (numMessages / 10)) == 0) { std::string expectedMsg = baseMsg + std::to_string(i); REQUIRE(recvMsg.size() == expectedMsg.size()); @@ -231,8 +232,10 @@ TEST_CASE_METHOD(SchedulerTestFixture, "Test direct messaging", "[transport]") std::string expected = "Direct hello"; const uint8_t* msg = BYTES_CONST(expected.c_str()); - std::string inprocLabel = "direct-test"; + std::string inprocLabel = + "direct-test-" + std::to_string(faabric::util::generateGid()); + // Send the message AsyncDirectSendEndpoint sender(inprocLabel); sender.send(msg, expected.size()); diff --git a/tests/test/transport/test_message_server.cpp b/tests/test/transport/test_message_server.cpp index 7d798587a..137b68442 100644 --- a/tests/test/transport/test_message_server.cpp +++ b/tests/test/transport/test_message_server.cpp @@ -253,8 +253,8 @@ TEST_CASE("Test client timeout on requests to valid server", "[transport]") if (expectFailure) { bool failed = false; - // Note - here we must wait until the server has finished handling the - // request, even though it's failed + // Here we must wait until the server has finished handling the request, + // even though it's failed server.setRequestLatch(); // Make the call and check it fails diff --git a/tests/test/util/test_bytes.cpp b/tests/test/util/test_bytes.cpp index c9123d24a..20d549dff 100644 --- a/tests/test/util/test_bytes.cpp +++ b/tests/test/util/test_bytes.cpp @@ -158,88 +158,4 @@ TEST_CASE("Test format byte array to string", "[util]") REQUIRE(formatByteArrayToIntString(bytesIn) == expectedString); } - -TEST_CASE("Test diffing byte array regions", "[util]") -{ - std::vector a; - std::vector b; - std::vector> expected; - - SECTION("Equal") - { - a = { 0, 1, 2, 3 }; - b = { 0, 1, 2, 3 }; - } - - SECTION("Empty") {} - - SECTION("Not equal") - { - a = { 0, 0, 2, 2, 3, 3, 4, 4, 5, 5 }; - b = { 0, 1, 1, 2, 3, 6, 6, 6, 5, 5 }; - expected = { - { 1, 2 }, - { 5, 3 }, - }; - } - - SECTION("Single length") - { - a = { 0, 1, 2, 3, 4 }; - b = { 0, 1, 3, 3, 4 }; - expected = { { 2, 1 } }; - } - - SECTION("Difference at start") - { - a = { 0, 1, 2, 3, 4, 5, 6 }; - b = { 1, 2, 3, 3, 3, 4, 6 }; - expected = { { 0, 3 }, { 4, 2 } }; - } - - SECTION("Difference at end") - { - a = { 0, 1, 2, 3, 4, 5, 6 }; - b = { 0, 1, 1, 3, 3, 4, 5 }; - expected = { { 2, 1 }, { 4, 3 } }; - } - - std::vector> actual = - diffArrayRegions({ a.data(), a.size() }, { b.data(), b.size() }); - - REQUIRE(actual.size() == expected.size()); - for (int i = 0; i < actual.size(); i++) { - REQUIRE(actual.at(i).first == expected.at(i).first); - REQUIRE(actual.at(i).second == expected.at(i).second); - } -} - -TEST_CASE("Test diffing byte arrays", "[util]") -{ - std::vector a; - std::vector b; - std::vector expected; - - SECTION("Equal") - { - a = { 0, 1, 2, 3 }; - b = { 0, 1, 2, 3 }; - expected = std::vector(a.size(), false); - } - - SECTION("Empty") {} - - SECTION("Not equal") - { - a = { 0, 0, 2, 2, 3, 3, 4, 4, 5, 5 }; - b = { 0, 1, 1, 2, 3, 6, 6, 6, 5, 5 }; - expected = { false, true, true, false, false, - true, true, true, false, false }; - } - - std::vector actual = - diffArrays({ a.data(), a.size() }, { b.data(), b.size() }); - - REQUIRE(actual == expected); -} } diff --git a/tests/test/util/test_dirty.cpp b/tests/test/util/test_dirty.cpp index 2f8812089..014aa6afb 100644 --- a/tests/test/util/test_dirty.cpp +++ b/tests/test/util/test_dirty.cpp @@ -22,6 +22,32 @@ class DirtyConfTestFixture ~DirtyConfTestFixture() = default; }; +TEST_CASE_METHOD(DirtyConfTestFixture, + "Test configuring tracker", + "[util][dirty]") +{ + SECTION("Segfaults") + { + conf.dirtyTrackingMode = "segfault"; + DirtyTracker& t = getDirtyTracker(); + REQUIRE(t.getType() == "segfault"); + } + + SECTION("Soft PTEs") + { + conf.dirtyTrackingMode = "softpte"; + DirtyTracker& t = getDirtyTracker(); + REQUIRE(t.getType() == "softpte"); + } + + SECTION("None") + { + conf.dirtyTrackingMode = "none"; + DirtyTracker& t = getDirtyTracker(); + REQUIRE(t.getType() == "none"); + } +} + TEST_CASE_METHOD(DirtyConfTestFixture, "Test dirty page checking", "[util][dirty]") @@ -40,9 +66,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, tracker.clearAll(); - std::vector> actual = - tracker.getBothDirtyOffsets(memView); - REQUIRE(actual.empty()); + std::vector actual = tracker.getBothDirtyPages(memView); + std::vector expected(nPages, 0); + REQUIRE(actual == expected); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -55,20 +81,17 @@ TEST_CASE_METHOD(DirtyConfTestFixture, pageOne[10] = 1; pageThree[123] = 4; - std::vector> expected = { - std::pair(HOST_PAGE_SIZE, HOST_PAGE_SIZE), - std::pair(3 * HOST_PAGE_SIZE, HOST_PAGE_SIZE) - }; + expected = { 0, 1, 0, 1, 0, 0 }; - actual = tracker.getBothDirtyOffsets(memView); + actual = tracker.getBothDirtyPages(memView); REQUIRE(actual == expected); // And another uint8_t* pageFive = pageThree + (2 * HOST_PAGE_SIZE); pageFive[99] = 3; - expected.emplace_back(5 * HOST_PAGE_SIZE, HOST_PAGE_SIZE); - actual = tracker.getBothDirtyOffsets(memView); + expected[5] = 1; + actual = tracker.getBothDirtyPages(memView); REQUIRE(actual == expected); // Reset @@ -77,8 +100,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); - actual = tracker.getBothDirtyOffsets(memView); - REQUIRE(actual.empty()); + actual = tracker.getBothDirtyPages(memView); + expected = std::vector(nPages, 0); + REQUIRE(actual == expected); // Check the data hasn't changed REQUIRE(pageOne[10] == 1); @@ -91,10 +115,10 @@ TEST_CASE_METHOD(DirtyConfTestFixture, pageFour[22] = 5; // As pages are adjacent we get a single region - expected = { - std::pair(3 * HOST_PAGE_SIZE, 2 * HOST_PAGE_SIZE), - }; - actual = tracker.getBothDirtyOffsets(memView); + expected = std::vector(nPages, 0); + expected[3] = 1; + expected[4] = 1; + actual = tracker.getBothDirtyPages(memView); REQUIRE(actual == expected); // Final reset and check @@ -103,8 +127,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); - actual = tracker.getBothDirtyOffsets(memView); - REQUIRE(actual.empty()); + actual = tracker.getBothDirtyPages(memView); + expected = std::vector(nPages, 0); + REQUIRE(actual == expected); tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); @@ -118,19 +143,20 @@ TEST_CASE_METHOD(DirtyConfTestFixture, SECTION("Soft PTEs") { conf.dirtyTrackingMode = "softpte"; } - tracker = getDirtyTracker(); + DirtyTracker& tracker = getDirtyTracker(); + REQUIRE(tracker.getType() == conf.dirtyTrackingMode); int nPages = 15; size_t memSize = HOST_PAGE_SIZE * nPages; MemoryRegion mem = allocateSharedMemory(memSize); std::span memView(mem.get(), memSize); - DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); - std::vector> actual = - tracker.getBothDirtyOffsets({ mem.get(), memSize }); - REQUIRE(actual.empty()); + std::vector actual = + tracker.getBothDirtyPages({ mem.get(), memSize }); + std::vector expected(nPages, 0); + REQUIRE(actual == expected); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -151,18 +177,17 @@ TEST_CASE_METHOD(DirtyConfTestFixture, pageSeven[77] = 1; pageNine[99] = 1; + expected[0] = 1; + expected[1] = 1; + expected[3] = 1; + expected[4] = 1; + expected[7] = 1; + expected[9] = 1; + tracker.stopTracking({ mem.get(), memSize }); tracker.stopThreadLocalTracking({ mem.get(), memSize }); - // Expect adjacent regions to be merged - std::vector> expected = { - std::pair(0, 2 * HOST_PAGE_SIZE), - std::pair(3 * HOST_PAGE_SIZE, 2 * HOST_PAGE_SIZE), - std::pair(7 * HOST_PAGE_SIZE, HOST_PAGE_SIZE), - std::pair(9 * HOST_PAGE_SIZE, HOST_PAGE_SIZE) - }; - - actual = tracker.getBothDirtyOffsets({ mem.get(), memSize }); + actual = tracker.getBothDirtyPages({ mem.get(), memSize }); REQUIRE(actual.size() == expected.size()); @@ -174,10 +199,13 @@ TEST_CASE_METHOD(DirtyConfTestFixture, "[util][dirty]") { conf.dirtyTrackingMode = "segfault"; - tracker = getDirtyTracker(); + DirtyTracker& tracker = getDirtyTracker(); + REQUIRE(tracker.getType() == "segfault"); - size_t memSize = 10 * HOST_PAGE_SIZE; + int nPages = 10; + size_t memSize = nPages * HOST_PAGE_SIZE; std::vector expectedData(memSize, 5); + std::vector expectedDirty(nPages, 0); MemoryRegion mem = allocatePrivateMemory(memSize); @@ -211,6 +239,7 @@ TEST_CASE_METHOD(DirtyConfTestFixture, size_t offsetA = 0; mem[offsetA] = 3; expectedData[offsetA] = 3; + expectedDirty[0] = 1; // Make two changes on adjacent page size_t offsetB1 = HOST_PAGE_SIZE + 10; @@ -219,11 +248,13 @@ TEST_CASE_METHOD(DirtyConfTestFixture, mem[offsetB2] = 2; expectedData[offsetB1] = 4; expectedData[offsetB2] = 2; + expectedDirty[1] = 1; // Change another page size_t offsetC = (5 * HOST_PAGE_SIZE) + 10; mem[offsetC] = 6; expectedData[offsetC] = 6; + expectedDirty[5] = 1; // Just read from another (should not cause a diff) int readValue = mem[4 * HOST_PAGE_SIZE + 5]; @@ -234,17 +265,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, REQUIRE(actualMemAfter == expectedData); // Get dirty regions - std::vector> actualDirty = - tracker.getBothDirtyOffsets(memView); + std::vector actualDirty = tracker.getBothDirtyPages(memView); // Check dirty regions - REQUIRE(actualDirty.size() == 2); - - std::vector> expectedDirty = { - { 0, 2 * HOST_PAGE_SIZE }, - { (uint32_t)(5 * HOST_PAGE_SIZE), HOST_PAGE_SIZE } - }; - REQUIRE(actualDirty == expectedDirty); tracker.stopTracking(memView); @@ -258,13 +281,15 @@ TEST_CASE_METHOD(DirtyConfTestFixture, // Here we want to check that faults triggered in a given thread are caught // by that thread, and so we can safely just to thread-local diff tracking. conf.dirtyTrackingMode = "segfault"; - tracker = getDirtyTracker(); + DirtyTracker& tracker = getDirtyTracker(); + REQUIRE(tracker.getType() == "segfault"); int nLoops = 20; // Deliberately cause contention int nThreads = 100; - size_t memSize = 2 * nThreads * HOST_PAGE_SIZE; + int nPages = 2 * nThreads; + size_t memSize = nPages * HOST_PAGE_SIZE; MemoryRegion mem = allocatePrivateMemory(memSize); std::span memView(mem.get(), memSize); @@ -279,16 +304,22 @@ TEST_CASE_METHOD(DirtyConfTestFixture, std::vector threads; threads.reserve(nThreads); for (int i = 0; i < nThreads; i++) { - threads.emplace_back([this, &success, &memView, i, loop] { + threads.emplace_back([&tracker, + &success, + &memView, + &nPages, + i, + loop] { success.at(i) = std::make_shared>(); // Start thread-local tracking tracker.startThreadLocalTracking(memView); // Modify a couple of pages specific to this thread - size_t pageOffset = i * 2 * HOST_PAGE_SIZE; - uint8_t* pageOne = memView.data() + pageOffset; - uint8_t* pageTwo = memView.data() + pageOffset + HOST_PAGE_SIZE; + size_t pageOffset = i * 2; + size_t byteOffset = pageOffset * HOST_PAGE_SIZE; + uint8_t* pageOne = memView.data() + byteOffset; + uint8_t* pageTwo = memView.data() + byteOffset + HOST_PAGE_SIZE; pageOne[20] = 3; pageOne[250] = 5; @@ -300,9 +331,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, tracker.stopThreadLocalTracking(memView); // Check we get the right number of dirty regions - std::vector> regions = - tracker.getThreadLocalDirtyOffsets(memView); - if (regions.size() != 1) { + std::vector regions = + tracker.getThreadLocalDirtyPages(memView); + if (regions.size() != nPages) { SPDLOG_ERROR("Segfault thread {} failed on loop {}. Got {} " "regions instead of {}", i, @@ -312,10 +343,9 @@ TEST_CASE_METHOD(DirtyConfTestFixture, return; } - std::vector> expected = { - std::pair(pageOffset, - 2 * HOST_PAGE_SIZE), - }; + std::vector expected = std::vector(nPages, 0); + expected[pageOffset] = 1; + expected[pageOffset + 1] = 1; if (regions != expected) { SPDLOG_ERROR( @@ -339,7 +369,7 @@ TEST_CASE_METHOD(DirtyConfTestFixture, tracker.stopTracking(memView); // Check no global offsets - REQUIRE(tracker.getDirtyOffsets(memView).empty()); + REQUIRE(tracker.getDirtyPages(memView).empty()); bool thisLoopSuccess = true; for (int i = 0; i < nThreads; i++) { diff --git a/tests/test/util/test_memory.cpp b/tests/test/util/test_memory.cpp index a7bc97905..f75faba58 100644 --- a/tests/test/util/test_memory.cpp +++ b/tests/test/util/test_memory.cpp @@ -14,58 +14,6 @@ using namespace faabric::util; namespace tests { -TEST_CASE("Test dedupe memory regions", "[util][memory]") -{ - std::vector> input; - std::vector> expected; - - uint32_t offsetA = 0; - uint32_t offsetB = 10; - - uint32_t sizeA = 2; - uint32_t sizeB = 3; - uint32_t sizeC = 4; - - SECTION("Empty") {} - - SECTION("Nothing to do") - { - input = { { offsetA, sizeA } }; - expected = input; - } - - SECTION("Equal on the same offset") - { - input = { - { offsetB, sizeB }, - { offsetA, sizeA }, - { offsetA, sizeA }, - }; - expected = { - { offsetA, sizeA }, - { offsetB, sizeB }, - }; - } - - SECTION("Longer on the same offset") - { - input = { - { offsetB, sizeB }, - { offsetA, sizeA }, - { offsetA, sizeC }, - { offsetA, sizeB }, - }; - expected = { - { offsetA, sizeC }, - { offsetB, sizeB }, - }; - } - - std::vector> actual = - dedupeMemoryRegions(input); - REQUIRE(actual == expected); -} - TEST_CASE("Test rounding down offsets to page size", "[util][memory]") { REQUIRE(faabric::util::alignOffsetDown(2 * faabric::util::HOST_PAGE_SIZE) == diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 3d507c3d8..54aa92223 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -304,6 +305,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(*intB == originalValueB); // Reset dirty tracking to get a clean start + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -331,7 +333,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Get the snapshot diffs tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + auto dirtyRegions = tracker.getBothDirtyPages(memView); std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -420,6 +422,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, snap->mapToMemory(memView); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -469,7 +472,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + auto dirtyRegions = tracker.getBothDirtyPages(memView); std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); REQUIRE(actualDiffs.size() == 4); @@ -484,8 +487,6 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::string snapKey = "foobar123"; int snapPages = 5; - uint32_t offset = HOST_PAGE_SIZE + (10 * sizeof(int32_t)); - std::shared_ptr snap = std::make_shared(snapPages * HOST_PAGE_SIZE); @@ -497,8 +498,14 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector updatedData; std::vector expectedData; + std::vector expectedDirtyPages(snapPages, 0); + uint32_t offset = HOST_PAGE_SIZE + (10 * sizeof(int32_t)); + expectedDirtyPages[1] = 1; + faabric::util::SnapshotDataType dataType = faabric::util::SnapshotDataType::Raw; + faabric::util::SnapshotDataType expectedDataType = + faabric::util::SnapshotDataType::Raw; faabric::util::SnapshotMergeOperation operation = faabric::util::SnapshotMergeOperation::Overwrite; @@ -514,6 +521,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, int diffValue = 0; dataType = faabric::util::SnapshotDataType::Int; + expectedDataType = faabric::util::SnapshotDataType::Int; dataLength = sizeof(int32_t); regionLength = sizeof(int32_t); @@ -574,6 +582,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, long diffValue = 0; dataType = faabric::util::SnapshotDataType::Long; + expectedDataType = faabric::util::SnapshotDataType::Long; dataLength = sizeof(long); regionLength = sizeof(long); @@ -634,11 +643,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, float diffValue = 0.0; dataType = faabric::util::SnapshotDataType::Float; + expectedDataType = faabric::util::SnapshotDataType::Float; dataLength = sizeof(float); regionLength = sizeof(float); - // Note - imprecision in float arithmetic makes it difficult to test - // the floating point types here unless we use integer values. + // Imprecision in float arithmetic makes it difficult to test the + // floating point types here unless we use integer values. SECTION("Float sum") { originalValue = 513; @@ -696,10 +706,11 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, double diffValue = 0.0; dataType = faabric::util::SnapshotDataType::Double; + expectedDataType = faabric::util::SnapshotDataType::Double; dataLength = sizeof(double); regionLength = sizeof(double); - // Note - imprecision in float arithmetic makes it difficult to test + // Imprecision in float arithmetic makes it difficult to test // the floating point types here unless we use integer values. SECTION("Double sum") { @@ -758,6 +769,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, bool diffValue = false; dataType = faabric::util::SnapshotDataType::Bool; + expectedDataType = faabric::util::SnapshotDataType::Raw; dataLength = sizeof(bool); regionLength = sizeof(bool); @@ -816,6 +828,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, snap->mapToMemory(memView); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -829,7 +842,10 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Get the snapshot diffs tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + + auto dirtyRegions = tracker.getBothDirtyPages(memView); + REQUIRE(dirtyRegions == expectedDirtyPages); + std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -839,7 +855,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Check diff REQUIRE(actualDiffs.size() == 1); std::vector expectedDiffs = { - { dataType, + { expectedDataType, operation, offset, { expectedData.data(), expectedData.size() } } @@ -894,6 +910,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, snap->mapToMemory(memView); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); @@ -908,7 +925,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Check getting diffs throws an exception tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + auto dirtyRegions = tracker.getBothDirtyPages(memView); bool failed = false; try { snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -940,6 +957,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, faabric::util::isPageAligned((const void*)snap->getDataPtr()); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); // Update the snapshot @@ -983,14 +1001,18 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, snap->mapToMemory(memView); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); + std::vector expectedDirtyPages(snapPages, 0); + // Add some tightly-packed changes uint32_t offsetA = 0; std::vector dataA(10, 1); std::memcpy(mem.get() + offsetA, dataA.data(), dataA.size()); + expectedDirtyPages[0] = 1; uint32_t offsetB = dataA.size() + 1; std::vector dataB(2, 1); @@ -1032,7 +1054,10 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Check number of diffs tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + + auto dirtyRegions = tracker.getBothDirtyPages(memView); + REQUIRE(dirtyRegions == expectedDirtyPages); + std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -1045,29 +1070,26 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, { int snapPages = 3; size_t snapSize = snapPages * HOST_PAGE_SIZE; + auto snap = std::make_shared(snapSize); - std::shared_ptr snap = - std::make_shared(snapSize); - - std::map expectedRegions; + std::vector expectedRegions; SECTION("No existing regions") { - expectedRegions[0] = { - 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite - }; + expectedRegions.emplace_back( + 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); } SECTION("One region at start") { snap->addMergeRegion( 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - expectedRegions[0] = { - 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite - }; - expectedRegions[100] = { - 100, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite - }; + + expectedRegions.emplace_back( + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); + + expectedRegions.emplace_back( + 100, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); } SECTION("One region at end") @@ -1077,15 +1099,15 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - expectedRegions[0] = { 0, - snapSize - 100, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite }; + expectedRegions.emplace_back(0, + snapSize - 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); - expectedRegions[snapSize - 100] = { (uint32_t)snapSize - 100, - 100, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite }; + expectedRegions.emplace_back((uint32_t)snapSize - 100, + 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); } SECTION("Multiple regions") @@ -1104,58 +1126,51 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - expectedRegions[0] = { - 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite - }; - - expectedRegions[100] = { - 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum - }; - - expectedRegions[100 + sizeof(int)] = { - 100 + sizeof(int), - HOST_PAGE_SIZE - (100 + sizeof(int)), - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite - }; - - expectedRegions[HOST_PAGE_SIZE] = { (uint32_t)HOST_PAGE_SIZE, - sizeof(double), - SnapshotDataType::Double, - SnapshotMergeOperation::Product }; - - expectedRegions[HOST_PAGE_SIZE + sizeof(double)] = { - (uint32_t)(HOST_PAGE_SIZE + sizeof(double)), - 200 - sizeof(double), - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite - }; + expectedRegions.emplace_back( + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); - expectedRegions[HOST_PAGE_SIZE + 200] = { - (uint32_t)HOST_PAGE_SIZE + 200, - (uint32_t)HOST_PAGE_SIZE, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite - }; + expectedRegions.emplace_back( + 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum); - expectedRegions[(2 * HOST_PAGE_SIZE) + 200] = { - (uint32_t)(2 * HOST_PAGE_SIZE) + 200, - 0, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite - }; + expectedRegions.emplace_back(100 + sizeof(int), + HOST_PAGE_SIZE - (100 + sizeof(int)), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions.emplace_back((uint32_t)HOST_PAGE_SIZE, + sizeof(double), + SnapshotDataType::Double, + SnapshotMergeOperation::Product); + + expectedRegions.emplace_back( + (uint32_t)(HOST_PAGE_SIZE + sizeof(double)), + 200 - sizeof(double), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions.emplace_back((uint32_t)HOST_PAGE_SIZE + 200, + (uint32_t)HOST_PAGE_SIZE, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions.emplace_back((uint32_t)(2 * HOST_PAGE_SIZE) + 200, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); } snap->fillGapsWithOverwriteRegions(); - std::map actualRegions = - snap->getMergeRegions(); + std::vector actualRegions = snap->getMergeRegions(); + + // Sort regions to ensure consistent comparison + std::sort(actualRegions.begin(), actualRegions.end()); REQUIRE(actualRegions.size() == expectedRegions.size()); - for (auto [expectedOffset, expectedRegion] : expectedRegions) { - REQUIRE(actualRegions.find(expectedOffset) != actualRegions.end()); + for (int i = 0; i < actualRegions.size(); i++) { + SnapshotMergeRegion expectedRegion = expectedRegions[i]; + SnapshotMergeRegion actualRegion = actualRegions[i]; - SnapshotMergeRegion actualRegion = actualRegions[expectedOffset]; REQUIRE(actualRegion.offset == expectedRegion.offset); REQUIRE(actualRegion.dataType == expectedRegion.dataType); REQUIRE(actualRegion.length == expectedRegion.length); @@ -1163,6 +1178,31 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, } } +TEST_CASE("Test sorting snapshot merge regions", "[snapshot][util]") +{ + std::vector regions; + + regions.emplace_back( + 10, 20, SnapshotDataType::Double, SnapshotMergeOperation::Sum); + regions.emplace_back( + 50, 20, SnapshotDataType::Double, SnapshotMergeOperation::Sum); + regions.emplace_back( + 5, 20, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); + regions.emplace_back( + 30, 20, SnapshotDataType::Float, SnapshotMergeOperation::Max); + + std::sort(regions.begin(), regions.end()); + + std::vector expected = { + { 5, 20, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite }, + { 10, 20, SnapshotDataType::Double, SnapshotMergeOperation::Sum }, + { 30, 20, SnapshotDataType::Float, SnapshotMergeOperation::Max }, + { 50, 20, SnapshotDataType::Double, SnapshotMergeOperation::Sum }, + }; + + REQUIRE(regions == expected); +} + TEST_CASE_METHOD(SnapshotMergeTestFixture, "Test mix of applicable and non-applicable merge regions", "[snapshot][util]") @@ -1180,21 +1220,24 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, snap->mapToMemory(memView); // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); + std::vector expectedDirtyPages(snapPages, 0); + // Add a couple of merge regions on each page, which should be skipped as // they won't overlap any changes for (int i = 0; i < snapPages; i++) { - // Overwrite + // Overwrite region at bottom of page int skippedOverwriteOffset = i * HOST_PAGE_SIZE; snap->addMergeRegion(skippedOverwriteOffset, 10, faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite); - // Sum + // Sum region near top of page int skippedSumOffset = ((i + 1) * HOST_PAGE_SIZE) - (2 * sizeof(int32_t)); snap->addMergeRegion(skippedSumOffset, @@ -1203,7 +1246,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, faabric::util::SnapshotMergeOperation::Sum); } - // Add an overwrite region that should take effect + // Add an overwrite region above the one at the very bottom, and some + // modified data that should be caught by it uint32_t overwriteAOffset = (2 * HOST_PAGE_SIZE) + 20; snap->addMergeRegion(overwriteAOffset, 20, @@ -1211,10 +1255,14 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, faabric::util::SnapshotMergeOperation::Overwrite); std::vector overwriteData(10, 1); + SPDLOG_DEBUG("Mix test writing {} bytes at {}", + overwriteData.size(), + overwriteAOffset); std::memcpy( mem.get() + overwriteAOffset, overwriteData.data(), overwriteData.size()); + expectedDirtyPages[2] = 1; - // Add a sum region and data that should also take effect + // Add a sum region and modified data that should also cause a diff uint32_t sumOffset = (4 * HOST_PAGE_SIZE) + 100; int sumValue = 333; int sumOriginal = 111; @@ -1225,8 +1273,10 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, faabric::util::SnapshotDataType::Int, faabric::util::SnapshotMergeOperation::Sum); + SPDLOG_DEBUG("Mix test writing int at {}", sumOffset); snap->copyInData({ BYTES(&sumOriginal), sizeof(int) }, sumOffset); *(int*)(mem.get() + sumOffset) = sumValue; + expectedDirtyPages[4] = 1; // Check diffs std::vector expectedDiffs = { @@ -1242,7 +1292,9 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + + auto dirtyRegions = tracker.getBothDirtyPages(memView); + REQUIRE(dirtyRegions == expectedDirtyPages); std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -1251,11 +1303,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, } TEST_CASE_METHOD(SnapshotMergeTestFixture, - "Test merge regions past end of original memory", + "Test diffing memory larger than snapshot", "[snapshot][util]") { int snapPages = 6; int memPages = 10; + size_t snapSize = snapPages * HOST_PAGE_SIZE; size_t memSize = memPages * HOST_PAGE_SIZE; @@ -1270,15 +1323,18 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map only the size of the snapshot snap->mapToMemory({ mem.get(), snapSize }); + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); tracker.startTracking(memView); tracker.startThreadLocalTracking(memView); - uint32_t changeStartPage = 0; uint32_t changeOffset = 0; uint32_t mergeRegionStart = snapSize; size_t changeLength = 123; + std::vector overshootData((memPages - snapPages) * HOST_PAGE_SIZE, + 0); + // When memory has changed at or past the end of the original data, the diff // will start at the end of the original data and round up to the next page // boundary. If the change starts before the end, it will start at the @@ -1293,15 +1349,14 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, SECTION("Change on first page past end of original data, overlapping merge " "region") { - changeStartPage = snapSize; - changeOffset = changeStartPage + 100; + changeOffset = snapSize + 100; mergeRegionStart = snapSize; diffData = std::vector(100, 2); - std::vector expectedData = zeroedPage; + std::vector expectedData = overshootData; std::memset(expectedData.data() + 100, 2, 100); - // Diff should be the page after the end of the original data + // Diff should just be the whole of the updated memory expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, (uint32_t)snapSize, @@ -1310,15 +1365,14 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, SECTION("Change and merge region aligned at end of original data") { - changeStartPage = snapSize; - changeOffset = changeStartPage; + changeOffset = snapSize; mergeRegionStart = snapSize; diffData = std::vector(100, 2); - std::vector expectedData = zeroedPage; + std::vector expectedData = overshootData; std::memset(expectedData.data(), 2, 100); - // Diff should be the page after the end of the original data + // Diff should just be the whole of the updated memory expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, (uint32_t)snapSize, @@ -1327,48 +1381,52 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, SECTION("Change and merge region after end of original data") { - changeStartPage = (snapPages + 2) * HOST_PAGE_SIZE; - changeOffset = changeStartPage + 100; - mergeRegionStart = changeStartPage; + uint32_t start = (snapPages + 2) * HOST_PAGE_SIZE; + changeOffset = start + 100; + mergeRegionStart = start; diffData = std::vector(100, 2); - std::vector expectedData = zeroedPage; - std::memset(expectedData.data() + 100, 2, 100); + std::vector expectedData = overshootData; + std::memset(expectedData.data() + (2 * HOST_PAGE_SIZE) + 100, 2, 100); - // Diff should be the page after the end of the original data, - // containing the change (and not those in between) + // Diff should be the whole region of updated memory expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, - changeStartPage, + (uint32_t)snapSize, expectedData } }; } SECTION("Merge region and change both crossing end of original data") { - changeStartPage = (snapPages - 1) * HOST_PAGE_SIZE; - changeOffset = changeStartPage + 100; + // Start change one page below the end + uint32_t start = (snapPages - 1) * HOST_PAGE_SIZE; + changeOffset = start + 100; + + // Add a merge region two pages below mergeRegionStart = (snapPages - 2) * HOST_PAGE_SIZE; - // Change goes from inside original data to overshoot the end + // Change will capture the modified bytes within the original region, + // and the overshoot size_t dataSize = 2 * HOST_PAGE_SIZE; + diffData = std::vector(dataSize, 2); + size_t overlapSize = HOST_PAGE_SIZE - 100; size_t overshootSize = dataSize - overlapSize; - diffData = std::vector(dataSize, 2); - // One diff will cover the overlap with last part of original data, and - // another will be rounded up to the nearest page for the extension - std::vector expectedDataTwo(2 * HOST_PAGE_SIZE, 0); + // another will be the rest of the data + std::vector expectedDataOne(HOST_PAGE_SIZE - 100, 2); + std::vector expectedDataTwo = overshootData; std::memset(expectedDataTwo.data(), 2, overshootSize); expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, - changeOffset, - std::vector(HOST_PAGE_SIZE - 100, 2) }, + (uint32_t)snapSize, + expectedDataTwo }, { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, - (uint32_t)snapSize, - expectedDataTwo } }; + changeOffset, + expectedDataOne } }; } // Copy in the changed data @@ -1382,7 +1440,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, tracker.stopTracking(memView); tracker.stopThreadLocalTracking(memView); - auto dirtyRegions = tracker.getBothDirtyOffsets(memView); + auto dirtyRegions = tracker.getBothDirtyPages(memView); std::vector actualDiffs = snap->diffWithDirtyRegions(memView, dirtyRegions); @@ -1488,6 +1546,7 @@ TEST_CASE_METHOD(DirtyTrackingTestFixture, snap->mapToMemory(memViewA); // Clear tracking + DirtyTracker& tracker = getDirtyTracker(); tracker.clearAll(); snap->clearTrackedChanges(); @@ -1505,23 +1564,25 @@ TEST_CASE_METHOD(DirtyTrackingTestFixture, // Write data to the mapped memory std::memcpy(memA.get() + offsetC, dataC.data(), dataC.size()); - // Check diff of snapshot with memory only includes the change made to the - // memory itself + // Check diff of snapshot with memory only includes the change made to + // the memory itself tracker.stopTracking(memViewA); tracker.stopThreadLocalTracking(memViewA); - auto dirtyRegions = tracker.getBothDirtyOffsets(memViewA); + auto dirtyRegions = tracker.getBothDirtyPages(memViewA); - std::vector actualDiffs = - snap->diffWithDirtyRegions(memViewA, dirtyRegions); - REQUIRE(actualDiffs.size() == 1); + { + std::vector actualDiffs = + snap->diffWithDirtyRegions(memViewA, dirtyRegions); + REQUIRE(actualDiffs.size() == 1); - SnapshotDiff& actualDiff = actualDiffs.at(0); - REQUIRE(actualDiff.getData().size() == dataC.size()); - REQUIRE(actualDiff.getOffset() == offsetC); + SnapshotDiff& actualDiff = actualDiffs.at(0); + REQUIRE(actualDiff.getData().size() == dataC.size()); + REQUIRE(actualDiff.getOffset() == offsetC); - // Apply diffs from memory to the snapshot - snap->queueDiffs(actualDiffs); - snap->writeQueuedDiffs(); + // Apply diffs from memory to the snapshot + snap->queueDiffs(actualDiffs); + snap->writeQueuedDiffs(); + } // Check snapshot now shows both diffs std::vector snapDirtyRegions = snap->getTrackedChanges(); @@ -1557,4 +1618,221 @@ TEST_CASE_METHOD(DirtyTrackingTestFixture, std::vector remappedMemA(memB.get(), memB.get() + snapSize); REQUIRE(remappedMemA == expectedFinal); } + +TEST_CASE("Test diffing byte array regions", "[util][snapshot]") +{ + std::vector a; + std::vector b; + std::vector> expected; + + uint32_t startOffset = 0; + uint32_t endOffset = 0; + SECTION("Equal") + { + a = { 0, 1, 2, 3 }; + b = { 0, 1, 2, 3 }; + startOffset = 0; + endOffset = b.size(); + } + + SECTION("Empty") {} + + SECTION("Not equal") + { + a = { 0, 0, 2, 2, 3, 3, 4, 4, 5, 5 }; + b = { 0, 1, 1, 2, 3, 6, 6, 6, 5, 5 }; + startOffset = 0; + endOffset = b.size(); + expected = { + { 1, 2 }, + { 5, 3 }, + }; + } + + SECTION("Not equal subsections") + { + a = { 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0 }; + b = { 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0 }; + startOffset = 3; + endOffset = 10; + expected = { + { 3, 2 }, + { 6, 1 }, + { 8, 2 }, + }; + } + + SECTION("Single length") + { + a = { 0, 1, 2, 3, 4 }; + b = { 0, 1, 3, 3, 4 }; + startOffset = 0; + endOffset = b.size(); + expected = { { 2, 1 } }; + } + + SECTION("Difference at start") + { + a = { 0, 1, 2, 3, 4, 5, 6 }; + b = { 1, 2, 3, 3, 3, 4, 6 }; + startOffset = 0; + endOffset = b.size(); + expected = { { 0, 3 }, { 4, 2 } }; + } + + SECTION("Difference at end") + { + a = { 0, 1, 2, 3, 4, 5, 6 }; + b = { 0, 1, 1, 3, 3, 4, 5 }; + startOffset = 0; + endOffset = b.size(); + expected = { { 2, 1 }, { 4, 3 } }; + } + + std::vector actual; + diffArrayRegions(actual, startOffset, endOffset, a, b); + + // Convert execpted into diffs + std::vector expectedDiffs; + for (auto p : expected) { + expectedDiffs.emplace_back( + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite, + p.first, + std::span(b.data() + p.first, + b.data() + p.first + p.second)); + } + + REQUIRE(actual.size() == expected.size()); + for (int i = 0; i < actual.size(); i++) { + REQUIRE(actual.at(i).getOffset() == expectedDiffs.at(i).getOffset()); + REQUIRE(actual.at(i).getDataCopy() == + expectedDiffs.at(i).getDataCopy()); + REQUIRE(actual.at(i).getDataType() == + expectedDiffs.at(i).getDataType()); + REQUIRE(actual.at(i).getOperation() == + expectedDiffs.at(i).getOperation()); + } +} + +TEST_CASE("Test snapshot merge region equality", "[snapshot][util]") +{ + SECTION("Equal") + { + SnapshotMergeRegion a( + 10, 12, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + SnapshotMergeRegion b( + 10, 12, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + REQUIRE(a == b); + } + + SECTION("Offset unequal") + { + SnapshotMergeRegion a( + 123, 12, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + SnapshotMergeRegion b( + 10, 12, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + REQUIRE(a != b); + } + + SECTION("Length unequal") + { + SnapshotMergeRegion a( + 10, 22, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + SnapshotMergeRegion b( + 10, 12, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + REQUIRE(a != b); + } + + SECTION("Data type unequal") + { + SnapshotMergeRegion a( + 10, 22, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + SnapshotMergeRegion b( + 10, 22, SnapshotDataType::Bool, SnapshotMergeOperation::Ignore); + REQUIRE(a != b); + } + + SECTION("Operation unequal") + { + SnapshotMergeRegion a( + 10, 22, SnapshotDataType::Double, SnapshotMergeOperation::Ignore); + SnapshotMergeRegion b( + 10, 22, SnapshotDataType::Double, SnapshotMergeOperation::Sum); + REQUIRE(a != b); + } + SECTION("Default constructors equal") + { + SnapshotMergeRegion a; + SnapshotMergeRegion b; + REQUIRE(a == b); + } +} + +TEST_CASE_METHOD(SnapshotMergeTestFixture, + "Test diffing snapshot memory with none tracking", + "[snapshot][util]") +{ + faabric::util::getSystemConfig().dirtyTrackingMode = "none"; + + // Create snapshot + int snapPages = 4; + size_t snapSize = snapPages * HOST_PAGE_SIZE; + + std::shared_ptr snap = + std::make_shared(snapSize); + reg.registerSnapshot(snapKey, snap); + + // Map some memory + MemoryRegion mem = allocatePrivateMemory(snapSize); + std::span memView(mem.get(), snapSize); + snap->mapToMemory(memView); + + // Reset dirty tracking + DirtyTracker& tracker = getDirtyTracker(); + REQUIRE(tracker.getType() == "none"); + tracker.clearAll(); + + // Start tracking + tracker.startTracking(memView); + tracker.startThreadLocalTracking(memView); + + // Update the memory + std::vector dataA = { 1, 2, 3, 4 }; + std::vector dataB = { 3, 4, 5 }; + uint32_t offsetA = 0; + uint32_t offsetB = 2 * HOST_PAGE_SIZE + 1; + + std::memcpy(mem.get() + offsetA, dataA.data(), dataA.size()); + std::memcpy(mem.get() + offsetB, dataB.data(), dataB.size()); + + // Stop tracking + tracker.stopTracking(memView); + tracker.stopThreadLocalTracking(memView); + + // Get diffs + std::vector expectedDirtyPages(snapPages, 1); + std::vector dirtyPages = tracker.getBothDirtyPages(memView); + REQUIRE(dirtyPages == expectedDirtyPages); + + // Diff with snapshot + snap->fillGapsWithOverwriteRegions(); + std::vector actualRegions = snap->getMergeRegions(); + REQUIRE(actualRegions.size() == 1); + REQUIRE(actualRegions.at(0).offset == 0); + REQUIRE(actualRegions.at(0).length == 0); + + std::vector actual = + snap->diffWithDirtyRegions(memView, dirtyPages); + + REQUIRE(actual.size() == 2); + SnapshotDiff diffA = actual.at(0); + SnapshotDiff diffB = actual.at(1); + + REQUIRE(diffA.getOffset() == offsetA); + REQUIRE(diffB.getOffset() == offsetB); + + REQUIRE(diffA.getDataCopy() == dataA); + REQUIRE(diffB.getDataCopy() == dataB); +} } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index b72ea04d0..00da2df51 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -65,14 +65,9 @@ class StateTestFixture class DirtyTrackingTestFixture { public: - DirtyTrackingTestFixture() - : tracker(faabric::util::getDirtyTracker()) - {} + DirtyTrackingTestFixture() {} - ~DirtyTrackingTestFixture() { tracker.clearAll(); } - - protected: - faabric::util::DirtyTracker& tracker; + ~DirtyTrackingTestFixture() { faabric::util::getDirtyTracker().clearAll(); } }; class SchedulerTestFixture : public DirtyTrackingTestFixture @@ -266,8 +261,8 @@ class PointToPointTestFixture ~PointToPointTestFixture() { - // Note - here we reset the thread-local cache for the test thread. If - // other threads are used in the tests, they too must do this. + // Here we reset the thread-local cache for the test thread. If other + // threads are used in the tests, they too must do this. broker.resetThreadLocalCache(); faabric::transport::clearSentMessages();