diff --git a/package.json b/package.json index f99e05ec..a047b71d 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "@libp2p/crypto": "^1.0.0", "@libp2p/interface-connection-encrypter": "^3.0.0", "@libp2p/interface-keys": "^1.0.2", + "@libp2p/interface-metrics": "^4.0.2", "@libp2p/interface-peer-id": "^1.0.2", "@libp2p/logger": "^2.0.0", "@libp2p/peer-id": "^1.1.8", diff --git a/src/crypto/streaming.ts b/src/crypto/streaming.ts index b967b771..a374a269 100644 --- a/src/crypto/streaming.ts +++ b/src/crypto/streaming.ts @@ -1,11 +1,12 @@ import type { Transform } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' import type { IHandshake } from '../@types/handshake-interface.js' +import type { MetricsRegistry } from '../metrics.js' import { NOISE_MSG_MAX_LENGTH_BYTES, NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG } from '../constants.js' import { uint16BEEncode } from '../encoder.js' // Returns generator that encrypts payload from the user -export function encryptStream (handshake: IHandshake): Transform { +export function encryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform { return async function * (source) { for await (const chunk of source) { for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG) { @@ -15,6 +16,7 @@ export function encryptStream (handshake: IHandshake): Transform { } const data = handshake.encrypt(chunk.subarray(i, end), handshake.session) + metrics?.encryptedPackets.increment() yield uint16BEEncode(data.byteLength) yield data @@ -24,7 +26,7 @@ export function encryptStream (handshake: IHandshake): Transform { } // Decrypt received payload to the user -export function decryptStream (handshake: IHandshake): Transform { +export function decryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform { return async function * (source) { for await (const chunk of source) { for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES) { @@ -33,10 +35,12 @@ export function decryptStream (handshake: IHandshake): Transform + +export function registerMetrics (metrics: Metrics) { + return { + xxHandshakeSuccesses: metrics.registerCounter( + 'libp2p_noise_xxhandshake_successes_total', { + help: 'Total count of noise xxHandshakes successes_' + }), + + xxHandshakeErrors: metrics.registerCounter( + 'libp2p_noise_xxhandshake_error_total', { + help: 'Total count of noise xxHandshakes errors' + }), + + encryptedPackets: metrics.registerCounter( + 'libp2p_noise_encrypted_packets_total', { + help: 'Total count of noise encrypted packets successfully' + }), + + decryptedPackets: metrics.registerCounter( + 'libp2p_noise_decrypted_packets_total', { + help: 'Total count of noise decrypted packets' + }), + + decryptErrors: metrics.registerCounter( + 'libp2p_noise_decrypt_errors_total', { + help: 'Total count of noise decrypt errors' + }) + } +} diff --git a/src/noise.ts b/src/noise.ts index 9e1ef4b7..10c63166 100644 --- a/src/noise.ts +++ b/src/noise.ts @@ -16,6 +16,8 @@ import { uint16BEDecode, uint16BEEncode } from './encoder.js' import { XXHandshake } from './handshake-xx.js' import { getPayload } from './utils.js' import type { NoiseExtensions } from './proto/payload.js' +import type { Metrics } from '@libp2p/interface-metrics' +import { MetricsRegistry, registerMetrics } from './metrics.js' interface HandshakeParams { connection: ProtobufStream @@ -32,6 +34,7 @@ export interface NoiseInit { extensions?: NoiseExtensions crypto?: ICryptoInterface prologueBytes?: Uint8Array + metrics?: Metrics } export class Noise implements INoiseConnection { @@ -41,12 +44,14 @@ export class Noise implements INoiseConnection { private readonly prologue: Uint8Array private readonly staticKeys: KeyPair private readonly extensions?: NoiseExtensions + private readonly metrics?: MetricsRegistry constructor (init: NoiseInit = {}) { - const { staticNoiseKey, extensions, crypto, prologueBytes } = init + const { staticNoiseKey, extensions, crypto, prologueBytes, metrics } = init this.crypto = crypto ?? stablelib this.extensions = extensions + this.metrics = metrics ? registerMetrics(metrics) : undefined if (staticNoiseKey) { // accepts x25519 private key of length 32 @@ -153,7 +158,9 @@ export class Noise implements INoiseConnection { await handshake.propose() await handshake.exchange() await handshake.finish() + this.metrics?.xxHandshakeSuccesses.increment() } catch (e: unknown) { + this.metrics?.xxHandshakeErrors.increment() if (e instanceof Error) { e.message = `Error occurred during XX handshake: ${e.message}` throw e @@ -173,10 +180,10 @@ export class Noise implements INoiseConnection { await pipe( secure, // write to wrapper - encryptStream(handshake), // encrypt data + prefix with message length + encryptStream(handshake, this.metrics), // encrypt data + prefix with message length network, // send to the remote peer decode({ lengthDecoder: uint16BEDecode }), // read message length prefix - decryptStream(handshake), // decrypt the incoming data + decryptStream(handshake, this.metrics), // decrypt the incoming data secure // pipe to the wrapper ) diff --git a/test/index.spec.ts b/test/index.spec.ts index b67b349d..6158511e 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -1,5 +1,17 @@ +import type { Metrics } from '@libp2p/interface-metrics' import { expect } from 'aegir/chai' +import { duplexPair } from 'it-pair/duplex' +import { pbStream } from 'it-pb-stream' +import sinon from 'sinon' import { Noise } from '../src/noise.js' +import { createPeerIdsFromFixtures } from './fixtures/peer.js' + +function createCounterSpy () { + return sinon.spy({ + increment: () => {}, + reset: () => {} + }) +} describe('Index', () => { it('should expose class with tag and required functions', () => { @@ -8,4 +20,33 @@ describe('Index', () => { expect(typeof (noise.secureInbound)).to.equal('function') expect(typeof (noise.secureOutbound)).to.equal('function') }) + + it('should collect metrics', async () => { + const [localPeer, remotePeer] = await createPeerIdsFromFixtures(2) + const metricsRegistry = new Map>() + const metrics = { + registerCounter: (name: string) => { + const counter = createCounterSpy() + metricsRegistry.set(name, counter) + return counter + } + } + const noiseInit = new Noise({ metrics: metrics as any as Metrics }) + const noiseResp = new Noise({}) + + const [inboundConnection, outboundConnection] = duplexPair() + const [outbound, inbound] = await Promise.all([ + noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), + noiseResp.secureInbound(remotePeer, inboundConnection, localPeer) + ]) + const wrappedInbound = pbStream(inbound.conn) + const wrappedOutbound = pbStream(outbound.conn) + + wrappedOutbound.writeLP(Buffer.from('test')) + await wrappedInbound.readLP() + expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1) + expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0) + expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1) + expect(metricsRegistry.get('libp2p_noise_decrypt_errors_total')?.increment.callCount).to.equal(0) + }) })