From d83af5255db9c4a557264542647f7ccb281e6840 Mon Sep 17 00:00:00 2001 From: Ashlesh Gawande Date: Fri, 14 Jul 2023 14:37:18 -0500 Subject: [PATCH] Replace FullSync algorithm and fix bugs to lower delay and overhead 1) During high update frequency PSync nodes can send data that contain nothing new. This blocks the receiver nodes from syncing leading to high delays. To fix this, Sync interests now include the cumulative number of elements inserted into the IBF. This is used as an heuristic to guide PSync on decode failures to minimize the chance of sending of such data packets. Sync data freshness for entire dataset is also reduced. If a no new update still happens, introduce some safeguards in the code. 2) Fast reaction upon negative set being detected is introduced. Timed processing for interests is also introduced. 3) Fix satisfyPendingInterests to send the new update on publish even if pending IBF decode fails, since we have something to send. 4) For interests which request segments, only respond with segments already contained in in-memory storage. refs: [solution II] in Thesis: "Improvements to PSync: Distributed Full Dataset Synchronization in Named-Data Networking" https://digitalcommons.memphis.edu/cgi/viewcontent.cgi?article=3162&context=etd Change-Id: Ie235b4fb56fcb7de21068511205e407006292b23 --- AUTHORS.md | 2 + PSync/common.hpp | 2 +- PSync/consumer.hpp | 4 +- PSync/full-producer.cpp | 341 ++++++++++++++++++++++++++--------- PSync/full-producer.hpp | 48 +++-- PSync/producer-base.cpp | 6 +- PSync/producer-base.hpp | 5 +- examples/full-sync.cpp | 7 +- tests/test-full-producer.cpp | 51 +++++- tests/test-full-sync.cpp | 53 +----- 10 files changed, 360 insertions(+), 159 deletions(-) diff --git a/AUTHORS.md b/AUTHORS.md index 8391947..6e0b086 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -7,6 +7,8 @@ If you would like to become a contributor to the official repository, please fol * Alexander Afanasyev * ***(Maintainer)*** Saurab Dulal * ***(Former Maintainer)*** Ashlesh Gawande +* Dylan Hensley +* Alexander Lane * Eric Newberry * Davide Pesavento * Junxiao Shi diff --git a/PSync/common.hpp b/PSync/common.hpp index 2bea73d..83fa1b8 100644 --- a/PSync/common.hpp +++ b/PSync/common.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2022, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. diff --git a/PSync/consumer.hpp b/PSync/consumer.hpp index 7d7995d..1cbd204 100644 --- a/PSync/consumer.hpp +++ b/PSync/consumer.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -65,7 +65,7 @@ class Consumer /// Callback to give sync data back to application. UpdateCallback onUpdate = [] (const auto&) {}; /// Number of expected elements (subscriptions) in Bloom filter. - uint32_t bfCount = 80; + uint32_t bfCount = 6; /// Bloom filter false positive probability. double bfFalsePositive = 0.001; /// Lifetime of hello Interest. diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp index 4a3401d..5073ab5 100644 --- a/PSync/full-producer.cpp +++ b/PSync/full-producer.cpp @@ -84,12 +84,20 @@ FullProducer::publishName(const ndn::Name& prefix, std::optional seq) uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1); NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq); updateSeqNo(prefix, newSeq); - satisfyPendingInterests(); + + m_inNoNewDataWaitOutPeriod = false; + + satisfyPendingInterests(ndn::Name(prefix).appendNumber(newSeq)); } void FullProducer::sendSyncInterest() { + if (m_inNoNewDataWaitOutPeriod) { + NDN_LOG_TRACE("Cannot send sync Interest as Data is expected from CS"); + return; + } + // If we send two sync interest one after the other // since there is no new data in the network yet, // when data is available it may satisfy both of them @@ -102,6 +110,15 @@ FullProducer::sendSyncInterest() // Append our latest IBF m_iblt.appendToName(syncInterestName); + // Append cumulative updates that has been inserted into this IBF + syncInterestName.appendNumber(m_numOwnElements); + + auto currentTime = ndn::time::system_clock::now(); + if ((currentTime - m_lastInterestSentTime < ndn::time::milliseconds(MIN_JITTER)) && + (m_outstandingInterestName == syncInterestName)) { + NDN_LOG_TRACE("Suppressing Interest: " << std::hash{}(syncInterestName)); + return; + } m_outstandingInterestName = syncInterestName; @@ -117,6 +134,14 @@ FullProducer::sendSyncInterest() options.maxTimeout = m_syncInterestLifetime; options.rttOptions.initialRto = m_syncInterestLifetime; + // This log message must be before sending the Interest through SegmentFetcher + // because getNonce generates a Nonce for this Interest. + // SegmentFetcher makes a copy of this Interest, so if we print the Nonce + // after, that Nonce will be different than the one seen in tshark! + NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() << + ", hash: " << std::hash{}(syncInterestName)); + + m_lastInterestSentTime = currentTime; m_fetcher = SegmentFetcher::start(m_face, syncInterest, ndn::security::getAcceptAllValidator(), options); @@ -142,42 +167,98 @@ FullProducer::sendSyncInterest() // timeout before it happens. if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) { auto after = ndn::time::milliseconds(m_jitter(m_rng)); - NDN_LOG_DEBUG("Schedule sync interest after: " << after); + NDN_LOG_DEBUG("Schedule sync Interest after: " << after); m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); }); } }); +} - NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() << - ", hash: " << std::hash{}(syncInterestName)); +void +FullProducer::processWaitingInterests() +{ + NDN_LOG_TRACE("Processing waiting Interest list, size: " << m_waitingForProcessing.size()); + if (m_waitingForProcessing.size() == 0) { + return; + } + + for (auto it = m_waitingForProcessing.begin(); it != m_waitingForProcessing.end();) { + if (it->second.numTries == std::numeric_limits::max()) { + NDN_LOG_TRACE("Interest with hash already marked for deletion, removing now: " << + std::hash{}(it->first)); + it = m_waitingForProcessing.erase(it); + continue; + } + + it->second.numTries += 1; + ndn::Interest interest(it->first); + interest.setNonce(it->second.nonce); + onSyncInterest(m_syncPrefix, interest, true); + if (it->second.numTries == std::numeric_limits::max()) { + NDN_LOG_TRACE("Removing Interest with hash: " << std::hash{}(it->first)); + it = m_waitingForProcessing.erase(it); + } + else { + ++it; + } + } + NDN_LOG_TRACE("Done processing waiting Interest list, size: " << m_waitingForProcessing.size()); } void -FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest) +FullProducer::scheduleProcessWaitingInterests() { - // TODO: answer only segments from store. - if (m_segmentPublisher.replyFromStore(interest.getName())) { + // If nothing waiting, no need to schedule + if (m_waitingForProcessing.size() == 0) { return; } - ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size()); - ndn::Name interestName; + if (!m_interestDelayTimerId) { + auto after = ndn::time::milliseconds(m_jitter(m_rng)); + NDN_LOG_TRACE("Setting a timer to processes waiting Interest(s) in: " << after); + + m_interestDelayTimerId = m_scheduler.schedule(after, [=] { + NDN_LOG_TRACE("Timer has expired, trying to process waiting Interest(s)"); + processWaitingInterests(); + scheduleProcessWaitingInterests(); + }); + } +} + +void +FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest, + bool isTimedProcessing) +{ + ndn::Name interestName = interest.getName(); + auto interestNameHash = std::hash{}(interestName); + NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() << + ", hash: " << interestNameHash); - if (nameWithoutSyncPrefix.size() == 1) { - // Get //IBF from //IBF - interestName = interest.getName(); + if (isTimedProcessing) { + NDN_LOG_TRACE("Delayed Interest being processed now"); } - else if (nameWithoutSyncPrefix.size() == 3) { - // Get //IBF from //IBF// - interestName = interest.getName().getPrefix(-2); + + if (m_segmentPublisher.replyFromStore(interestName)) { + NDN_LOG_DEBUG("Answer from memory"); + return; } - else { + + ndn::Name nameWithoutSyncPrefix = interestName.getSubName(prefixName.size()); + + if (nameWithoutSyncPrefix.size() == 4) { + // //// + NDN_LOG_DEBUG("Segment not found in memory. Other side will have to restart"); + // This should have been answered from publisher Cache! + sendApplicationNack(prefixName); return; } - ndn::name::Component ibltName = interestName.get(interestName.size() - 1); + if (nameWithoutSyncPrefix.size() != 2) { + NDN_LOG_WARN("Two components required after sync prefix: //; received: " << interestName); + return; + } - NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() << - ", hash: " << std::hash{}(interestName)); + ndn::name::Component ibltName = interestName[-2]; + uint64_t numRcvdElements = interestName[-1].toNumber(); detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression); try { @@ -189,80 +270,157 @@ FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& i } auto diff = m_iblt - iblt; + + NDN_LOG_TRACE("Decode, positive: " << diff.positive.size() + << " negative: " << diff.negative.size() << " m_threshold: " + << m_threshold); + + auto waitingIt = m_waitingForProcessing.find(interestName); + if (!diff.canDecode) { - NDN_LOG_TRACE("Cannot decode differences, positive: " << diff.positive.size() - << " negative: " << diff.negative.size() << " m_threshold: " - << m_threshold); - - // Send all data if greater then threshold, else send positive below as usual - // Or send if we can't get neither positive nor negative differences - if (diff.positive.size() + diff.negative.size() >= m_threshold || - (diff.positive.empty() && diff.negative.empty())) { + NDN_LOG_DEBUG("Cannot decode differences!"); + + if (numRcvdElements > m_numOwnElements) { + if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) { + NDN_LOG_TRACE("Decode failure, adding to waiting Interest list " << interestNameHash); + m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()}); + scheduleProcessWaitingInterests(); + } + else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) { + if (waitingIt->second.numTries > 1) { + NDN_LOG_TRACE("Decode failure, still behind. Erasing waiting Interest as we have tried twice"); + waitingIt->second.numTries = std::numeric_limits::max(); // markWaitingInterestForDeletion + NDN_LOG_DEBUG("Waiting Interest has been deleted. Sending new sync interest"); + sendSyncInterest(); + } + else { + NDN_LOG_TRACE("Decode failure, still behind, waiting more till the next timer"); + } + } + else { + NDN_LOG_TRACE("Decode failure, still behind"); + } + } + else { + if (m_numOwnElements == numRcvdElements && diff.positive.size() == 0 && diff.negative.size() > 0) { + NDN_LOG_TRACE("We have nothing to offer and are actually behind"); +#ifdef PSYNC_WITH_TESTS + ++nIbfDecodeFailuresBelowThreshold; +#endif // PSYNC_WITH_TESTS + return; + } + detail::State state; for (const auto& content : m_prefixes) { if (content.second != 0) { state.addContent(ndn::Name(content.first).appendNumber(content.second)); } } +#ifdef PSYNC_WITH_TESTS + ++nIbfDecodeFailuresAboveThreshold; +#endif // PSYNC_WITH_TESTS if (!state.getContent().empty()) { - sendSyncData(interest.getName(), state.wireEncode()); + NDN_LOG_DEBUG("Sending entire state: " << state); + // Want low freshness when potentially sending large content to clear it quickly from the network + sendSyncData(interestName, state.wireEncode(), 10_ms); + // Since we're directly sending the data, we need to clear pending interests here + deletePendingInterests(interestName); + } + // We seem to be ahead, delete the Interest from waiting list + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); } - -#ifdef PSYNC_WITH_TESTS - ++nIbfDecodeFailuresAboveThreshold; -#endif // PSYNC_WITH_TESTS - return; } - -#ifdef PSYNC_WITH_TESTS - ++nIbfDecodeFailuresBelowThreshold; -#endif // PSYNC_WITH_TESTS + return; } - detail::State state; - for (const auto& hash : diff.positive) { - auto nameIt = m_biMap.left.find(hash); - if (nameIt != m_biMap.left.end()) { - ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1); - // Don't sync up sequence number zero - if (m_prefixes[nameWithoutSeq] != 0 && - !isFutureHash(nameWithoutSeq, diff.negative)) { - state.addContent(nameIt->second); + if (diff.positive.size() == 0 && diff.negative.size() == 0) { + NDN_LOG_TRACE("Saving positive: " << diff.positive.size() << " negative: " << diff.negative.size()); + + auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second; + entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(), + [this, interest] { + NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce()); + m_pendingEntries.erase(interest.getName()); + }); + + // Can't delete directly in this case as it will cause + // memory access errors with the for loop in processWaitingInterests + if (isTimedProcessing) { + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); } } + return; } - if (!state.getContent().empty()) { - NDN_LOG_DEBUG("Sending sync content: " << state); - sendSyncData(interestName, state.wireEncode()); + // Only add to waiting list if we don't have anything to send (positive = 0) + if (diff.positive.size() == 0 && diff.negative.size() > 0) { + if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) { + NDN_LOG_TRACE("Adding Interest to waiting list: " << interestNameHash); + m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()}); + scheduleProcessWaitingInterests(); + } + else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) { + if (waitingIt->second.numTries > 1) { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash << + ". Erasing waiting Interest as we have tried twice"); + waitingIt->second.numTries = std::numeric_limits::max(); // markWaitingInterestForDeletion + } + else { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash << + ". Keep waiting for Interest as number of tries is not exhausted"); + } + } + else { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash); + } return; } - auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second; - entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(), - [this, interest] { - NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce()); - m_pendingEntries.erase(interest.getName()); - }); + if (diff.positive.size() > 0) { + detail::State state; + for (const auto& hash : diff.positive) { + auto nameIt = m_biMap.left.find(hash); + if (nameIt != m_biMap.left.end()) { + ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1); + // Don't sync up sequence number zero + if (m_prefixes[nameWithoutSeq] != 0 && + !isFutureHash(nameWithoutSeq.toUri(), diff.negative)) { + state.addContent(nameIt->second); + } + } + } + + if (!state.getContent().empty()) { + NDN_LOG_DEBUG("Sending sync content: " << state); + sendSyncData(interestName, state.wireEncode(), m_syncReplyFreshness); + + // Timed processing or not - if we are answering it, it should not go in waiting Interests + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); + } + } + } } void -FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block) +FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block, + ndn::time::milliseconds syncReplyFreshness) { bool isSatisfyingOwnInterest = m_outstandingInterestName == name; if (isSatisfyingOwnInterest && m_fetcher) { - NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)"); + NDN_LOG_DEBUG("Removing our pending Interest from face (stop fetcher)"); m_fetcher->stop(); m_outstandingInterestName.clear(); } NDN_LOG_DEBUG("Sending sync Data"); auto content = detail::compress(m_contentCompression, block); - m_segmentPublisher.publish(name, name, *content, m_syncReplyFreshness); - + m_segmentPublisher.publish(name, name, *content, syncReplyFreshness); if (isSatisfyingOwnInterest) { - NDN_LOG_TRACE("Renewing sync interest"); + NDN_LOG_DEBUG("Renewing sync interest"); sendSyncInterest(); } } @@ -299,54 +457,67 @@ FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPt } } - // We just got the data, so send a new sync interest if (!updates.empty()) { m_onUpdate(updates); - NDN_LOG_TRACE("Renewing sync interest"); - sendSyncInterest(); + // Wait a bit to let neighbors get the data too + auto after = ndn::time::milliseconds(m_jitter(m_rng)); + m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { + NDN_LOG_DEBUG("Got updates, renewing sync Interest now"); + sendSyncInterest(); + }); + NDN_LOG_DEBUG("Schedule sync Interest after: " << after); + m_inNoNewDataWaitOutPeriod = false; + + processWaitingInterests(); } else { - NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() << + NDN_LOG_TRACE("No new update, Interest nonce: " << interest.getNonce() << " , hash: " << std::hash{}(interest.getName())); + m_inNoNewDataWaitOutPeriod = true; + + // Have to wait, otherwise will get same data from CS + auto after = m_syncReplyFreshness + ndn::time::milliseconds(m_jitter(m_rng)); + m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { + NDN_LOG_DEBUG("Sending sync Interest after no new update"); + m_inNoNewDataWaitOutPeriod = false; + sendSyncInterest(); + }); + NDN_LOG_DEBUG("Schedule sync after: " << after); } + } void -FullProducer::satisfyPendingInterests() +FullProducer::satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq) { - NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size()); + NDN_LOG_DEBUG("Satisfying full sync Interest: " << m_pendingEntries.size()); for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) { + NDN_LOG_TRACE("Satisfying pending Interest: " << std::hash{}(it->first.getPrefix(-1))); const auto& entry = it->second; auto diff = m_iblt - entry.iblt; - if (!diff.canDecode) { - NDN_LOG_TRACE("Decode failed for pending interest"); - if (diff.positive.size() + diff.negative.size() >= m_threshold || - (diff.positive.empty() && diff.negative.empty())) { - NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest"); - it = m_pendingEntries.erase(it); - continue; - } - } + NDN_LOG_TRACE("Decoded: " << diff.canDecode << " positive: " << diff.positive.size() << + " negative: " << diff.negative.size()); detail::State state; + bool publishedPrefixInDiff = false; for (const auto& hash : diff.positive) { auto nameIt = m_biMap.left.find(hash); if (nameIt != m_biMap.left.end()) { - if (m_prefixes[nameIt->second.getPrefix(-1)] != 0) { - state.addContent(nameIt->second); + if (updatedPrefixWithSeq == nameIt->second) { + publishedPrefixInDiff = true; } + state.addContent(nameIt->second); } } - if (!state.getContent().empty()) { - NDN_LOG_DEBUG("Satisfying sync content: " << state); - sendSyncData(it->first, state.wireEncode()); - it = m_pendingEntries.erase(it); - } - else { - ++it; + if (!publishedPrefixInDiff) { + state.addContent(updatedPrefixWithSeq); } + + NDN_LOG_DEBUG("Satisfying sync content: " << state); + sendSyncData(it->first, state.wireEncode(), m_syncReplyFreshness); + it = m_pendingEntries.erase(it); } } @@ -363,7 +534,7 @@ FullProducer::deletePendingInterests(const ndn::Name& interestName) { auto it = m_pendingEntries.find(interestName); if (it != m_pendingEntries.end()) { - NDN_LOG_TRACE("Delete pending interest: " << interestName); + NDN_LOG_TRACE("Delete pending Interest: " << std::hash{}(interestName)); it = m_pendingEntries.erase(it); } } diff --git a/PSync/full-producer.hpp b/PSync/full-producer.hpp index 4644727..2e331c0 100644 --- a/PSync/full-producer.hpp +++ b/PSync/full-producer.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #ifndef PSYNC_FULL_PRODUCER_HPP #define PSYNC_FULL_PRODUCER_HPP @@ -112,6 +112,12 @@ class FullProducer : public ProducerBase void sendSyncInterest(); + void + processWaitingInterests(); + + void + scheduleProcessWaitingInterests(); + /** * @brief Process sync interest from other parties * @@ -125,9 +131,11 @@ class FullProducer : public ProducerBase * * @param prefixName prefix for sync group which we registered * @param interest the interest we got + * @param isTimedProcessing is this interest from the waiting interests list */ void - onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest); + onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest, + bool isTimedProcessing = false); /** * @brief Send sync data @@ -138,9 +146,11 @@ class FullProducer : public ProducerBase * * @param name name to be set as data name * @param block the content of the data + * @param syncReplyFreshness the freshness to use for the sync data; defaults to @p SYNC_REPLY_FRESHNESS */ void - sendSyncData(const ndn::Name& name, const ndn::Block& block); + sendSyncData(const ndn::Name& name, const ndn::Block& block, + ndn::time::milliseconds syncReplyFreshness); /** * @brief Process sync data @@ -164,12 +174,14 @@ class FullProducer : public ProducerBase /** * @brief Satisfy pending sync interests * - * For pending sync interests SI, if IBF of SI has any difference from our own IBF: - * send data back. - * If we can't decode differences from the stored IBF, then delete it. + * For pending sync interests do a difference with current IBF to find out missing prefixes. + * Send [Missing Prefixes] union @p updatedPrefixWithSeq + * + * This is because it is called from publish, so the @p updatedPrefixWithSeq must be missing + * from other nodes regardless of IBF difference failure. */ void - satisfyPendingInterests(); + satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq); /** * @brief Delete pending sync interests that match given name @@ -177,7 +189,7 @@ class FullProducer : public ProducerBase void deletePendingInterests(const ndn::Name& interestName); - /** + /** * @brief Check if hash(prefix + 1) is in negative * * Sometimes what happens is that interest from other side @@ -199,15 +211,29 @@ class FullProducer : public ProducerBase ndn::scheduler::ScopedEventId expirationEvent; }; - std::map m_pendingEntries; + struct WaitingEntryInfo + { + uint16_t numTries = 0; + ndn::Interest::Nonce nonce; + }; + ndn::time::milliseconds m_syncInterestLifetime; UpdateCallback m_onUpdate; ndn::scheduler::ScopedEventId m_scheduledSyncInterestId; - std::uniform_int_distribution<> m_jitter{100, 500}; + static constexpr int MIN_JITTER = 100; + static constexpr int MAX_JITTER = 500; + std::uniform_int_distribution<> m_jitter{MIN_JITTER, MAX_JITTER}; + ndn::time::system_clock::time_point m_lastInterestSentTime; ndn::Name m_outstandingInterestName; ndn::ScopedRegisteredPrefixHandle m_registeredPrefix; std::shared_ptr m_fetcher; uint64_t m_incomingFace = 0; + std::map m_waitingForProcessing; + bool m_inNoNewDataWaitOutPeriod = false; + ndn::scheduler::ScopedEventId m_interestDelayTimerId; + +PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE: + std::map m_pendingEntries; }; } // namespace psync diff --git a/PSync/producer-base.cpp b/PSync/producer-base.cpp index faa3ea6..a43afb5 100644 --- a/PSync/producer-base.cpp +++ b/PSync/producer-base.cpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include "PSync/producer-base.hpp" #include "PSync/detail/util.hpp" @@ -115,6 +115,8 @@ ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq) auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq); m_biMap.insert({newHash, prefixWithSeq}); m_iblt.insert(newHash); + + m_numOwnElements += (seq - oldSeq); } void diff --git a/PSync/producer-base.hpp b/PSync/producer-base.hpp index faaa926..e857d5a 100644 --- a/PSync/producer-base.hpp +++ b/PSync/producer-base.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #ifndef PSYNC_PRODUCER_BASE_HPP #define PSYNC_PRODUCER_BASE_HPP @@ -176,6 +176,7 @@ class ProducerBase const ndn::time::milliseconds m_syncReplyFreshness; const CompressionScheme m_ibltCompression; const CompressionScheme m_contentCompression; + uint64_t m_numOwnElements = 0; }; } // namespace psync diff --git a/examples/full-sync.cpp b/examples/full-sync.cpp index 6248336..e202a83 100644 --- a/examples/full-sync.cpp +++ b/examples/full-sync.cpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include @@ -37,7 +37,7 @@ class Producer /** * @brief Initialize producer and schedule updates. * - * Set IBF size as 80 expecting 80 updates to IBF in a sync cycle. + * Use default IBF size of 6 as we're expecting 6 updates to IBF in a sync cycle. * Set syncInterestLifetime and syncDataFreshness to 1.6 seconds. * userPrefix is the prefix string of user node prefixes. */ @@ -46,7 +46,6 @@ class Producer : m_producer(m_face, m_keyChain, syncPrefix, [this] { psync::FullProducer::Options opts; opts.onUpdate = std::bind(&Producer::processSyncUpdate, this, _1); - opts.ibfCount = 80; opts.syncInterestLifetime = 1600_ms; opts.syncDataFreshness = 1600_ms; return opts; diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp index c833181..bcb753a 100644 --- a/tests/test-full-producer.cpp +++ b/tests/test-full-producer.cpp @@ -15,11 +15,12 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include "PSync/full-producer.hpp" #include "PSync/detail/util.hpp" + #include "tests/boost-test.hpp" #include "tests/io-fixture.hpp" #include "tests/key-chain-fixture.hpp" @@ -87,6 +88,54 @@ BOOST_AUTO_TEST_CASE(OnSyncDataDecodeFailure) BOOST_CHECK_NO_THROW(node.onSyncData(syncInterest, goodCompressBadBlock)); } +BOOST_AUTO_TEST_CASE(SatisfyPendingInterestsBehavior) +{ + Name syncPrefix("/psync"); + FullProducer::Options opts; + opts.ibfCount = 6; + FullProducer node(m_face, m_keyChain, syncPrefix, opts); + + Name syncInterestName(syncPrefix); + node.m_iblt.appendToName(syncInterestName); + syncInterestName.appendNumber(1); + Interest syncInterest(syncInterestName); + + node.addUserNode(syncPrefix); + + node.onSyncInterest(syncPrefix, syncInterest); + + BOOST_CHECK_EQUAL(node.m_pendingEntries.size(), 1); + + // Test whether data is still sent if IBF diff is greater than default threshhold. + auto prefix1 = Name("/test/alice").appendNumber(1); + uint32_t newHash1 = psync::detail::murmurHash3(42, prefix1); + node.m_iblt.insert(newHash1); + + auto prefix2 = Name("/test/bob").appendNumber(1); + uint32_t newHash2 = psync::detail::murmurHash3(42, prefix2); + node.m_iblt.insert(newHash2); + + auto prefix3 = Name("/test/carol").appendNumber(1); + uint32_t newHash3 = psync::detail::murmurHash3(42, prefix3); + node.m_iblt.insert(newHash3); + + auto prefix4 = Name("/test/david").appendNumber(1); + uint32_t newHash4 = psync::detail::murmurHash3(42, prefix4); + node.m_iblt.insert(newHash4); + + auto prefix5 = Name("/test/erin").appendNumber(1); + uint32_t newHash5 = psync::detail::murmurHash3(42, prefix5); + node.m_iblt.insert(newHash5); + + node.publishName(syncPrefix); + + advanceClocks(10_ms); + + BOOST_CHECK_EQUAL(m_face.sentData.size(), 1); + + BOOST_CHECK_EQUAL(node.m_pendingEntries.empty(), true); +} + BOOST_AUTO_TEST_SUITE_END() } // namespace psync::tests diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp index c483d89..ff25728 100644 --- a/tests/test-full-sync.cpp +++ b/tests/test-full-sync.cpp @@ -297,7 +297,7 @@ BOOST_AUTO_TEST_CASE(MultipleNodesSimultaneousPublish) nodes[i]->publishName(userPrefixes[i]); } - advanceClocks(10_ms, 100); + advanceClocks(100_ms, 100); for (int i = 0; i < 4; i++) { for (int j = 0; j < 4; j++) { BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 1); @@ -308,7 +308,7 @@ BOOST_AUTO_TEST_CASE(MultipleNodesSimultaneousPublish) nodes[i]->publishName(userPrefixes[i], 4); } - advanceClocks(10_ms, 100); + advanceClocks(100_ms, 100); for (int i = 0; i < 4; i++) { for (int j = 0; j < 4; j++) { BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 4); @@ -464,55 +464,6 @@ BOOST_AUTO_TEST_CASE(DiffIBFDecodeFailureMultipleNodes) }); } -BOOST_AUTO_TEST_CASE(DelayedSecondSegment) -{ - addNode(0); - - int i = 0; - detail::State state; - std::shared_ptr compressed; - do { - auto prefixToPublish = makeSubPrefix(0, i++); - nodes[0]->addUserNode(prefixToPublish); - nodes[0]->publishName(prefixToPublish); - - state.addContent(Name(prefixToPublish).appendNumber(nodes[0]->m_prefixes[prefixToPublish])); - - auto block = state.wireEncode(); - compressed = detail::compress(nodes[0]->m_contentCompression, block); - } while (compressed->size() < (ndn::MAX_NDN_PACKET_SIZE >> 1)); - - advanceClocks(10_ms, 100); - - Name syncInterestName(syncPrefix); - detail::IBLT iblt(40, nodes[0]->m_ibltCompression); - iblt.appendToName(syncInterestName); - - nodes[0]->onSyncInterest(syncPrefix, Interest(syncInterestName)); - - advanceClocks(10_ms); - - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2); - // Expire contents from segmentPublisher - advanceClocks(10_ms, 100); - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 0); - - // Get data name from face and increase segment number to form next interest - BOOST_REQUIRE(!faces[0]->sentData.empty()); - Name dataName = faces[0]->sentData.front().getName(); - Name interestName = dataName.getPrefix(-1).appendSegment(1); - faces[0]->sentData.clear(); - - nodes[0]->onSyncInterest(syncPrefix, Interest(interestName)); - advanceClocks(10_ms); - - // Should have repopulated SegmentPublisher - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2); - // Should have received the second data segment this time - BOOST_REQUIRE(!faces[0]->sentData.empty()); - BOOST_CHECK_EQUAL(faces[0]->sentData.front().getName().at(-1).toSegment(), 1); -} - BOOST_AUTO_TEST_SUITE_END() } // namespace psync::tests