-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: better reqresp logging + handle empty responses in snappy #10657
Merged
+170
−27
Merged
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep'; | |||||
import { describe, expect, it, jest } from '@jest/globals'; | ||||||
import { type MockProxy, mock } from 'jest-mock-extended'; | ||||||
|
||||||
import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js'; | ||||||
import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
import { | ||||||
MOCK_SUB_PROTOCOL_HANDLERS, | ||||||
MOCK_SUB_PROTOCOL_VALIDATORS, | ||||||
|
@@ -86,9 +86,15 @@ describe('ReqResp', () => { | |||||
void nodes[1].req.stop(); | ||||||
void nodes[2].req.stop(); | ||||||
|
||||||
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); | ||||||
|
||||||
// send from the first node | ||||||
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST); | ||||||
|
||||||
// We expect the logger to have been called twice with the peer ids citing the inability to connect | ||||||
expect(loggerSpy).toHaveBeenCalledWith(`Connection reset: ${nodes[1].p2p.peerId.toString()}`); | ||||||
expect(loggerSpy).toHaveBeenCalledWith(`Connection reset: ${nodes[2].p2p.peerId.toString()}`); | ||||||
|
||||||
expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); | ||||||
}); | ||||||
|
||||||
|
@@ -139,6 +145,29 @@ describe('ReqResp', () => { | |||||
expect(res).toEqual(tx); | ||||||
}); | ||||||
|
||||||
it('Handle returning empty buffers', async () => { | ||||||
const tx = mockTx(); | ||||||
const txHash = tx.getTxHash(); | ||||||
|
||||||
const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; | ||||||
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => { | ||||||
return Promise.resolve(Buffer.alloc(0)); | ||||||
}; | ||||||
|
||||||
nodes = await createNodes(peerManager, 2); | ||||||
|
||||||
const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); | ||||||
|
||||||
await startNodes(nodes, protocolHandlers); | ||||||
await sleep(500); | ||||||
await connectToPeers(nodes); | ||||||
await sleep(500); | ||||||
|
||||||
const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); | ||||||
expect(spySendRequestToPeer).toHaveBeenCalledTimes(1); | ||||||
expect(res).toEqual(undefined); | ||||||
}); | ||||||
|
||||||
it('Does not crash if tx hash returns undefined', async () => { | ||||||
const tx = mockTx(); | ||||||
const txHash = tx.getTxHash(); | ||||||
|
@@ -170,7 +199,7 @@ describe('ReqResp', () => { | |||||
}); | ||||||
|
||||||
// Spy on the logger to make sure the error message is logged | ||||||
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error'); | ||||||
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); | ||||||
|
||||||
await sleep(500); | ||||||
await connectToPeers(nodes); | ||||||
|
@@ -183,9 +212,9 @@ describe('ReqResp', () => { | |||||
// Make sure the error message is logged | ||||||
const peerId = nodes[1].p2p.peerId.toString(); | ||||||
expect(loggerSpy).toHaveBeenCalledWith( | ||||||
expect.stringMatching(/Error sending request to peer/i), | ||||||
expect.any(Error), | ||||||
{ peerId, subProtocol: '/aztec/req/tx/0.1.0' }, | ||||||
`Timeout error: ${ | ||||||
new IndiviualReqRespTimeoutError().message | ||||||
} | peerId: ${peerId.toString()} | subProtocol: ${TX_REQ_PROTOCOL}`, | ||||||
); | ||||||
|
||||||
// Expect the peer to be penalized for timing out | ||||||
|
@@ -209,7 +238,7 @@ describe('ReqResp', () => { | |||||
} | ||||||
|
||||||
// Spy on the logger to make sure the error message is logged | ||||||
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error'); | ||||||
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); | ||||||
|
||||||
await sleep(500); | ||||||
await connectToPeers(nodes); | ||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,10 +5,14 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer'; | |||||||||||||
import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; | ||||||||||||||
import { pipe } from 'it-pipe'; | ||||||||||||||
import { type Libp2p } from 'libp2p'; | ||||||||||||||
import { compressSync, uncompressSync } from 'snappy'; | ||||||||||||||
import { type Uint8ArrayList } from 'uint8arraylist'; | ||||||||||||||
|
||||||||||||||
import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; | ||||||||||||||
import { | ||||||||||||||
CollectiveReqRespTimeoutError, | ||||||||||||||
IndiviualReqRespTimeoutError, | ||||||||||||||
InvalidResponseError, | ||||||||||||||
} from '../../errors/reqresp.error.js'; | ||||||||||||||
import { SnappyTransform } from '../encoding.js'; | ||||||||||||||
import { type PeerManager } from '../peer_manager.js'; | ||||||||||||||
import { PeerErrorSeverity } from '../peer_scoring.js'; | ||||||||||||||
import { type P2PReqRespConfig } from './config.js'; | ||||||||||||||
|
@@ -49,13 +53,16 @@ export class ReqResp { | |||||||||||||
|
||||||||||||||
private rateLimiter: RequestResponseRateLimiter; | ||||||||||||||
|
||||||||||||||
private snappyTransform: SnappyTransform; | ||||||||||||||
|
||||||||||||||
constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) { | ||||||||||||||
this.logger = createLogger('p2p:reqresp'); | ||||||||||||||
|
||||||||||||||
this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; | ||||||||||||||
this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; | ||||||||||||||
|
||||||||||||||
this.rateLimiter = new RequestResponseRateLimiter(peerManager); | ||||||||||||||
this.snappyTransform = new SnappyTransform(); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
|
@@ -143,8 +150,7 @@ export class ReqResp { | |||||||||||||
// The response validator handles peer punishment within | ||||||||||||||
const isValid = await responseValidator(request, object, peer); | ||||||||||||||
if (!isValid) { | ||||||||||||||
this.logger.error(`Invalid response for ${subProtocol} from ${peer.toString()}`); | ||||||||||||||
return undefined; | ||||||||||||||
throw new InvalidResponseError(); | ||||||||||||||
} | ||||||||||||||
return object; | ||||||||||||||
} | ||||||||||||||
|
@@ -159,7 +165,7 @@ export class ReqResp { | |||||||||||||
() => new CollectiveReqRespTimeoutError(), | ||||||||||||||
); | ||||||||||||||
} catch (e: any) { | ||||||||||||||
this.logger.error(`${e.message} | subProtocol: ${subProtocol}`); | ||||||||||||||
this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`); | ||||||||||||||
return undefined; | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
@@ -200,18 +206,14 @@ export class ReqResp { | |||||||||||||
|
||||||||||||||
// Open the stream with a timeout | ||||||||||||||
const result = await executeTimeoutWithCustomError<Buffer>( | ||||||||||||||
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage), | ||||||||||||||
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage.bind(this)), | ||||||||||||||
this.individualRequestTimeoutMs, | ||||||||||||||
() => new IndiviualReqRespTimeoutError(), | ||||||||||||||
); | ||||||||||||||
|
||||||||||||||
await stream.close(); | ||||||||||||||
this.logger.trace(`Stream closed with ${peerId.toString()} for ${subProtocol}`); | ||||||||||||||
|
||||||||||||||
return result; | ||||||||||||||
} catch (e: any) { | ||||||||||||||
this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol }); | ||||||||||||||
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); | ||||||||||||||
this.handleResponseError(e, peerId, subProtocol); | ||||||||||||||
} finally { | ||||||||||||||
if (stream) { | ||||||||||||||
try { | ||||||||||||||
|
@@ -224,7 +226,65 @@ export class ReqResp { | |||||||||||||
} | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
return undefined; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Handle a response error | ||||||||||||||
* | ||||||||||||||
* ReqResp errors are punished differently depending on the severity of the offense | ||||||||||||||
* | ||||||||||||||
* @param e - The error | ||||||||||||||
* @param peerId - The peer id | ||||||||||||||
* @param subProtocol - The sub protocol | ||||||||||||||
* @returns If the error is non pubishable, then undefined is returned, otherwise the peer is penalized | ||||||||||||||
*/ | ||||||||||||||
private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void { | ||||||||||||||
const severity = this.categorizeError(e, peerId, subProtocol); | ||||||||||||||
if (severity) { | ||||||||||||||
this.peerManager.penalizePeer(peerId, severity); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Categorize the error and log it. | ||||||||||||||
*/ | ||||||||||||||
private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined { | ||||||||||||||
// Non pubishable errors | ||||||||||||||
// We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour | ||||||||||||||
if (e instanceof CollectiveReqRespTimeoutError || e instanceof InvalidResponseError) { | ||||||||||||||
this.logger.debug( | ||||||||||||||
`Non-punishable error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`, | ||||||||||||||
); | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: let's log stuff like the peerId as labels, so we can filter by them in loki more easily.
Suggested change
|
||||||||||||||
return undefined; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Pubishable errors | ||||||||||||||
// Connection reset errors in the networking stack are punished with high severity | ||||||||||||||
// it just signals an unreliable peer | ||||||||||||||
// We assume that the requesting node has a functioning networking stack. | ||||||||||||||
if (e?.code === 'ECONNRESET' || e?.code === 'EPIPE') { | ||||||||||||||
this.logger.debug(`Connection reset: ${peerId.toString()}`); | ||||||||||||||
return PeerErrorSeverity.HighToleranceError; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if (e?.code === 'ECONNREFUSED') { | ||||||||||||||
this.logger.debug(`Connection refused: ${peerId.toString()}`); | ||||||||||||||
return PeerErrorSeverity.HighToleranceError; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Timeout errors are punished with high tolerance, they can be due to a geogrpahically far away peer or an | ||||||||||||||
// overloaded peer | ||||||||||||||
if (e instanceof IndiviualReqRespTimeoutError) { | ||||||||||||||
this.logger.debug(`Timeout error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`); | ||||||||||||||
return PeerErrorSeverity.HighToleranceError; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Catch all error | ||||||||||||||
this.logger.error(`Unexpected error sending request to peer`, e, { | ||||||||||||||
peerId: peerId.toString(), | ||||||||||||||
subProtocol, | ||||||||||||||
}); | ||||||||||||||
return PeerErrorSeverity.HighToleranceError; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
|
@@ -235,8 +295,8 @@ export class ReqResp { | |||||||||||||
for await (const chunk of source) { | ||||||||||||||
chunks.push(chunk.subarray()); | ||||||||||||||
} | ||||||||||||||
const messageData = chunks.concat(); | ||||||||||||||
return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer; | ||||||||||||||
const messageData = Buffer.concat(chunks); | ||||||||||||||
return this.snappyTransform.inboundTransformNoTopic(messageData); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
|
@@ -266,6 +326,7 @@ export class ReqResp { | |||||||||||||
} | ||||||||||||||
|
||||||||||||||
const handler = this.subProtocolHandlers[protocol]; | ||||||||||||||
const transform = this.snappyTransform; | ||||||||||||||
|
||||||||||||||
try { | ||||||||||||||
await pipe( | ||||||||||||||
|
@@ -274,7 +335,7 @@ export class ReqResp { | |||||||||||||
for await (const chunkList of source) { | ||||||||||||||
const msg = Buffer.from(chunkList.subarray()); | ||||||||||||||
const response = await handler(msg); | ||||||||||||||
yield new Uint8Array(compressSync(response)); | ||||||||||||||
yield new Uint8Array(transform.outboundTransformNoTopic(response)); | ||||||||||||||
} | ||||||||||||||
}, | ||||||||||||||
stream, | ||||||||||||||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take it neiter the asBuffer: true nor the Uint8Array wrapper are needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(same for the Uint8Array in output)