Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

WIP callbacks -> async / await #82

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,27 @@
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
"cids": "~0.5.7",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.6.0",
"interface-datastore": "ipfs/interface-datastore#refactor/async-iterators",
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue with interface-datastore dep ref

➜ npm i
npm ERR! code 1
npm ERR! Command failed: git checkout refactor/async-iterators
npm ERR! error: pathspec 'refactor/async-iterators' did not match any file(s) known to git

Copy link
Contributor

@kumavis kumavis Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like merged and branch deleted 👍 ipfs/interface-datastore#25

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not published yet though, I'll point to the commit for now

"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.16.0",
"libp2p-record": "~0.6.2",
"libp2p-crypto": "libp2p/js-libp2p-crypto#feat/async-await",
"libp2p-record": "dirkmc/js-libp2p-record#feat/async-await",
"multihashes": "~0.4.14",
"multihashing-async": "~0.5.2",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
"multihashing-async": "multiformats/js-multihashing-async#feat/async-iterators",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

published as 0.7.0 👍

"p-queue": "^3.1.0",
"peer-id": "libp2p/js-peer-id#feat/async-await",
"peer-info": "libp2p/js-peer-info#feat/async-await",
"priorityqueue": "~0.2.1",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.1",
"pull-stream": "^3.6.9",
"streaming-iterables": "^3.5.0",
"varint": "^5.0.0",
"xor-distance": "^1.0.0"
},
Expand Down
141 changes: 141 additions & 0 deletions src/connection-helper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
'use strict'

const pull = require('pull-stream')
const lp = require('pull-length-prefixed')

const errcode = require('err-code')

const c = require('./constants')
const Message = require('./message')
const utils = require('./utils')

/**
* Provide a Promise API to send messages over a Connection
*/
class ConnectionHelper {
/**
* Create a ConnectionHelper.
*
* @param {PeerId} selfId
*/
constructor (selfId) {
this._log = utils.logger(selfId, 'net')
}

/**
* Write a message to the given connection.
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @returns {Promise}
*/
writeMessage (conn, msg) {
const serialized = msg.serialize()
return new Promise((resolve, reject) => {
pull(
pull.values([serialized]),
lp.encode(),
conn,
pull.onEnd((err) => err ? reject(err) : resolve())
)
})
}

/**
* Write a message and read its response.
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @returns {Promise<Message>}
*/
writeReadMessage (conn, msg) {
const serialized = msg.serialize()
return new Promise((resolve, reject) => {
pull(
pull.values([serialized]),
lp.encode(),
conn,
lp.decode(),
pull.filter((msg) => msg.length < c.maxMessageSize),
pull.collect((err, res) => {
if (err) {
return reject(err)
}
if (res.length === 0) {
return reject(errcode('No message received', 'ERR_NO_MESSAGE_RECEIVED'))
}

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return reject(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE'))
}

resolve(response)
})
)
})
}

/**
* A function that processes a message and may optionally return a Message
* which is passed through to the Connection. The function may be async.
*
* @typedef {ThroughFunc} function
* @param {Message} msg
* @returns Message
*/

/**
* Process incoming messages with a ThroughFunc.
* If a value is returned from the function, it is passed through to the
* Connection.
*
* @param {Connection} conn - the connection to use
* @param {ThroughFunc} throughFn - the message to send
*/
through (conn, throughFn) {
pull(
conn,
lp.decode(),
pull.filter((msg) => msg.length < c.maxMessageSize),
pull.map((rawMsg) => {
let msg
try {
msg = Message.deserialize(rawMsg)
} catch (err) {
this._log.error('failed to read incoming message', err)
return
}

return msg
}),
pull.filter(Boolean),
pull.asyncMap(async (msg, cb) => {
try {
cb(null, await throughFn(msg))
} catch (err) {
cb(err)
}
}),
// Not all handlers will return a response
pull.filter(Boolean),
pull.map((response) => {
let msg
try {
msg = response.serialize()
} catch (err) {
this._log.error('failed to send message', err)
return
}
return msg
}),
pull.filter(Boolean),
lp.encode(),
conn
)
}
}

module.exports = ConnectionHelper
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16

// K is the maximum number of requests to perform before returning failue
exports.K = 20

Expand Down
Loading