Skip to content

Commit

Permalink
Merge pull request #49 from ethereumjs/lightsync
Browse files Browse the repository at this point in the history
Added LES flow control and support for serving headers
  • Loading branch information
vpulim authored Sep 24, 2018
2 parents 5a7def4 + 5dbe50c commit 1d507ed
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 69 deletions.
15 changes: 14 additions & 1 deletion bin/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const { parse } = require('../lib/util')
const Node = require('../lib/node')
const jayson = require('jayson')
const RPCManager = require('../lib/rpc')
const { randomBytes } = require('crypto')
const os = require('os')
const path = require('path')

Expand Down Expand Up @@ -37,6 +38,10 @@ const args = require('yargs')
'bootnodes': {
describe: 'Comma separated RLPx bootstrap enode URLs'
},
'port': {
describe: 'Network listening port',
default: 30303
},
'rpc': {
describe: 'Enable the JSON-RPC server',
boolean: true,
Expand All @@ -59,6 +64,11 @@ const args = require('yargs')
'params': {
describe: 'Path to chain parameters json file',
coerce: path.resolve
},
'key': {
describe: '32-byte hex string to generate key pair from',
default: 'random',
coerce: key => key.length === 64 ? Buffer.from(key, 'hex') : randomBytes(32)
}
})
.locale('en_EN')
Expand All @@ -69,6 +79,7 @@ async function runNode (options) {
logger.info('Initializing Ethereumjs client...')
const node = new Node(options)
node.on('error', err => logger.error(err))
node.on('listening', details => logger.info(`Listener up url=${details.url}`))
logger.info(`Connecting to network: ${options.common.chainName()}`)
await node.open()
logger.info('Synchronizing blockchain...')
Expand Down Expand Up @@ -96,9 +107,11 @@ async function run () {
transports: args.transports,
syncmode: args.syncmode,
dataDir: `${args.datadir}/${networkDirName}ethereumjs/${syncDirName}`,
localPort: args.port,
rpcport: args.rpcport,
rpcaddr: args.rpcaddr,
bootnodes: parse.bootnodes(args.bootnodes)
bootnodes: parse.bootnodes(args.bootnodes),
privateKey: args.key
}
const node = await runNode(options)

Expand Down
13 changes: 13 additions & 0 deletions lib/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ class Chain extends EventEmitter {
await this.update()
}

/**
* Get headers from blockchain
* @param {Buffer|BN} block block hash or number to start from
* @param {number} max maximum number of headers to get
* @param {number} skip number of headers to skip
* @param {boolean} reverse get headers in reverse
* @return {Promise}
*/
async getHeaders (block, max, skip, reverse) {
const blocks = await this.getBlocks(block, max, skip, reverse)
return blocks.map(b => b.header)
}

/**
* Insert new headers into blockchain
* @method putHeaders
Expand Down
89 changes: 89 additions & 0 deletions lib/net/protocol/flowcontrol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const defaultOptions = {
bl: 300000000,
mrc: {
'GetBlockHeaders': { base: 1000, req: 1000 }
},
mrr: 10000
}

/**
* LES flow control manager
* @memberof module:net/protocol
*/
class FlowControl {
constructor (options) {
options = {...defaultOptions, ...options}

this.bl = options.bl
this.mrc = options.mrc
this.mrr = options.mrr
this.out = new Map()
this.in = new Map()
}

/**
* Process reply message from an LES peer by updating its BLE value
* @param {Peer} peer LES peer
* @param {number} bv latest buffer value
*/
handleReply (peer, bv) {
const params = this.in.get(peer.id) || {}
params.ble = bv
params.last = Date.now()
this.in.set(peer.id, params)
}

/**
* Calculate maximum items that can be requested from an LES peer
* @param {Peer} peer LES peer
* @param {string} messageName message name
* @return {number} maximum count
*/
maxRequestCount (peer, messageName) {
const now = Date.now()
const mrcBase = peer.les.status.mrc[messageName].base
const mrcReq = peer.les.status.mrc[messageName].req
const mrr = peer.les.status.mrr
const bl = peer.les.status.bl
const params = this.in.get(peer.id) || { ble: bl }
if (params.last) {
// recharge BLE at rate of MRR when less than BL
params.ble = Math.min(params.ble + mrr * (now - params.last), bl)
}
params.last = now
this.in.set(peer.id, params)
// calculate how many messages we can request from peer
return Math.max(Math.floor((params.ble - mrcBase) / mrcReq), 0)
}

/**
* Calculate new buffer value for an LES peer after an incoming request is
* processed. If the new value is negative, the peer should be dropped by the
* caller.
* @param {Peer} peer LES peer
* @param {string} messageName message name
* @param {number} count number of items to request from peer
* @return {number} new buffer value after request is sent (if negative, drop peer)
*/
handleRequest (peer, messageName, count) {
const now = Date.now()
const params = this.out.get(peer.id) || {}
if (params.bv && params.last) {
params.bv = Math.min(params.bv + this.mrr * (now - params.last), this.bl)
} else {
params.bv = this.bl
}
params.bv -= this.mrc[messageName].base + this.mrc[messageName].req * count
params.last = now
if (params.bv < 0) {
this.out.delete(peer.id)
} else {
this.out.set(peer.id, params)
}
return params.bv
}
}

module.exports = FlowControl
1 change: 1 addition & 0 deletions lib/net/protocol/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ exports.EthProtocol = require('./ethprotocol')
exports.LesProtocol = require('./lesprotocol')
exports.Sender = require('./sender')
exports.RlpxSender = require('./rlpxsender')
exports.FlowControl = require('./flowcontrol')
62 changes: 40 additions & 22 deletions lib/net/protocol/lesprotocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ const messages = [{
name: 'GetBlockHeaders',
code: 0x02,
response: 0x03,
encode: (block, max, skip, reverse) => [
encode: (block, max, skip, reverse) => [[
BN.isBN(block) ? block.toBuffer() : block, max, skip, reverse
],
decode: (payload) => payload
]],
decode: ([block, max, skip, reverse]) => [
block.length === 32 ? block : new BN(block),
...[max, skip, reverse].map(util.bufferToInt)
]
}, {
name: 'BlockHeaders',
code: 0x03,
flow: true,
encode: (hashes) => hashes,
reply: true,
encode: (headers) => [headers.map(h => h.raw)],
decode: (payload) => payload.map(p => new Block.Header(p))
}]

Expand All @@ -28,15 +31,17 @@ const messages = [{
class LesProtocol extends Protocol {
/**
* Create les protocol
* @param {Object} options constructor parameters
* @param {Chain} options.chain blockchain
* @param {number} [options.timeout=5000] handshake timeout in ms
* @param {Logger} [options.logger] logger instance
* @param {Object} options constructor parameters
* @param {Chain} options.chain blockchain
* @param {FlowControl} options.flow flow control manager
* @param {number} [options.timeout=5000] handshake timeout in ms
* @param {Logger} [options.logger] logger instance
*/
constructor (options) {
super(options)

this.chain = options.chain
this.flow = options.flow
this.reqId = 0
}

Expand Down Expand Up @@ -81,12 +86,23 @@ class LesProtocol extends Protocol {
* @return {Object}
*/
encodeStatus () {
const mrc = Object.entries(this.flow.mrc).map(([name, { base, req }]) => {
const { code } = messages.find(m => m.name === name)
return [code, base, req]
})
return {
networkId: this.chain.networkId,
headTd: this.chain.blocks.td.toBuffer(),
headHash: this.chain.blocks.latest.hash(),
headNum: this.chain.blocks.latest.header.number,
genesisHash: this.chain.genesis.hash
headTd: this.chain.headers.td.toBuffer(),
headHash: this.chain.headers.latest.hash(),
headNum: this.chain.headers.latest.number,
genesisHash: this.chain.genesis.hash,
serveHeaders: 1,
serveChainSince: 0,
serveStateSince: 0,
txRelay: 1,
'flowControl/BL': new BN(this.flow.bl).toBuffer(),
'flowControl/MRR': new BN(this.flow.mrr).toBuffer(),
'flowControl/MRC': mrc
}
}

Expand Down Expand Up @@ -132,7 +148,12 @@ class LesProtocol extends Protocol {
* @return {*}
*/
encode (message, args) {
return [ this.reqId++, message.encode(...args) ]
const encoded = [ this.reqId++ ]
if (message.reply) {
// add BV if this is a reply message
encoded.push(args.shift())
}
return encoded.concat(message.encode(...args))
}

/**
Expand All @@ -143,15 +164,12 @@ class LesProtocol extends Protocol {
* @param {BoundProtocol} bound reference to bound protocol
* @return {*}
*/
decode (message, payload, bound) {
// drop request id since it's not currently used
payload.shift()
// update buffer limit estimate (BLE) if message contains buffer value (BV)
if (message.flow) {
const bv = payload.shift()
bound.ble = new BN(bv).toNumber()
decode (message, payload) {
return {
reqId: new BN(payload.shift()),
bv: message.reply ? new BN(payload.shift()).toNumber() : undefined,
payload: message.decode(payload.shift())
}
return message.decode(payload.shift())
}
}

Expand Down
14 changes: 7 additions & 7 deletions lib/net/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BoundProtocol extends EventEmitter {
let data
let error
try {
data = this.protocol.decode(message, incoming.payload, this)
data = this.protocol.decode(message, incoming.payload)
} catch (e) {
error = new Error(`Could not decode message ${message.name}: ${e}`)
}
Expand All @@ -104,7 +104,7 @@ class BoundProtocol extends EventEmitter {
* @param {string} name message name
* @param {...*} args message arguments
*/
send (name, args) {
send (name, ...args) {
const messages = this.protocol.messages
const message = messages.find(m => m.name === name)
if (message) {
Expand All @@ -123,8 +123,8 @@ class BoundProtocol extends EventEmitter {
* @param {...*} args message arguments
* @return {Promise}
*/
async request (name, args) {
const message = this.send(name, args)
async request (name, ...args) {
const message = this.send(name, ...args)
const resolver = {
timeout: null,
resolve: null,
Expand Down Expand Up @@ -154,8 +154,8 @@ class BoundProtocol extends EventEmitter {
for (let message of messages) {
const name = message.name
const camel = name[0].toLowerCase() + name.slice(1)
this[name] = async (...args) => this.request(name, args)
this[camel] = async (...args) => this.request(name, args)
this[name] = async (...args) => this.request(name, ...args)
this[camel] = async (...args) => this.request(name, ...args)
}
}
}
Expand Down Expand Up @@ -271,7 +271,7 @@ class Protocol extends EventEmitter {
* @param {BoundProtocol} bound reference to bound protocol
* @return {*}
*/
decode (message, payload, bound) {
decode (message, payload) {
return message.decode(payload)
}

Expand Down
16 changes: 15 additions & 1 deletion lib/net/server/rlpxserver.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class RlpxServer extends Server {
}

await super.stop()
this.rplx.destroy()
this.rlpx.destroy()
this.dpt.destroy()
this.started = false
}
Expand Down Expand Up @@ -154,6 +154,10 @@ class RlpxServer extends Server {
})

this.dpt.on('error', e => this.error(e))

if (this.localPort) {
this.dpt.bind(this.localPort, '0.0.0.0')
}
}

/**
Expand Down Expand Up @@ -203,6 +207,16 @@ class RlpxServer extends Server {
})

this.rlpx.on('error', e => this.error(e))

this.rlpx.on('listening', () => {
this.emit('listening', {
url: `enode://${this.rlpx._id.toString('hex')}@[::]:${this.localPort}`
})
})

if (this.localPort) {
this.rlpx.listen(this.localPort, '0.0.0.0')
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Node extends EventEmitter {
s.on('error', error => {
this.emit('error', error)
})
s.on('listening', details => {
this.emit('listening', details)
})
})
await Promise.all(this.services.map(s => s.open()))
this.opened = true
Expand Down
Loading

0 comments on commit 1d507ed

Please sign in to comment.