Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

devp2p: Add snappy compression for devp2p - EIP-706 #1399

Merged
merged 16 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11,145 changes: 8,570 additions & 2,575 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/block/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"lint:fix": "../../config/cli/lint-fix.sh",
"tape": "tape -r ts-node/register",
"test": "npm run test:node && npm run test:browser",
"test:node": "tape -r ts-node/register test/*.spec.ts",
"test:node": "npm run tape -- test/*.spec.ts",
"test:browser": "karma start karma.conf.js"
},
"dependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"lint:fix": "../../config/cli/lint-fix.sh",
"tape": "tape -r ts-node/register",
"test": "npm run test:node && npm run test:browser",
"test:node": "tape -r ts-node/register ./tests/*.spec.ts",
"test:node": "npm run tape -- ./tests/*.spec.ts",
"test:browser": "karma start karma.conf.js",
"docs:build": "typedoc --options typedoc.js"
},
Expand Down
6 changes: 3 additions & 3 deletions packages/devp2p/examples/peer-communication-les.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as devp2p from '../src/index'
import { LES, Peer } from '../src/index'
import Common, { Chain } from '@ethereumjs/common'
import Common, { Chain, Hardfork } from '@ethereumjs/common'
import { TypedTransaction } from '@ethereumjs/tx'
import { Block, BlockHeader } from '@ethereumjs/block'
import ms from 'ms'
Expand All @@ -16,7 +16,7 @@ const GENESIS_HASH = Buffer.from(
'hex'
)

const common = new Common({ chain: Chain.Rinkeby })
const common = new Common({ chain: Chain.Rinkeby, hardfork: Hardfork.London })
const bootstrapNodes = common.bootstrapNodes()
const BOOTNODES = bootstrapNodes.map((node: any) => {
return {
Expand Down Expand Up @@ -102,7 +102,7 @@ rlpx.on('peer:added', (peer) => {
)
break
}
const header = BlockHeader.fromValuesArray(payload[2][0], {})
const header = BlockHeader.fromValuesArray(payload[2][0], { common })

setTimeout(() => {
les.sendMessage(devp2p.LES.MESSAGE_CODES.GET_BLOCK_BODIES, [1, [header.hash()]])
Expand Down
6 changes: 4 additions & 2 deletions packages/devp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"tsc": "../../config/cli/ts-compile.sh",
"lint": "../../config/cli/lint.sh",
"lint:fix": "../../config/cli/lint-fix.sh",
"test": "tape -r ts-node/register ./test/index.ts"
"tape": "tape -r ts-node/register",
holgerd77 marked this conversation as resolved.
Show resolved Hide resolved
"test": "npm run tape -- ./test/index.ts"
},
"dependencies": {
"@ethereumjs/common": "^2.4.0",
Expand All @@ -58,7 +59,8 @@
"multiaddr": "^8.1.2",
"rlp": "^2.2.4",
"scanf": "^1.1.2",
"secp256k1": "^4.0.2"
"secp256k1": "^4.0.2",
"snappyjs": "^0.6.1"
},
"devDependencies": {
"@ethereumjs/block": "^3.4.0",
Expand Down
4 changes: 4 additions & 0 deletions packages/devp2p/src/@types/snappyjs/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
declare module 'snappyjs' {
function uncompress(data: Buffer): Buffer
function compress(data: Buffer): Buffer
}
22 changes: 19 additions & 3 deletions packages/devp2p/src/eth/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import assert from 'assert'
import { EventEmitter } from 'events'
import * as rlp from 'rlp'
import ms from 'ms'
import snappy from 'snappyjs'
import { debug as createDebugLogger } from 'debug'
import { BN } from 'ethereumjs-util'
import { int2buffer, buffer2int, assertEq, formatLogId, formatLogData } from '../util'
import { Peer, DISCONNECT_REASONS } from '../rlpx/peer'

import { debug as createDebugLogger } from 'debug'
const debug = createDebugLogger('devp2p:eth')
const verbose = createDebugLogger('verbose').enabled

Expand Down Expand Up @@ -230,7 +231,15 @@ export class ETH extends EventEmitter {
this._peer._socket.remotePort
} (eth${this._version}): ${this._getStatusString(this._status)}`
)
this._send(ETH.MESSAGE_CODES.STATUS, rlp.encode(this._status as any))

let payload = rlp.encode(this._status as any)

// Use snappy compression if peer supports DevP2P >=v5
if (this._peer._hello?.protocolVersion && this._peer._hello?.protocolVersion >= 5) {
payload = snappy.compress(payload)
}

this._send(ETH.MESSAGE_CODES.STATUS, payload)
holgerd77 marked this conversation as resolved.
Show resolved Hide resolved
this._handleStatus()
}

Expand Down Expand Up @@ -272,7 +281,14 @@ export class ETH extends EventEmitter {
throw new Error(`Unknown code ${code}`)
}

this._send(code, rlp.encode(payload))
payload = rlp.encode(payload)

// Use snappy compression if peer supports DevP2P >=v5
if (this._peer._hello?.protocolVersion && this._peer._hello?.protocolVersion >= 5) {
payload = snappy.compress(payload)
}

this._send(code, payload)
}

getMsgPrefix(msgCode: ETH.MESSAGE_CODES): string {
Expand Down
20 changes: 18 additions & 2 deletions packages/devp2p/src/les/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events'
import * as rlp from 'rlp'
import ms from 'ms'
import snappy from 'snappyjs'
import { debug as createDebugLogger } from 'debug'
import { int2buffer, buffer2int, assertEq, formatLogData } from '../util'
import { Peer, DISCONNECT_REASONS } from '../rlpx/peer'
Expand Down Expand Up @@ -166,7 +167,15 @@ export class LES extends EventEmitter {
this._peer._socket.remotePort
} (les${this._version}): ${this._getStatusString(this._status)}`
)
this._send(LES.MESSAGE_CODES.STATUS, rlp.encode(statusList))

let payload = rlp.encode(statusList)

// Use snappy compression if peer supports DevP2P >=v5
if (this._peer._hello?.protocolVersion && this._peer._hello?.protocolVersion >= 5) {
payload = snappy.compress(payload)
}

this._send(LES.MESSAGE_CODES.STATUS, payload)
this._handleStatus()
}

Expand Down Expand Up @@ -219,7 +228,14 @@ export class LES extends EventEmitter {
throw new Error(`Unknown code ${code}`)
}

this._send(code, rlp.encode(payload))
payload = rlp.encode(payload)

// Use snappy compression if peer supports DevP2P >=v5
if (this._peer._hello?.protocolVersion && this._peer._hello?.protocolVersion >= 5) {
payload = snappy.compress(payload)
}

this._send(code, payload)
}

getMsgPrefix(msgCode: LES.MESSAGE_CODES) {
Expand Down
15 changes: 12 additions & 3 deletions packages/devp2p/src/rlpx/peer.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { Socket } from 'net'
import { EventEmitter } from 'events'
import * as rlp from 'rlp'
import * as util from '../util'
import BufferList = require('bl')
import ms from 'ms'
import snappy from 'snappyjs'
import { debug as createDebugLogger } from 'debug'
import Common from '@ethereumjs/common'
import { ECIES } from './ecies'
import { ETH, LES } from '../'
import { int2buffer, buffer2int, formatLogData } from '../util'
import { Socket } from 'net'

const debug = createDebugLogger('devp2p:rlpx:peer')
const verbose = createDebugLogger('verbose').enabled

export const BASE_PROTOCOL_VERSION = 4
export const BASE_PROTOCOL_VERSION = 5
export const BASE_PROTOCOL_LENGTH = 16

export const PING_INTERVAL = ms('15s')
Expand Down Expand Up @@ -427,6 +428,7 @@ export class Peer extends EventEmitter {
*/
_handleMessage(code: PREFIXES, msg: Buffer) {
const payload = rlp.decode(msg)

switch (code) {
case PREFIXES.HELLO:
this._handleHello(payload)
Expand Down Expand Up @@ -499,7 +501,14 @@ export class Peer extends EventEmitter {
)

try {
obj.protocol._handleMessage(msgCode, body.slice(1))
let payload = body.slice(1)

// Use snappy uncompression if peer supports DevP2P >=v5
if (this._hello?.protocolVersion && this._hello?.protocolVersion >= 5) {
payload = snappy.uncompress(payload)
}

obj.protocol._handleMessage(msgCode, payload)
} catch (err) {
this.disconnect(DISCONNECT_REASONS.SUBPROTOCOL_ERROR)
debug(`Error on peer subprotocol message handling: ${err}`)
Expand Down
6 changes: 6 additions & 0 deletions packages/devp2p/test/integration/eth-simulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,9 @@ test('ETH: invalid status send', async (t) => {
}
util.twoPeerMsgExchange(t, opts, capabilities)
})

test('RLPX: verify that snappy compression is not used with an RLPX peer that only supports devp2p 4', (t) => {
const opts: any = {}
opts.status0 = Object.assign({}, status)
util.twoPeerMsgExchange2(t, opts, capabilities)
})
45 changes: 45 additions & 0 deletions packages/devp2p/test/integration/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,48 @@ export function destroyRLPXs(rlpxs: any) {
rlpx.destroy()
}
}

export function twoPeerMsgExchange2(
t: Test,
opts: any,
capabilities?: any,
common?: Object | Common
) {
const rlpxs = initTwoPeerRLPXSetup(null, capabilities, common)
rlpxs[0].on('peer:added', function (peer: any) {
const protocol = peer.getProtocols()[0]
const v4Hello = {
protocolVersion: 4,
clientId: 'fakePeer',
capabilities: [ETH.eth66],
port: 30303,
id: Buffer.alloc(12),
}
// Set peer's devp2p protocol version to 4
protocol._peer._hello = v4Hello
protocol.sendStatus(opts.status0)
peer.on('error', (err: Error) => {
t.fail(`Unexpected peer 0 error: ${err}`)
})
})

rlpxs[1].on('peer:added', function (peer: any) {
const protocol = peer.getProtocols()[0]
protocol.once('message', async (code: any, _payload: any) => {
switch (code) {
case ETH.MESSAGE_CODES.STATUS:
t.fail('should not have been able to process status message')
break
}
})
peer.once('error', (err: any) => {
t.equal(
err.message,
'Invalid Snappy bitstream',
'unable to process snappy compressed message'
)
t.end()
destroyRLPXs(rlpxs)
})
})
}
holgerd77 marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion packages/devp2p/tsconfig.browser.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"compilerOptions": {
"outDir": "./dist.browser",
"types": ["node"],
"typeRoots": ["node_modules/@types"]
"typeRoots": ["node_modules/@types", "src/@types"]
}
}
2 changes: 1 addition & 1 deletion packages/devp2p/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"include": ["src/**/*.ts", "test/**/*.ts", "examples/**/*.ts"],
"compilerOptions": {
"baseUrl": ".",
"typeRoots": ["node_modules/@types"],
"typeRoots": ["node_modules/@types", "src/@types"],
},
}
2 changes: 1 addition & 1 deletion packages/devp2p/tsconfig.prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"baseUrl": "./",
"outDir": "./dist",
"rootDir": "./src",
"typeRoots": ["node_modules/@types"],
"typeRoots": ["node_modules/@types", "src/@types"],
"composite": true,
},
"references": [
Expand Down