Skip to content

Commit

Permalink
Merge pull request #70 from ethereumjs/tests
Browse files Browse the repository at this point in the history
Unit tests for all components
  • Loading branch information
vpulim authored Dec 1, 2018
2 parents a45c129 + 231c166 commit 0848e1e
Show file tree
Hide file tree
Showing 49 changed files with 2,596 additions and 556 deletions.
2 changes: 0 additions & 2 deletions lib/handler/ethhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ class EthHandler extends Handler {
const { block, max, skip, reverse } = message.data
const headers = await this.chain.getHeaders(block, max, skip, reverse)
peer.eth.send('BlockHeaders', headers)
} else if (message.name === 'GetBlockBodies') {
// TO DO
}
} catch (error) {
this.emit('error', error)
Expand Down
34 changes: 17 additions & 17 deletions lib/net/peer/libp2ppeer.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class Libp2pPeer extends Peer {
if (this.connected) {
return
}
const nodeInfo = await createPeerInfo({multiaddrs: ['/ip4/0.0.0.0/tcp/0']})
const peerInfo = await createPeerInfo(this)
const nodeInfo = await this.createPeerInfo({multiaddrs: ['/ip4/0.0.0.0/tcp/0']})
const peerInfo = await this.createPeerInfo(this)
const node = new Libp2pNode({ peerInfo: nodeInfo })
await node.asyncStart()
await node.asyncDial(peerInfo)
Expand Down Expand Up @@ -110,23 +110,23 @@ class Libp2pPeer extends Peer {
this.server = server
this.connected = true
}
}

async function createPeerInfo ({ multiaddrs, id }) {
return new Promise((resolve, reject) => {
const handler = (err, peerInfo) => {
if (err) {
return reject(err)
async createPeerInfo ({ multiaddrs, id }) {
return new Promise((resolve, reject) => {
const handler = (err, peerInfo) => {
if (err) {
return reject(err)
}
multiaddrs.forEach(ma => peerInfo.multiaddrs.add(ma))
resolve(peerInfo)
}
multiaddrs.forEach(ma => peerInfo.multiaddrs.add(ma))
resolve(peerInfo)
}
if (id) {
PeerInfo.create(PeerId.createFromB58String(id), handler)
} else {
PeerInfo.create(handler)
}
})
if (id) {
PeerInfo.create(PeerId.createFromB58String(id), handler)
} else {
PeerInfo.create(handler)
}
})
}
}

module.exports = Libp2pPeer
64 changes: 29 additions & 35 deletions lib/net/peer/rlpxpeer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const Peer = require('./peer')
const { RlpxSender } = require('../protocol')
const RlpxSender = require('../protocol/rlpxsender')
const { ETH, LES, RLPx } = require('ethereumjs-devp2p')
const { randomBytes } = require('crypto')

Expand Down Expand Up @@ -87,44 +87,38 @@ class RlpxPeer extends Peer {
}
const key = randomBytes(32)
await Promise.all(this.protocols.map(p => p.open()))
return new Promise(async (resolve, reject) => {
this.rlpx = new RLPx(key, {
capabilities: RlpxPeer.capabilities(this.protocols),
listenPort: null }
)
await this.rlpx.connect({
id: Buffer.from(this.id, 'hex'),
address: this.host,
port: this.port
})
this.rlpx.on('peer:error', (rlpxPeer, error) => {
this.emit('error', error)
})
this.rlpx.once('peer:added', async (rlpxPeer) => {
try {
this.rlpx = new RLPx(key, {
capabilities: RlpxPeer.capabilities(this.protocols),
listenPort: null }
)
await this.rlpx.connect({
id: Buffer.from(this.id, 'hex'),
address: this.host,
port: this.port
})
await this.bindProtocols(rlpxPeer)
this.emit('connected')
} catch (error) {
return reject(error)
}
this.rlpx.on('peer:error', (rlpxPeer, error) => {
this.emit('error', error)
})
this.rlpx.once('peer:added', async (rlpxPeer) => {
try {
await this.bindProtocols(rlpxPeer)
this.emit('connected')
} catch (error) {
this.emit('error', error)
}
})
this.rlpx.once('peer:removed', (rlpxPeer, reason) => {
try {
if (rlpxPeer !== this.rlpxPeer) {
return
}
reason = rlpxPeer.getDisconnectPrefix(reason)
this.rlpxPeer = null
this.connected = false
this.emit('disconnected', reason)
} catch (error) {
this.emit('error', error)
}
})
this.rlpx.once('peer:removed', (rlpxPeer, reason) => {
try {
if (rlpxPeer !== this.rlpxPeer) {
return
}
})
reason = rlpxPeer.getDisconnectPrefix(reason)
this.rlpxPeer = null
this.connected = false
this.emit('disconnected', reason)
} catch (error) {
this.emit('error', error)
}
})
}

Expand Down
17 changes: 3 additions & 14 deletions lib/net/peerpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ class PeerPool extends EventEmitter {
*/
async open () {
if (this.opened) {
return
return false
}
await Promise.all(this.servers.map(server => {
this.servers.map(server => {
server.on('connected', (peer) => { this.connected(peer) })
server.on('disconnected', (peer) => { this.disconnected(peer) })
}))
})
this.opened = true
}

Expand Down Expand Up @@ -146,17 +146,6 @@ class PeerPool extends EventEmitter {
this.emit('banned', peer)
}

/**
* Handler for pool and peer errors
* @private
* @param {Error} error
* @param {Peer} peer
* @emits error
*/
error (error, peer) {
this.emit('error', error)
}

/**
* Add peer to pool
* @param {Peer} peer
Expand Down
150 changes: 150 additions & 0 deletions lib/net/protocol/boundprotocol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
'use strict'

const EventEmitter = require('events')

/**
* Binds a protocol implementation to the specified peer
* @memberof module:net/protocol
*/
class BoundProtocol extends EventEmitter {
/**
* Create bound protocol
* @param {Object} options constructor parameters
* @param {Protocol} options.protocol protocol to bind
* @param {Peer} options.peer peer that protocol is bound to
* @param {Sender} options.sender message sender
*/
constructor (options) {
super()

this.protocol = options.protocol
this.peer = options.peer
this.sender = options.sender
this.name = this.protocol.name
this.versions = this.protocol.versions
this.timeout = this.protocol.timeout
this.logger = this.protocol.logger
this._status = {}
this.resolvers = new Map()
this.sender.on('message', message => {
try {
this.handle(message)
} catch (error) {
this.emit('error', error)
}
})
this.sender.on('error', error => this.emit('error', error))
this.addMethods()
}

get status () {
return this._status
}

set status (status) {
Object.assign(this._status, status)
}

async handshake (sender) {
this._status = await this.protocol.handshake(sender)
}

/**
* Handle incoming message
* @private
* @param {Object} message message object
* @emits message
*/
handle (incoming) {
const messages = this.protocol.messages
const message = messages.find(m => m.code === incoming.code)
if (!message) {
return
}

let data
let error
try {
data = this.protocol.decode(message, incoming.payload)
} catch (e) {
error = new Error(`Could not decode message ${message.name}: ${e}`)
}
const resolver = this.resolvers.get(incoming.code)
if (resolver) {
clearTimeout(resolver.timeout)
this.resolvers.delete(incoming.code)
if (error) {
resolver.reject(error)
} else {
resolver.resolve(data)
}
} else {
if (error) {
this.emit('error', error)
} else {
this.emit('message', { name: message.name, data: data })
}
}
}

/**
* Send message with name and the specified args
* @param {string} name message name
* @param {object} args message arguments
*/
send (name, args) {
const messages = this.protocol.messages
const message = messages.find(m => m.name === name)
if (message) {
const encoded = this.protocol.encode(message, args)
this.sender.sendMessage(message.code, encoded)
return message
} else {
throw new Error(`Unknown message: ${name}`)
}
}

/**
* Returns a promise that resolves with the message payload when a response
* to the specified message is received
* @param {string} name message to wait for
* @param {object} args message arguments
* @return {Promise}
*/
async request (name, args) {
const message = this.send(name, args)
const resolver = {
timeout: null,
resolve: null,
reject: null
}
if (this.resolvers.get(message.response)) {
throw new Error(`Only one active request allowed per message type (${name})`)
}
this.resolvers.set(message.response, resolver)
return new Promise((resolve, reject) => {
resolver.timeout = setTimeout(() => {
resolver.timeout = null
this.resolvers.delete(message.response)
reject(new Error(`Request timed out after ${this.timeout}ms`))
}, this.timeout)
resolver.resolve = resolve
resolver.reject = reject
})
}

/**
* Add a methods to the bound protocol for each protocol message that has a
* corresponding response message
*/
addMethods () {
const messages = this.protocol.messages.filter(m => m.response)
for (let message of messages) {
const name = message.name
const camel = name[0].toLowerCase() + name.slice(1)
this[name] = this[camel] = async (args) => this.request(name, args)
}
}
}

module.exports = BoundProtocol
Loading

0 comments on commit 0848e1e

Please sign in to comment.