Skip to content

Commit df740a1

Browse files
mpetrunicdapplion
andauthored
feat: add metrics (#246)
* Add metrics * Declare metrics locally and use MetricsRegister * feat: add libp2p metrics Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
1 parent 4b2113c commit df740a1

File tree

5 files changed

+91
-6
lines changed

5 files changed

+91
-6
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
"@libp2p/crypto": "^1.0.0",
7171
"@libp2p/interface-connection-encrypter": "^3.0.0",
7272
"@libp2p/interface-keys": "^1.0.2",
73+
"@libp2p/interface-metrics": "^4.0.2",
7374
"@libp2p/interface-peer-id": "^1.0.2",
7475
"@libp2p/logger": "^2.0.0",
7576
"@libp2p/peer-id": "^1.1.8",

src/crypto/streaming.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import type { Transform } from 'it-stream-types'
22
import type { Uint8ArrayList } from 'uint8arraylist'
33
import type { IHandshake } from '../@types/handshake-interface.js'
4+
import type { MetricsRegistry } from '../metrics.js'
45
import { NOISE_MSG_MAX_LENGTH_BYTES, NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG } from '../constants.js'
56
import { uint16BEEncode } from '../encoder.js'
67

78
// Returns generator that encrypts payload from the user
8-
export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
9+
export function encryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform<Uint8Array> {
910
return async function * (source) {
1011
for await (const chunk of source) {
1112
for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES_WITHOUT_TAG) {
@@ -15,6 +16,7 @@ export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
1516
}
1617

1718
const data = handshake.encrypt(chunk.subarray(i, end), handshake.session)
19+
metrics?.encryptedPackets.increment()
1820

1921
yield uint16BEEncode(data.byteLength)
2022
yield data
@@ -24,7 +26,7 @@ export function encryptStream (handshake: IHandshake): Transform<Uint8Array> {
2426
}
2527

2628
// Decrypt received payload to the user
27-
export function decryptStream (handshake: IHandshake): Transform<Uint8ArrayList, Uint8Array> {
29+
export function decryptStream (handshake: IHandshake, metrics?: MetricsRegistry): Transform<Uint8ArrayList, Uint8Array> {
2830
return async function * (source) {
2931
for await (const chunk of source) {
3032
for (let i = 0; i < chunk.length; i += NOISE_MSG_MAX_LENGTH_BYTES) {
@@ -33,10 +35,12 @@ export function decryptStream (handshake: IHandshake): Transform<Uint8ArrayList,
3335
end = chunk.length
3436
}
3537

36-
const { plaintext: decrypted, valid } = await handshake.decrypt(chunk.subarray(i, end), handshake.session)
38+
const { plaintext: decrypted, valid } = handshake.decrypt(chunk.subarray(i, end), handshake.session)
3739
if (!valid) {
40+
metrics?.decryptErrors.increment()
3841
throw new Error('Failed to validate decrypted chunk')
3942
}
43+
metrics?.decryptedPackets.increment()
4044
yield decrypted
4145
}
4246
}

src/metrics.ts

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import type { Metrics } from '@libp2p/interface-metrics'
2+
3+
export type MetricsRegistry = ReturnType<typeof registerMetrics>
4+
5+
export function registerMetrics (metrics: Metrics) {
6+
return {
7+
xxHandshakeSuccesses: metrics.registerCounter(
8+
'libp2p_noise_xxhandshake_successes_total', {
9+
help: 'Total count of noise xxHandshakes successes_'
10+
}),
11+
12+
xxHandshakeErrors: metrics.registerCounter(
13+
'libp2p_noise_xxhandshake_error_total', {
14+
help: 'Total count of noise xxHandshakes errors'
15+
}),
16+
17+
encryptedPackets: metrics.registerCounter(
18+
'libp2p_noise_encrypted_packets_total', {
19+
help: 'Total count of noise encrypted packets successfully'
20+
}),
21+
22+
decryptedPackets: metrics.registerCounter(
23+
'libp2p_noise_decrypted_packets_total', {
24+
help: 'Total count of noise decrypted packets'
25+
}),
26+
27+
decryptErrors: metrics.registerCounter(
28+
'libp2p_noise_decrypt_errors_total', {
29+
help: 'Total count of noise decrypt errors'
30+
})
31+
}
32+
}

src/noise.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import { uint16BEDecode, uint16BEEncode } from './encoder.js'
1616
import { XXHandshake } from './handshake-xx.js'
1717
import { getPayload } from './utils.js'
1818
import type { NoiseExtensions } from './proto/payload.js'
19+
import type { Metrics } from '@libp2p/interface-metrics'
20+
import { MetricsRegistry, registerMetrics } from './metrics.js'
1921

2022
interface HandshakeParams {
2123
connection: ProtobufStream
@@ -32,6 +34,7 @@ export interface NoiseInit {
3234
extensions?: NoiseExtensions
3335
crypto?: ICryptoInterface
3436
prologueBytes?: Uint8Array
37+
metrics?: Metrics
3538
}
3639

3740
export class Noise implements INoiseConnection {
@@ -41,12 +44,14 @@ export class Noise implements INoiseConnection {
4144
private readonly prologue: Uint8Array
4245
private readonly staticKeys: KeyPair
4346
private readonly extensions?: NoiseExtensions
47+
private readonly metrics?: MetricsRegistry
4448

4549
constructor (init: NoiseInit = {}) {
46-
const { staticNoiseKey, extensions, crypto, prologueBytes } = init
50+
const { staticNoiseKey, extensions, crypto, prologueBytes, metrics } = init
4751

4852
this.crypto = crypto ?? stablelib
4953
this.extensions = extensions
54+
this.metrics = metrics ? registerMetrics(metrics) : undefined
5055

5156
if (staticNoiseKey) {
5257
// accepts x25519 private key of length 32
@@ -153,7 +158,9 @@ export class Noise implements INoiseConnection {
153158
await handshake.propose()
154159
await handshake.exchange()
155160
await handshake.finish()
161+
this.metrics?.xxHandshakeSuccesses.increment()
156162
} catch (e: unknown) {
163+
this.metrics?.xxHandshakeErrors.increment()
157164
if (e instanceof Error) {
158165
e.message = `Error occurred during XX handshake: ${e.message}`
159166
throw e
@@ -173,10 +180,10 @@ export class Noise implements INoiseConnection {
173180

174181
await pipe(
175182
secure, // write to wrapper
176-
encryptStream(handshake), // encrypt data + prefix with message length
183+
encryptStream(handshake, this.metrics), // encrypt data + prefix with message length
177184
network, // send to the remote peer
178185
decode({ lengthDecoder: uint16BEDecode }), // read message length prefix
179-
decryptStream(handshake), // decrypt the incoming data
186+
decryptStream(handshake, this.metrics), // decrypt the incoming data
180187
secure // pipe to the wrapper
181188
)
182189

test/index.spec.ts

+41
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1+
import type { Metrics } from '@libp2p/interface-metrics'
12
import { expect } from 'aegir/chai'
3+
import { duplexPair } from 'it-pair/duplex'
4+
import { pbStream } from 'it-pb-stream'
5+
import sinon from 'sinon'
26
import { Noise } from '../src/noise.js'
7+
import { createPeerIdsFromFixtures } from './fixtures/peer.js'
8+
9+
function createCounterSpy () {
10+
return sinon.spy({
11+
increment: () => {},
12+
reset: () => {}
13+
})
14+
}
315

416
describe('Index', () => {
517
it('should expose class with tag and required functions', () => {
@@ -8,4 +20,33 @@ describe('Index', () => {
820
expect(typeof (noise.secureInbound)).to.equal('function')
921
expect(typeof (noise.secureOutbound)).to.equal('function')
1022
})
23+
24+
it('should collect metrics', async () => {
25+
const [localPeer, remotePeer] = await createPeerIdsFromFixtures(2)
26+
const metricsRegistry = new Map<string, ReturnType<typeof createCounterSpy>>()
27+
const metrics = {
28+
registerCounter: (name: string) => {
29+
const counter = createCounterSpy()
30+
metricsRegistry.set(name, counter)
31+
return counter
32+
}
33+
}
34+
const noiseInit = new Noise({ metrics: metrics as any as Metrics })
35+
const noiseResp = new Noise({})
36+
37+
const [inboundConnection, outboundConnection] = duplexPair<Uint8Array>()
38+
const [outbound, inbound] = await Promise.all([
39+
noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer),
40+
noiseResp.secureInbound(remotePeer, inboundConnection, localPeer)
41+
])
42+
const wrappedInbound = pbStream(inbound.conn)
43+
const wrappedOutbound = pbStream(outbound.conn)
44+
45+
wrappedOutbound.writeLP(Buffer.from('test'))
46+
await wrappedInbound.readLP()
47+
expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1)
48+
expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0)
49+
expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1)
50+
expect(metricsRegistry.get('libp2p_noise_decrypt_errors_total')?.increment.callCount).to.equal(0)
51+
})
1152
})

0 commit comments

Comments
 (0)