Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of useAbortSignal option for grpc-web #777

Merged
merged 7 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion integration/grpc-web/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,11 @@ export class GrpcWebImpl {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) {
return client.close();
}
});
});
upStream();
}).pipe(share());
Expand Down
65 changes: 62 additions & 3 deletions src/generate-grpc-web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export function generateGrpcClientImpl(
/** Creates the RPC methods that client code actually calls. */
function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) {
assertInstanceOf(methodDesc, FormattedMethodDescriptor);
const { options } = ctx;
const { useAbortSignal } = options;
const requestMessage = rawRequestType(ctx, methodDesc);
const inputType = requestType(ctx, methodDesc, true);
const returns = responsePromiseOrObservable(ctx, methodDesc);
Expand All @@ -56,6 +58,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
return code`
${methodDesc.formattedName}(
request: ${inputType},
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata?: grpc.Metadata,
): ${returns} {
throw new Error('ts-proto does not yet support client streaming!');
Expand All @@ -67,11 +70,13 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
return code`
${methodDesc.formattedName}(
request: ${inputType},
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata?: grpc.Metadata,
): ${returns} {
return this.rpc.${method}(
${methodDescName(serviceDesc, methodDesc)},
${requestMessage}.fromPartial(request),
${useAbortSignal ? "abortSignal," : ""}
metadata,
);
}
Expand Down Expand Up @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code
/** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */
function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code {
const chunks: Code[] = [];
const { options } = ctx;
const { useAbortSignal } = options;

chunks.push(code`interface Rpc {`);

Expand All @@ -173,6 +180,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined,
): ${wrapper}<any>;
`);
Expand All @@ -182,6 +190,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined,
): ${observableType(ctx)}<any>;
`);
Expand Down Expand Up @@ -230,10 +239,14 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami
}

function createPromiseUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): Promise<any> {
const request = { ..._request, ...methodDesc.requestType };
Expand All @@ -242,7 +255,7 @@ function createPromiseUnaryMethod(ctx: Context): Code {
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Promise((resolve, reject) => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -257,16 +270,32 @@ function createPromiseUnaryMethod(ctx: Context): Code {
}
},
});

${
useAbortSignal
? `
const abortHandler = () => {
client.close();
reject(new Error("Aborted"));
}

if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: ""
}
});
}
`;
}

function createObservableUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): ${observableType(ctx)}<any> {
const request = { ..._request, ...methodDesc.requestType };
Expand All @@ -275,7 +304,7 @@ function createObservableUnaryMethod(ctx: Context): Code {
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable(observer => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -291,16 +320,33 @@ function createObservableUnaryMethod(ctx: Context): Code {
}
},
});


${
useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: ""
}

}).pipe(${take}(1));
}
`;
}

function createInvokeMethod(ctx: Context) {
const { options } = ctx;
const { useAbortSignal } = options;

return code`
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): ${observableType(ctx)}<any> {
const upStreamCodes = this.options.upStreamRetryCodes || [];
Expand Down Expand Up @@ -332,7 +378,20 @@ function createInvokeMethod(ctx: Context) {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) return client.close()
});

${
useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: ""
}
});
upStream();
}).pipe(${share}());
Expand Down