Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close the session on error in client transports #623

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions packages/connect-node-test/src/helpers/testserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ export function createTestServers() {
return;
}
nodeH2SecureServer.close((err) => (err ? reject(err) : resolve()));
// TODO this resolve is only there because we currently don't manage http2 sessions in the client, and the server doesn't shut down with an open connection
resolve(); // the server.close() callback above slows down our tests
});
},
},
Expand Down Expand Up @@ -166,8 +164,6 @@ export function createTestServers() {
return;
}
nodeH2cServer.close((err) => (err ? reject(err) : resolve()));
// TODO this resolve is only there because we currently don't manage http2 sessions in the client, and the server doesn't shut down with an open connection
resolve(); // the server.close() callback above slows down our tests
});
},
},
Expand Down Expand Up @@ -287,12 +283,11 @@ export function createTestServers() {
});
await fastifyH2cServer.listen();
},
stop() {
async stop() {
if (!fastifyH2cServer) {
throw new Error("fastifyH2cServer not started");
}
void fastifyH2cServer.close(); // await close() slows down our tests
return Promise.resolve();
await fastifyH2cServer.close();
},
},
// connect-express
Expand Down
2 changes: 1 addition & 1 deletion packages/connect-node/src/node-transport-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type CommonNodeTransportOptions = NodeHttpClientOptions &
* Asserts that the options are within sane limits, and returns default values
* where no value is provided.
*
* @internal
* @private Internal code, does not follow semantic versioning.
*/
export function validateNodeTransportOptions(
options: CommonNodeTransportOptions
Expand Down
2 changes: 1 addition & 1 deletion packages/connect-web-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ it like a web server would usually do.

| code generator | bundle size | minified | compressed |
|----------------|-------------------:|-----------------------:|---------------------:|
| connect | 107,197 b | 46,935 b | 12,539 b |
| connect | 107,175 b | 46,924 b | 12,556 b |
| grpc-web | 414,906 b | 301,127 b | 53,279 b |
241 changes: 241 additions & 0 deletions packages/connect/src/protocol-connect/transport.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import { Int32Value, MethodKind, StringValue } from "@bufbuild/protobuf";
import type {
UniversalClientRequest,
UniversalClientResponse,
} from "../protocol/universal.js";
import { createAsyncIterable } from "../protocol/async-iterable.js";
import { ConnectError, connectErrorFromReason } from "../connect-error.js";
import { createTransport } from "./transport.js";
import { encodeEnvelope } from "../protocol/envelope.js";
import { createEndStreamSerialization, endStreamFlag } from "./end-stream.js";
import { Code } from "../code.js";
import {
contentTypeStreamProto,
contentTypeUnaryProto,
} from "./content-type.js";
import type { Transport } from "../transport.js";
import { errorToJsonBytes } from "./error-json.js";

const TestService = {
typeName: "TestService",
methods: {
unary: {
name: "Unary",
I: Int32Value,
O: StringValue,
kind: MethodKind.Unary,
},
server: {
name: "Server",
I: Int32Value,
O: StringValue,
kind: MethodKind.ServerStreaming,
},
client: {
name: "Client",
I: Int32Value,
O: StringValue,
kind: MethodKind.ClientStreaming,
},
biDi: {
name: "BiDi",
I: Int32Value,
O: StringValue,
kind: MethodKind.BiDiStreaming,
},
},
} as const;

describe("Connect transport", function () {
const defaultOptions = {
baseUrl: "http://example.com",
interceptors: [],
acceptCompression: [],
compressMinBytes: 0,
readMaxBytes: 0xffffff,
sendCompression: null,
useBinaryFormat: true,
writeMaxBytes: 0xffffff,
};
describe("against server responding with unexpected content type", function () {
let httpRequestAborted = false;
let transport: Transport = null as unknown as Transport;
beforeEach(function () {
httpRequestAborted = false;
transport = createTransport({
httpClient(
request: UniversalClientRequest
): Promise<UniversalClientResponse> {
request.signal?.addEventListener(
"abort",
() => (httpRequestAborted = true)
);
return Promise.resolve({
status: 200,
header: new Headers({
"Content-Type": "application/csv",
}),
body: createAsyncIterable([]),
trailer: new Headers(),
});
},
...defaultOptions,
});
});
it("should cancel the HTTP request for unary", async function () {
try {
await transport.unary(
TestService,
TestService.methods.unary,
undefined,
undefined,
undefined,
{}
);
fail("expected error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).message).toBe(
'[invalid_argument] unexpected response content type "application/csv"'
);
}
expect(httpRequestAborted).toBeTrue();
});
it("should cancel the HTTP request for server-streaming", async function () {
try {
await transport.stream(
TestService,
TestService.methods.unary,
undefined,
undefined,
undefined,
createAsyncIterable([])
);
fail("expected error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).message).toBe(
'[invalid_argument] unexpected response content type "application/csv"'
);
}
expect(httpRequestAborted).toBeTrue();
});
});
describe("against server responding with an error", function () {
describe("for unary", function () {
let httpRequestAborted = false;
const t = createTransport({
httpClient(
request: UniversalClientRequest
): Promise<UniversalClientResponse> {
request.signal?.addEventListener(
"abort",
() => (httpRequestAborted = true)
);
return Promise.resolve({
status: 429,
header: new Headers({
"Content-Type": contentTypeUnaryProto,
}),
body: createAsyncIterable([
errorToJsonBytes(
new ConnectError("foo", Code.ResourceExhausted),
{}
),
]),
trailer: new Headers(),
});
},
...defaultOptions,
});
it("should cancel the HTTP request", async function () {
try {
await t.unary(
TestService,
TestService.methods.unary,
undefined,
undefined,
undefined,
{}
);
fail("expected error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).message).toBe(
"[resource_exhausted] foo"
);
}
expect(httpRequestAborted).toBeTrue();
});
});
describe("for server-streaming", function () {
let httpRequestAborted = false;
const t = createTransport({
httpClient(
request: UniversalClientRequest
): Promise<UniversalClientResponse> {
request.signal?.addEventListener(
"abort",
() => (httpRequestAborted = true)
);
return Promise.resolve({
status: 200,
header: new Headers({
"Content-Type": contentTypeStreamProto,
}),
body: createAsyncIterable([
encodeEnvelope(0, new StringValue({ value: "abc" }).toBinary()),
encodeEnvelope(
endStreamFlag,
createEndStreamSerialization({}).serialize({
metadata: new Headers(),
error: new ConnectError("foo", Code.ResourceExhausted),
})
),
]),
trailer: new Headers(),
});
},
...defaultOptions,
});
it("should cancel the HTTP request", async function () {
const res = await t.stream(
TestService,
TestService.methods.server,
undefined,
undefined,
undefined,
createAsyncIterable([])
);
const messagesReceived: StringValue[] = [];
try {
for await (const m of res.message) {
messagesReceived.push(m);
}
fail("expected error");
} catch (e) {
expect(e).toBeInstanceOf(ConnectError);
expect(connectErrorFromReason(e).message).toBe(
"[resource_exhausted] foo"
);
}
expect(messagesReceived.length).toBe(1);
expect(httpRequestAborted).toBeTrue();
});
});
});
});
Loading