Skip to content

Commit

Permalink
fix(node-http-handler): handle NodeHttp2Handler session failure (#2289)
Browse files Browse the repository at this point in the history
Detect errors on the NodeHttp2Handler, immediately destroy
connections on unexpected error mode, and reconnect.

Prior to this PR, if the server sent the client a GOAWAY frame,
the session would not be removed from the connection pool and requests
would fail indefinitely.

This tries to avoid keeping streams (requests) on the Http2Session
(tcp connection) from being stuck in an open state waiting for a gentle
close even if there were unexpected protocol or connection errors
during the request, assuming http2 errors are rare.
(if a server or load balancer or network is misbehaving,
close() might get stuck waiting for requests to finish,
especially if requests and sessions don't have timeouts?)

Co-authored-by: Trivikram Kamat <16024985+trivikr@users.noreply.github.com>
  • Loading branch information
TysonAndre and trivikr authored May 5, 2021
1 parent 3dd31c4 commit e97e357
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 7 deletions.
109 changes: 109 additions & 0 deletions packages/node-http-handler/src/node-http2-handler.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { HttpRequest } from "@aws-sdk/protocol-http";
import { rejects } from "assert";
import { constants, Http2Stream } from "http2";

import { NodeHttp2Handler } from "./node-http2-handler";
import { createMockHttp2Server, createResponseFunction } from "./server.mock";
Expand Down Expand Up @@ -100,6 +102,113 @@ describe("NodeHttp2Handler", () => {
mockH2Server2.close();
});

const UNEXPECTEDLY_CLOSED_REGEX = /closed|destroy|cancel|did not get a response/i;
it("handles goaway frames", async () => {
const port3 = port + 2;
const mockH2Server3 = createMockHttp2Server().listen(port3);
let establishedConnections = 0;
let numRequests = 0;
let shouldSendGoAway = true;

mockH2Server3.on("stream", (request: Http2Stream) => {
// transmit goaway frame without shutting down the connection
// to simulate an unlikely error mode.
numRequests += 1;
if (shouldSendGoAway) {
request.session.goaway(constants.NGHTTP2_PROTOCOL_ERROR);
}
});
mockH2Server3.on("connection", () => {
establishedConnections += 1;
});
const req = new HttpRequest({ ...getMockReqOptions(), port: port3 });
expect(establishedConnections).toBe(0);
expect(numRequests).toBe(0);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame"
);
expect(establishedConnections).toBe(1);
expect(numRequests).toBe(1);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame"
);
expect(establishedConnections).toBe(2);
expect(numRequests).toBe(2);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame"
);
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);

// should be able to recover from goaway after reconnecting to a server
// that doesn't send goaway, and reuse the TCP connection (Http2Session)
shouldSendGoAway = false;
mockH2Server3.on("request", createResponseFunction(mockResponse));
await nodeH2Handler.handle(req, {});
const result = await nodeH2Handler.handle(req, {});
const resultReader = result.response.body;

// ...and validate that the mocked response is received
const responseBody = await new Promise((resolve) => {
const buffers = [];
resultReader.on("data", (chunk) => buffers.push(chunk));
resultReader.on("end", () => {
resolve(Buffer.concat(buffers).toString("utf8"));
});
});
expect(responseBody).toBe("test");
expect(establishedConnections).toBe(4);
expect(numRequests).toBe(5);
mockH2Server3.close();
});

it("handles connections destroyed by servers", async () => {
const port3 = port + 2;
const mockH2Server3 = createMockHttp2Server().listen(port3);
let establishedConnections = 0;
let numRequests = 0;

mockH2Server3.on("stream", (request: Http2Stream) => {
// transmit goaway frame and then shut down the connection.
numRequests += 1;
request.session.destroy();
});
mockH2Server3.on("connection", () => {
establishedConnections += 1;
});
const req = new HttpRequest({ ...getMockReqOptions(), port: port3 });
expect(establishedConnections).toBe(0);
expect(numRequests).toBe(0);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame or destroyed connection"
);
expect(establishedConnections).toBe(1);
expect(numRequests).toBe(1);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame or destroyed connection"
);
expect(establishedConnections).toBe(2);
expect(numRequests).toBe(2);
await rejects(
nodeH2Handler.handle(req, {}),
UNEXPECTEDLY_CLOSED_REGEX,
"should be rejected promptly due to goaway frame or destroyed connection"
);
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);
mockH2Server3.close();
});

it("closes and removes session on sessionTimeout", async (done) => {
const sessionTimeout = 500;
nodeH2Handler = new NodeHttp2Handler({ sessionTimeout });
Expand Down
60 changes: 53 additions & 7 deletions packages/node-http-handler/src/node-http2-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ export class NodeHttp2Handler implements HttpHandler {
}

handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> {
return new Promise((resolve, reject) => {
return new Promise((resolve, rejectOriginal) => {
// It's redundant to track fulfilled because promises use the first resolution/rejection
// but avoids generating unnecessary stack traces in the "close" event handler.
let fulfilled = false;
const reject = (err: Error) => {
fulfilled = true;
rejectOriginal(err);
};
// if the request was already aborted, prevent doing extra work
if (abortSignal?.aborted) {
const abortError = new Error("Request aborted");
Expand All @@ -70,13 +77,10 @@ export class NodeHttp2Handler implements HttpHandler {
headers: getTransformedHeaders(headers),
body: req,
});
fulfilled = true;
resolve({ response: httpResponse });
});

req.on("error", reject);
req.on("frameError", reject);
req.on("aborted", reject);

const requestTimeout = this.requestTimeout;
if (requestTimeout) {
req.setTimeout(requestTimeout, () => {
Expand All @@ -96,6 +100,20 @@ export class NodeHttp2Handler implements HttpHandler {
};
}

// Set up handlers for errors
req.on("frameError", reject);
req.on("error", reject);
req.on("goaway", reject);
req.on("aborted", reject);

// The HTTP/2 error code used when closing the stream can be retrieved using the
// http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0),
// an 'error' event will have also been emitted.
req.on("close", () => {
if (!fulfilled) {
reject(new Error("Unexpected error: http2 request did not get a response"));
}
});
writeRequestBody(req, request);
});
}
Expand All @@ -107,14 +125,42 @@ export class NodeHttp2Handler implements HttpHandler {

const newSession = connect(authority);
connectionPool.set(authority, newSession);
const destroySessionCb = () => {
this.destroySession(authority, newSession);
};
newSession.on("goaway", destroySessionCb);
newSession.on("error", destroySessionCb);
newSession.on("frameError", destroySessionCb);

const sessionTimeout = this.sessionTimeout;
if (sessionTimeout) {
newSession.setTimeout(sessionTimeout, () => {
newSession.close();
connectionPool.delete(authority);
if (connectionPool.get(authority) === newSession) {
newSession.close();
connectionPool.delete(authority);
}
});
}
return newSession;
}

/**
* Destroy a session immediately and remove it from the http2 pool.
*
* This check ensures that the session is only closed once
* and that an event on one session does not close a different session.
*/
private destroySession(authority: string, session: ClientHttp2Session): void {
if (this.connectionPool.get(authority) !== session) {
// Already closed?
return;
}
this.connectionPool.delete(authority);
session.removeAllListeners("goaway");
session.removeAllListeners("error");
session.removeAllListeners("frameError");
if (!session.destroyed) {
session.destroy();
}
}
}

0 comments on commit e97e357

Please sign in to comment.