Skip to content

Commit

Permalink
Fix flush() to retry fetch() on transient errors
Browse files Browse the repository at this point in the history
This changes flush() to retry the fetch() call on transient errors such
as EPIPE, ECONNREFUSED, and ECONNRESET.
  • Loading branch information
penberg committed Jan 21, 2024
1 parent 5db0765 commit d48eff1
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/http/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { CursorReqBody as json_CursorReqBody } from "./json_encode.js";
import { CursorReqBody as protobuf_CursorReqBody } from "./protobuf_encode.js";
import { PipelineRespBody as json_PipelineRespBody } from "./json_decode.js";
import { PipelineRespBody as protobuf_PipelineRespBody } from "./protobuf_decode.js";
import { FetchCursorReq } from "../ws/proto.js";
import { FetchError } from "node-fetch";

type QueueEntry = PipelineEntry | CursorEntry;

Expand Down Expand Up @@ -329,8 +331,7 @@ export class HttpStream extends Stream implements SqlOwner {
let promise;
try {
const request = createRequest();
const fetch = this.#fetch;
promise = fetch(request);
promise = this.#fetchWithRetry(request);
} catch (error) {
promise = Promise.reject(error);
}
Expand All @@ -356,6 +357,19 @@ export class HttpStream extends Stream implements SqlOwner {
});
}

#fetchWithRetry(request: Request, retryCount = 3): Promise<Response> {
try {
return this.#fetch(request);
} catch (error: any) {
if (isRetryableError(error)) {
if (retryCount > 0) {
return this.#fetchWithRetry(request, retryCount - 1);
}
}
throw error;
}
}

#createPipelineRequest(pipeline: Array<PipelineEntry>, endpoint: Endpoint): Request {
return this.#createRequest<proto.PipelineReqBody>(
new URL(endpoint.pipelinePath, this.#baseUrl),
Expand Down Expand Up @@ -417,6 +431,15 @@ export class HttpStream extends Stream implements SqlOwner {
}
}

function isRetryableError(error: any): boolean {
if (!error.errno) {
return false;
}
return error.errno === "EPIPE"
|| error.errno === "ECONNREFUSED"
|| error.errno === "ECONNRESET";
}

function handlePipelineResponse(pipeline: Array<PipelineEntry>, respBody: proto.PipelineRespBody): void {
if (respBody.results.length !== pipeline.length) {
throw new ProtoError("Server returned unexpected number of pipeline results");
Expand Down

0 comments on commit d48eff1

Please sign in to comment.