Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: refactor query logic #237

Merged
merged 37 commits into from
Nov 18, 2021
Merged

fix: refactor query logic #237

merged 37 commits into from
Nov 18, 2021

Conversation

achingbrain
Copy link
Member

@achingbrain achingbrain commented Oct 6, 2021

  1. Queries are plain functions now
  2. Queries are run by the QueryManager
  3. Query functions yield events - dial, send request, receive response, value, error, etc
  4. Timeouts have been replaced with AbortSignals
  5. Halting in-progress queries is done by aborting an AbortSignal from the top level or breaking out of the for await..of loop
  6. Internal components do not depend on the top-level DHT component any more
  7. Some methods have been moved to other components to break circular dependencies
  8. The top level KadDHT class now has a ts interface and a factory function
  9. There are two DHTs, lan (for private addresses) and wan (for public addresses)
  10. Client mode is now on by default for the wan DHT, off for the lan DHT
  11. DHT queries for CIDs search for the multihash rather than the full CID

1. Queries are plain functions now
2. Queries are run by the QueryManager
3. Timeouts have been replaced with AbortSignals
4. All long-lived operations require AbortSignals
5. Halting in-progress queries is done by aborting an AbortSignal from the top level
6. Internal components do not depend on the top-level DHT component any more
7. Some methods have been moved to other components to break circular depenedencies
8. The top level KadDHT class now has a ts interface
@achingbrain achingbrain linked an issue Oct 7, 2021 that may be closed by this pull request
Copy link
Contributor

@kumavis kumavis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only got half way through the review before bedtime. generally looks good, nice improvements.


await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => {
await drain(parallel(map(this._peerRouting.getClosestPeers(key, { signal: options.signal, shallow: true }), (peer) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little dense here

if (result && !result.done && result.closerPeers) {
for (const closerPeer of result.closerPeers) {
const closerPeerKadId = await convertPeerId(closerPeer)
const closerPeerXor = BigInt('0x' + toString(xor(closerPeerKadId, kadId), 'base16'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worthy of a utility fn?

// only continue query if closer peer is actually closer
if (closerPeerXor > peerXor) {
// log('skipping %p as they are not closer to %b than %p', closerPeer, key, peer)
// continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It causes this test to pass but then this test to fail.

There's a bug in there somewhere, I wanted to experiment a bit more with go-ipfs and the expanded interop test suite before returning to this.

// TODO: ensure we can break out of this loop early - this promise needs to
// be resolved manually and will leak memory otherwise
await deferred.promise
deferred = defer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can be replaced by a for-await ?

deferred = defer()

// yield all available results
while (results.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do a yield-star here?

@achingbrain achingbrain marked this pull request as ready for review November 15, 2021 15:55
@jacobheun
Copy link
Contributor

Nice! Will try to carve out some time today to go through this. If you haven't already gone through and checked the implementation summary doc, https://github.com/libp2p/js-libp2p-kad-dht/blob/fix/refactor-query-logic/docs/IMPL_SUMMARY.MD, it would be helpful to make sure that's up to date, especially if we're doing anything different/in addition to the dht spec.

@achingbrain
Copy link
Member Author

@jacobheun I don't think we're doing anything extra to the spec, at least, we shouldn't be.

That document is interesting, it seems to be mostly covered by https://docs.ipfs.io/concepts/dht/ - perhaps we could link to that instead?

@jacobheun
Copy link
Contributor

If we're following along with the spec we can just point to it. The main value of that doc is that there wasn't a spec when it was written and that helped clarify what this implementation was doing. Some implementations of libp2p will extend the behavior of the protocol for certain components, so it's helpful to have a readme that clarifies what those are. An example of this is that to date the js dht was using dist paths for lookup, but Go never did.

@achingbrain
Copy link
Member Author

achingbrain commented Nov 16, 2021

Ha, well ok then in that case we are doing extra to the spec as we're still using disjoint paths. Probably a good thing though.

Copy link
Contributor

@jacobheun jacobheun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor things, overall looks good, great improvements to the query logic. @achingbrain and I reviewed on a call to walk through the changes.

src/constants.js Outdated
@@ -33,3 +33,5 @@ exports.K = 20

// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

exports.QUERY_SELF_INTERVAL = Number(minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems too frequent, can this be configured? Looks like spec default is 5mins https://github.com/libp2p/specs/tree/master/kad-dht#bootstrap-process.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exposed as the querySelfInterval factory function option. Will increase the default to 5 mins

src/index.js Outdated
Comment on lines 43 to 44
* A DHT implementation modelled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use S/Kademlia in the new spec.

Suggested change
* A DHT implementation modelled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
* A Kademlia based DHT implementation compliant with the libp2p specification.
* DualKadDHT extends the existing specification by providing support for a public
* and private DHT, which is inline with the go-libp2p implementation.
* Spec at https://github.com/libp2p/specs/tree/master/kad-dht

src/index.js Outdated
this._wan = wan
this._lan = lan
this._libp2p = libp2p
this._datastore = libp2p.datastore || this._wan._datastore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any potential conflicts with lan/wan using the same datastore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only store libp2p-records in the datastore and they are externally validated so I don't think so, no.

src/index.js Outdated
this._running = false
await Promise.all([
this._lan.onPeerConnect({ id: peerId, multiaddrs: [connection.remoteAddr] }),
this._wan.onPeerConnect({ id: peerId, multiaddrs: [connection.remoteAddr] })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this check for the DHT protocol? We don't want to add peers we connect to, we want to ensure they're running the DHT protocol. I believe go also has logic around usefulness. Will check the rest of the code and update here when I see this logic.

src/index.js Outdated
*/
enableServerMode () {
log('enabling server mode')
this._wan.enableServerMode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed in the lan? I assume lan is created with everyone in server mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lan is server-mode only

this._queryManager = new QueryManager({
peerId: libp2p.peerId,
// Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper
disjointPaths: Math.ceil(kBucketSize / 2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get rid disjoint paths in a future PR. In conversations last year we netted out that there's not much value here and it can slow the queries down. Go get's around the potential malicious node problem with ASN distribution protection in the routing tables.

tdht.teardown()
})

it('getClosestPeers', async function () {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd we remove this one?


// TODO: after error switch
})

it('get should handle correctly an unexpected error', async function () {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd we get rid of record validation testing?

it('stop after finding k closest peers', async () => {
// mock this so we can dial non existing peers
dht.dialer.dial = () => {}
it.skip('should end paths when they have no closer peers to those already queried', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip?

*/
function sendingQueryEvent (fields) {
return {
...fields,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be cool if fields also included the query id so you could trace the query at the libp2p layer instead of just logging.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Query logic overhaul
3 participants