From f16ce42f624292e6a6ee6ec0df51c034a7513e5c Mon Sep 17 00:00:00 2001 From: Francois HERSENT Date: Fri, 10 Feb 2023 16:56:38 +0100 Subject: [PATCH 1/7] implement abort option --- src/generate-grpc-web.ts | 54 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index 2bfdb1efe..2795f8fa8 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -48,6 +48,8 @@ export function generateGrpcClientImpl( /** Creates the RPC methods that client code actually calls. */ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) { assertInstanceOf(methodDesc, FormattedMethodDescriptor); + const { options } = ctx; + const { useAbortSignal } = options; const requestMessage = rawRequestType(ctx, methodDesc); const inputType = requestType(ctx, methodDesc, true); const returns = responsePromiseOrObservable(ctx, methodDesc); @@ -56,6 +58,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me return code` ${methodDesc.formattedName}( request: ${inputType}, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata?: grpc.Metadata, ): ${returns} { throw new Error('ts-proto does not yet support client streaming!'); @@ -67,11 +70,13 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me return code` ${methodDesc.formattedName}( request: ${inputType}, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata?: grpc.Metadata, ): ${returns} { return this.rpc.${method}( ${methodDescName(serviceDesc, methodDesc)}, ${requestMessage}.fromPartial(request), + ${useAbortSignal ? "abortSignal," : ""} metadata, ); } @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code /** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code { const chunks: Code[] = []; + const { options } = ctx; + const { useAbortSignal } = options; chunks.push(code`interface Rpc {`); @@ -173,6 +180,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre unary( methodDesc: T, request: any, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined, ): ${wrapper}; `); @@ -182,6 +190,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre invoke( methodDesc: T, request: any, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined, ): ${observableType(ctx)}; `); @@ -230,10 +239,14 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami } function createPromiseUnaryMethod(ctx: Context): Code { + const { options } = ctx; + const { useAbortSignal } = options; + return code` unary( methodDesc: T, _request: any, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined ): Promise { const request = { ..._request, ...methodDesc.requestType }; @@ -242,7 +255,7 @@ function createPromiseUnaryMethod(ctx: Context): Code { ? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Promise((resolve, reject) => { - ${grpc}.unary(methodDesc, { + ${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -257,16 +270,28 @@ function createPromiseUnaryMethod(ctx: Context): Code { } }, }); + + ${useAbortSignal ? ` + const abortHandler = () => { + client.close(); + reject(new Error("Aborted")); + } + + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} }); } `; } function createObservableUnaryMethod(ctx: Context): Code { + const { options } = ctx; + const { useAbortSignal } = options; + return code` unary( methodDesc: T, _request: any, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined ): ${observableType(ctx)} { const request = { ..._request, ...methodDesc.requestType }; @@ -275,7 +300,7 @@ function createObservableUnaryMethod(ctx: Context): Code { ? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Observable(observer => { - ${grpc}.unary(methodDesc, { + ${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -291,16 +316,30 @@ function createObservableUnaryMethod(ctx: Context): Code { } }, }); + + + ${useAbortSignal ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} + }); + }).pipe(${take}(1)); } `; } function createInvokeMethod(ctx: Context) { + const { options } = ctx; + const { useAbortSignal } = options; + return code` invoke( methodDesc: T, _request: any, + ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined ): ${observableType(ctx)} { const upStreamCodes = this.options.upStreamRetryCodes || []; @@ -332,7 +371,16 @@ function createInvokeMethod(ctx: Context) { } }, }); - observer.add(() => client.close()); + observer.add(() => { + if (!observer.closed) return client.close() + }); + + ${useAbortSignal ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} }); upStream(); }).pipe(${share}()); From e549e8269f776bf0219a34046a51ab192bafe9e3 Mon Sep 17 00:00:00 2001 From: Francois HERSENT Date: Fri, 10 Feb 2023 17:20:23 +0100 Subject: [PATCH 2/7] fix format --- src/generate-grpc-web.ts | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index 2795f8fa8..588f9ece0 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -271,13 +271,17 @@ function createPromiseUnaryMethod(ctx: Context): Code { }, }); - ${useAbortSignal ? ` + ${ + useAbortSignal + ? ` const abortHandler = () => { client.close(); reject(new Error("Aborted")); } - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : "" + } }); } `; @@ -318,12 +322,16 @@ function createObservableUnaryMethod(ctx: Context): Code { }); - ${useAbortSignal ? ` + ${ + useAbortSignal + ? ` const abortHandler = () => { observer.error("Aborted"); client.close(); }; - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : "" + } }); }).pipe(${take}(1)); @@ -334,7 +342,7 @@ function createObservableUnaryMethod(ctx: Context): Code { function createInvokeMethod(ctx: Context) { const { options } = ctx; const { useAbortSignal } = options; - + return code` invoke( methodDesc: T, @@ -375,12 +383,16 @@ function createInvokeMethod(ctx: Context) { if (!observer.closed) return client.close() }); - ${useAbortSignal ? ` + ${ + useAbortSignal + ? ` const abortHandler = () => { observer.error("Aborted"); client.close(); }; - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : ""} + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : "" + } }); upStream(); }).pipe(${share}()); From 4d885fa0f72a7ba96c8e977c4efe9a56310a9d3f Mon Sep 17 00:00:00 2001 From: HERSENT Date: Mon, 13 Feb 2023 09:55:52 +0000 Subject: [PATCH 3/7] update example --- integration/grpc-web/example.ts | 6 +++++- src/generate-grpc-web.ts | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/integration/grpc-web/example.ts b/integration/grpc-web/example.ts index a636468af..1e7b9fb4b 100644 --- a/integration/grpc-web/example.ts +++ b/integration/grpc-web/example.ts @@ -918,7 +918,11 @@ export class GrpcWebImpl { } }, }); - observer.add(() => client.close()); + observer.add(() => { + if (!observer.closed) { + return client.close(); + } + }); }); upStream(); }).pipe(share()); diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index 588f9ece0..c4690eda1 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -332,7 +332,6 @@ function createObservableUnaryMethod(ctx: Context): Code { if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` : "" } - }); }).pipe(${take}(1)); } From ab5c4d41cdb9c7c8005a152414397e8f93fd7d04 Mon Sep 17 00:00:00 2001 From: HERSENT Date: Tue, 14 Feb 2023 08:26:03 +0000 Subject: [PATCH 4/7] add useAbortSignal option --- integration/grpc-web/parameters.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/grpc-web/parameters.txt b/integration/grpc-web/parameters.txt index 03af13770..30d1b5399 100644 --- a/integration/grpc-web/parameters.txt +++ b/integration/grpc-web/parameters.txt @@ -1 +1 @@ -outputClientImpl=grpc-web +outputClientImpl=grpc-web, useAbortSignal=true From b1b83e7a56a0e89c6b1db4f457a3008262961867 Mon Sep 17 00:00:00 2001 From: HERSENT Date: Tue, 14 Feb 2023 08:27:08 +0000 Subject: [PATCH 5/7] make more readable --- src/generate-grpc-web.ts | 61 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index c4690eda1..9186eee7e 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -242,6 +242,16 @@ function createPromiseUnaryMethod(ctx: Context): Code { const { options } = ctx; const { useAbortSignal } = options; + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + client.close(); + reject(new Error("Aborted")); + } + + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; + return code` unary( methodDesc: T, @@ -271,17 +281,7 @@ function createPromiseUnaryMethod(ctx: Context): Code { }, }); - ${ - useAbortSignal - ? ` - const abortHandler = () => { - client.close(); - reject(new Error("Aborted")); - } - - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` - : "" - } + ${maybeAbortSignal} }); } `; @@ -291,6 +291,14 @@ function createObservableUnaryMethod(ctx: Context): Code { const { options } = ctx; const { useAbortSignal } = options; + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; return code` unary( methodDesc: T, @@ -322,16 +330,7 @@ function createObservableUnaryMethod(ctx: Context): Code { }); - ${ - useAbortSignal - ? ` - const abortHandler = () => { - observer.error("Aborted"); - client.close(); - }; - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` - : "" - } + ${maybeAbortSignal} }).pipe(${take}(1)); } @@ -342,6 +341,15 @@ function createInvokeMethod(ctx: Context) { const { options } = ctx; const { useAbortSignal } = options; + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; + return code` invoke( methodDesc: T, @@ -382,16 +390,7 @@ function createInvokeMethod(ctx: Context) { if (!observer.closed) return client.close() }); - ${ - useAbortSignal - ? ` - const abortHandler = () => { - observer.error("Aborted"); - client.close(); - }; - if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` - : "" - } + ${maybeAbortSignal} }); upStream(); }).pipe(${share}()); From 6c2417dc1eac6c80696149a1eb8c76b0fcc0ec6a Mon Sep 17 00:00:00 2001 From: HERSENT Date: Tue, 14 Feb 2023 08:32:56 +0000 Subject: [PATCH 6/7] add useAbortSignal option --- .../example.ts | 26 ++++++++++++++++--- .../parameters.txt | 2 +- integration/grpc-web/parameters.txt | 2 +- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/integration/grpc-web-no-streaming-observable/example.ts b/integration/grpc-web-no-streaming-observable/example.ts index 8354c8c45..a5067e5bc 100644 --- a/integration/grpc-web-no-streaming-observable/example.ts +++ b/integration/grpc-web-no-streaming-observable/example.ts @@ -321,7 +321,11 @@ export const Empty = { * but with the streaming method removed. */ export interface DashState { - UserSettings(request: DeepPartial, metadata?: grpc.Metadata): Observable; + UserSettings( + request: DeepPartial, + abortSignal?: AbortSignal, + metadata?: grpc.Metadata, + ): Observable; } export class DashStateClientImpl implements DashState { @@ -332,8 +336,12 @@ export class DashStateClientImpl implements DashState { this.UserSettings = this.UserSettings.bind(this); } - UserSettings(request: DeepPartial, metadata?: grpc.Metadata): Observable { - return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), metadata); + UserSettings( + request: DeepPartial, + abortSignal: AbortSignal | undefined, + metadata?: grpc.Metadata, + ): Observable { + return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata); } } @@ -373,6 +381,7 @@ interface Rpc { unary( methodDesc: T, request: any, + abortSignal: AbortSignal | undefined, metadata: grpc.Metadata | undefined, ): Observable; } @@ -404,6 +413,7 @@ export class GrpcWebImpl { unary( methodDesc: T, _request: any, + abortSignal: AbortSignal | undefined, metadata: grpc.Metadata | undefined, ): Observable { const request = { ..._request, ...methodDesc.requestType }; @@ -411,7 +421,7 @@ export class GrpcWebImpl { ? new BrowserHeaders({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Observable((observer) => { - grpc.unary(methodDesc, { + const client = grpc.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -427,6 +437,14 @@ export class GrpcWebImpl { } }, }); + + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) { + abortSignal.addEventListener("abort", abortHandler); + } }).pipe(take(1)); } } diff --git a/integration/grpc-web-no-streaming-observable/parameters.txt b/integration/grpc-web-no-streaming-observable/parameters.txt index f9e39f25b..ffa06e766 100644 --- a/integration/grpc-web-no-streaming-observable/parameters.txt +++ b/integration/grpc-web-no-streaming-observable/parameters.txt @@ -1 +1 @@ -outputClientImpl=grpc-web,returnObservable=true +outputClientImpl=grpc-web,returnObservable=true,useAbortSignal=true diff --git a/integration/grpc-web/parameters.txt b/integration/grpc-web/parameters.txt index 30d1b5399..03af13770 100644 --- a/integration/grpc-web/parameters.txt +++ b/integration/grpc-web/parameters.txt @@ -1 +1 @@ -outputClientImpl=grpc-web, useAbortSignal=true +outputClientImpl=grpc-web From 50689968b868724da33c93a0c310edfe9924179f Mon Sep 17 00:00:00 2001 From: HERSENT Date: Tue, 21 Feb 2023 16:00:50 +0000 Subject: [PATCH 7/7] move abortSignal from 2nd to 3rd arguments --- .../example.ts | 8 ++++---- src/generate-grpc-web.ts | 20 +++++++++---------- src/generate-services.ts | 6 +++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/integration/grpc-web-no-streaming-observable/example.ts b/integration/grpc-web-no-streaming-observable/example.ts index a5067e5bc..519bf7a2b 100644 --- a/integration/grpc-web-no-streaming-observable/example.ts +++ b/integration/grpc-web-no-streaming-observable/example.ts @@ -323,8 +323,8 @@ export const Empty = { export interface DashState { UserSettings( request: DeepPartial, - abortSignal?: AbortSignal, metadata?: grpc.Metadata, + abortSignal?: AbortSignal, ): Observable; } @@ -338,8 +338,8 @@ export class DashStateClientImpl implements DashState { UserSettings( request: DeepPartial, - abortSignal: AbortSignal | undefined, metadata?: grpc.Metadata, + abortSignal?: AbortSignal, ): Observable { return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata); } @@ -381,8 +381,8 @@ interface Rpc { unary( methodDesc: T, request: any, - abortSignal: AbortSignal | undefined, metadata: grpc.Metadata | undefined, + abortSignal?: AbortSignal, ): Observable; } @@ -413,8 +413,8 @@ export class GrpcWebImpl { unary( methodDesc: T, _request: any, - abortSignal: AbortSignal | undefined, metadata: grpc.Metadata | undefined, + abortSignal?: AbortSignal, ): Observable { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = metadata && this.options.metadata diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index 9186eee7e..f78db6300 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -58,8 +58,8 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me return code` ${methodDesc.formattedName}( request: ${inputType}, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata?: grpc.Metadata, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${returns} { throw new Error('ts-proto does not yet support client streaming!'); } @@ -70,8 +70,8 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me return code` ${methodDesc.formattedName}( request: ${inputType}, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata?: grpc.Metadata, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${returns} { return this.rpc.${method}( ${methodDescName(serviceDesc, methodDesc)}, @@ -180,8 +180,8 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre unary( methodDesc: T, request: any, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${wrapper}; `); @@ -190,8 +190,8 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre invoke( methodDesc: T, request: any, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)}; `); } @@ -256,8 +256,8 @@ function createPromiseUnaryMethod(ctx: Context): Code { unary( methodDesc: T, _request: any, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): Promise { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = @@ -303,8 +303,8 @@ function createObservableUnaryMethod(ctx: Context): Code { unary( methodDesc: T, _request: any, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)} { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = @@ -354,8 +354,8 @@ function createInvokeMethod(ctx: Context) { invoke( methodDesc: T, _request: any, - ${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""} - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)} { const upStreamCodes = this.options.upStreamRetryCodes || []; const DEFAULT_TIMEOUT_TIME: number = 3_000; diff --git a/src/generate-services.ts b/src/generate-services.ts index ec9a6a9b5..ff94dd125 100644 --- a/src/generate-services.ts +++ b/src/generate-services.ts @@ -62,9 +62,6 @@ export function generateService( const partialInput = options.outputClientImpl === "grpc-web"; const inputType = requestType(ctx, methodDesc, partialInput); params.push(code`request: ${inputType}`); - if (options.useAbortSignal) { - params.push(code`abortSignal?: AbortSignal`); - } // Use metadata as last argument for interface only configuration if (options.outputClientImpl === "grpc-web") { @@ -78,6 +75,9 @@ export function generateService( const Metadata = imp(options.metadataType); params.push(code`metadata?: ${Metadata}`); } + if (options.useAbortSignal) { + params.push(code`abortSignal?: AbortSignal`); + } if (options.addNestjsRestParameter) { params.push(code`...rest: any`); }