diff --git a/.gitmodules b/.gitmodules index 69b4fe2cab..ffd222ee65 100644 --- a/.gitmodules +++ b/.gitmodules @@ -164,3 +164,8 @@ branch = master path = vendor/nim-results url = https://github.com/arnetheduck/nim-results.git +[submodule "vendor/negentropy"] + ignore = untracked + path = vendor/negentropy + url = https://github.com/waku-org/negentropy.git + branch = feat/c-wrapper \ No newline at end of file diff --git a/Makefile b/Makefile index 2125f5d85c..3126e3c730 100644 --- a/Makefile +++ b/Makefile @@ -32,10 +32,10 @@ else # "variables.mk" was included. Business as usual until the end of this file ########## ## Main ## ########## -.PHONY: all test update clean +.PHONY: all test update clean negentropy # default target, because it's the first one that doesn't start with '.' -all: | wakunode2 example2 chat2 chat2bridge libwaku +all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku test: | testcommon testwaku @@ -46,7 +46,7 @@ update: | update-common rm -rf waku.nims && \ $(MAKE) waku.nims $(HANDLE_OUTPUT) -clean: +clean: | negentropy-clean rm -rf build # must be included after the default target @@ -182,11 +182,11 @@ testcommon: | build deps .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge # install anvil only for the testwaku target -testwaku: | build deps anvil librln +testwaku: | build deps anvil librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims -wakunode2: | build deps librln +wakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims @@ -194,7 +194,7 @@ benchmarks: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim benchmarks $(NIM_PARAMS) waku.nims -testwakunode2: | build deps librln +testwakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) waku.nims @@ -335,3 +335,9 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 +negentropy: + $(MAKE) -C vendor/negentropy/cpp && \ + cp vendor/negentropy/cpp/libnegentropy.so ./ +negentropy-clean: + $(MAKE) -C vendor/negentropy/cpp clean && \ + rm libnegentropy.so diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim new file mode 100644 index 0000000000..c277a0d1e2 --- /dev/null +++ b/tests/waku_sync/sync_utils.nim @@ -0,0 +1,29 @@ +{.used.} + +import + std/options, + chronos, + chronicles, + libp2p/crypto/crypto + +import + ../../../waku/[ + node/peer_manager, + waku_core, + waku_sync, + ], + ../testlib/[ + common, + wakucore + ] + +proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = + const DefaultFrameSize = 153600 + let + peerManager = PeerManager.new(switch) + proto = WakuSync.new(peerManager, DefaultFrameSize, 2.seconds, some(handler)) + + proto.start() + switch.mount(proto) + + return proto \ No newline at end of file diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim new file mode 100644 index 0000000000..178e9277ea --- /dev/null +++ b/tests/waku_sync/test_all.nim @@ -0,0 +1,4 @@ +{.used.} + +import + ./test_protocol diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim new file mode 100644 index 0000000000..3861dc8dd1 --- /dev/null +++ b/tests/waku_sync/test_protocol.nim @@ -0,0 +1,141 @@ +{.used.} + +import + std/options, + testutils/unittests, + chronos, + chronicles, + libp2p/crypto/crypto, + stew/byteutils +from std/os import sleep + +import + ../../../waku/[ + common/paging, + node/peer_manager, + waku_core, + waku_core/message/digest, + waku_sync, + waku_sync/raw_bindings, + ], + ../testlib/[ + common, + wakucore + ], + ./sync_utils + + +suite "Waku Sync - Protocol Tests": + + asyncTest "test c integration": + let + s1 = negentropyNewStorage() + s2 = negentropyNewStorage() + ng1 = negentropyNew(s1,10000) + ng2 = negentropyNew(s2,10000) + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1) + var ret = negentropyStorageInsert(s1, msg1.timestamp, msgHash) + check: + ret == true + + ret = negentropyStorageInsert(s2, msg1.timestamp, msgHash) + check: + ret == true + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + ret = negentropyStorageInsert(s2, msg2.timestamp, msgHash2) + check: + ret == true + + let ng1_q1 = negentropyInitiate(ng1) + check: + ng1_q1.len > 0 + + let ng2_q1 = negentropyServerReconcile(ng2, ng1_q1) + check: + ng2_q1.len > 0 + + var + haveHashes: seq[WakuMessageHash] + needHashes: seq[WakuMessageHash] + let ng1_q2 = negentropyClientReconcile(ng1, ng2_q1, haveHashes, needHashes) + + check: + needHashes.len() == 1 + haveHashes.len() == 0 + ng1_q2.len == 0 + needHashes[0] == msgHash2 + + ret = negentropyStorageErase(s1, msg1.timestamp, msgHash) + check: + ret == true + + asyncTest "sync 2 nodes different hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + + var hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 1 + hashes.value[0] == computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer +#[ client.ingessMessage(DefaultPubsubTopic, msg2) + sleep(1000) + hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 0 ]# + + asyncTest "sync 2 nodes same hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + client.ingessMessage(DefaultPubsubTopic, msg2) + + let hashes = await client.sync(serverPeerInfo) + assert hashes.isOk(), $hashes.error + check: + hashes.value.len == 0 diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..1a59da6c32 --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit 1a59da6c32605bb5ae7c495c66dac3367d4bc560 diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 91c0494419..f77ba199be 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -37,6 +37,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_sync, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -97,6 +98,7 @@ type wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata + wakuSync*: WakuSync enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -182,6 +184,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source=source) +## Waku Sync + +proc mountWakuSync*(node: WakuNode): Result[void, string] = + if not node.wakuSync.isNil(): + return err("Waku sync already mounted, skipping") + + let sync = WakuSync.new(node.peerManager)#TODO add the callback and the options + + node.wakuSync = sync + + let catchRes = catch: node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) + if catchRes.isErr(): + return err(catchRes.error.msg) + + return ok() + ## Waku Metadata proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = @@ -237,10 +255,17 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) = + if node.wakuSync.isNil(): + return + + node.wakuSync.ingessMessage(topic, msg) + let defaultHandler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) + syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index ca85b61c63..103740e597 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -19,6 +19,7 @@ import ../../../waku_lightpush/common, ../../../waku_relay, ../../../waku_node, + ../../../waku_sync, ../../../node/peer_manager, ../responses, ../serdes, @@ -89,6 +90,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = connected: it.connectedness == Connectedness.Connected)) tuplesToWakuPeers(peers, lightpushPeers) + if not node.wakuSync.isNil(): + # Map WakuStore peers to WakuPeers and add to return list + let syncPeers = node.peerManager.peerStore + .peers(WakuSyncCodec) + .mapIt((multiaddr: constructMultiaddrStr(it), + protocol: WakuSyncCodec, + connected: it.connectedness == Connectedness.Connected)) + tuplesToWakuPeers(peers, syncPeers) + let resp = RestApiResponse.jsonResponse(peers, status=Http200) if resp.isErr(): error "An error ocurred while building the json respose: ", error=resp.error diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim new file mode 100644 index 0000000000..dc60abeaee --- /dev/null +++ b/waku/waku_sync.nim @@ -0,0 +1,10 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + ./waku_sync/protocol + +export + protocol diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim new file mode 100644 index 0000000000..ed06407ba1 --- /dev/null +++ b/waku/waku_sync/protocol.nim @@ -0,0 +1,186 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options, + stew/results, + chronicles, + chronos, + metrics, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/enr, + ../waku_core, + ../waku_enr, + ../node/peer_manager/peer_manager, + ./raw_bindings + +logScope: + topics = "waku sync" + +const WakuSyncCodec* = "/vac/waku/sync/1.0.0" +const DefaultFrameSize = 153600 # using a random number for now +const DefaultSyncInterval = 60.minutes + +type + WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} + + WakuSync* = ref object of LPProtocol + storage: pointer + negentropy: pointer + peerManager: PeerManager + maxFrameSize: int # Not sure if this should be protocol defined or not... + syncInterval: Duration + callback: Option[WakuSyncCallback] + +proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = + if msg.ephemeral: + return + + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) + debug "inserting message into storage ", hash=msgHash + let result: bool = negentropyStorageInsert(self.storage, msg.timestamp, msgHash) + if not result : + debug "failed to insert message ", hash=msgHash.toHex() + +proc serverReconciliation(self: WakuSync, message: seq[byte]): Result[seq[byte], string] = + let payload: seq[byte] = negentropyServerReconcile(self.negentropy, message) + ok(payload) + +proc clientReconciliation( + self: WakuSync, message: seq[byte], + haveHashes: var seq[WakuMessageHash], + needHashes: var seq[WakuMessageHash], + ): Result[Option[seq[byte]], string] = + let payload: seq[byte] = negentropyClientReconcile(self.negentropy, message, haveHashes, needHashes) + ok(some(payload)) + +proc intitialization(self: WakuSync): Future[Result[seq[byte], string]] {.async.} = + let payload: seq[byte] = negentropyInitiate(self.negentropy) + info "initialized negentropy ", value=payload + + ok(payload) + +proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let request: seq[byte] = (await self.intitialization()).valueOr: + return err(error) + debug "sending request to server", req=request + let writeRes = catch: await conn.writeLP(request) + if writeRes.isErr(): + return err(writeRes.error.msg) + + var + haveHashes: seq[WakuMessageHash] # What to do with haves ??? + needHashes: seq[WakuMessageHash] + + while true: + let readRes = catch: await conn.readLp(self.maxFrameSize) + let buffer: seq[byte] = readRes.valueOr: + return err(error.msg) + debug "Received Sync request from peer", request=buffer + let responseOpt: Option[seq[byte]] = self.clientReconciliation(buffer, haveHashes, needHashes).valueOr: + return err(error) + + let response: seq[byte] = + if responseOpt.isNone() or responseOpt.get().len == 0: + debug "Closing connection as sync response is none" + await conn.close() + break + else: + responseOpt.get() + debug "Sending Sync response to peer", response=response + let writeRes = catch: await conn.writeLP(response) + if writeRes.isErr(): + return err(writeRes.error.msg) + #Need to handle empty needhashes return + return ok(needHashes) + +proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: + return err("No suitable peer found for sync") + + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + return err("Cannot establish sync connection") + + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + return err("Sync request error: " & error) + + ok(hashes) + +proc sync*(self: WakuSync, peer: RemotePeerInfo): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + return err("Cannot establish sync connection") + + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + return err("Sync request error: " & error) + + ok(hashes) + +proc initProtocolHandler(self: WakuSync) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + while not conn.isClosed: # Not sure if this works as I think it does... + let requestRes = catch: await conn.readLp(self.maxFrameSize) + let buffer: seq[byte] = requestRes.valueOr: + error "Connection reading error", error=error.msg + return + let response: seq[byte] = self.serverReconciliation(buffer).valueOr: + error "Reconciliation error", error=error + return + + let writeRes= catch: await conn.writeLP(response) + if writeRes.isErr(): + error "Connection write error", error=writeRes.error.msg + return + + self.handler = handle + self.codec = WakuSyncCodec + +proc new*(T: type WakuSync, + peerManager: PeerManager, + maxFrameSize: int = DefaultFrameSize, + syncInterval: Duration = DefaultSyncInterval, + callback: Option[WakuSyncCallback] = none(WakuSyncCallback) +): T = + let storage = negentropyNewStorage() + + let negentropy = negentropyNew(storage, uint64(maxFrameSize)) + + let sync = WakuSync( + storage: storage, + negentropy: negentropy, + peerManager: peerManager, + maxFrameSize: maxFrameSize, + syncInterval: syncInterval, + callback: callback + ) + + sync.initProtocolHandler() + + info "Created WakuSync protocol" + + return sync + +proc periodicSync(self: WakuSync) {.async.} = + while self.started and self.callback.isSome(): + await sleepAsync(self.syncInterval) + + let hashes = (await self.sync()).valueOr: + continue + + let callback = self.callback.get() + + await callback(hashes) + +proc start*(self: WakuSync) = + self.started = true + + asyncSpawn self.periodicSync() + +proc stop*(self: WakuSync) = + self.started = false \ No newline at end of file diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim new file mode 100644 index 0000000000..88033f5476 --- /dev/null +++ b/waku/waku_sync/raw_bindings.nim @@ -0,0 +1,239 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +from os import DirSep + +import + std/[strutils], + ../waku_core/message, + chronicles, + std/options, + stew/byteutils + +const negentropyPath = currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep & "vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep + +{.link: negentropyPath & "libnegentropy.so".} + +const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" + + +#[ proc StringtoBytes(data: cstring): seq[byte] = + let size = data.len() + + var bytes = newSeq[byte](size) + copyMem(bytes[0].addr, data[0].unsafeAddr, size) + + return bytes ]# + +type Buffer* = object + len*: uint64 + `ptr`*: ptr uint8 + +type BindingResult* = object + output: Buffer + have_ids_len: uint + need_ids_len: uint + have_ids: ptr Buffer + need_ids: ptr Buffer + +proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = + assert buffer.len == 32 + + var hash: WakuMessageHash + + copyMem(hash[0].addr, buffer.ptr, 32) + + return hash + +proc toBuffer*(x: openArray[byte]): Buffer = + ## converts the input to a Buffer object + ## the Buffer object is used to communicate data with the rln lib + var temp = @x + let baseAddr = cast[pointer](x) + let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) + return output + +proc BufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)):seq[byte] = + var bufLen: uint64 + if isNone(len): + bufLen = buffer.len + else: + bufLen = len.get() + if bufLen == 0: + return @[] + debug "length of buffer is",len=bufLen + let bytes = newSeq[byte](bufLen) + copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen) + return bytes + +proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] = + var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr) + var mySequence = newSeq[Buffer](buffLen) + for i in 0..buffLen-1: + mySequence[i] = uncheckedArr[i] + return mySequence + +### Storage ### + +proc storage_init(db_path:cstring, name: cstring): pointer{. header: NEGENTROPY_HEADER, importc: "storage_new".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy/storage/btree/core.h#L163 +proc raw_insert(storage: pointer, timestamp: uint64, id: ptr Buffer): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy/storage/btree/core.h#L300 +proc raw_erase(storage: pointer, timestamp: uint64, id: ptr Buffer): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".} + +### Negentropy ### + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L42 +proc constructNegentropy(storage: pointer, frameSizeLimit: uint64): pointer {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L46 +proc raw_initiate(negentropy: pointer, output: ptr Buffer): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L58 +proc raw_setInitiator(negentropy: pointer) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L62 +proc raw_reconcile(negentropy: pointer, query: ptr Buffer, output: ptr Buffer): int {.header: NEGENTROPY_HEADER, importc: "reconcile".} +#[ +type + ReconcileCallback = proc(have_ids: ptr Buffer, have_ids_len:uint64, need_ids: ptr Buffer, need_ids_len:uint64, output: ptr Buffer, outptr: var ptr cchar) {.cdecl, raises: [], gcsafe.}# {.header: NEGENTROPY_HEADER, importc: "reconcile_cbk".} + ]# + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L69 +#proc raw_reconcile(negentropy: pointer, query: ptr Buffer, cbk: ReconcileCallback, output: ptr cchar): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids".} + +proc raw_reconcile(negentropy: pointer, query: ptr Buffer, r: ptr BindingResult){.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".} + +proc free_result(r: ptr BindingResult){.header: NEGENTROPY_HEADER, importc: "free_result".} + +### Wrappings ### + +#TODO: Change all these methods to private as we don't want them to be exposed outside Sync package +#TODO: Wrap storage and negentropy with objects rather than using void pointers +proc negentropyNewStorage*(): pointer = + let storage = storage_init("", "") + + return storage + +proc negentropyStorageErase*(storage: pointer, id: int64, hash: WakuMessageHash): bool = + let cString = toBuffer(hash) + + return raw_erase(storage, uint64(id), cString.unsafeAddr) + +proc negentropyStorageInsert*(storage: pointer, id: int64, hash: WakuMessageHash): bool = + var buffer = toBuffer(hash) + var bufPtr = addr(buffer) + return raw_insert(storage, uint64(id), bufPtr) + +proc negentropyNew*(storage: pointer, frameSizeLimit: uint64): pointer = + let negentropy = constructNegentropy(storage, frameSizeLimit) + + return negentropy + +proc negentropyInitiate*(negentropy: pointer): seq[byte] = + var output:seq[byte] = newSeq[byte](153600) #TODO: Optimize this using callback to avoid huge alloc + var outBuffer: Buffer = toBuffer(output) + let outLen: int = raw_initiate(negentropy, outBuffer.unsafeAddr) + let bytes: seq[byte] = BufferToBytes(addr(outBuffer), some(uint64(outLen))) + + debug "received return from initiate", len=outLen + return bytes + +proc negentropySetInitiator*(negentropy: pointer) = + raw_setInitiator(negentropy) + +proc negentropyServerReconcile*(negentropy: pointer, query: seq[byte]): seq[byte] = + let queryBuf = toBuffer(query) + var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error + var output:seq[byte] = newSeq[byte](153600) #TODO: Optimize this using callback to avoid huge alloc + var outBuffer: Buffer = toBuffer(output) + + let outLen: int = raw_reconcile(negentropy, queryBufPtr, outBuffer.unsafeAddr) + debug "received return from raw_reconcile", len=outLen + + let outputBytes: seq[byte] = BufferToBytes(addr(outBuffer), some(uint64(outLen))) + debug "outputBytes len", len=outputBytes.len + return outputBytes + +proc negentropyClientReconcile*(negentropy: pointer, query: seq[byte], haveIds: var seq[WakuMessageHash], needIds: var seq[WakuMessageHash]): seq[byte] = + let cQuery = toBuffer(query) + + var myResult {.noinit.}: BindingResult = BindingResult() + myResult.have_ids_len = 0 + myResult.need_ids_len = 0 + var myResultPtr = addr myResult + + raw_reconcile(negentropy, cQuery.unsafeAddr, myResultPtr) + + if myResultPtr == nil: + error "ERROR from raw_reconcile!" + return @[] + + let output = BufferToBytes(addr myResult.output) + + var + have_hashes: seq[Buffer] + need_hashes: seq[Buffer] + + if myResult.have_ids_len > 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + debug "have and need hashes ",have_count=have_hashes.len, need_count=need_hashes.len + + for i in 0..have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + debug "have hashes ", index=i, hash=hash.to0xHex() + haveIds.add(hash) + + for i in 0..need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + debug "need hashes ", index=i, hash=hash.to0xHex() + needIds.add(hash) + + +#[ Callback Approach, to be uncommented later during optimization phase + var + cppHaveIds: cstringArray = allocCStringArray([]) + cppNeedIds: cstringArray = allocCStringArray([]) + haveIdsLen: uint + needIdsLen: uint + output: seq[byte] = newSeq[byte](1) #TODO: fix this hack. + + + let handler:ReconcileCallback = proc(have_ids: ptr Buffer, have_ids_len:uint64, need_ids: ptr Buffer, + need_ids_len:uint64, outBuffer: ptr Buffer, outptr: var ptr cchar) {.cdecl, raises: [], gcsafe.} = + debug "ReconcileCallback: Received needHashes from client:", len = need_ids_len , outBufLen=outBuffer.len + if outBuffer.len > 0: + let ret = BufferToBytes(outBuffer) + outptr = cast[ptr cchar](ret[0].unsafeAddr) + + try: + let ret = raw_reconcile(negentropy, cQuery.unsafeAddr, handler, cast[ptr cchar](output[0].unsafeAddr)) + if ret != 0: + error "failed to reconcile" + return + except Exception as e: + error "exception raised from raw_reconcile", error=e.msg ]# + + +#[ debug "haveIdsLen", len=haveIdsLen + + for ele in cstringArrayToSeq(cppHaveIds, haveIdsLen): + haveIds.add(toWakuMessageHash(ele)) + + for ele in cstringArrayToSeq(cppNeedIds, needIdsLen): + needIds.add(toWakuMessageHash(ele)) + + deallocCStringArray(cppHaveIds) + deallocCStringArray(cppNeedIds) ]# + free_result(myResultPtr) + + debug "return " , output=output + + return output \ No newline at end of file