Skip to content

Support client-side cancellation in the router transport #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions packages/connect-node-test/src/crosstest/cancel_after_begin.spec.ts
Original file line number Diff line number Diff line change
@@ -28,45 +28,38 @@ describe("cancel_after_begin", function () {
}
}

servers.describeTransportsExcluding(
[
// TODO(TCN-1763) support client-side cancellation in createRouterTransport()
"@bufbuild/connect (ConnectRouter, binary)",
"@bufbuild/connect (ConnectRouter, JSON)",
],
(transport) => {
const servers = createTestServers();
beforeAll(async () => await servers.start());
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
const controller = new AbortController();
async function* input() {
yield {
payload: {
body: new Uint8Array(),
type: PayloadType.COMPRESSABLE,
},
};
await new Promise((resolve) => setTimeout(resolve, 1));
controller.abort();
yield {
payload: {
body: new Uint8Array(),
type: PayloadType.COMPRESSABLE,
},
};
}
try {
await client.streamingInputCall(input(), {
signal: controller.signal,
});
fail("expected error");
} catch (e) {
expectError(e);
}
});
}
);
servers.describeTransports((transport) => {
const servers = createTestServers();
beforeAll(async () => await servers.start());
it("with promise client", async function () {
const client = createPromiseClient(TestService, transport());
const controller = new AbortController();
async function* input() {
yield {
payload: {
body: new Uint8Array(),
type: PayloadType.COMPRESSABLE,
},
};
await new Promise((resolve) => setTimeout(resolve, 1));
controller.abort();
yield {
payload: {
body: new Uint8Array(),
type: PayloadType.COMPRESSABLE,
},
};
}
try {
await client.streamingInputCall(input(), {
signal: controller.signal,
});
fail("expected error");
} catch (e) {
expectError(e);
}
});
});

afterAll(async () => await servers.stop());
});
Original file line number Diff line number Diff line change
@@ -35,10 +35,6 @@ describe("cancel_after_first_response", function () {

servers.describeTransportsExcluding(
[
// TODO(TCN-1763) support client-side cancellation in createRouterTransport()
"@bufbuild/connect (ConnectRouter, binary)",
"@bufbuild/connect (ConnectRouter, JSON)",

// All following Transports run over HTTP/1, which cannot support full-duplex.
"@bufbuild/connect-node (Connect, JSON, http) against @bufbuild/connect-node (h1)",
"@bufbuild/connect-node (Connect, binary, http) against @bufbuild/connect-node (h1)",
173 changes: 83 additions & 90 deletions packages/connect-web-test/src/cancellation.spec.ts
Original file line number Diff line number Diff line change
@@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import type { CallOptions } from "@bufbuild/connect";
import {
Code,
ConnectError,
createCallbackClient,
createPromiseClient,
} from "@bufbuild/connect";
import type { CallOptions } from "@bufbuild/connect";
import { describeTransportsExcluding } from "./helpers/crosstestserver.js";
import { describeTransports } from "./helpers/crosstestserver.js";
import { TestService } from "./gen/grpc/testing/test_connect.js";

describe("explicit cancellation with AbortController", function () {
@@ -28,96 +28,89 @@ describe("explicit cancellation with AbortController", function () {
const options: Readonly<CallOptions> = {
signal: abort.signal,
};
describeTransportsExcluding(
[
// TODO(TCN-1763) support client-side cancellation in createRouterTransport()
"@bufbuild/connect-web (ConnectRouter, JSON)",
"@bufbuild/connect-web (ConnectRouter, binary)",
],
(transport) => {
describe("with promise client", () => {
const client = createPromiseClient(TestService, transport());
it("works for unary method", async () => {
let caughtError = false;
try {
await client.unaryCall({}, options);
} catch (e) {
caughtError = true;
expect(e).toBeInstanceOf(ConnectError);
if (e instanceof ConnectError) {
expect(e.code).toBe(Code.Canceled);
}
describeTransports((transport) => {
describe("with promise client", () => {
const client = createPromiseClient(TestService, transport());
it("works for unary method", async () => {
let caughtError = false;
try {
await client.unaryCall({}, options);
} catch (e) {
caughtError = true;
expect(e).toBeInstanceOf(ConnectError);
if (e instanceof ConnectError) {
expect(e.code).toBe(Code.Canceled);
}
}
expect(caughtError).toBeTrue();
});
it("works for server-streaming method", async () => {
let caughtError = false;
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _res of client.streamingOutputCall({}, options)) {
//
}
expect(caughtError).toBeTrue();
});
it("works for server-streaming method", async () => {
let caughtError = false;
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _res of client.streamingOutputCall({}, options)) {
//
}
} catch (e) {
caughtError = true;
expect(e).toBeInstanceOf(ConnectError);
if (e instanceof ConnectError) {
expect(e.code).toBe(Code.Canceled);
}
} catch (e) {
caughtError = true;
expect(e).toBeInstanceOf(ConnectError);
if (e instanceof ConnectError) {
expect(e.code).toBe(Code.Canceled);
}
expect(caughtError).toBeTrue();
});
}
expect(caughtError).toBeTrue();
});
});
describe("with callback client", () => {
const client = createCallbackClient(TestService, transport());
it("works for unary method", (done) => {
client.unaryCall(
{},
() => {
fail("expected callback client to swallow AbortError");
},
options
);
setTimeout(done, 50);
});
it("works for unary method with returned cancel-fn", (done) => {
const cancelFn = client.unaryCall(
{},
() => {
fail("expected callback client to swallow AbortError");
},
options
);
cancelFn();
setTimeout(done, 50);
});
it("works for server-streaming method", (done) => {
client.streamingOutputCall(
{},
() => {
fail("expected call to cancel right away, but got message");
},
(error) => {
expect(error).toBeUndefined();
},
options
);
setTimeout(done, 50);
});
describe("with callback client", () => {
const client = createCallbackClient(TestService, transport());
it("works for unary method", (done) => {
client.unaryCall(
{},
() => {
fail("expected callback client to swallow AbortError");
},
options
);
setTimeout(done, 50);
});
it("works for unary method with returned cancel-fn", (done) => {
const cancelFn = client.unaryCall(
{},
() => {
fail("expected callback client to swallow AbortError");
},
options
);
cancelFn();
setTimeout(done, 50);
});
it("works for server-streaming method", (done) => {
client.streamingOutputCall(
{},
() => {
fail("expected call to cancel right away, but got message");
},
(error) => {
expect(error).toBeUndefined();
},
options
);
setTimeout(done, 50);
});
it("works for server-streaming method with returned cancel-fn", (done) => {
const cancelFn = client.streamingOutputCall(
{},
() => {
fail("expected call to cancel right away, but got message");
},
(error) => {
expect(error).toBeUndefined();
},
options
);
cancelFn();
setTimeout(done, 50);
});
it("works for server-streaming method with returned cancel-fn", (done) => {
const cancelFn = client.streamingOutputCall(
{},
() => {
fail("expected call to cancel right away, but got message");
},
(error) => {
expect(error).toBeUndefined();
},
options
);
cancelFn();
setTimeout(done, 50);
});
}
);
});
});
});
62 changes: 52 additions & 10 deletions packages/connect/src/protocol/universal-handler-client.ts
Original file line number Diff line number Diff line change
@@ -14,9 +14,10 @@

import { Code } from "../code.js";
import { ConnectError } from "../connect-error.js";
import { createAsyncIterable } from "./async-iterable.js";
import { createAsyncIterable, pipe } from "./async-iterable.js";
import type { UniversalHandler } from "./universal-handler.js";
import type { UniversalClientFn } from "./universal.js";
import { getAbortSignalReason } from "./signals.js";

/**
* An in-memory UniversalClientFn that can be used to route requests to a ConnectRouter
@@ -38,23 +39,64 @@ export function createUniversalHandlerClient(
Code.Unimplemented
);
}
const uServerRes = await handler({
body: uClientReq.body,
httpVersion: "2.0",
method: uClientReq.method,
url: reqUrl,
header: uClientReq.header,
signal: uClientReq.signal ?? new AbortController().signal,
});
const reqSignal = uClientReq.signal ?? new AbortController().signal;
const uServerRes = await raceSignal(
reqSignal,
handler({
body: uClientReq.body,
httpVersion: "2.0",
method: uClientReq.method,
url: reqUrl,
header: uClientReq.header,
signal: reqSignal,
})
);
let body = uServerRes.body ?? new Uint8Array();
if (body instanceof Uint8Array) {
body = createAsyncIterable([body]);
}
return {
body: body,
body: pipe(body, (iterable) => {
return {
[Symbol.asyncIterator]() {
const it = iterable[Symbol.asyncIterator]();
const w: AsyncIterator<Uint8Array> = {
next() {
return raceSignal(reqSignal, it.next());
},
};
if (it.throw !== undefined) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- can't handle mutated object sensibly
w.throw = (e: unknown) => it.throw!(e);
}
if (it.return !== undefined) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion,@typescript-eslint/no-explicit-any -- can't handle mutated object sensibly
w.return = (value?: any) => it.return!(value);
}
return w;
},
};
}),
header: new Headers(uServerRes.header),
status: uServerRes.status,
trailer: new Headers(uServerRes.trailer),
};
};
}

/**
* Wrap a promise, and reject early if the given signal triggers before the
* promise is settled.
*/
function raceSignal<T>(signal: AbortSignal, promise: Promise<T>): Promise<T> {
let cleanup: (() => void) | undefined;
const signalPromise = new Promise<never>((_, reject) => {
const onAbort = () => reject(getAbortSignalReason(signal));
if (signal.aborted) {
return onAbort();
}
signal.addEventListener("abort", onAbort);
cleanup = () => signal.removeEventListener("abort", onAbort);
});
return Promise.race([signalPromise, promise]).finally(cleanup);
}