Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection-encrypter": "^3.0.0",
"@libp2p/interface-keys": "^1.0.2",
"@libp2p/interface-metrics": "^4.0.2",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^1.1.8",
Expand Down
10 changes: 7 additions & 3 deletions src/crypto/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Transform } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { IHandshake } from '../@types/handshake-interface.js'
import type { MetricsRegistry } from '../metrics.js'
import { NOISE_MSG_MAX_LENGTH_BYTES, NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG } from '../constants.js'
import { uint16BEEncode } from '../encoder.js'

// Returns generator that encrypts payload from the user
export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
export function encryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform<Uint8Array> {
return async function * (source) {
for await (const chunk of source) {
for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG) {
Expand All @@ -15,6 +16,7 @@ export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
}

const data = handshake.encrypt(chunk.subarray(i, end), handshake.session)
metrics?.encryptedPackets.increment()

yield uint16BEEncode(data.byteLength)
yield data
Expand All @@ -24,7 +26,7 @@ export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
}

// Decrypt received payload to the user
export function decryptStream (handshake: IHandshake): Transform<Uint8ArrayList, Uint8Array> {
export function decryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform<Uint8ArrayList, Uint8Array> {
return async function * (source) {
for await (const chunk of source) {
for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES) {
Expand All @@ -33,10 +35,12 @@ export function decryptStream (handshake: IHandshake): Transform<Uint8ArrayList,
end = chunk.length
}

const { plaintext: decrypted, valid } = await handshake.decrypt(chunk.subarray(i, end), handshake.session)
const { plaintext: decrypted, valid } = handshake.decrypt(chunk.subarray(i, end), handshake.session)
if (!valid) {
metrics?.decryptErrors.increment()
throw new Error('Failed to validate decrypted chunk')
}
metrics?.decryptedPackets.increment()
yield decrypted
}
}
Expand Down
32 changes: 32 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Metrics } from '@libp2p/interface-metrics'

export type MetricsRegistry = ReturnType<typeof registerMetrics>

export function registerMetrics (metrics: Metrics) {
return {
xxHandshakeSuccesses: metrics.registerCounter(
'libp2p_noise_xxhandshake_successes_total', {
help: 'Total count of noise xxHandshakes successes_'
}),

xxHandshakeErrors: metrics.registerCounter(
'libp2p_noise_xxhandshake_error_total', {
help: 'Total count of noise xxHandshakes errors'
}),

encryptedPackets: metrics.registerCounter(
'libp2p_noise_encrypted_packets_total', {
help: 'Total count of noise encrypted packets successfully'
}),

decryptedPackets: metrics.registerCounter(
'libp2p_noise_decrypted_packets_total', {
help: 'Total count of noise decrypted packets'
}),

decryptErrors: metrics.registerCounter(
'libp2p_noise_decrypt_errors_total', {
help: 'Total count of noise decrypt errors'
})
}
}
13 changes: 10 additions & 3 deletions src/noise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { uint16BEDecode, uint16BEEncode } from './encoder.js'
import { XXHandshake } from './handshake-xx.js'
import { getPayload } from './utils.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { Metrics } from '@libp2p/interface-metrics'
import { MetricsRegistry, registerMetrics } from './metrics.js'

interface HandshakeParams {
connection: ProtobufStream
Expand All @@ -32,6 +34,7 @@ export interface NoiseInit {
extensions?: NoiseExtensions
crypto?: ICryptoInterface
prologueBytes?: Uint8Array
metrics?: Metrics
}

export class Noise implements INoiseConnection {
Expand All @@ -41,12 +44,14 @@ export class Noise implements INoiseConnection {
private readonly prologue: Uint8Array
private readonly staticKeys: KeyPair
private readonly extensions?: NoiseExtensions
private readonly metrics?: MetricsRegistry

constructor (init: NoiseInit = {}) {
const { staticNoiseKey, extensions, crypto, prologueBytes } = init
const { staticNoiseKey, extensions, crypto, prologueBytes, metrics } = init

this.crypto = crypto ?? stablelib
this.extensions = extensions
this.metrics = metrics ? registerMetrics(metrics) : undefined

if (staticNoiseKey) {
// accepts x25519 private key of length 32
Expand Down Expand Up @@ -153,7 +158,9 @@ export class Noise implements INoiseConnection {
await handshake.propose()
await handshake.exchange()
await handshake.finish()
this.metrics?.xxHandshakeSuccesses.increment()
} catch (e: unknown) {
this.metrics?.xxHandshakeErrors.increment()
if (e instanceof Error) {
e.message = `Error occurred during XX handshake: ${e.message}`
throw e
Expand All @@ -173,10 +180,10 @@ export class Noise implements INoiseConnection {

await pipe(
secure, // write to wrapper
encryptStream(handshake), // encrypt data + prefix with message length
encryptStream(handshake, this.metrics), // encrypt data + prefix with message length
network, // send to the remote peer
decode({ lengthDecoder: uint16BEDecode }), // read message length prefix
decryptStream(handshake), // decrypt the incoming data
decryptStream(handshake, this.metrics), // decrypt the incoming data
secure // pipe to the wrapper
)

Expand Down
41 changes: 41 additions & 0 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import type { Metrics } from '@libp2p/interface-metrics'
import { expect } from 'aegir/chai'
import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-pb-stream'
import sinon from 'sinon'
import { Noise } from '../src/noise.js'
import { createPeerIdsFromFixtures } from './fixtures/peer.js'

function createCounterSpy () {
return sinon.spy({
increment: () => {},
reset: () => {}
})
}

describe('Index', () => {
it('should expose class with tag and required functions', () => {
Expand All @@ -8,4 +20,33 @@ describe('Index', () => {
expect(typeof (noise.secureInbound)).to.equal('function')
expect(typeof (noise.secureOutbound)).to.equal('function')
})

it('should collect metrics', async () => {
const [localPeer, remotePeer] = await createPeerIdsFromFixtures(2)
const metricsRegistry = new Map<string, ReturnType<typeof createCounterSpy>>()
const metrics = {
registerCounter: (name: string) => {
const counter = createCounterSpy()
metricsRegistry.set(name, counter)
return counter
}
}
const noiseInit = new Noise({ metrics: metrics as any as Metrics })
const noiseResp = new Noise({})

const [inboundConnection, outboundConnection] = duplexPair<Uint8Array>()
const [outbound, inbound] = await Promise.all([
noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer),
noiseResp.secureInbound(remotePeer, inboundConnection, localPeer)
])
const wrappedInbound = pbStream(inbound.conn)
const wrappedOutbound = pbStream(outbound.conn)

wrappedOutbound.writeLP(Buffer.from('test'))
await wrappedInbound.readLP()
expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1)
expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0)
expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1)
expect(metricsRegistry.get('libp2p_noise_decrypt_errors_total')?.increment.callCount).to.equal(0)
})
})