diff --git a/integration/grpc-web-no-streaming-observable/example.ts b/integration/grpc-web-no-streaming-observable/example.ts index 8354c8c45..519bf7a2b 100644 --- a/integration/grpc-web-no-streaming-observable/example.ts +++ b/integration/grpc-web-no-streaming-observable/example.ts @@ -321,7 +321,11 @@ export const Empty = { * but with the streaming method removed. */ export interface DashState { - UserSettings(request: DeepPartial, metadata?: grpc.Metadata): Observable; + UserSettings( + request: DeepPartial, + metadata?: grpc.Metadata, + abortSignal?: AbortSignal, + ): Observable; } export class DashStateClientImpl implements DashState { @@ -332,8 +336,12 @@ export class DashStateClientImpl implements DashState { this.UserSettings = this.UserSettings.bind(this); } - UserSettings(request: DeepPartial, metadata?: grpc.Metadata): Observable { - return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), metadata); + UserSettings( + request: DeepPartial, + metadata?: grpc.Metadata, + abortSignal?: AbortSignal, + ): Observable { + return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata); } } @@ -374,6 +382,7 @@ interface Rpc { methodDesc: T, request: any, metadata: grpc.Metadata | undefined, + abortSignal?: AbortSignal, ): Observable; } @@ -405,13 +414,14 @@ export class GrpcWebImpl { methodDesc: T, _request: any, metadata: grpc.Metadata | undefined, + abortSignal?: AbortSignal, ): Observable { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = metadata && this.options.metadata ? new BrowserHeaders({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Observable((observer) => { - grpc.unary(methodDesc, { + const client = grpc.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -427,6 +437,14 @@ export class GrpcWebImpl { } }, }); + + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) { + abortSignal.addEventListener("abort", abortHandler); + } }).pipe(take(1)); } } diff --git a/integration/grpc-web-no-streaming-observable/parameters.txt b/integration/grpc-web-no-streaming-observable/parameters.txt index f9e39f25b..ffa06e766 100644 --- a/integration/grpc-web-no-streaming-observable/parameters.txt +++ b/integration/grpc-web-no-streaming-observable/parameters.txt @@ -1 +1 @@ -outputClientImpl=grpc-web,returnObservable=true +outputClientImpl=grpc-web,returnObservable=true,useAbortSignal=true diff --git a/integration/grpc-web/example.ts b/integration/grpc-web/example.ts index a636468af..1e7b9fb4b 100644 --- a/integration/grpc-web/example.ts +++ b/integration/grpc-web/example.ts @@ -918,7 +918,11 @@ export class GrpcWebImpl { } }, }); - observer.add(() => client.close()); + observer.add(() => { + if (!observer.closed) { + return client.close(); + } + }); }); upStream(); }).pipe(share()); diff --git a/src/generate-grpc-web.ts b/src/generate-grpc-web.ts index 2bfdb1efe..f78db6300 100644 --- a/src/generate-grpc-web.ts +++ b/src/generate-grpc-web.ts @@ -48,6 +48,8 @@ export function generateGrpcClientImpl( /** Creates the RPC methods that client code actually calls. */ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) { assertInstanceOf(methodDesc, FormattedMethodDescriptor); + const { options } = ctx; + const { useAbortSignal } = options; const requestMessage = rawRequestType(ctx, methodDesc); const inputType = requestType(ctx, methodDesc, true); const returns = responsePromiseOrObservable(ctx, methodDesc); @@ -57,6 +59,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me ${methodDesc.formattedName}( request: ${inputType}, metadata?: grpc.Metadata, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${returns} { throw new Error('ts-proto does not yet support client streaming!'); } @@ -68,10 +71,12 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me ${methodDesc.formattedName}( request: ${inputType}, metadata?: grpc.Metadata, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${returns} { return this.rpc.${method}( ${methodDescName(serviceDesc, methodDesc)}, ${requestMessage}.fromPartial(request), + ${useAbortSignal ? "abortSignal," : ""} metadata, ); } @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code /** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code { const chunks: Code[] = []; + const { options } = ctx; + const { useAbortSignal } = options; chunks.push(code`interface Rpc {`); @@ -174,6 +181,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre methodDesc: T, request: any, metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${wrapper}; `); @@ -183,6 +191,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre methodDesc: T, request: any, metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)}; `); } @@ -230,11 +239,25 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami } function createPromiseUnaryMethod(ctx: Context): Code { + const { options } = ctx; + const { useAbortSignal } = options; + + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + client.close(); + reject(new Error("Aborted")); + } + + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; + return code` unary( methodDesc: T, _request: any, - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): Promise { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = @@ -242,7 +265,7 @@ function createPromiseUnaryMethod(ctx: Context): Code { ? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Promise((resolve, reject) => { - ${grpc}.unary(methodDesc, { + ${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -257,17 +280,31 @@ function createPromiseUnaryMethod(ctx: Context): Code { } }, }); + + ${maybeAbortSignal} }); } `; } function createObservableUnaryMethod(ctx: Context): Code { + const { options } = ctx; + const { useAbortSignal } = options; + + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; return code` unary( methodDesc: T, _request: any, - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)} { const request = { ..._request, ...methodDesc.requestType }; const maybeCombinedMetadata = @@ -275,7 +312,7 @@ function createObservableUnaryMethod(ctx: Context): Code { ? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap }) : metadata || this.options.metadata; return new Observable(observer => { - ${grpc}.unary(methodDesc, { + ${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, { request, host: this.host, metadata: maybeCombinedMetadata, @@ -291,17 +328,34 @@ function createObservableUnaryMethod(ctx: Context): Code { } }, }); + + + ${maybeAbortSignal} + }).pipe(${take}(1)); } `; } function createInvokeMethod(ctx: Context) { + const { options } = ctx; + const { useAbortSignal } = options; + + const maybeAbortSignal = useAbortSignal + ? ` + const abortHandler = () => { + observer.error("Aborted"); + client.close(); + }; + if (abortSignal) abortSignal.addEventListener("abort", abortHandler);` + : ""; + return code` invoke( methodDesc: T, _request: any, - metadata: grpc.Metadata | undefined + metadata: grpc.Metadata | undefined, + ${useAbortSignal ? "abortSignal?: AbortSignal," : ""} ): ${observableType(ctx)} { const upStreamCodes = this.options.upStreamRetryCodes || []; const DEFAULT_TIMEOUT_TIME: number = 3_000; @@ -332,7 +386,11 @@ function createInvokeMethod(ctx: Context) { } }, }); - observer.add(() => client.close()); + observer.add(() => { + if (!observer.closed) return client.close() + }); + + ${maybeAbortSignal} }); upStream(); }).pipe(${share}()); diff --git a/src/generate-services.ts b/src/generate-services.ts index ec9a6a9b5..ff94dd125 100644 --- a/src/generate-services.ts +++ b/src/generate-services.ts @@ -62,9 +62,6 @@ export function generateService( const partialInput = options.outputClientImpl === "grpc-web"; const inputType = requestType(ctx, methodDesc, partialInput); params.push(code`request: ${inputType}`); - if (options.useAbortSignal) { - params.push(code`abortSignal?: AbortSignal`); - } // Use metadata as last argument for interface only configuration if (options.outputClientImpl === "grpc-web") { @@ -78,6 +75,9 @@ export function generateService( const Metadata = imp(options.metadataType); params.push(code`metadata?: ${Metadata}`); } + if (options.useAbortSignal) { + params.push(code`abortSignal?: AbortSignal`); + } if (options.addNestjsRestParameter) { params.push(code`...rest: any`); }