diff --git a/AUTHORS.md b/AUTHORS.md index 8391947..6e0b086 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -7,6 +7,8 @@ If you would like to become a contributor to the official repository, please fol * Alexander Afanasyev * ***(Maintainer)*** Saurab Dulal * ***(Former Maintainer)*** Ashlesh Gawande +* Dylan Hensley +* Alexander Lane * Eric Newberry * Davide Pesavento * Junxiao Shi diff --git a/PSync/common.hpp b/PSync/common.hpp index 2bea73d..83fa1b8 100644 --- a/PSync/common.hpp +++ b/PSync/common.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2022, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. diff --git a/PSync/consumer.hpp b/PSync/consumer.hpp index 7d7995d..1cbd204 100644 --- a/PSync/consumer.hpp +++ b/PSync/consumer.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -65,7 +65,7 @@ class Consumer /// Callback to give sync data back to application. UpdateCallback onUpdate = [] (const auto&) {}; /// Number of expected elements (subscriptions) in Bloom filter. - uint32_t bfCount = 80; + uint32_t bfCount = 6; /// Bloom filter false positive probability. double bfFalsePositive = 0.001; /// Lifetime of hello Interest. diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp index 4a3401d..5073ab5 100644 --- a/PSync/full-producer.cpp +++ b/PSync/full-producer.cpp @@ -84,12 +84,20 @@ FullProducer::publishName(const ndn::Name& prefix, std::optional seq) uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1); NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq); updateSeqNo(prefix, newSeq); - satisfyPendingInterests(); + + m_inNoNewDataWaitOutPeriod = false; + + satisfyPendingInterests(ndn::Name(prefix).appendNumber(newSeq)); } void FullProducer::sendSyncInterest() { + if (m_inNoNewDataWaitOutPeriod) { + NDN_LOG_TRACE("Cannot send sync Interest as Data is expected from CS"); + return; + } + // If we send two sync interest one after the other // since there is no new data in the network yet, // when data is available it may satisfy both of them @@ -102,6 +110,15 @@ FullProducer::sendSyncInterest() // Append our latest IBF m_iblt.appendToName(syncInterestName); + // Append cumulative updates that has been inserted into this IBF + syncInterestName.appendNumber(m_numOwnElements); + + auto currentTime = ndn::time::system_clock::now(); + if ((currentTime - m_lastInterestSentTime < ndn::time::milliseconds(MIN_JITTER)) && + (m_outstandingInterestName == syncInterestName)) { + NDN_LOG_TRACE("Suppressing Interest: " << std::hash{}(syncInterestName)); + return; + } m_outstandingInterestName = syncInterestName; @@ -117,6 +134,14 @@ FullProducer::sendSyncInterest() options.maxTimeout = m_syncInterestLifetime; options.rttOptions.initialRto = m_syncInterestLifetime; + // This log message must be before sending the Interest through SegmentFetcher + // because getNonce generates a Nonce for this Interest. + // SegmentFetcher makes a copy of this Interest, so if we print the Nonce + // after, that Nonce will be different than the one seen in tshark! + NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() << + ", hash: " << std::hash{}(syncInterestName)); + + m_lastInterestSentTime = currentTime; m_fetcher = SegmentFetcher::start(m_face, syncInterest, ndn::security::getAcceptAllValidator(), options); @@ -142,42 +167,98 @@ FullProducer::sendSyncInterest() // timeout before it happens. if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) { auto after = ndn::time::milliseconds(m_jitter(m_rng)); - NDN_LOG_DEBUG("Schedule sync interest after: " << after); + NDN_LOG_DEBUG("Schedule sync Interest after: " << after); m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); }); } }); +} - NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() << - ", hash: " << std::hash{}(syncInterestName)); +void +FullProducer::processWaitingInterests() +{ + NDN_LOG_TRACE("Processing waiting Interest list, size: " << m_waitingForProcessing.size()); + if (m_waitingForProcessing.size() == 0) { + return; + } + + for (auto it = m_waitingForProcessing.begin(); it != m_waitingForProcessing.end();) { + if (it->second.numTries == std::numeric_limits::max()) { + NDN_LOG_TRACE("Interest with hash already marked for deletion, removing now: " << + std::hash{}(it->first)); + it = m_waitingForProcessing.erase(it); + continue; + } + + it->second.numTries += 1; + ndn::Interest interest(it->first); + interest.setNonce(it->second.nonce); + onSyncInterest(m_syncPrefix, interest, true); + if (it->second.numTries == std::numeric_limits::max()) { + NDN_LOG_TRACE("Removing Interest with hash: " << std::hash{}(it->first)); + it = m_waitingForProcessing.erase(it); + } + else { + ++it; + } + } + NDN_LOG_TRACE("Done processing waiting Interest list, size: " << m_waitingForProcessing.size()); } void -FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest) +FullProducer::scheduleProcessWaitingInterests() { - // TODO: answer only segments from store. - if (m_segmentPublisher.replyFromStore(interest.getName())) { + // If nothing waiting, no need to schedule + if (m_waitingForProcessing.size() == 0) { return; } - ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size()); - ndn::Name interestName; + if (!m_interestDelayTimerId) { + auto after = ndn::time::milliseconds(m_jitter(m_rng)); + NDN_LOG_TRACE("Setting a timer to processes waiting Interest(s) in: " << after); + + m_interestDelayTimerId = m_scheduler.schedule(after, [=] { + NDN_LOG_TRACE("Timer has expired, trying to process waiting Interest(s)"); + processWaitingInterests(); + scheduleProcessWaitingInterests(); + }); + } +} + +void +FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest, + bool isTimedProcessing) +{ + ndn::Name interestName = interest.getName(); + auto interestNameHash = std::hash{}(interestName); + NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() << + ", hash: " << interestNameHash); - if (nameWithoutSyncPrefix.size() == 1) { - // Get //IBF from //IBF - interestName = interest.getName(); + if (isTimedProcessing) { + NDN_LOG_TRACE("Delayed Interest being processed now"); } - else if (nameWithoutSyncPrefix.size() == 3) { - // Get //IBF from //IBF// - interestName = interest.getName().getPrefix(-2); + + if (m_segmentPublisher.replyFromStore(interestName)) { + NDN_LOG_DEBUG("Answer from memory"); + return; } - else { + + ndn::Name nameWithoutSyncPrefix = interestName.getSubName(prefixName.size()); + + if (nameWithoutSyncPrefix.size() == 4) { + // //// + NDN_LOG_DEBUG("Segment not found in memory. Other side will have to restart"); + // This should have been answered from publisher Cache! + sendApplicationNack(prefixName); return; } - ndn::name::Component ibltName = interestName.get(interestName.size() - 1); + if (nameWithoutSyncPrefix.size() != 2) { + NDN_LOG_WARN("Two components required after sync prefix: //; received: " << interestName); + return; + } - NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() << - ", hash: " << std::hash{}(interestName)); + ndn::name::Component ibltName = interestName[-2]; + uint64_t numRcvdElements = interestName[-1].toNumber(); detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression); try { @@ -189,80 +270,157 @@ FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& i } auto diff = m_iblt - iblt; + + NDN_LOG_TRACE("Decode, positive: " << diff.positive.size() + << " negative: " << diff.negative.size() << " m_threshold: " + << m_threshold); + + auto waitingIt = m_waitingForProcessing.find(interestName); + if (!diff.canDecode) { - NDN_LOG_TRACE("Cannot decode differences, positive: " << diff.positive.size() - << " negative: " << diff.negative.size() << " m_threshold: " - << m_threshold); - - // Send all data if greater then threshold, else send positive below as usual - // Or send if we can't get neither positive nor negative differences - if (diff.positive.size() + diff.negative.size() >= m_threshold || - (diff.positive.empty() && diff.negative.empty())) { + NDN_LOG_DEBUG("Cannot decode differences!"); + + if (numRcvdElements > m_numOwnElements) { + if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) { + NDN_LOG_TRACE("Decode failure, adding to waiting Interest list " << interestNameHash); + m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()}); + scheduleProcessWaitingInterests(); + } + else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) { + if (waitingIt->second.numTries > 1) { + NDN_LOG_TRACE("Decode failure, still behind. Erasing waiting Interest as we have tried twice"); + waitingIt->second.numTries = std::numeric_limits::max(); // markWaitingInterestForDeletion + NDN_LOG_DEBUG("Waiting Interest has been deleted. Sending new sync interest"); + sendSyncInterest(); + } + else { + NDN_LOG_TRACE("Decode failure, still behind, waiting more till the next timer"); + } + } + else { + NDN_LOG_TRACE("Decode failure, still behind"); + } + } + else { + if (m_numOwnElements == numRcvdElements && diff.positive.size() == 0 && diff.negative.size() > 0) { + NDN_LOG_TRACE("We have nothing to offer and are actually behind"); +#ifdef PSYNC_WITH_TESTS + ++nIbfDecodeFailuresBelowThreshold; +#endif // PSYNC_WITH_TESTS + return; + } + detail::State state; for (const auto& content : m_prefixes) { if (content.second != 0) { state.addContent(ndn::Name(content.first).appendNumber(content.second)); } } +#ifdef PSYNC_WITH_TESTS + ++nIbfDecodeFailuresAboveThreshold; +#endif // PSYNC_WITH_TESTS if (!state.getContent().empty()) { - sendSyncData(interest.getName(), state.wireEncode()); + NDN_LOG_DEBUG("Sending entire state: " << state); + // Want low freshness when potentially sending large content to clear it quickly from the network + sendSyncData(interestName, state.wireEncode(), 10_ms); + // Since we're directly sending the data, we need to clear pending interests here + deletePendingInterests(interestName); + } + // We seem to be ahead, delete the Interest from waiting list + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); } - -#ifdef PSYNC_WITH_TESTS - ++nIbfDecodeFailuresAboveThreshold; -#endif // PSYNC_WITH_TESTS - return; } - -#ifdef PSYNC_WITH_TESTS - ++nIbfDecodeFailuresBelowThreshold; -#endif // PSYNC_WITH_TESTS + return; } - detail::State state; - for (const auto& hash : diff.positive) { - auto nameIt = m_biMap.left.find(hash); - if (nameIt != m_biMap.left.end()) { - ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1); - // Don't sync up sequence number zero - if (m_prefixes[nameWithoutSeq] != 0 && - !isFutureHash(nameWithoutSeq, diff.negative)) { - state.addContent(nameIt->second); + if (diff.positive.size() == 0 && diff.negative.size() == 0) { + NDN_LOG_TRACE("Saving positive: " << diff.positive.size() << " negative: " << diff.negative.size()); + + auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second; + entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(), + [this, interest] { + NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce()); + m_pendingEntries.erase(interest.getName()); + }); + + // Can't delete directly in this case as it will cause + // memory access errors with the for loop in processWaitingInterests + if (isTimedProcessing) { + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); } } + return; } - if (!state.getContent().empty()) { - NDN_LOG_DEBUG("Sending sync content: " << state); - sendSyncData(interestName, state.wireEncode()); + // Only add to waiting list if we don't have anything to send (positive = 0) + if (diff.positive.size() == 0 && diff.negative.size() > 0) { + if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) { + NDN_LOG_TRACE("Adding Interest to waiting list: " << interestNameHash); + m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()}); + scheduleProcessWaitingInterests(); + } + else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) { + if (waitingIt->second.numTries > 1) { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash << + ". Erasing waiting Interest as we have tried twice"); + waitingIt->second.numTries = std::numeric_limits::max(); // markWaitingInterestForDeletion + } + else { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash << + ". Keep waiting for Interest as number of tries is not exhausted"); + } + } + else { + NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash); + } return; } - auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second; - entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(), - [this, interest] { - NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce()); - m_pendingEntries.erase(interest.getName()); - }); + if (diff.positive.size() > 0) { + detail::State state; + for (const auto& hash : diff.positive) { + auto nameIt = m_biMap.left.find(hash); + if (nameIt != m_biMap.left.end()) { + ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1); + // Don't sync up sequence number zero + if (m_prefixes[nameWithoutSeq] != 0 && + !isFutureHash(nameWithoutSeq.toUri(), diff.negative)) { + state.addContent(nameIt->second); + } + } + } + + if (!state.getContent().empty()) { + NDN_LOG_DEBUG("Sending sync content: " << state); + sendSyncData(interestName, state.wireEncode(), m_syncReplyFreshness); + + // Timed processing or not - if we are answering it, it should not go in waiting Interests + if (waitingIt != m_waitingForProcessing.end()) { + waitingIt->second.numTries = std::numeric_limits::max(); + } + } + } } void -FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block) +FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block, + ndn::time::milliseconds syncReplyFreshness) { bool isSatisfyingOwnInterest = m_outstandingInterestName == name; if (isSatisfyingOwnInterest && m_fetcher) { - NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)"); + NDN_LOG_DEBUG("Removing our pending Interest from face (stop fetcher)"); m_fetcher->stop(); m_outstandingInterestName.clear(); } NDN_LOG_DEBUG("Sending sync Data"); auto content = detail::compress(m_contentCompression, block); - m_segmentPublisher.publish(name, name, *content, m_syncReplyFreshness); - + m_segmentPublisher.publish(name, name, *content, syncReplyFreshness); if (isSatisfyingOwnInterest) { - NDN_LOG_TRACE("Renewing sync interest"); + NDN_LOG_DEBUG("Renewing sync interest"); sendSyncInterest(); } } @@ -299,54 +457,67 @@ FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPt } } - // We just got the data, so send a new sync interest if (!updates.empty()) { m_onUpdate(updates); - NDN_LOG_TRACE("Renewing sync interest"); - sendSyncInterest(); + // Wait a bit to let neighbors get the data too + auto after = ndn::time::milliseconds(m_jitter(m_rng)); + m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { + NDN_LOG_DEBUG("Got updates, renewing sync Interest now"); + sendSyncInterest(); + }); + NDN_LOG_DEBUG("Schedule sync Interest after: " << after); + m_inNoNewDataWaitOutPeriod = false; + + processWaitingInterests(); } else { - NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() << + NDN_LOG_TRACE("No new update, Interest nonce: " << interest.getNonce() << " , hash: " << std::hash{}(interest.getName())); + m_inNoNewDataWaitOutPeriod = true; + + // Have to wait, otherwise will get same data from CS + auto after = m_syncReplyFreshness + ndn::time::milliseconds(m_jitter(m_rng)); + m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { + NDN_LOG_DEBUG("Sending sync Interest after no new update"); + m_inNoNewDataWaitOutPeriod = false; + sendSyncInterest(); + }); + NDN_LOG_DEBUG("Schedule sync after: " << after); } + } void -FullProducer::satisfyPendingInterests() +FullProducer::satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq) { - NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size()); + NDN_LOG_DEBUG("Satisfying full sync Interest: " << m_pendingEntries.size()); for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) { + NDN_LOG_TRACE("Satisfying pending Interest: " << std::hash{}(it->first.getPrefix(-1))); const auto& entry = it->second; auto diff = m_iblt - entry.iblt; - if (!diff.canDecode) { - NDN_LOG_TRACE("Decode failed for pending interest"); - if (diff.positive.size() + diff.negative.size() >= m_threshold || - (diff.positive.empty() && diff.negative.empty())) { - NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest"); - it = m_pendingEntries.erase(it); - continue; - } - } + NDN_LOG_TRACE("Decoded: " << diff.canDecode << " positive: " << diff.positive.size() << + " negative: " << diff.negative.size()); detail::State state; + bool publishedPrefixInDiff = false; for (const auto& hash : diff.positive) { auto nameIt = m_biMap.left.find(hash); if (nameIt != m_biMap.left.end()) { - if (m_prefixes[nameIt->second.getPrefix(-1)] != 0) { - state.addContent(nameIt->second); + if (updatedPrefixWithSeq == nameIt->second) { + publishedPrefixInDiff = true; } + state.addContent(nameIt->second); } } - if (!state.getContent().empty()) { - NDN_LOG_DEBUG("Satisfying sync content: " << state); - sendSyncData(it->first, state.wireEncode()); - it = m_pendingEntries.erase(it); - } - else { - ++it; + if (!publishedPrefixInDiff) { + state.addContent(updatedPrefixWithSeq); } + + NDN_LOG_DEBUG("Satisfying sync content: " << state); + sendSyncData(it->first, state.wireEncode(), m_syncReplyFreshness); + it = m_pendingEntries.erase(it); } } @@ -363,7 +534,7 @@ FullProducer::deletePendingInterests(const ndn::Name& interestName) { auto it = m_pendingEntries.find(interestName); if (it != m_pendingEntries.end()) { - NDN_LOG_TRACE("Delete pending interest: " << interestName); + NDN_LOG_TRACE("Delete pending Interest: " << std::hash{}(interestName)); it = m_pendingEntries.erase(it); } } diff --git a/PSync/full-producer.hpp b/PSync/full-producer.hpp index 4644727..2e331c0 100644 --- a/PSync/full-producer.hpp +++ b/PSync/full-producer.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #ifndef PSYNC_FULL_PRODUCER_HPP #define PSYNC_FULL_PRODUCER_HPP @@ -112,6 +112,12 @@ class FullProducer : public ProducerBase void sendSyncInterest(); + void + processWaitingInterests(); + + void + scheduleProcessWaitingInterests(); + /** * @brief Process sync interest from other parties * @@ -125,9 +131,11 @@ class FullProducer : public ProducerBase * * @param prefixName prefix for sync group which we registered * @param interest the interest we got + * @param isTimedProcessing is this interest from the waiting interests list */ void - onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest); + onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest, + bool isTimedProcessing = false); /** * @brief Send sync data @@ -138,9 +146,11 @@ class FullProducer : public ProducerBase * * @param name name to be set as data name * @param block the content of the data + * @param syncReplyFreshness the freshness to use for the sync data; defaults to @p SYNC_REPLY_FRESHNESS */ void - sendSyncData(const ndn::Name& name, const ndn::Block& block); + sendSyncData(const ndn::Name& name, const ndn::Block& block, + ndn::time::milliseconds syncReplyFreshness); /** * @brief Process sync data @@ -164,12 +174,14 @@ class FullProducer : public ProducerBase /** * @brief Satisfy pending sync interests * - * For pending sync interests SI, if IBF of SI has any difference from our own IBF: - * send data back. - * If we can't decode differences from the stored IBF, then delete it. + * For pending sync interests do a difference with current IBF to find out missing prefixes. + * Send [Missing Prefixes] union @p updatedPrefixWithSeq + * + * This is because it is called from publish, so the @p updatedPrefixWithSeq must be missing + * from other nodes regardless of IBF difference failure. */ void - satisfyPendingInterests(); + satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq); /** * @brief Delete pending sync interests that match given name @@ -177,7 +189,7 @@ class FullProducer : public ProducerBase void deletePendingInterests(const ndn::Name& interestName); - /** + /** * @brief Check if hash(prefix + 1) is in negative * * Sometimes what happens is that interest from other side @@ -199,15 +211,29 @@ class FullProducer : public ProducerBase ndn::scheduler::ScopedEventId expirationEvent; }; - std::map m_pendingEntries; + struct WaitingEntryInfo + { + uint16_t numTries = 0; + ndn::Interest::Nonce nonce; + }; + ndn::time::milliseconds m_syncInterestLifetime; UpdateCallback m_onUpdate; ndn::scheduler::ScopedEventId m_scheduledSyncInterestId; - std::uniform_int_distribution<> m_jitter{100, 500}; + static constexpr int MIN_JITTER = 100; + static constexpr int MAX_JITTER = 500; + std::uniform_int_distribution<> m_jitter{MIN_JITTER, MAX_JITTER}; + ndn::time::system_clock::time_point m_lastInterestSentTime; ndn::Name m_outstandingInterestName; ndn::ScopedRegisteredPrefixHandle m_registeredPrefix; std::shared_ptr m_fetcher; uint64_t m_incomingFace = 0; + std::map m_waitingForProcessing; + bool m_inNoNewDataWaitOutPeriod = false; + ndn::scheduler::ScopedEventId m_interestDelayTimerId; + +PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE: + std::map m_pendingEntries; }; } // namespace psync diff --git a/PSync/producer-base.cpp b/PSync/producer-base.cpp index faa3ea6..a43afb5 100644 --- a/PSync/producer-base.cpp +++ b/PSync/producer-base.cpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include "PSync/producer-base.hpp" #include "PSync/detail/util.hpp" @@ -115,6 +115,8 @@ ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq) auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq); m_biMap.insert({newHash, prefixWithSeq}); m_iblt.insert(newHash); + + m_numOwnElements += (seq - oldSeq); } void diff --git a/PSync/producer-base.hpp b/PSync/producer-base.hpp index faaa926..e857d5a 100644 --- a/PSync/producer-base.hpp +++ b/PSync/producer-base.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #ifndef PSYNC_PRODUCER_BASE_HPP #define PSYNC_PRODUCER_BASE_HPP @@ -176,6 +176,7 @@ class ProducerBase const ndn::time::milliseconds m_syncReplyFreshness; const CompressionScheme m_ibltCompression; const CompressionScheme m_contentCompression; + uint64_t m_numOwnElements = 0; }; } // namespace psync diff --git a/examples/full-sync.cpp b/examples/full-sync.cpp index 6248336..e202a83 100644 --- a/examples/full-sync.cpp +++ b/examples/full-sync.cpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2023, The University of Memphis + * Copyright (c) 2014-2024, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include @@ -37,7 +37,7 @@ class Producer /** * @brief Initialize producer and schedule updates. * - * Set IBF size as 80 expecting 80 updates to IBF in a sync cycle. + * Use default IBF size of 6 as we're expecting 6 updates to IBF in a sync cycle. * Set syncInterestLifetime and syncDataFreshness to 1.6 seconds. * userPrefix is the prefix string of user node prefixes. */ @@ -46,7 +46,6 @@ class Producer : m_producer(m_face, m_keyChain, syncPrefix, [this] { psync::FullProducer::Options opts; opts.onUpdate = std::bind(&Producer::processSyncUpdate, this, _1); - opts.ibfCount = 80; opts.syncInterestLifetime = 1600_ms; opts.syncDataFreshness = 1600_ms; return opts; diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp index c833181..bcb753a 100644 --- a/tests/test-full-producer.cpp +++ b/tests/test-full-producer.cpp @@ -15,11 +15,12 @@ * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . - **/ + */ #include "PSync/full-producer.hpp" #include "PSync/detail/util.hpp" + #include "tests/boost-test.hpp" #include "tests/io-fixture.hpp" #include "tests/key-chain-fixture.hpp" @@ -87,6 +88,54 @@ BOOST_AUTO_TEST_CASE(OnSyncDataDecodeFailure) BOOST_CHECK_NO_THROW(node.onSyncData(syncInterest, goodCompressBadBlock)); } +BOOST_AUTO_TEST_CASE(SatisfyPendingInterestsBehavior) +{ + Name syncPrefix("/psync"); + FullProducer::Options opts; + opts.ibfCount = 6; + FullProducer node(m_face, m_keyChain, syncPrefix, opts); + + Name syncInterestName(syncPrefix); + node.m_iblt.appendToName(syncInterestName); + syncInterestName.appendNumber(1); + Interest syncInterest(syncInterestName); + + node.addUserNode(syncPrefix); + + node.onSyncInterest(syncPrefix, syncInterest); + + BOOST_CHECK_EQUAL(node.m_pendingEntries.size(), 1); + + // Test whether data is still sent if IBF diff is greater than default threshhold. + auto prefix1 = Name("/test/alice").appendNumber(1); + uint32_t newHash1 = psync::detail::murmurHash3(42, prefix1); + node.m_iblt.insert(newHash1); + + auto prefix2 = Name("/test/bob").appendNumber(1); + uint32_t newHash2 = psync::detail::murmurHash3(42, prefix2); + node.m_iblt.insert(newHash2); + + auto prefix3 = Name("/test/carol").appendNumber(1); + uint32_t newHash3 = psync::detail::murmurHash3(42, prefix3); + node.m_iblt.insert(newHash3); + + auto prefix4 = Name("/test/david").appendNumber(1); + uint32_t newHash4 = psync::detail::murmurHash3(42, prefix4); + node.m_iblt.insert(newHash4); + + auto prefix5 = Name("/test/erin").appendNumber(1); + uint32_t newHash5 = psync::detail::murmurHash3(42, prefix5); + node.m_iblt.insert(newHash5); + + node.publishName(syncPrefix); + + advanceClocks(10_ms); + + BOOST_CHECK_EQUAL(m_face.sentData.size(), 1); + + BOOST_CHECK_EQUAL(node.m_pendingEntries.empty(), true); +} + BOOST_AUTO_TEST_SUITE_END() } // namespace psync::tests diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp index c483d89..ff25728 100644 --- a/tests/test-full-sync.cpp +++ b/tests/test-full-sync.cpp @@ -297,7 +297,7 @@ BOOST_AUTO_TEST_CASE(MultipleNodesSimultaneousPublish) nodes[i]->publishName(userPrefixes[i]); } - advanceClocks(10_ms, 100); + advanceClocks(100_ms, 100); for (int i = 0; i < 4; i++) { for (int j = 0; j < 4; j++) { BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 1); @@ -308,7 +308,7 @@ BOOST_AUTO_TEST_CASE(MultipleNodesSimultaneousPublish) nodes[i]->publishName(userPrefixes[i], 4); } - advanceClocks(10_ms, 100); + advanceClocks(100_ms, 100); for (int i = 0; i < 4; i++) { for (int j = 0; j < 4; j++) { BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 4); @@ -464,55 +464,6 @@ BOOST_AUTO_TEST_CASE(DiffIBFDecodeFailureMultipleNodes) }); } -BOOST_AUTO_TEST_CASE(DelayedSecondSegment) -{ - addNode(0); - - int i = 0; - detail::State state; - std::shared_ptr compressed; - do { - auto prefixToPublish = makeSubPrefix(0, i++); - nodes[0]->addUserNode(prefixToPublish); - nodes[0]->publishName(prefixToPublish); - - state.addContent(Name(prefixToPublish).appendNumber(nodes[0]->m_prefixes[prefixToPublish])); - - auto block = state.wireEncode(); - compressed = detail::compress(nodes[0]->m_contentCompression, block); - } while (compressed->size() < (ndn::MAX_NDN_PACKET_SIZE >> 1)); - - advanceClocks(10_ms, 100); - - Name syncInterestName(syncPrefix); - detail::IBLT iblt(40, nodes[0]->m_ibltCompression); - iblt.appendToName(syncInterestName); - - nodes[0]->onSyncInterest(syncPrefix, Interest(syncInterestName)); - - advanceClocks(10_ms); - - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2); - // Expire contents from segmentPublisher - advanceClocks(10_ms, 100); - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 0); - - // Get data name from face and increase segment number to form next interest - BOOST_REQUIRE(!faces[0]->sentData.empty()); - Name dataName = faces[0]->sentData.front().getName(); - Name interestName = dataName.getPrefix(-1).appendSegment(1); - faces[0]->sentData.clear(); - - nodes[0]->onSyncInterest(syncPrefix, Interest(interestName)); - advanceClocks(10_ms); - - // Should have repopulated SegmentPublisher - BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2); - // Should have received the second data segment this time - BOOST_REQUIRE(!faces[0]->sentData.empty()); - BOOST_CHECK_EQUAL(faces[0]->sentData.front().getName().at(-1).toSegment(), 1); -} - BOOST_AUTO_TEST_SUITE_END() } // namespace psync::tests