-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
# This is a combination of 4 commits.
# This is the 1st commit message: feat: Waku Sync Protocol periodic sync & peer manager Fixes ingess messages Bindings first draft Wrapping String to/from bytes Vector added missing binds & wraps added header files feat: negentropy c integ (#2448) * chore: move negentropy to a submodule * chore: add negentropy folder in vendor dir * moved submodule to c-wrapper branch * chore: updated negentropy * chore: udpate submodule URL to use https * chore: started integrating negetropy C wrapper * chore: fixed all compilation errors wrt C-wrapper integration * chore: include sync peers to be returned as part of REST API * chore: tested insert into storage and changes done for it. * chore: experimenting with callback * chore: first test for sync * chore: revert callback changes * chore: revert temp changes * chore: write tests to verify c integration * draft: in progress changes to integrate callback based method from C * chore: in progress callback integration * chore: first working sync example with c bindings * feat: added few tests for sync protocol * chore: copy negentropy so for build to work * chore: add negentropy as dependency for test targets * chore: try to fix CI compilation issue of negentropy * chore: apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * chore: fix naming convention changes --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> # This is the commit message #2: update ref # This is the commit message #3: update submodule # This is the commit message #4: chore: consider leak fix in negentropy
- Loading branch information
Showing
11 changed files
with
662 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
{.used.} | ||
|
||
import | ||
std/options, | ||
chronos, | ||
chronicles, | ||
libp2p/crypto/crypto | ||
|
||
import | ||
../../../waku/[ | ||
node/peer_manager, | ||
waku_core, | ||
waku_sync, | ||
], | ||
../testlib/[ | ||
common, | ||
wakucore | ||
] | ||
|
||
proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = | ||
const DefaultFrameSize = 153600 | ||
let | ||
peerManager = PeerManager.new(switch) | ||
proto = WakuSync.new(peerManager, DefaultFrameSize, 2.seconds, some(handler)) | ||
|
||
proto.start() | ||
switch.mount(proto) | ||
|
||
return proto |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
{.used.} | ||
|
||
import | ||
./test_protocol |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
{.used.} | ||
|
||
import | ||
std/options, | ||
testutils/unittests, | ||
chronos, | ||
chronicles, | ||
libp2p/crypto/crypto, | ||
stew/byteutils | ||
from std/os import sleep | ||
|
||
import | ||
../../../waku/[ | ||
common/paging, | ||
node/peer_manager, | ||
waku_core, | ||
waku_core/message/digest, | ||
waku_sync, | ||
waku_sync/raw_bindings, | ||
], | ||
../testlib/[ | ||
common, | ||
wakucore | ||
], | ||
./sync_utils | ||
|
||
|
||
suite "Waku Sync - Protocol Tests": | ||
|
||
asyncTest "test c integration": | ||
let | ||
s1 = negentropyNewStorage() | ||
s2 = negentropyNewStorage() | ||
ng1 = negentropyNew(s1,10000) | ||
ng2 = negentropyNew(s2,10000) | ||
|
||
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1) | ||
var ret = negentropyStorageInsert(s1, msg1.timestamp, msgHash) | ||
check: | ||
ret == true | ||
|
||
ret = negentropyStorageInsert(s2, msg1.timestamp, msgHash) | ||
check: | ||
ret == true | ||
|
||
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) | ||
ret = negentropyStorageInsert(s2, msg2.timestamp, msgHash2) | ||
check: | ||
ret == true | ||
|
||
let ng1_q1 = negentropyInitiate(ng1) | ||
check: | ||
ng1_q1.len > 0 | ||
|
||
let ng2_q1 = negentropyServerReconcile(ng2, ng1_q1) | ||
check: | ||
ng2_q1.len > 0 | ||
|
||
var | ||
haveHashes: seq[WakuMessageHash] | ||
needHashes: seq[WakuMessageHash] | ||
let ng1_q2 = negentropyClientReconcile(ng1, ng2_q1, haveHashes, needHashes) | ||
|
||
check: | ||
needHashes.len() == 1 | ||
haveHashes.len() == 0 | ||
ng1_q2.len == 0 | ||
needHashes[0] == msgHash2 | ||
|
||
ret = negentropyStorageErase(s1, msg1.timestamp, msgHash) | ||
check: | ||
ret == true | ||
|
||
asyncTest "sync 2 nodes different hashes": | ||
## Setup | ||
let | ||
serverSwitch = newTestSwitch() | ||
clientSwitch = newTestSwitch() | ||
|
||
await allFutures(serverSwitch.start(), clientSwitch.start()) | ||
|
||
let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() | ||
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
|
||
let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = | ||
debug "Received needHashes from peer:", len = hashes.len | ||
for hash in hashes: | ||
debug "Hash received from peer:", hash=hash.to0xHex() | ||
|
||
let | ||
server = await newTestWakuSync(serverSwitch, handler=protoHandler) | ||
client = await newTestWakuSync(clientSwitch, handler=protoHandler) | ||
server.ingessMessage(DefaultPubsubTopic, msg1) | ||
client.ingessMessage(DefaultPubsubTopic, msg1) | ||
server.ingessMessage(DefaultPubsubTopic, msg2) | ||
|
||
var hashes = await client.sync(serverPeerInfo) | ||
require (hashes.isOk()) | ||
check: | ||
hashes.value.len == 1 | ||
hashes.value[0] == computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) | ||
#Assuming message is fetched from peer | ||
#[ client.ingessMessage(DefaultPubsubTopic, msg2) | ||
sleep(1000) | ||
hashes = await client.sync(serverPeerInfo) | ||
require (hashes.isOk()) | ||
check: | ||
hashes.value.len == 0 ]# | ||
|
||
asyncTest "sync 2 nodes same hashes": | ||
## Setup | ||
let | ||
serverSwitch = newTestSwitch() | ||
clientSwitch = newTestSwitch() | ||
|
||
await allFutures(serverSwitch.start(), clientSwitch.start()) | ||
|
||
let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() | ||
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) | ||
|
||
let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = | ||
debug "Received needHashes from peer:", len = hashes.len | ||
for hash in hashes: | ||
debug "Hash received from peer:", hash=hash.to0xHex() | ||
|
||
let | ||
server = await newTestWakuSync(serverSwitch, handler=protoHandler) | ||
client = await newTestWakuSync(clientSwitch, handler=protoHandler) | ||
server.ingessMessage(DefaultPubsubTopic, msg1) | ||
client.ingessMessage(DefaultPubsubTopic, msg1) | ||
server.ingessMessage(DefaultPubsubTopic, msg2) | ||
client.ingessMessage(DefaultPubsubTopic, msg2) | ||
|
||
let hashes = await client.sync(serverPeerInfo) | ||
assert hashes.isOk(), $hashes.error | ||
check: | ||
hashes.value.len == 0 |
Submodule negentropy
added at
1a59da
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
when (NimMajor, NimMinor) < (1, 4): | ||
{.push raises: [Defect].} | ||
else: | ||
{.push raises: [].} | ||
|
||
import | ||
./waku_sync/protocol | ||
|
||
export | ||
protocol |
Oops, something went wrong.