diff --git a/.gitmodules b/.gitmodules index 69b4fe2cab..5de4298598 100644 --- a/.gitmodules +++ b/.gitmodules @@ -164,3 +164,8 @@ branch = master path = vendor/nim-results url = https://github.com/arnetheduck/nim-results.git +[submodule "vendor/negentropy"] + ignore = untracked + branch = feat/c-wrapper + path = vendor/negentropy + url = https://github.com/waku-org/negentropy.git diff --git a/Makefile b/Makefile index f18aba2fb7..d1e5b34de6 100644 --- a/Makefile +++ b/Makefile @@ -32,10 +32,10 @@ else # "variables.mk" was included. Business as usual until the end of this file ########## ## Main ## ########## -.PHONY: all test update clean +.PHONY: all test update clean negentropy # default target, because it's the first one that doesn't start with '.' -all: | wakunode2 example2 chat2 chat2bridge libwaku +all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku test: | testcommon testwaku @@ -46,7 +46,7 @@ update: | update-common rm -rf waku.nims && \ $(MAKE) waku.nims $(HANDLE_OUTPUT) -clean: +clean: | negentropy-clean rm -rf build # must be included after the default target @@ -166,15 +166,15 @@ testcommon: | build deps ########## .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge -testwaku: | build deps librln +testwaku: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims -wakunode2: | build deps librln +wakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims -testwakunode2: | build deps librln +testwakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) waku.nims @@ -314,3 +314,9 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 +negentropy: + $(MAKE) -C vendor/negentropy/cpp && \ + cp vendor/negentropy/cpp/libnegentropy.so ./ +negentropy-clean: + $(MAKE) -C vendor/negentropy/cpp clean && \ + rm libnegentropy.so diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index cfbb19ea1b..3035db1306 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -508,6 +508,11 @@ proc setupProtocols(node: WakuNode, except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + # Waku Sync setup + node.mountWakuSync().isOkOr: + return err("failed to mount waku sync protocol: " & error) + + mountStoreClient(node) if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim new file mode 100644 index 0000000000..c277a0d1e2 --- /dev/null +++ b/tests/waku_sync/sync_utils.nim @@ -0,0 +1,29 @@ +{.used.} + +import + std/options, + chronos, + chronicles, + libp2p/crypto/crypto + +import + ../../../waku/[ + node/peer_manager, + waku_core, + waku_sync, + ], + ../testlib/[ + common, + wakucore + ] + +proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = + const DefaultFrameSize = 153600 + let + peerManager = PeerManager.new(switch) + proto = WakuSync.new(peerManager, DefaultFrameSize, 2.seconds, some(handler)) + + proto.start() + switch.mount(proto) + + return proto \ No newline at end of file diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim new file mode 100644 index 0000000000..178e9277ea --- /dev/null +++ b/tests/waku_sync/test_all.nim @@ -0,0 +1,4 @@ +{.used.} + +import + ./test_protocol diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim new file mode 100644 index 0000000000..3861dc8dd1 --- /dev/null +++ b/tests/waku_sync/test_protocol.nim @@ -0,0 +1,141 @@ +{.used.} + +import + std/options, + testutils/unittests, + chronos, + chronicles, + libp2p/crypto/crypto, + stew/byteutils +from std/os import sleep + +import + ../../../waku/[ + common/paging, + node/peer_manager, + waku_core, + waku_core/message/digest, + waku_sync, + waku_sync/raw_bindings, + ], + ../testlib/[ + common, + wakucore + ], + ./sync_utils + + +suite "Waku Sync - Protocol Tests": + + asyncTest "test c integration": + let + s1 = negentropyNewStorage() + s2 = negentropyNewStorage() + ng1 = negentropyNew(s1,10000) + ng2 = negentropyNew(s2,10000) + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1) + var ret = negentropyStorageInsert(s1, msg1.timestamp, msgHash) + check: + ret == true + + ret = negentropyStorageInsert(s2, msg1.timestamp, msgHash) + check: + ret == true + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + ret = negentropyStorageInsert(s2, msg2.timestamp, msgHash2) + check: + ret == true + + let ng1_q1 = negentropyInitiate(ng1) + check: + ng1_q1.len > 0 + + let ng2_q1 = negentropyServerReconcile(ng2, ng1_q1) + check: + ng2_q1.len > 0 + + var + haveHashes: seq[WakuMessageHash] + needHashes: seq[WakuMessageHash] + let ng1_q2 = negentropyClientReconcile(ng1, ng2_q1, haveHashes, needHashes) + + check: + needHashes.len() == 1 + haveHashes.len() == 0 + ng1_q2.len == 0 + needHashes[0] == msgHash2 + + ret = negentropyStorageErase(s1, msg1.timestamp, msgHash) + check: + ret == true + + asyncTest "sync 2 nodes different hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + + var hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 1 + hashes.value[0] == computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer +#[ client.ingessMessage(DefaultPubsubTopic, msg2) + sleep(1000) + hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 0 ]# + + asyncTest "sync 2 nodes same hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + client.ingessMessage(DefaultPubsubTopic, msg2) + + let hashes = await client.sync(serverPeerInfo) + assert hashes.isOk(), $hashes.error + check: + hashes.value.len == 0 diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..1487908100 --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit 1487908100846577f6b3f34ab866702439075306 diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index ca85b61c63..103740e597 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -19,6 +19,7 @@ import ../../../waku_lightpush/common, ../../../waku_relay, ../../../waku_node, + ../../../waku_sync, ../../../node/peer_manager, ../responses, ../serdes, @@ -89,6 +90,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = connected: it.connectedness == Connectedness.Connected)) tuplesToWakuPeers(peers, lightpushPeers) + if not node.wakuSync.isNil(): + # Map WakuStore peers to WakuPeers and add to return list + let syncPeers = node.peerManager.peerStore + .peers(WakuSyncCodec) + .mapIt((multiaddr: constructMultiaddrStr(it), + protocol: WakuSyncCodec, + connected: it.connectedness == Connectedness.Connected)) + tuplesToWakuPeers(peers, syncPeers) + let resp = RestApiResponse.jsonResponse(peers, status=Http200) if resp.isErr(): error "An error ocurred while building the json respose: ", error=resp.error diff --git a/waku/waku_sync/headers/negentropy.h b/waku/waku_sync/headers/negentropy.h deleted file mode 100644 index d3de53edf0..0000000000 --- a/waku/waku_sync/headers/negentropy.h +++ /dev/null @@ -1,323 +0,0 @@ -// (C) 2023 Doug Hoyte. MIT license - -#pragma once - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "negentropy/encoding.h" -#include "negentropy/types.h" -#include "negentropy/storage/base.h" - - -namespace negentropy { - -const uint64_t PROTOCOL_VERSION = 0x61; // Version 1 - -const uint64_t MAX_U64 = std::numeric_limits::max(); -using err = std::runtime_error; - - - -template -struct Negentropy { - StorageImpl &storage; - uint64_t frameSizeLimit; - - bool isInitiator = false; - - uint64_t lastTimestampIn = 0; - uint64_t lastTimestampOut = 0; - - Negentropy(StorageImpl &storage, uint64_t frameSizeLimit = 0) : storage(storage), frameSizeLimit(frameSizeLimit) { - if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small"); - } - - std::string initiate() { - if (isInitiator) throw negentropy::err("already initiated"); - isInitiator = true; - - std::string output; - output.push_back(PROTOCOL_VERSION); - - output += splitRange(0, storage.size(), Bound(MAX_U64)); - - return output; - } - - void setInitiator() { - isInitiator = true; - } - - std::string reconcile(std::string_view query) { - if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs"); - - std::vector haveIds, needIds; - return reconcileAux(query, haveIds, needIds); - } - - std::optional reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { - if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs"); - - auto output = reconcileAux(query, haveIds, needIds); - if (output.size() == 1) return std::nullopt; - return output; - } - - private: - std::string reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { - lastTimestampIn = lastTimestampOut = 0; // reset for each message - - std::string fullOutput; - fullOutput.push_back(PROTOCOL_VERSION); - - auto protocolVersion = getByte(query); - if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte"); - if (protocolVersion != PROTOCOL_VERSION) { - if (isInitiator) throw negentropy::err(std::string("unsupported negentropy protocol version requested") + std::to_string(protocolVersion - 0x60)); - else return fullOutput; - } - - uint64_t storageSize = storage.size(); - Bound prevBound; - size_t prevIndex = 0; - bool skip = false; - - while (query.size()) { - std::string o; - - auto doSkip = [&]{ - if (skip) { - skip = false; - o += encodeBound(prevBound); - o += encodeVarInt(uint64_t(Mode::Skip)); - } - }; - - auto currBound = decodeBound(query); - auto mode = Mode(decodeVarInt(query)); - - auto lower = prevIndex; - auto upper = storage.findLowerBound(prevIndex, storageSize, currBound); - - if (mode == Mode::Skip) { - skip = true; - } else if (mode == Mode::Fingerprint) { - auto theirFingerprint = getBytes(query, FINGERPRINT_SIZE); - auto ourFingerprint = storage.fingerprint(lower, upper); - - if (theirFingerprint != ourFingerprint.sv()) { - doSkip(); - o += splitRange(lower, upper, currBound); - } else { - skip = true; - } - } else if (mode == Mode::IdList) { - auto numIds = decodeVarInt(query); - - std::unordered_set theirElems; - for (uint64_t i = 0; i < numIds; i++) { - auto e = getBytes(query, ID_SIZE); - theirElems.insert(e); - } - - storage.iterate(lower, upper, [&](const Item &item, size_t){ - auto k = std::string(item.getId()); - - if (theirElems.find(k) == theirElems.end()) { - // ID exists on our side, but not their side - if (isInitiator) haveIds.emplace_back(k); - } else { - // ID exists on both sides - theirElems.erase(k); - } - - return true; - }); - - if (isInitiator) { - skip = true; - - for (const auto &k : theirElems) { - // ID exists on their side, but not our side - needIds.emplace_back(k); - } - } else { - doSkip(); - - std::string responseIds; - uint64_t numResponseIds = 0; - Bound endBound = currBound; - - storage.iterate(lower, upper, [&](const Item &item, size_t index){ - if (exceededFrameSizeLimit(fullOutput.size() + responseIds.size())) { - endBound = Bound(item); - upper = index; // shrink upper so that remaining range gets correct fingerprint - return false; - } - - responseIds += item.getId(); - numResponseIds++; - return true; - }); - - o += encodeBound(endBound); - o += encodeVarInt(uint64_t(Mode::IdList)); - o += encodeVarInt(numResponseIds); - o += responseIds; - - fullOutput += o; - o.clear(); - } - } else { - throw negentropy::err("unexpected mode"); - } - - if (exceededFrameSizeLimit(fullOutput.size() + o.size())) { - // frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range - auto remainingFingerprint = storage.fingerprint(upper, storageSize); - - fullOutput += encodeBound(Bound(MAX_U64)); - fullOutput += encodeVarInt(uint64_t(Mode::Fingerprint)); - fullOutput += remainingFingerprint.sv(); - break; - } else { - fullOutput += o; - } - - prevIndex = upper; - prevBound = currBound; - } - - return fullOutput; - } - - std::string splitRange(size_t lower, size_t upper, const Bound &upperBound) { - std::string o; - - uint64_t numElems = upper - lower; - const uint64_t buckets = 16; - - if (numElems < buckets * 2) { - o += encodeBound(upperBound); - o += encodeVarInt(uint64_t(Mode::IdList)); - - o += encodeVarInt(numElems); - storage.iterate(lower, upper, [&](const Item &item, size_t){ - o += item.getId(); - return true; - }); - } else { - uint64_t itemsPerBucket = numElems / buckets; - uint64_t bucketsWithExtra = numElems % buckets; - auto curr = lower; - - for (uint64_t i = 0; i < buckets; i++) { - auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); - auto ourFingerprint = storage.fingerprint(curr, curr + bucketSize); - curr += bucketSize; - - Bound nextBound; - - if (curr == upper) { - nextBound = upperBound; - } else { - Item prevItem, currItem; - - storage.iterate(curr - 1, curr + 1, [&](const Item &item, size_t index){ - if (index == curr - 1) prevItem = item; - else currItem = item; - return true; - }); - - nextBound = getMinimalBound(prevItem, currItem); - } - - o += encodeBound(nextBound); - o += encodeVarInt(uint64_t(Mode::Fingerprint)); - o += ourFingerprint.sv(); - } - } - - return o; - } - - bool exceededFrameSizeLimit(size_t n) { - return frameSizeLimit && n > frameSizeLimit - 200; - } - - // Decoding - - uint64_t decodeTimestampIn(std::string_view &encoded) { - uint64_t timestamp = decodeVarInt(encoded); - timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1; - timestamp += lastTimestampIn; - if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate - lastTimestampIn = timestamp; - return timestamp; - } - - Bound decodeBound(std::string_view &encoded) { - auto timestamp = decodeTimestampIn(encoded); - auto len = decodeVarInt(encoded); - return Bound(timestamp, getBytes(encoded, len)); - } - - // Encoding - - std::string encodeTimestampOut(uint64_t timestamp) { - if (timestamp == MAX_U64) { - lastTimestampOut = MAX_U64; - return encodeVarInt(0); - } - - uint64_t temp = timestamp; - timestamp -= lastTimestampOut; - lastTimestampOut = temp; - return encodeVarInt(timestamp + 1); - }; - - std::string encodeBound(const Bound &bound) { - std::string output; - - output += encodeTimestampOut(bound.item.timestamp); - output += encodeVarInt(bound.idLen); - output += bound.item.getId().substr(0, bound.idLen); - - return output; - }; - - Bound getMinimalBound(const Item &prev, const Item &curr) { - if (curr.timestamp != prev.timestamp) { - return Bound(curr.timestamp); - } else { - uint64_t sharedPrefixBytes = 0; - auto currKey = curr.getId(); - auto prevKey = prev.getId(); - - for (uint64_t i = 0; i < ID_SIZE; i++) { - if (currKey[i] != prevKey[i]) break; - sharedPrefixBytes++; - } - - return Bound(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); - } - } -}; - - -} - - -template -using Negentropy = negentropy::Negentropy; diff --git a/waku/waku_sync/headers/negentropy/encoding.h b/waku/waku_sync/headers/negentropy/encoding.h deleted file mode 100644 index c6d5bd6012..0000000000 --- a/waku/waku_sync/headers/negentropy/encoding.h +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once - -#include - - -namespace negentropy { - -using err = std::runtime_error; - - - -inline uint8_t getByte(std::string_view &encoded) { - if (encoded.size() < 1) throw negentropy::err("parse ends prematurely"); - uint8_t output = encoded[0]; - encoded = encoded.substr(1); - return output; -} - -inline std::string getBytes(std::string_view &encoded, size_t n) { - if (encoded.size() < n) throw negentropy::err("parse ends prematurely"); - auto res = encoded.substr(0, n); - encoded = encoded.substr(n); - return std::string(res); -}; - -inline uint64_t decodeVarInt(std::string_view &encoded) { - uint64_t res = 0; - - while (1) { - if (encoded.size() == 0) throw negentropy::err("premature end of varint"); - uint64_t byte = encoded[0]; - encoded = encoded.substr(1); - res = (res << 7) | (byte & 0b0111'1111); - if ((byte & 0b1000'0000) == 0) break; - } - - return res; -} - -inline std::string encodeVarInt(uint64_t n) { - if (n == 0) return std::string(1, '\0'); - - std::string o; - - while (n) { - o.push_back(static_cast(n & 0x7F)); - n >>= 7; - } - - std::reverse(o.begin(), o.end()); - - for (size_t i = 0; i < o.size() - 1; i++) { - o[i] |= 0x80; - } - - return o; -} - - -} diff --git a/waku/waku_sync/headers/negentropy/storage/BTreeLMDB.h b/waku/waku_sync/headers/negentropy/storage/BTreeLMDB.h deleted file mode 100644 index f36036f663..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/BTreeLMDB.h +++ /dev/null @@ -1,146 +0,0 @@ -#pragma once - -#include - -#include "lmdbxx/lmdb++.h" - -#include "negentropy.h" -#include "negentropy/storage/btree/core.h" - - -namespace negentropy { namespace storage { - -using err = std::runtime_error; -using Node = negentropy::storage::btree::Node; -using NodePtr = negentropy::storage::btree::NodePtr; - - -struct BTreeLMDB : btree::BTreeCore { - lmdb::txn &txn; - lmdb::dbi dbi; - uint64_t treeId; - - struct MetaData { - uint64_t rootNodeId; - uint64_t nextNodeId; - - bool operator==(const MetaData &other) const { - return rootNodeId == other.rootNodeId && nextNodeId == other.nextNodeId; - } - }; - - MetaData metaDataCache; - MetaData origMetaData; - std::map dirtyNodeCache; - - - static lmdb::dbi setupDB(lmdb::txn &txn, std::string_view tableName) { - return lmdb::dbi::open(txn, tableName, MDB_CREATE | MDB_REVERSEKEY); - } - - BTreeLMDB(lmdb::txn &txn, lmdb::dbi dbi, uint64_t treeId) : txn(txn), dbi(dbi), treeId(treeId) { - static_assert(sizeof(MetaData) == 16); - std::string_view v; - bool found = dbi.get(txn, getKey(0), v); - metaDataCache = found ? lmdb::from_sv(v) : MetaData{ 0, 1, }; - origMetaData = metaDataCache; - } - - ~BTreeLMDB() { - flush(); - } - - void flush() { - for (auto &[nodeId, node] : dirtyNodeCache) { - dbi.put(txn, getKey(nodeId), node.sv()); - } - dirtyNodeCache.clear(); - - if (metaDataCache != origMetaData) { - dbi.put(txn, getKey(0), lmdb::to_sv(metaDataCache)); - origMetaData = metaDataCache; - } - } - - - // Interface - - const btree::NodePtr getNodeRead(uint64_t nodeId) { - if (nodeId == 0) return {nullptr, 0}; - - auto res = dirtyNodeCache.find(nodeId); - if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId}; - - std::string_view sv; - bool found = dbi.get(txn, getKey(nodeId), sv); - if (!found) throw err("couldn't find node"); - return NodePtr{(Node*)sv.data(), nodeId}; - } - - btree::NodePtr getNodeWrite(uint64_t nodeId) { - if (nodeId == 0) return {nullptr, 0}; - - { - auto res = dirtyNodeCache.find(nodeId); - if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId}; - } - - std::string_view sv; - bool found = dbi.get(txn, getKey(nodeId), sv); - if (!found) throw err("couldn't find node"); - - auto res = dirtyNodeCache.try_emplace(nodeId); - Node *newNode = &res.first->second; - memcpy(newNode, sv.data(), sizeof(Node)); - - return NodePtr{newNode, nodeId}; - } - - btree::NodePtr makeNode() { - uint64_t nodeId = metaDataCache.nextNodeId++; - auto res = dirtyNodeCache.try_emplace(nodeId); - return NodePtr{&res.first->second, nodeId}; - } - - void deleteNode(uint64_t nodeId) { - if (nodeId == 0) throw err("can't delete metadata"); - dirtyNodeCache.erase(nodeId); - dbi.del(txn, getKey(nodeId)); - } - - uint64_t getRootNodeId() { - return metaDataCache.rootNodeId; - } - - void setRootNodeId(uint64_t newRootNodeId) { - metaDataCache.rootNodeId = newRootNodeId; - } - - // Internal utils - - private: - std::string getKey(uint64_t n) { - uint64_t treeIdCopy = treeId; - - if constexpr (std::endian::native == std::endian::big) { - auto byteswap = [](uint64_t &n) { - uint8_t *first = reinterpret_cast(&n); - uint8_t *last = first + 8; - std::reverse(first, last); - }; - - byteswap(n); - byteswap(treeIdCopy); - } else { - static_assert(std::endian::native == std::endian::little); - } - - std::string k; - k += lmdb::to_sv(treeIdCopy); - k += lmdb::to_sv(n); - return k; - } -}; - - -}} diff --git a/waku/waku_sync/headers/negentropy/storage/BTreeMem.h b/waku/waku_sync/headers/negentropy/storage/BTreeMem.h deleted file mode 100644 index 4f79d2055f..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/BTreeMem.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include "negentropy.h" -#include "negentropy/storage/btree/core.h" - - -namespace negentropy { namespace storage { - - -struct BTreeMem : btree::BTreeCore { - std::unordered_map _nodeStorageMap; - uint64_t _rootNodeId = 0; // 0 means no root - uint64_t _nextNodeId = 1; - - // Interface - - const btree::NodePtr getNodeRead(uint64_t nodeId) { - if (nodeId == 0) return {nullptr, 0}; - auto res = _nodeStorageMap.find(nodeId); - if (res == _nodeStorageMap.end()) return btree::NodePtr{nullptr, 0}; - return btree::NodePtr{&res->second, nodeId}; - } - - btree::NodePtr getNodeWrite(uint64_t nodeId) { - return getNodeRead(nodeId); - } - - btree::NodePtr makeNode() { - uint64_t nodeId = _nextNodeId++; - _nodeStorageMap.try_emplace(nodeId); - return getNodeRead(nodeId); - } - - void deleteNode(uint64_t nodeId) { - _nodeStorageMap.erase(nodeId); - } - - uint64_t getRootNodeId() { - return _rootNodeId; - } - - void setRootNodeId(uint64_t newRootNodeId) { - _rootNodeId = newRootNodeId; - } -}; - - -}} diff --git a/waku/waku_sync/headers/negentropy/storage/SubRange.h b/waku/waku_sync/headers/negentropy/storage/SubRange.h deleted file mode 100644 index 20d5676e38..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/SubRange.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include - -#include "negentropy.h" - - - -namespace negentropy { namespace storage { - - -struct SubRange : StorageBase { - StorageBase &base; - size_t baseSize; - size_t subBegin; - size_t subEnd; - size_t subSize; - - SubRange(StorageBase &base, const Bound &lowerBound, const Bound &upperBound) : base(base) { - baseSize = base.size(); - subBegin = lowerBound == Bound(0) ? 0 : base.findLowerBound(0, baseSize, lowerBound); - subEnd = upperBound == Bound(MAX_U64) ? baseSize : base.findLowerBound(subBegin, baseSize, upperBound); - if (subEnd != baseSize && Bound(base.getItem(subEnd)) == upperBound) subEnd++; // instead of upper_bound: OK because items are unique - subSize = subEnd - subBegin; - } - - uint64_t size() { - return subSize; - } - - const Item &getItem(size_t i) { - if (i >= subSize) throw negentropy::err("bad index"); - return base.getItem(subBegin + i); - } - - void iterate(size_t begin, size_t end, std::function cb) { - checkBounds(begin, end); - - base.iterate(subBegin + begin, subBegin + end, [&](const Item &item, size_t index){ - return cb(item, index - subBegin); - }); - } - - size_t findLowerBound(size_t begin, size_t end, const Bound &bound) { - checkBounds(begin, end); - - return std::min(base.findLowerBound(subBegin + begin, subBegin + end, bound) - subBegin, subSize); - } - - Fingerprint fingerprint(size_t begin, size_t end) { - checkBounds(begin, end); - - return base.fingerprint(subBegin + begin, subBegin + end); - } - - private: - void checkBounds(size_t begin, size_t end) { - if (begin > end || end > subSize) throw negentropy::err("bad range"); - } -}; - - -}} diff --git a/waku/waku_sync/headers/negentropy/storage/Vector.h b/waku/waku_sync/headers/negentropy/storage/Vector.h deleted file mode 100644 index 76f22c7b68..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/Vector.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once - -#include "negentropy.h" - - - -namespace negentropy { namespace storage { - - -struct Vector : StorageBase { - std::vector items; - bool sealed = false; - - void insert(uint64_t createdAt, std::string_view id) { - if (sealed) throw negentropy::err("already sealed"); - if (id.size() != ID_SIZE) throw negentropy::err("bad id size for added item"); - items.emplace_back(createdAt, id); - } - - void insertItem(const Item &item) { - insert(item.timestamp, item.getId()); - } - - void seal() { - if (sealed) throw negentropy::err("already sealed"); - sealed = true; - - std::sort(items.begin(), items.end()); - - for (size_t i = 1; i < items.size(); i++) { - if (items[i - 1] == items[i]) throw negentropy::err("duplicate item inserted"); - } - } - - void unseal() { - sealed = false; - } - - uint64_t size() { - checkSealed(); - return items.size(); - } - - const Item &getItem(size_t i) { - checkSealed(); - return items.at(i); - } - - void iterate(size_t begin, size_t end, std::function cb) { - checkSealed(); - checkBounds(begin, end); - - for (auto i = begin; i < end; ++i) { - if (!cb(items[i], i)) break; - } - } - - size_t findLowerBound(size_t begin, size_t end, const Bound &bound) { - checkSealed(); - checkBounds(begin, end); - - return std::lower_bound(items.begin() + begin, items.begin() + end, bound.item) - items.begin(); - } - - Fingerprint fingerprint(size_t begin, size_t end) { - Accumulator out; - out.setToZero(); - - iterate(begin, end, [&](const Item &item, size_t){ - out.add(item); - return true; - }); - - return out.getFingerprint(end - begin); - } - - private: - void checkSealed() { - if (!sealed) throw negentropy::err("not sealed"); - } - - void checkBounds(size_t begin, size_t end) { - if (begin > end || end > items.size()) throw negentropy::err("bad range"); - } -}; - - -}} diff --git a/waku/waku_sync/headers/negentropy/storage/base.h b/waku/waku_sync/headers/negentropy/storage/base.h deleted file mode 100644 index 44c8db82c9..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/base.h +++ /dev/null @@ -1,22 +0,0 @@ -#pragma once - -#include - -#include "negentropy/types.h" - - -namespace negentropy { - -struct StorageBase { - virtual uint64_t size() = 0; - - virtual const Item &getItem(size_t i) = 0; - - virtual void iterate(size_t begin, size_t end, std::function cb) = 0; - - virtual size_t findLowerBound(size_t begin, size_t end, const Bound &value) = 0; - - virtual Fingerprint fingerprint(size_t begin, size_t end) = 0; -}; - -} diff --git a/waku/waku_sync/headers/negentropy/storage/btree/core.h b/waku/waku_sync/headers/negentropy/storage/btree/core.h deleted file mode 100644 index ef25483a81..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/btree/core.h +++ /dev/null @@ -1,652 +0,0 @@ -#pragma once - -#include - -#include "negentropy.h" - - - -namespace negentropy { namespace storage { namespace btree { - -using err = std::runtime_error; - -/* - -Each node contains an array of keys. For leaf nodes, the keys are 0. For non-leaf nodes, these will -be the nodeIds of the children leaves. The items in the keys of non-leaf nodes are the first items -in the corresponding child nodes. - -Except for the right-most nodes in the tree at each level (which includes the root node), all nodes -contain at least MIN_ITEMS and at most MAX_ITEMS. - -If a node falls below MIN_ITEMS, a neighbour node (which always has the same parent) is selected. - * If between the two nodes there are REBALANCE_THRESHOLD or fewer total items, all items are - moved into one node and the other is deleted. - * If there are more than REBALANCE_THRESHOLD total items, then the items are divided into two - approximately equal-sized halves. - -If a node goes above MAX_ITEMS then a new neighbour node is created. - * If the node is the right-most in its level, pack the old node to MAX_ITEMS, and move the rest - into the new neighbour. This optimises space-usage in the case of append workloads. - * Otherwise, split the node into two approximately equal-sized halves. - -*/ - - -#ifdef NE_FUZZ_TEST - -// Fuzz test mode: Causes a large amount of tree structure changes like splitting, moving, and rebalancing - -const size_t MIN_ITEMS = 2; -const size_t REBALANCE_THRESHOLD = 4; -const size_t MAX_ITEMS = 6; - -#else - -// Production mode: Nodes fit into 4k pages, and oscillating insert/erase will not cause tree structure changes - -const size_t MIN_ITEMS = 30; -const size_t REBALANCE_THRESHOLD = 60; -const size_t MAX_ITEMS = 80; - -#endif - -static_assert(MIN_ITEMS < REBALANCE_THRESHOLD); -static_assert(REBALANCE_THRESHOLD < MAX_ITEMS); -static_assert(MAX_ITEMS / 2 > MIN_ITEMS); -static_assert(MIN_ITEMS % 2 == 0 && REBALANCE_THRESHOLD % 2 == 0 && MAX_ITEMS % 2 == 0); - - -struct Key { - Item item; - uint64_t nodeId; - - void setToZero() { - item = Item(); - nodeId = 0; - } -}; - -inline bool operator<(const Key &a, const Key &b) { - return a.item < b.item; -}; - -struct Node { - uint64_t numItems; // Number of items in this Node - uint64_t accumCount; // Total number of items in or under this Node - uint64_t nextSibling; // Pointer to next node in this level - uint64_t prevSibling; // Pointer to previous node in this level - - Accumulator accum; - - Key items[MAX_ITEMS + 1]; - - - Node() { - memset((void*)this, '\0', sizeof(*this)); - } - - std::string_view sv() { - return std::string_view(reinterpret_cast(this), sizeof(*this)); - } -}; - -struct NodePtr { - Node *p; - uint64_t nodeId; - - - bool exists() { - return p != nullptr; - } - - Node &get() const { - return *p; - } -}; - -struct Breadcrumb { - size_t index; - NodePtr nodePtr; -}; - - -struct BTreeCore : StorageBase { - //// Node Storage - - virtual const NodePtr getNodeRead(uint64_t nodeId) = 0; - - virtual NodePtr getNodeWrite(uint64_t nodeId) = 0; - - virtual NodePtr makeNode() = 0; - - virtual void deleteNode(uint64_t nodeId) = 0; - - virtual uint64_t getRootNodeId() = 0; - - virtual void setRootNodeId(uint64_t newRootNodeId) = 0; - - - //// Search - - std::vector searchItem(uint64_t rootNodeId, const Item &newItem, bool &found) { - found = false; - std::vector breadcrumbs; - - auto foundNode = getNodeRead(rootNodeId); - - while (foundNode.nodeId) { - const auto &node = foundNode.get(); - size_t index = node.numItems - 1; - - if (node.numItems > 1) { - for (size_t i = 1; i < node.numItems + 1; i++) { - if (i == node.numItems + 1 || newItem < node.items[i].item) { - index = i - 1; - break; - } - } - } - - if (!found && (newItem == node.items[index].item)) found = true; - - breadcrumbs.push_back({index, foundNode}); - foundNode = getNodeRead(node.items[index].nodeId); - } - - return breadcrumbs; - } - - - //// Insert - - bool insert(uint64_t createdAt, std::string_view id) { - return insertItem(Item(createdAt, id)); - } - - bool insertItem(const Item &newItem) { - // Make root leaf in case it doesn't exist - - auto rootNodeId = getRootNodeId(); - - if (!rootNodeId) { - auto newNodePtr = makeNode(); - auto &newNode = newNodePtr.get(); - - newNode.items[0].item = newItem; - newNode.numItems++; - newNode.accum.add(newItem); - newNode.accumCount = 1; - - setRootNodeId(newNodePtr.nodeId); - return true; - } - - - // Traverse interior nodes, leaving breadcrumbs along the way - - - bool found; - auto breadcrumbs = searchItem(rootNodeId, newItem, found); - - if (found) return false; // already inserted - - - // Follow breadcrumbs back to root - - Key newKey = { newItem, 0 }; - bool needsMerge = true; - - while (breadcrumbs.size()) { - auto crumb = breadcrumbs.back(); - breadcrumbs.pop_back(); - - auto &node = getNodeWrite(crumb.nodePtr.nodeId).get(); - - if (!needsMerge) { - node.accum.add(newItem); - node.accumCount++; - } else if (crumb.nodePtr.get().numItems < MAX_ITEMS) { - // Happy path: Node has room for new item - - node.items[node.numItems] = newKey; - std::inplace_merge(node.items, node.items + node.numItems, node.items + node.numItems + 1); - node.numItems++; - - node.accum.add(newItem); - node.accumCount++; - - needsMerge = false; - } else { - // Node is full: Split it into 2 - - auto &left = node; - auto rightPtr = makeNode(); - auto &right = rightPtr.get(); - - left.items[MAX_ITEMS] = newKey; - std::inplace_merge(left.items, left.items + MAX_ITEMS, left.items + MAX_ITEMS + 1); - - left.accum.setToZero(); - left.accumCount = 0; - - if (!left.nextSibling) { - // If right-most node, pack as tightly as possible to optimise for append workloads - left.numItems = MAX_ITEMS; - right.numItems = 1; - } else { - // Otherwise, split the node equally - left.numItems = (MAX_ITEMS / 2) + 1; - right.numItems = MAX_ITEMS / 2; - } - - for (size_t i = 0; i < left.numItems; i++) { - addToAccum(left.items[i], left); - } - - for (size_t i = 0; i < right.numItems; i++) { - right.items[i] = left.items[left.numItems + i]; - addToAccum(right.items[i], right); - } - - for (size_t i = left.numItems; i < MAX_ITEMS + 1; i++) left.items[i].setToZero(); - - right.nextSibling = left.nextSibling; - left.nextSibling = rightPtr.nodeId; - right.prevSibling = crumb.nodePtr.nodeId; - - if (right.nextSibling) { - auto &rightRight = getNodeWrite(right.nextSibling).get(); - rightRight.prevSibling = rightPtr.nodeId; - } - - newKey = { right.items[0].item, rightPtr.nodeId }; - } - - // Update left-most key, in case item was inserted at the beginning - - refreshIndex(node, 0); - } - - // Out of breadcrumbs but still need to merge: New level required - - if (needsMerge) { - auto &left = getNodeRead(rootNodeId).get(); - auto &right = getNodeRead(newKey.nodeId).get(); - - auto newRootPtr = makeNode(); - auto &newRoot = newRootPtr.get(); - newRoot.numItems = 2; - - newRoot.accum.add(left.accum); - newRoot.accum.add(right.accum); - newRoot.accumCount = left.accumCount + right.accumCount; - - newRoot.items[0] = left.items[0]; - newRoot.items[0].nodeId = rootNodeId; - newRoot.items[1] = right.items[0]; - newRoot.items[1].nodeId = newKey.nodeId; - - setRootNodeId(newRootPtr.nodeId); - } - - return true; - } - - - - /// Erase - - bool erase(uint64_t createdAt, std::string_view id) { - return eraseItem(Item(createdAt, id)); - } - - bool eraseItem(const Item &oldItem) { - auto rootNodeId = getRootNodeId(); - if (!rootNodeId) return false; - - - // Traverse interior nodes, leaving breadcrumbs along the way - - bool found; - auto breadcrumbs = searchItem(rootNodeId, oldItem, found); - if (!found) return false; - - - // Remove from node - - bool needsRemove = true; - bool neighbourRefreshNeeded = false; - - while (breadcrumbs.size()) { - auto crumb = breadcrumbs.back(); - breadcrumbs.pop_back(); - - auto &node = getNodeWrite(crumb.nodePtr.nodeId).get(); - - if (!needsRemove) { - node.accum.sub(oldItem); - node.accumCount--; - } else { - for (size_t i = crumb.index + 1; i < node.numItems; i++) node.items[i - 1] = node.items[i]; - node.numItems--; - node.items[node.numItems].setToZero(); - - node.accum.sub(oldItem); - node.accumCount--; - - needsRemove = false; - } - - - if (crumb.index < node.numItems) refreshIndex(node, crumb.index); - - if (neighbourRefreshNeeded) { - refreshIndex(node, crumb.index + 1); - neighbourRefreshNeeded = false; - } - - - if (node.numItems < MIN_ITEMS && breadcrumbs.size() && breadcrumbs.back().nodePtr.get().numItems > 1) { - auto rebalance = [&](Node &leftNode, Node &rightNode) { - size_t totalItems = leftNode.numItems + rightNode.numItems; - size_t numLeft = (totalItems + 1) / 2; - size_t numRight = totalItems - numLeft; - - Accumulator accum; - accum.setToZero(); - uint64_t accumCount = 0; - - if (rightNode.numItems >= numRight) { - // Move extra from right to left - - size_t numMove = rightNode.numItems - numRight; - - for (size_t i = 0; i < numMove; i++) { - auto &item = rightNode.items[i]; - if (item.nodeId == 0) { - accum.add(item.item); - accumCount++; - } else { - auto &movingNode = getNodeRead(item.nodeId).get(); - accum.add(movingNode.accum); - accumCount += movingNode.accumCount; - } - leftNode.items[leftNode.numItems + i] = item; - } - - ::memmove(rightNode.items, rightNode.items + numMove, (rightNode.numItems - numMove) * sizeof(rightNode.items[0])); - - for (size_t i = numRight; i < rightNode.numItems; i++) rightNode.items[i].setToZero(); - - leftNode.accum.add(accum); - rightNode.accum.sub(accum); - - leftNode.accumCount += accumCount; - rightNode.accumCount -= accumCount; - - neighbourRefreshNeeded = true; - } else { - // Move extra from left to right - - size_t numMove = leftNode.numItems - numLeft; - - ::memmove(rightNode.items + numMove, rightNode.items, rightNode.numItems * sizeof(rightNode.items[0])); - - for (size_t i = 0; i < numMove; i++) { - auto &item = leftNode.items[numLeft + i]; - if (item.nodeId == 0) { - accum.add(item.item); - accumCount++; - } else { - auto &movingNode = getNodeRead(item.nodeId).get(); - accum.add(movingNode.accum); - accumCount += movingNode.accumCount; - } - rightNode.items[i] = item; - } - - for (size_t i = numLeft; i < leftNode.numItems; i++) leftNode.items[i].setToZero(); - - leftNode.accum.sub(accum); - rightNode.accum.add(accum); - - leftNode.accumCount -= accumCount; - rightNode.accumCount += accumCount; - } - - leftNode.numItems = numLeft; - rightNode.numItems = numRight; - }; - - if (breadcrumbs.back().index == 0) { - // Use neighbour to the right - - auto &leftNode = node; - auto &rightNode = getNodeWrite(node.nextSibling).get(); - size_t totalItems = leftNode.numItems + rightNode.numItems; - - if (totalItems <= REBALANCE_THRESHOLD) { - // Move all items into right - - ::memmove(rightNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems); - ::memcpy(rightNode.items, leftNode.items, sizeof(leftNode.items[0]) * leftNode.numItems); - - rightNode.numItems += leftNode.numItems; - rightNode.accumCount += leftNode.accumCount; - rightNode.accum.add(leftNode.accum); - - if (leftNode.prevSibling) getNodeWrite(leftNode.prevSibling).get().nextSibling = leftNode.nextSibling; - rightNode.prevSibling = leftNode.prevSibling; - - leftNode.numItems = 0; - } else { - // Rebalance from left to right - - rebalance(leftNode, rightNode); - } - } else { - // Use neighbour to the left - - auto &leftNode = getNodeWrite(node.prevSibling).get(); - auto &rightNode = node; - size_t totalItems = leftNode.numItems + rightNode.numItems; - - if (totalItems <= REBALANCE_THRESHOLD) { - // Move all items into left - - ::memcpy(leftNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems); - - leftNode.numItems += rightNode.numItems; - leftNode.accumCount += rightNode.accumCount; - leftNode.accum.add(rightNode.accum); - - if (rightNode.nextSibling) getNodeWrite(rightNode.nextSibling).get().prevSibling = rightNode.prevSibling; - leftNode.nextSibling = rightNode.nextSibling; - - rightNode.numItems = 0; - } else { - // Rebalance from right to left - - rebalance(leftNode, rightNode); - } - } - } - - if (node.numItems == 0) { - if (node.prevSibling) getNodeWrite(node.prevSibling).get().nextSibling = node.nextSibling; - if (node.nextSibling) getNodeWrite(node.nextSibling).get().prevSibling = node.prevSibling; - - needsRemove = true; - - deleteNode(crumb.nodePtr.nodeId); - } - } - - if (needsRemove) { - setRootNodeId(0); - } else { - auto &node = getNodeRead(rootNodeId).get(); - - if (node.numItems == 1 && node.items[0].nodeId) { - setRootNodeId(node.items[0].nodeId); - deleteNode(rootNodeId); - } - } - - return true; - } - - - //// Compat with the vector interface - - void seal() { - } - - void unseal() { - } - - - //// Utils - - void refreshIndex(Node &node, size_t index) { - auto childNodePtr = getNodeRead(node.items[index].nodeId); - if (childNodePtr.exists()) { - auto &childNode = childNodePtr.get(); - node.items[index].item = childNode.items[0].item; - } - } - - void addToAccum(const Key &k, Node &node) { - if (k.nodeId == 0) { - node.accum.add(k.item); - node.accumCount++; - } else { - auto nodePtr = getNodeRead(k.nodeId); - node.accum.add(nodePtr.get().accum); - node.accumCount += nodePtr.get().accumCount; - } - } - - void traverseToOffset(size_t index, const std::function &cb, std::function customAccum = nullptr) { - auto rootNodePtr = getNodeRead(getRootNodeId()); - if (!rootNodePtr.exists()) return; - auto &rootNode = rootNodePtr.get(); - - if (index > rootNode.accumCount) throw err("out of range"); - return traverseToOffsetAux(index, rootNode, cb, customAccum); - } - - void traverseToOffsetAux(size_t index, Node &node, const std::function &cb, std::function customAccum) { - if (node.numItems == node.accumCount) { - cb(node, index); - return; - } - - for (size_t i = 0; i < node.numItems; i++) { - auto &child = getNodeRead(node.items[i].nodeId).get(); - if (index < child.accumCount) return traverseToOffsetAux(index, child, cb, customAccum); - index -= child.accumCount; - if (customAccum) customAccum(child); - } - } - - - - //// Interface - - uint64_t size() { - auto rootNodePtr = getNodeRead(getRootNodeId()); - if (!rootNodePtr.exists()) return 0; - auto &rootNode = rootNodePtr.get(); - return rootNode.accumCount; - } - - const Item &getItem(size_t index) { - if (index >= size()) throw err("out of range"); - - Item *out; - traverseToOffset(index, [&](Node &node, size_t index){ - out = &node.items[index].item; - }); - return *out; - } - - void iterate(size_t begin, size_t end, std::function cb) { - checkBounds(begin, end); - - size_t num = end - begin; - - traverseToOffset(begin, [&](Node &node, size_t index){ - Node *currNode = &node; - for (size_t i = 0; i < num; i++) { - if (!cb(currNode->items[index].item, begin + i)) return; - index++; - if (index >= currNode->numItems) { - currNode = getNodeRead(currNode->nextSibling).p; - index = 0; - } - } - }); - } - - size_t findLowerBound(size_t begin, size_t end, const Bound &value) { - checkBounds(begin, end); - - auto rootNodePtr = getNodeRead(getRootNodeId()); - if (!rootNodePtr.exists()) return end; - auto &rootNode = rootNodePtr.get(); - if (value.item <= rootNode.items[0].item) return begin; - return std::min(findLowerBoundAux(value, rootNodePtr, 0), end); - } - - size_t findLowerBoundAux(const Bound &value, NodePtr nodePtr, uint64_t numToLeft) { - if (!nodePtr.exists()) return numToLeft + 1; - - Node &node = nodePtr.get(); - - for (size_t i = 1; i < node.numItems; i++) { - if (value.item <= node.items[i].item) { - return findLowerBoundAux(value, getNodeRead(node.items[i - 1].nodeId), numToLeft); - } else { - if (node.items[i - 1].nodeId) numToLeft += getNodeRead(node.items[i - 1].nodeId).get().accumCount; - else numToLeft++; - } - } - - return findLowerBoundAux(value, getNodeRead(node.items[node.numItems - 1].nodeId), numToLeft); - } - - Fingerprint fingerprint(size_t begin, size_t end) { - checkBounds(begin, end); - - auto getAccumLeftOf = [&](size_t index) { - Accumulator accum; - accum.setToZero(); - - traverseToOffset(index, [&](Node &node, size_t index){ - for (size_t i = 0; i < index; i++) accum.add(node.items[i].item); - }, [&](Node &node){ - accum.add(node.accum); - }); - - return accum; - }; - - auto accum1 = getAccumLeftOf(begin); - auto accum2 = getAccumLeftOf(end); - - accum1.negate(); - accum2.add(accum1); - - return accum2.getFingerprint(end - begin); - } - - private: - void checkBounds(size_t begin, size_t end) { - if (begin > end || end > size()) throw negentropy::err("bad range"); - } -}; - - -}}} diff --git a/waku/waku_sync/headers/negentropy/storage/btree/debug.h b/waku/waku_sync/headers/negentropy/storage/btree/debug.h deleted file mode 100644 index 241524275a..0000000000 --- a/waku/waku_sync/headers/negentropy/storage/btree/debug.h +++ /dev/null @@ -1,189 +0,0 @@ -#pragma once - -#include -#include - -#include - -#include "negentropy/storage/btree/core.h" -#include "negentropy/storage/BTreeMem.h" -#include "negentropy/storage/BTreeLMDB.h" - - -namespace negentropy { namespace storage { namespace btree { - - -using err = std::runtime_error; - - -inline void dump(BTreeCore &btree, uint64_t nodeId, int depth) { - if (nodeId == 0) { - if (depth == 0) std::cout << "EMPTY TREE" << std::endl; - return; - } - - auto nodePtr = btree.getNodeRead(nodeId); - auto &node = nodePtr.get(); - std::string indent(depth * 4, ' '); - - std::cout << indent << "NODE id=" << nodeId << " numItems=" << node.numItems << " accum=" << hoytech::to_hex(node.accum.sv()) << " accumCount=" << node.accumCount << std::endl; - - for (size_t i = 0; i < node.numItems; i++) { - std::cout << indent << " item: " << node.items[i].item.timestamp << "," << hoytech::to_hex(node.items[i].item.getId()) << std::endl; - dump(btree, node.items[i].nodeId, depth + 1); - } -} - -inline void dump(BTreeCore &btree) { - dump(btree, btree.getRootNodeId(), 0); -} - - -struct VerifyContext { - std::optional leafDepth; - std::set allNodeIds; - std::vector leafNodeIds; -}; - -inline void verify(BTreeCore &btree, uint64_t nodeId, uint64_t depth, VerifyContext &ctx, Accumulator *accumOut = nullptr, uint64_t *accumCountOut = nullptr) { - if (nodeId == 0) return; - - if (ctx.allNodeIds.contains(nodeId)) throw err("verify: saw node id again"); - ctx.allNodeIds.insert(nodeId); - - auto nodePtr = btree.getNodeRead(nodeId); - auto &node = nodePtr.get(); - - if (node.numItems == 0) throw err("verify: empty node"); - if (node.nextSibling && node.numItems < MIN_ITEMS) throw err("verify: too few items in node"); - if (node.numItems > MAX_ITEMS) throw err("verify: too many items"); - - if (node.items[0].nodeId == 0) { - if (ctx.leafDepth) { - if (*ctx.leafDepth != depth) throw err("verify: mismatch of leaf depth"); - } else { - ctx.leafDepth = depth; - } - - ctx.leafNodeIds.push_back(nodeId); - } - - // FIXME: verify unused items are zeroed - - Accumulator accum; - accum.setToZero(); - uint64_t accumCount = 0; - - for (size_t i = 0; i < node.numItems; i++) { - uint64_t childNodeId = node.items[i].nodeId; - if (childNodeId == 0) { - accum.add(node.items[i].item); - accumCount++; - } else { - { - auto firstChildPtr = btree.getNodeRead(childNodeId); - auto &firstChild = firstChildPtr.get(); - if (firstChild.numItems == 0 || firstChild.items[0].item != node.items[i].item) throw err("verify: key does not match child's first key"); - } - verify(btree, childNodeId, depth + 1, ctx, &accum, &accumCount); - } - - if (i < node.numItems - 1) { - if (!(node.items[i].item < node.items[i + 1].item)) throw err("verify: items out of order"); - } - } - - for (size_t i = node.numItems; i < MAX_ITEMS + 1; i++) { - for (size_t j = 0; j < sizeof(Key); j++) if (((char*)&node.items[i])[j] != '\0') throw err("verify: memory not zeroed out"); - } - - if (accumCount != node.accumCount) throw err("verify: accumCount mismatch"); - if (accum.sv() != node.accum.sv()) throw err("verify: accum mismatch"); - - if (accumOut) accumOut->add(accum); - if (accumCountOut) *accumCountOut += accumCount; -} - -inline void verify(BTreeCore &btree, bool isLMDB) { - VerifyContext ctx; - Accumulator accum; - accum.setToZero(); - uint64_t accumCount = 0; - - verify(btree, btree.getRootNodeId(), 0, ctx, &accum, &accumCount); - - if (ctx.leafNodeIds.size()) { - uint64_t i = 0, totalItems = 0; - auto nodePtr = btree.getNodeRead(ctx.leafNodeIds[0]); - std::optional prevItem; - uint64_t prevSibling = 0; - - while (nodePtr.exists()) { - auto &node = nodePtr.get(); - if (nodePtr.nodeId != ctx.leafNodeIds[i]) throw err("verify: leaf id mismatch"); - - if (prevSibling != node.prevSibling) throw err("verify: prevSibling mismatch"); - prevSibling = nodePtr.nodeId; - - nodePtr = btree.getNodeRead(node.nextSibling); - i++; - - for (size_t j = 0; j < node.numItems; j++) { - if (prevItem && !(*prevItem < node.items[j].item)) throw err("verify: leaf item out of order"); - prevItem = node.items[j].item; - totalItems++; - } - } - - if (totalItems != accumCount) throw err("verify: leaf count mismatch"); - } - - // Check for leaks - - if (isLMDB) { - static_assert(std::endian::native == std::endian::little); // FIXME - - auto &btreeLMDB = dynamic_cast(btree); - btreeLMDB.flush(); - - std::string_view key, val; - - // Leaks - - auto cursor = lmdb::cursor::open(btreeLMDB.txn, btreeLMDB.dbi); - - if (cursor.get(key, val, MDB_FIRST)) { - do { - uint64_t nodeId = lmdb::from_sv(key.substr(8)); - if (nodeId != 0 && !ctx.allNodeIds.contains(nodeId)) throw err("verify: memory leak"); - } while (cursor.get(key, val, MDB_NEXT)); - } - - // Dangling - - for (const auto &k : ctx.allNodeIds) { - std::string tpKey; - tpKey += lmdb::to_sv(btreeLMDB.treeId); - tpKey += lmdb::to_sv(k); - if (!btreeLMDB.dbi.get(btreeLMDB.txn, tpKey, val)) throw err("verify: dangling node"); - } - } else { - auto &btreeMem = dynamic_cast(btree); - - // Leaks - - for (const auto &[k, v] : btreeMem._nodeStorageMap) { - if (!ctx.allNodeIds.contains(k)) throw err("verify: memory leak"); - } - - // Dangling - - for (const auto &k : ctx.allNodeIds) { - if (!btreeMem._nodeStorageMap.contains(k)) throw err("verify: dangling node"); - } - } -} - - - -}}} diff --git a/waku/waku_sync/headers/negentropy/types.h b/waku/waku_sync/headers/negentropy/types.h deleted file mode 100644 index ce9d9dd7c9..0000000000 --- a/waku/waku_sync/headers/negentropy/types.h +++ /dev/null @@ -1,184 +0,0 @@ -// (C) 2023 Doug Hoyte. MIT license - -#pragma once - -#include - - -namespace negentropy { - -using err = std::runtime_error; - -const size_t ID_SIZE = 32; -const size_t FINGERPRINT_SIZE = 16; - - -enum class Mode { - Skip = 0, - Fingerprint = 1, - IdList = 2, -}; - - -struct Item { - uint64_t timestamp; - uint8_t id[ID_SIZE]; - - explicit Item(uint64_t timestamp = 0) : timestamp(timestamp) { - memset(id, '\0', sizeof(id)); - } - - explicit Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp) { - if (id_.size() != sizeof(id)) throw negentropy::err("bad id size for Item"); - memcpy(id, id_.data(), sizeof(id)); - } - - std::string_view getId() const { - return std::string_view(reinterpret_cast(id), sizeof(id)); - } - - bool operator==(const Item &other) const { - return timestamp == other.timestamp && getId() == other.getId(); - } -}; - -inline bool operator<(const Item &a, const Item &b) { - return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId(); -}; - -inline bool operator<=(const Item &a, const Item &b) { - return a.timestamp != b.timestamp ? a.timestamp <= b.timestamp : a.getId() <= b.getId(); -}; - - -struct Bound { - Item item; - size_t idLen; - - explicit Bound(uint64_t timestamp = 0, std::string_view id = "") : item(timestamp), idLen(id.size()) { - if (idLen > ID_SIZE) throw negentropy::err("bad id size for Bound"); - memcpy(item.id, id.data(), idLen); - } - - explicit Bound(const Item &item_) : item(item_), idLen(ID_SIZE) {} - - bool operator==(const Bound &other) const { - return item == other.item; - } -}; - -inline bool operator<(const Bound &a, const Bound &b) { - return a.item < b.item; -}; - - -struct Fingerprint { - uint8_t buf[FINGERPRINT_SIZE]; - - std::string_view sv() const { - return std::string_view(reinterpret_cast(buf), sizeof(buf)); - } -}; - -struct Accumulator { - uint8_t buf[ID_SIZE]; - - void setToZero() { - memset(buf, '\0', sizeof(buf)); - } - - void add(const Item &item) { - add(item.id); - } - - void add(const Accumulator &acc) { - add(acc.buf); - } - - void add(const uint8_t *otherBuf) { - uint64_t currCarry = 0, nextCarry = 0; - uint64_t *p = reinterpret_cast(buf); - const uint64_t *po = reinterpret_cast(otherBuf); - - auto byteswap = [](uint64_t &n) { - uint8_t *first = reinterpret_cast(&n); - uint8_t *last = first + 8; - std::reverse(first, last); - }; - - for (size_t i = 0; i < 4; i++) { - uint64_t orig = p[i]; - uint64_t otherV = po[i]; - - if constexpr (std::endian::native == std::endian::big) { - byteswap(orig); - byteswap(otherV); - } else { - static_assert(std::endian::native == std::endian::little); - } - - uint64_t next = orig; - - next += currCarry; - if (next < orig) nextCarry = 1; - - next += otherV; - if (next < otherV) nextCarry = 1; - - if constexpr (std::endian::native == std::endian::big) { - byteswap(next); - } - - p[i] = next; - currCarry = nextCarry; - nextCarry = 0; - } - } - - void negate() { - for (size_t i = 0; i < sizeof(buf); i++) { - buf[i] = ~buf[i]; - } - - Accumulator one; - one.setToZero(); - one.buf[0] = 1; - add(one.buf); - } - - void sub(const Item &item) { - sub(item.id); - } - - void sub(const Accumulator &acc) { - sub(acc.buf); - } - - void sub(const uint8_t *otherBuf) { - Accumulator neg; - memcpy(neg.buf, otherBuf, sizeof(buf)); - neg.negate(); - add(neg); - } - - std::string_view sv() const { - return std::string_view(reinterpret_cast(buf), sizeof(buf)); - } - - Fingerprint getFingerprint(uint64_t n) { - std::string input; - input += sv(); - input += encodeVarInt(n); - - unsigned char hash[SHA256_DIGEST_LENGTH]; - SHA256(reinterpret_cast(input.data()), input.size(), hash); - - Fingerprint out; - memcpy(out.buf, hash, FINGERPRINT_SIZE); - - return out; - } -}; - - -} diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index c59246b7af..ed06407ba1 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -32,8 +32,8 @@ type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} WakuSync* = ref object of LPProtocol - storage: Storage - negentropy: Negentropy + storage: pointer + negentropy: pointer peerManager: PeerManager maxFrameSize: int # Not sure if this should be protocol defined or not... syncInterval: Duration @@ -43,13 +43,14 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) if msg.ephemeral: return - let msgHash = computeMessageHash(pubsubTopic, msg) - - self.storage.insert(msg.timestamp, msgHash) + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) + debug "inserting message into storage ", hash=msgHash + let result: bool = negentropyStorageInsert(self.storage, msg.timestamp, msgHash) + if not result : + debug "failed to insert message ", hash=msgHash.toHex() proc serverReconciliation(self: WakuSync, message: seq[byte]): Result[seq[byte], string] = - let payload = self.negentropy.serverReconcile(message) - + let payload: seq[byte] = negentropyServerReconcile(self.negentropy, message) ok(payload) proc clientReconciliation( @@ -57,19 +58,19 @@ proc clientReconciliation( haveHashes: var seq[WakuMessageHash], needHashes: var seq[WakuMessageHash], ): Result[Option[seq[byte]], string] = - let payload = self.negentropy.clientReconcile(message, haveHashes, needHashes) - + let payload: seq[byte] = negentropyClientReconcile(self.negentropy, message, haveHashes, needHashes) ok(some(payload)) proc intitialization(self: WakuSync): Future[Result[seq[byte], string]] {.async.} = - let payload = self.negentropy.initiate() + let payload: seq[byte] = negentropyInitiate(self.negentropy) + info "initialized negentropy ", value=payload ok(payload) proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let request = (await self.intitialization()).valueOr: + let request: seq[byte] = (await self.intitialization()).valueOr: return err(error) - + debug "sending request to server", req=request let writeRes = catch: await conn.writeLP(request) if writeRes.isErr(): return err(writeRes.error.msg) @@ -80,42 +81,43 @@ proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHas while true: let readRes = catch: await conn.readLp(self.maxFrameSize) - let buffer = readRes.valueOr: + let buffer: seq[byte] = readRes.valueOr: return err(error.msg) - - let responseOpt = self.clientReconciliation(buffer, haveHashes, needHashes).valueOr: + debug "Received Sync request from peer", request=buffer + let responseOpt: Option[seq[byte]] = self.clientReconciliation(buffer, haveHashes, needHashes).valueOr: return err(error) - let response = - if responseOpt.isNone(): + let response: seq[byte] = + if responseOpt.isNone() or responseOpt.get().len == 0: + debug "Closing connection as sync response is none" await conn.close() break else: responseOpt.get() - + debug "Sending Sync response to peer", response=response let writeRes = catch: await conn.writeLP(response) if writeRes.isErr(): return err(writeRes.error.msg) - + #Need to handle empty needhashes return return ok(needHashes) proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let peer = self.peerManager.selectPeer(WakuSyncCodec).valueOr: + let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: return err("No suitable peer found for sync") - let conn = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: return err("Cannot establish sync connection") - let hashes = (await self.request(conn)).valueOr: + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: return err("Sync request error: " & error) ok(hashes) proc sync*(self: WakuSync, peer: RemotePeerInfo): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let conn = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: return err("Cannot establish sync connection") - let hashes = (await self.request(conn)).valueOr: + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: return err("Sync request error: " & error) ok(hashes) @@ -124,15 +126,14 @@ proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = while not conn.isClosed: # Not sure if this works as I think it does... let requestRes = catch: await conn.readLp(self.maxFrameSize) - let buffer = requestRes.valueOr: + let buffer: seq[byte] = requestRes.valueOr: error "Connection reading error", error=error.msg return - - let response = self.serverReconciliation(buffer).valueOr: + let response: seq[byte] = self.serverReconciliation(buffer).valueOr: error "Reconciliation error", error=error return - let writeRes = catch: await conn.writeLP(response) + let writeRes= catch: await conn.writeLP(response) if writeRes.isErr(): error "Connection write error", error=writeRes.error.msg return @@ -146,9 +147,9 @@ proc new*(T: type WakuSync, syncInterval: Duration = DefaultSyncInterval, callback: Option[WakuSyncCallback] = none(WakuSyncCallback) ): T = - let storage = Storage.new() + let storage = negentropyNewStorage() - let negentropy = Negentropy.new(storage, uint64(maxFrameSize)) + let negentropy = negentropyNew(storage, uint64(maxFrameSize)) let sync = WakuSync( storage: storage, diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim index 9b5b332bfd..88033f5476 100644 --- a/waku/waku_sync/raw_bindings.nim +++ b/waku/waku_sync/raw_bindings.nim @@ -6,173 +6,234 @@ else: from os import DirSep import - std/[strutils, sequtils] + std/[strutils], + ../waku_core/message, + chronicles, + std/options, + stew/byteutils -import - ../waku_core/message - -{.link: "../waku_sync/negentropy.so".} #TODO build the dyn lib - -const negentropyPath = currentSourcePath.rsplit(DirSep, 1)[0] +const negentropyPath = currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep & "vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep -const NEGENTROPY_HEADER = negentropyPath & DirSep & "headers" & DirSep & "negentropy.h" +{.link: negentropyPath & "libnegentropy.so".} -### String ### +const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" -type - String {.importcpp: "std::string", header: "", byref.} = object - -proc size(self: String): csize_t {.importcpp: "size", header: "".} -proc resize(self: String, len: csize_t) {.importcpp: "resize", header: "".} -proc cStr(self: String): pointer {.importcpp: "c_str", header: "".} -proc initString(): String {.importcpp: "std::string()", constructor, header: ""} -proc toBytes(self: String): seq[byte] = - let size = self.size() +#[ proc StringtoBytes(data: cstring): seq[byte] = + let size = data.len() var bytes = newSeq[byte](size) + copyMem(bytes[0].addr, data[0].unsafeAddr, size) - copyMem(bytes[0].addr, self.cStr(), size) + return bytes ]# - return bytes +type Buffer* = object + len*: uint64 + `ptr`*: ptr uint8 -proc toWakuMessageHash(self: String): WakuMessageHash = - assert self.size == 32 +type BindingResult* = object + output: Buffer + have_ids_len: uint + need_ids_len: uint + have_ids: ptr Buffer + need_ids: ptr Buffer + +proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = + assert buffer.len == 32 var hash: WakuMessageHash - copyMem(hash[0].addr, self.cStr(), 32) + copyMem(hash[0].addr, buffer.ptr, 32) return hash -proc fromWakuMessageHash(T: type String, hash: WakuMessageHash): T = - let cppString = initString() - - cppString.resize(csize_t(32)) # add the null terminator at the end??? - - copyMem(cppString.cStr(), hash[0].unsafeAddr, 32) - - return cppString - -proc fromBytes(T: type String, bytes: seq[byte]): T = - let size = bytes.len - - let cppString = initString() - - cppString.resize(csize_t(size)) # add the null terminator at the end??? - - copyMem(cppString.cStr(), bytes[0].unsafeAddr, size) - - return cppString - -### Vector ### - -type - Vector[T] {.importcpp: "std::vector", header: "", byref.} = object - VectorIter[T] {.importcpp: "std::vector<'0>::iterator", header: "", byref.} = object - -proc initVector[T](): Vector[T] {.importcpp: "std::vector<'*0>()", constructor, header: "".} -proc size(self: Vector): csize_t {.importcpp: "size", header: "".} -proc begin[T](self: Vector[T]): VectorIter[T] {.importcpp: "begin", header: "".} -proc `[]`[T](self: VectorIter[T]): T {.importcpp: "*#", header: "".} -proc next*[T](self: VectorIter[T]; n = 1): VectorIter[T] {.importcpp: "next(@)", header: "".} - -proc toSeq*[T](vec: Vector[T]): seq[T] = - result = newSeq[T](vec.size()) - - var itr = vec.begin() +proc toBuffer*(x: openArray[byte]): Buffer = + ## converts the input to a Buffer object + ## the Buffer object is used to communicate data with the rln lib + var temp = @x + let baseAddr = cast[pointer](x) + let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) + return output + +proc BufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)):seq[byte] = + var bufLen: uint64 + if isNone(len): + bufLen = buffer.len + else: + bufLen = len.get() + if bufLen == 0: + return @[] + debug "length of buffer is",len=bufLen + let bytes = newSeq[byte](bufLen) + copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen) + return bytes - for i in 0.. 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + debug "have and need hashes ",have_count=have_hashes.len, need_count=need_hashes.len + + for i in 0..have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + debug "have hashes ", index=i, hash=hash.to0xHex() + haveIds.add(hash) + + for i in 0..need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + debug "need hashes ", index=i, hash=hash.to0xHex() + needIds.add(hash) + + +#[ Callback Approach, to be uncommented later during optimization phase + var + cppHaveIds: cstringArray = allocCStringArray([]) + cppNeedIds: cstringArray = allocCStringArray([]) + haveIdsLen: uint + needIdsLen: uint + output: seq[byte] = newSeq[byte](1) #TODO: fix this hack. + + + let handler:ReconcileCallback = proc(have_ids: ptr Buffer, have_ids_len:uint64, need_ids: ptr Buffer, + need_ids_len:uint64, outBuffer: ptr Buffer, outptr: var ptr cchar) {.cdecl, raises: [], gcsafe.} = + debug "ReconcileCallback: Received needHashes from client:", len = need_ids_len , outBufLen=outBuffer.len + if outBuffer.len > 0: + let ret = BufferToBytes(outBuffer) + outptr = cast[ptr cchar](ret[0].unsafeAddr) + + try: + let ret = raw_reconcile(negentropy, cQuery.unsafeAddr, handler, cast[ptr cchar](output[0].unsafeAddr)) + if ret != 0: + error "failed to reconcile" + return + except Exception as e: + error "exception raised from raw_reconcile", error=e.msg ]# + + +#[ debug "haveIdsLen", len=haveIdsLen - let cppString = self.raw_reconcile(cppQuery, cppHaveIds, cppNeedIds) + for ele in cstringArrayToSeq(cppHaveIds, haveIdsLen): + haveIds.add(toWakuMessageHash(ele)) - let haveHashes = cppHaveIds.toSeq().mapIt(it.toWakuMessageHash()) - let needHashes = cppNeedIds.toSeq().mapIt(it.toWakuMessageHash()) + for ele in cstringArrayToSeq(cppNeedIds, needIdsLen): + needIds.add(toWakuMessageHash(ele)) - haveIds.add(haveHashes) - needIds.add(needHashes) + deallocCStringArray(cppHaveIds) + deallocCStringArray(cppNeedIds) ]# + free_result(myResultPtr) - let payload = cppString.toBytes() + debug "return " , output=output - return payload \ No newline at end of file + return output \ No newline at end of file