Skip to content

Commit

Permalink
fix: use unidirectional streams (#78)
Browse files Browse the repository at this point in the history
* fix: use unidirectional streams

* chore: use libp2p-pubsub@0.5.2
  • Loading branch information
vasco-santos authored Jun 4, 2020
1 parent beaa225 commit 8118894
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 75 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "^0.5.0",
"libp2p-pubsub": "~0.5.2",
"p-map": "^4.0.0",
"peer-id": "~0.13.12",
"protons": "^1.0.1",
Expand Down
45 changes: 34 additions & 11 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,28 +303,51 @@ describe('2 nodes', () => {
} = await createGossipsubNodes(2, true))
})

after(() => Promise.all(nodes.map((n) => n.stop())))

it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(5000)
// Make subscriptions prior to new nodes
before(() => {
nodes[0].subscribe('Za')
nodes[1].subscribe('Zb')

expect(nodes[0].peers.size).to.equal(0)
expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(0)
expectSet(nodes[1].subscriptions, ['Zb'])
})

// Connect nodes
const onConnect0 = registrarRecords[0][multicodec].onConnect
const onConnect1 = registrarRecords[1][multicodec].onConnect
after(() => Promise.all(nodes.map((n) => n.stop())))

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnect0(nodes[1].peerId, d0)
onConnect1(nodes[0].peerId, d1)
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(5000)

const dial = async () => {
// Connect nodes
const onConnect0 = registrarRecords[0][multicodec].onConnect
const onConnect1 = registrarRecords[1][multicodec].onConnect
const handle0 = registrarRecords[0][multicodec].handler
const handle1 = registrarRecords[1][multicodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
await onConnect0(nodes[1].peerId, d0)
await handle1({
protocol: multicodec,
stream: d1.stream,
connection: {
remotePeer: nodes[0].peerId
}
})
await onConnect1(nodes[0].peerId, d1)
await handle0({
protocol: multicodec,
stream: d0.stream,
connection: {
remotePeer: nodes[1].peerId
}
})
}

await Promise.all([
dial(),
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve))
])
Expand Down
56 changes: 52 additions & 4 deletions test/floodsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnectGs(nodeFs.peerId, d0)
onConnectFs(nodeGs.peerId, d1)
await onConnectGs(nodeFs.peerId, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerId
}
})
await onConnectFs(nodeGs.peerId, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerId
}
})
})

after(async function () {
Expand Down Expand Up @@ -167,11 +183,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnectGs(nodeFs.peerId, d0)
onConnectFs(nodeGs.peerId, d1)
await onConnectGs(nodeFs.peerId, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerId
}
})
await onConnectFs(nodeGs.peerId, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerId
}
})

nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
Expand Down Expand Up @@ -288,11 +320,27 @@ describe('gossipsub fallbacks to floodsub', () => {

const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
const handleGs = registrarRecords[0][floodsubMulticodec].handler
const handleFs = registrarRecords[1][floodsubMulticodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
await onConnectGs(nodeFs.peerId, d0)
await handleFs({
protocol: floodsubMulticodec,
stream: d1.stream,
connection: {
remotePeer: nodeGs.peerId
}
})
await onConnectFs(nodeGs.peerId, d1)
await handleGs({
protocol: floodsubMulticodec,
stream: d0.stream,
connection: {
remotePeer: nodeFs.peerId
}
})

nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
Expand Down
4 changes: 2 additions & 2 deletions test/gossip.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('gossip', () => {
// add subscriptions to each node
nodes.forEach((n) => n.subscribe(topic))

connectGossipsubNodes(nodes, registrarRecords, multicodec)
await connectGossipsubNodes(nodes, registrarRecords, multicodec)

await new Promise((resolve) => setTimeout(resolve, 1000))

Expand Down Expand Up @@ -69,7 +69,7 @@ describe('gossip', () => {
nodes.forEach((n) => n.subscribe(topic))

// every node connected to every other
connectGossipsubNodes(nodes, registrarRecords, multicodec)
await connectGossipsubNodes(nodes, registrarRecords, multicodec)
await new Promise((resolve) => setTimeout(resolve, 500))
// await mesh rebalancing
await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve))))
Expand Down
20 changes: 18 additions & 2 deletions test/mesh.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,31 @@ describe('mesh overlay', () => {
// connect N (< GossipsubD) nodes to node0
const N = 4
const onConnect0 = registrarRecords[0][multicodec].onConnect
const handle0 = registrarRecords[0][multicodec].handler

for (let i = nodes.length; i > nodes.length - N; i--) {
const n = i - 1
const onConnectN = registrarRecords[n][multicodec].onConnect
const handleN = registrarRecords[n][multicodec].handler

// Notice peers of connection
const [d0, d1] = ConnectionPair()
onConnect0(nodes[n].peerId, d0)
onConnectN(nodes[0].peerId, d1)
await onConnect0(nodes[n].peerId, d0)
await handleN({
protocol: multicodec,
stream: d1.stream,
connection: {
remotePeer: nodes[0].peerId
}
})
await onConnectN(nodes[0].peerId, d1)
await handle0({
protocol: multicodec,
stream: d0.stream,
connection: {
remotePeer: nodes[n].peerId
}
})
}

// await mesh rebalancing
Expand Down
Loading

0 comments on commit 8118894

Please sign in to comment.