From 2e7d30a1671aa667f7c000c737dfa40824f7b9f1 Mon Sep 17 00:00:00 2001 From: "Leah E. Cole" <6719667+leahecole@users.noreply.github.com> Date: Wed, 8 May 2024 14:32:07 -0400 Subject: [PATCH] fix: don't emit error event during stream handoff (#1592) * fix: don't emit error event during stream handoff * properly resolve promises in tests --- gax/src/streamingRetryRequest.ts | 7 +- gax/test/test-application/src/index.ts | 116 ++++++++++--------------- 2 files changed, 52 insertions(+), 71 deletions(-) diff --git a/gax/src/streamingRetryRequest.ts b/gax/src/streamingRetryRequest.ts index e97f64355..3972a26ed 100644 --- a/gax/src/streamingRetryRequest.ts +++ b/gax/src/streamingRetryRequest.ts @@ -106,8 +106,11 @@ export function streamingRetryRequest(opts: streamingRetryRequestOptions) { // No more attempts need to be made, just continue on. retryStream.emit('response', response); delayStream.pipe(retryStream); - requestStream.on('error', (err: GoogleError) => { - retryStream.destroy(err); + requestStream.on('error', () => { + // retryStream must be destroyed here for the stream handoff part of retries to function properly + // but the error event should not be passed - if it emits as part of .destroy() + // it will bubble up early to the caller + retryStream.destroy(); }); } } diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index 8d13f8618..f7328414a 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -467,6 +467,7 @@ async function testWait(client: EchoClient) { assert.deepStrictEqual(response.content, request.success.content); } +// a successful streaming call that has retry options passed but does not retry async function testServerStreamingRetryOptions(client: SequenceServiceClient) { const finalData: string[] = []; const backoffSettings = createBackoffSettings( @@ -495,7 +496,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) { ); const response = await client.createStreamingSequence(request); - await new Promise(resolve => { + await new Promise((resolve, reject) => { const sequence = response[0]; const attemptRequest = @@ -509,8 +510,9 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) { attemptStream.on('data', (response: {content: string}) => { finalData.push(response.content); }); - attemptStream.on('error', () => { - //Do Nothing + attemptStream.on('error', (error: GoogleError) => { + // should not reach this + reject(error); }); attemptStream.on('end', () => { attemptStream.end(); @@ -524,6 +526,7 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) { }); } +// a streaming call that retries two times and finishes successfully async function testServerStreamingRetrieswithRetryOptions( client: SequenceServiceClient ) { @@ -554,7 +557,7 @@ async function testServerStreamingRetrieswithRetryOptions( ); const response = await client.createStreamingSequence(request); - await new Promise(resolve => { + await new Promise((resolve, reject) => { const sequence = response[0]; const attemptRequest = @@ -568,8 +571,8 @@ async function testServerStreamingRetrieswithRetryOptions( attemptStream.on('data', (response: {content: string}) => { finalData.push(response.content); }); - attemptStream.on('error', () => { - //Do Nothing + attemptStream.on('error', error => { + reject(error); }); attemptStream.on('end', () => { attemptStream.end(); @@ -583,6 +586,7 @@ async function testServerStreamingRetrieswithRetryOptions( }); } +// a streaming call that retries twice using shouldRetryFn and finally succeeds async function testServerStreamingRetriesWithShouldRetryFn( client: SequenceServiceClient ) { @@ -617,7 +621,7 @@ async function testServerStreamingRetriesWithShouldRetryFn( ); const response = await client.createStreamingSequence(request); - await new Promise(resolve => { + await new Promise((resolve, reject) => { const sequence = response[0]; const attemptRequest = new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); @@ -630,8 +634,9 @@ async function testServerStreamingRetriesWithShouldRetryFn( attemptStream.on('data', (response: {content: string}) => { finalData.push(response.content); }); - attemptStream.on('error', () => { - // Do nothing + attemptStream.on('error', error => { + // should not reach this + reject(error); }); attemptStream.on('end', () => { attemptStream.end(); @@ -645,6 +650,7 @@ async function testServerStreamingRetriesWithShouldRetryFn( }); } +// streaming call that retries twice using RetryRequestOptions instead of RetryOptions async function testServerStreamingRetrieswithRetryRequestOptions( client: SequenceServiceClient ) { @@ -676,7 +682,7 @@ async function testServerStreamingRetrieswithRetryRequestOptions( ); const response = await client.createStreamingSequence(request); - await new Promise(resolve => { + await new Promise((resolve, reject) => { const sequence = response[0]; const attemptRequest = @@ -690,8 +696,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions( attemptStream.on('data', (response: {content: string}) => { finalData.push(response.content); }); - attemptStream.on('error', () => { - // Do Nothing + attemptStream.on('error', error => { + reject(error); }); attemptStream.on('end', () => { attemptStream.end(); @@ -704,6 +710,8 @@ async function testServerStreamingRetrieswithRetryRequestOptions( ); }); } + +// streaming call that retries twice with RetryRequestOpsions and resumes from where it left off async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy( client: SequenceServiceClient ) { @@ -748,7 +756,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - await new Promise(resolve => { + await new Promise((resolve, reject) => { const sequence = response[0]; const attemptRequest = @@ -762,8 +770,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate attemptStream.on('data', (response: {content: string}) => { finalData.push(response.content); }); - attemptStream.on('error', () => { - // do nothing + attemptStream.on('error', error => { + reject(error); }); attemptStream.on('end', () => { attemptStream.end(); @@ -777,6 +785,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate }); } +// retries twice but fails with an error not from the streaming sequence async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy( client: SequenceServiceClient ) { @@ -816,9 +825,8 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum [1, 2, 11], 'This is testing the brand new and shiny StreamingSequence server 3' ); - const allowedCodes = [4, 14]; const response = await client.createStreamingSequence(request); - await new Promise((_, reject) => { + await new Promise(resolve => { const sequence = response[0]; const attemptRequest = @@ -831,21 +839,14 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum ); attemptStream.on('error', (e: GoogleError) => { - if (!allowedCodes.includes(e.code!)) { - reject(e); - } + assert.strictEqual(e.code, 3); + assert.match(e.note!, /not classified as transient/); + resolve(); }); - }).then( - () => { - assert(false); - }, - (err: GoogleError) => { - assert.strictEqual(err.code, 3); - assert.match(err.note!, /not classified as transient/); - } - ); + }); } +// fails on the first error in the sequence async function testServerStreamingThrowsClassifiedTransientErrorNote( client: SequenceServiceClient ) { @@ -875,7 +876,7 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote( ); const response = await client.createStreamingSequence(request); - await new Promise((_, reject) => { + await new Promise(resolve => { const sequence = response[0]; const attemptRequest = @@ -887,21 +888,14 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote( settings ); attemptStream.on('error', (e: GoogleError) => { - if (!allowedCodes.includes(e.code!)) { - reject(e); - } + assert.strictEqual(e.code, 14); + assert.match(e.note!, /not classified as transient/); + resolve(); }); - }).then( - () => { - assert(false); - }, - (err: GoogleError) => { - assert.strictEqual(err.code, 14); - assert.match(err.note!, /not classified as transient/); - } - ); + }); } +// retries once and fails on the second error in the sequence async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote( client: SequenceServiceClient ) { @@ -931,7 +925,7 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote( ); const response = await client.createStreamingSequence(request); - await new Promise((_, reject) => { + await new Promise(resolve => { const sequence = response[0]; const attemptRequest = @@ -943,19 +937,11 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote( settings ); attemptStream.on('error', (e: GoogleError) => { - if (!allowedCodes.includes(e.code!)) { - reject(e); - } + assert.strictEqual(e.code, 4); + assert.match(e.note!, /not classified as transient/); + resolve(); }); - }).then( - () => { - assert(false); - }, - (err: GoogleError) => { - assert.strictEqual(err.code, 4); - assert.match(err.note!, /not classified as transient/); - } - ); + }); } async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries( @@ -988,7 +974,7 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries( ); const response = await client.createStreamingSequence(request); - await new Promise((_, reject) => { + await new Promise(resolve => { const sequence = response[0]; const attemptRequest = @@ -1000,22 +986,14 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries( settings ); attemptStream.on('error', (e: GoogleError) => { - if (!allowedCodes.includes(e.code!)) { - reject(e); - } - }); - }).then( - () => { - assert(false); - }, - (err: GoogleError) => { - assert.strictEqual(err.code, 3); + assert.strictEqual(e.code, 3); assert.match( - err.message, + e.message, /Cannot set both totalTimeoutMillis and maxRetries/ ); - } - ); + resolve(); + }); + }); } async function main() {