Skip to content

Commit

Permalink
add unit tests for pooled fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
sashankaryal committed Nov 26, 2024
1 parent 7d5d7c6 commit abc6a13
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 11 deletions.
18 changes: 13 additions & 5 deletions app/packages/looker/src/worker/decorated-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ const DEFAULT_BASE_DELAY = 200;
// list of HTTP status codes that are client errors (4xx) and should not be retried
const NON_RETRYABLE_STATUS_CODES = [400, 401, 403, 404, 405, 422];

export interface RetryOptions {
retries: number;
delay: number;
}
class NonRetryableError extends Error {
constructor(message: string) {
super(message);
Expand All @@ -13,10 +17,12 @@ class NonRetryableError extends Error {
export const fetchWithLinearBackoff = async (
url: string,
opts: RequestInit = {},
retries = DEFAULT_MAX_RETRIES,
delay = DEFAULT_BASE_DELAY
retry: RetryOptions = {
retries: DEFAULT_MAX_RETRIES,
delay: DEFAULT_BASE_DELAY,
}
) => {
for (let i = 0; i < retries; i++) {
for (let i = 0; i < retry.retries; i++) {
try {
const response = await fetch(url, opts);
if (response.ok) {
Expand All @@ -36,8 +42,10 @@ export const fetchWithLinearBackoff = async (
// immediately throw
throw e;
}
if (i < retries - 1) {
await new Promise((resolve) => setTimeout(resolve, delay * (i + 1)));
if (i < retry.retries - 1) {
await new Promise((resolve) =>
setTimeout(resolve, retry.delay * (i + 1))
);
} else {
// max retries reached
throw new Error(
Expand Down
110 changes: 110 additions & 0 deletions app/packages/looker/src/worker/pooled-fetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { beforeEach, describe, expect, it, Mock, vi } from "vitest";
import { enqueueFetch } from "./pooled-fetch";

const MAX_CONCURRENT_REQUESTS = 100;

// helper function to create a deferred promise
function createDeferredPromise<T>() {
let resolve: (value: T | PromiseLike<T>) => void;
let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve!, reject: reject! };
}

describe("enqueueFetch", () => {
let mockedFetch: Mock;

beforeEach(() => {
vi.resetAllMocks();
mockedFetch = vi.fn();
global.fetch = mockedFetch;
});

it("should return response when fetch succeeds", async () => {
const mockResponse = new Response("OK", { status: 200 });
mockedFetch.mockResolvedValue(mockResponse);

const response = await enqueueFetch({ url: "https://fiftyone.ai" });
expect(response).toBe(mockResponse);
});

it("should process multiple requests in order", async () => {
const mockResponse1 = new Response("First", { status: 200 });
const mockResponse2 = new Response("Second", { status: 200 });

const deferred1 = createDeferredPromise<Response>();
const deferred2 = createDeferredPromise<Response>();

mockedFetch
.mockImplementationOnce(() => deferred1.promise)
.mockImplementationOnce(() => deferred2.promise);

const promise1 = enqueueFetch({ url: "https://fiftyone.ai/1" });
const promise2 = enqueueFetch({ url: "https://fiftyone.ai/2" });

deferred1.resolve(mockResponse1);

const response1 = await promise1;
expect(response1).toBe(mockResponse1);

deferred2.resolve(mockResponse2);

const response2 = await promise2;
expect(response2).toBe(mockResponse2);
});

it("should not exceed MAX_CONCURRENT_REQUESTS", async () => {
const numRequests = MAX_CONCURRENT_REQUESTS + 50;
const deferredPromises = [];

for (let i = 0; i < numRequests; i++) {
const deferred = createDeferredPromise<Response>();
deferredPromises.push(deferred);
mockedFetch.mockImplementationOnce(() => deferred.promise);
enqueueFetch({ url: `https://fiftyone.ai/${i}` });
}

// at this point, fetch should have been called MAX_CONCURRENT_REQUESTS times
expect(mockedFetch).toHaveBeenCalledTimes(MAX_CONCURRENT_REQUESTS);

// resolve all deferred promises
deferredPromises.forEach((deferred, index) => {
deferred.resolve(new Response(`Response ${index}`, { status: 200 }));
});

// wait for all promises to resolve
await Promise.all(deferredPromises.map((dp) => dp.promise));

// all requests should have been processed
expect(mockedFetch).toHaveBeenCalledTimes(numRequests);
});

it("should reject immediately on non-retryable error", async () => {
const mockResponse = new Response("Not Found", { status: 404 });
mockedFetch.mockResolvedValue(mockResponse);

await expect(enqueueFetch({ url: "https://fiftyone.ai" })).rejects.toThrow(
"Non-retryable HTTP error: 404"
);
});

it("should retry on retryable errors up to MAX_RETRIES times", async () => {
const MAX_RETRIES = 3;
mockedFetch.mockRejectedValue(new Error("Network Error"));

await expect(
enqueueFetch({
url: "https://fiftyone.ai",
retryOptions: {
retries: MAX_RETRIES,
delay: 50,
},
})
).rejects.toThrow("Max retries for fetch reached");

expect(mockedFetch).toHaveBeenCalledTimes(MAX_RETRIES);
});
});
12 changes: 6 additions & 6 deletions app/packages/looker/src/worker/pooled-fetch.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { fetchWithLinearBackoff } from "./decorated-fetch";
import { fetchWithLinearBackoff, RetryOptions } from "./decorated-fetch";

interface QueueItem {
request: {
url: string;
options?: RequestInit;
retryOptions?: RetryOptions;
};
resolve: (value: Response | PromiseLike<Response>) => void;
reject: (reason?: any) => void;
Expand All @@ -15,10 +16,9 @@ const MAX_CONCURRENT_REQUESTS = 100;
let activeRequests = 0;
const requestQueue: QueueItem[] = [];

export const enqueueFetch = (request: {
url: string;
options?: RequestInit;
}): Promise<Response> => {
export const enqueueFetch = (
request: QueueItem["request"]
): Promise<Response> => {
return new Promise((resolve, reject) => {
requestQueue.push({ request, resolve, reject });
processFetchQueue();
Expand All @@ -33,7 +33,7 @@ const processFetchQueue = () => {
const { request, resolve, reject } = requestQueue.shift();
activeRequests++;

fetchWithLinearBackoff(request.url, request.options)
fetchWithLinearBackoff(request.url, request.options, request.retryOptions)
.then((response) => {
activeRequests--;
resolve(response);
Expand Down

0 comments on commit abc6a13

Please sign in to comment.