Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: forefront request fetching in RQv2 #2689

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ export abstract class RequestProvider implements IStorage {
assumedTotalCount = 0;
assumedHandledCount = 0;

/**
* Counts enqueued `forefront` requests. This is used to invalidate the local head cache.
* We can trust these numbers only in a case that queue is used by a single client.
*
* Note that this number does not have to be persisted, as the local head cache is dropped
* on a migration event.
*/
protected assumedForefrontCount = 0;

private initialCount = 0;

protected queueHeadIds = new ListDictionary<string>();
Expand Down Expand Up @@ -163,6 +172,8 @@ export abstract class RequestProvider implements IStorage {
if (!wasAlreadyPresent && !this.recentlyHandledRequestsCache.get(requestId)) {
this.assumedTotalCount++;

this.assumedForefrontCount += forefront ? 1 : 0;

// Performance optimization: add request straight to head if possible
this._maybeAddRequestToQueueHead(requestId, forefront);
}
Expand Down Expand Up @@ -280,6 +291,8 @@ export abstract class RequestProvider implements IStorage {
if (!wasAlreadyPresent && !this.recentlyHandledRequestsCache.get(requestId)) {
this.assumedTotalCount++;

this.assumedForefrontCount += forefront ? 1 : 0;

// Performance optimization: add request straight to head if possible
this._maybeAddRequestToQueueHead(requestId, forefront);
}
Expand Down Expand Up @@ -529,6 +542,7 @@ export abstract class RequestProvider implements IStorage {
forefront,
})) as RequestQueueOperationInfo;
queueOperationInfo.uniqueKey = request.uniqueKey;
this.assumedForefrontCount += forefront ? 1 : 0;
this._cacheRequest(getRequestId(request.uniqueKey), queueOperationInfo);

return queueOperationInfo;
Expand Down Expand Up @@ -874,6 +888,11 @@ export interface RequestQueueOperationOptions {
* - while reclaiming the request: the request will be placed to the beginning of the queue, so that it's returned
* in the next call to {@apilink RequestQueue.fetchNextRequest}.
* By default, it's put to the end of the queue.
*
* In case the request is already present in the queue, this option has no effect.
*
* If more requests are added with this option at once, their order in the following `fetchNextRequest` call
* is arbitrary.
* @default false
*/
forefront?: boolean;
Expand Down
41 changes: 36 additions & 5 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ const RECENTLY_HANDLED_CACHE_SIZE = 1000;
export class RequestQueue extends RequestProvider {
private _listHeadAndLockPromise: Promise<void> | null = null;

/**
* Returns `true` if there are any requests in the queue that were enqueued to the forefront.
*
* Can return false negatives, but never false positives.
* @returns `true` if there are `forefront` requests in the queue.
*/
private async hasPendingForefrontRequests(): Promise<boolean> {
const queueInfo = await this.client.get();
return this.assumedForefrontCount > 0 && !queueInfo?.hadMultipleClients;
}

constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) {
super(
{
Expand Down Expand Up @@ -181,7 +192,7 @@ export class RequestQueue extends RequestProvider {
}

// We want to fetch ahead of time to minimize dead time
if (this.queueHeadIds.length() > 1) {
if (this.queueHeadIds.length() > 1 && !(await this.hasPendingForefrontRequests())) {
return;
}

Expand All @@ -193,15 +204,30 @@ export class RequestQueue extends RequestProvider {
}

private async _listHeadAndLock(): Promise<void> {
const headData = await this.client.listAndLockHead({ limit: 25, lockSecs: this.requestLockSecs });
const forefront = await this.hasPendingForefrontRequests();

const headData = await this.client.listAndLockHead({
limit: Math.min(forefront ? this.assumedForefrontCount : 25, 25),
lockSecs: this.requestLockSecs,
});

const headIdBuffer = [];

for (const { id, uniqueKey } of headData.items) {
// Queue head index might be behind the main table, so ensure we don't recycle requests
if (!id || !uniqueKey || this.recentlyHandledRequestsCache.get(id)) {
this.log.debug(`Skipping request from queue head as it's invalid or recently handled`, {
if (
!id ||
!uniqueKey ||
this.recentlyHandledRequestsCache.get(id) ||
// If we tried to read new forefront requests, but another client appeared in the meantime, we can't be sure we'll only read our requests.
// To retain the correct queue ordering, we rollback this head read.
(forefront && headData.hadMultipleClients)
barjin marked this conversation as resolved.
Show resolved Hide resolved
) {
this.log.debug(`Skipping request from queue head as it's potentially invalid or recently handled`, {
id,
uniqueKey,
recentlyHandled: !!this.recentlyHandledRequestsCache.get(id),
inconsistentForefrontRead: forefront && headData.hadMultipleClients,
});

// Remove the lock from the request for now, so that it can be picked up later
Expand All @@ -215,14 +241,19 @@ export class RequestQueue extends RequestProvider {
continue;
}

this.queueHeadIds.add(id, id, false);
headIdBuffer.push(id);
this.assumedForefrontCount = Math.max(0, this.assumedForefrontCount - 1);
this._cacheRequest(getRequestId(uniqueKey), {
requestId: id,
uniqueKey,
wasAlreadyPresent: true,
wasAlreadyHandled: false,
});
}

for (const id of forefront ? headIdBuffer.reverse() : headIdBuffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this reverse matters at all? They're all still gonna be added with forefront so it feels redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it does, as we want to prepend headIdBuffer (as it is) to queueHeadIds - if we push the requests one by one in to the head of queueHeadIds, it would end up reversed:

buffer: 12345
queue: 67890

buffer: 2345
queue: 167890

buffer: 345
queue: 2167890

etc.

I'm not a huge fan of this ternary mumbo-jumbo either, but headIdBuffer is at most 25 items long, so at least this should cause no noticeable perf problem. Readability... that's a different story 🥲

this.queueHeadIds.add(id, id, forefront);
}
}

private async getOrHydrateRequest<T extends Dictionary = Dictionary>(
Expand Down
101 changes: 99 additions & 2 deletions test/core/storages/request_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
API_PROCESSED_REQUESTS_DELAY_MILLIS,
STORAGE_CONSISTENCY_DELAY_MILLIS,
RequestQueueV1 as RequestQueue,
RequestQueueV2,
Request,
Configuration,
ProxyConfiguration,
Expand Down Expand Up @@ -622,6 +623,42 @@ describe('RequestQueue remote', () => {
expect(listHeadMock).toHaveBeenLastCalledWith({ limit: QUERY_HEAD_MIN_LENGTH });
});

test('`fetchNextRequest` order respects `forefront` enqueues', async () => {
const emulator = new MemoryStorageEmulator();

await emulator.init();
const queue = await RequestQueue.open();

const retrievedUrls: string[] = [];

await queue.addRequests([
{ url: 'http://example.com/1' },
{ url: 'http://example.com/5' },
{ url: 'http://example.com/6' },
]);

retrievedUrls.push((await queue.fetchNextRequest()).url);

await queue.addRequest({ url: 'http://example.com/4' }, { forefront: true });
await queue.addRequest({ url: 'http://example.com/3' }, { forefront: true });

await queue.addRequest({ url: 'http://example.com/2' }, { forefront: true });

let req = await queue.fetchNextRequest();

expect(req.url).toBe('http://example.com/2');

await queue.reclaimRequest(req, { forefront: true });

while (req) {
retrievedUrls.push(req.url);
req = await queue.fetchNextRequest();
}

expect(retrievedUrls.map((x) => new URL(x).pathname)).toEqual(['/1', '/2', '/3', '/4', '/5', '/6']);
await emulator.destroy();
});

test('getInfo() should work', async () => {
const queue = new RequestQueue({ id: 'some-id', name: 'some-name', client: storageClient });

Expand Down Expand Up @@ -826,9 +863,9 @@ describe('RequestQueue v2', () => {
}

async function getEmptyQueue(name: string) {
const queue = await RequestQueue.open(name);
const queue = await RequestQueueV2.open(name);
await queue.drop();
return RequestQueue.open(name);
return RequestQueueV2.open(name);
}

function getUniqueRequests(count: number) {
Expand Down Expand Up @@ -903,4 +940,64 @@ describe('RequestQueue v2', () => {

expect(secondFetch[0]).toEqual(firstFetch[0]);
});

test('`fetchNextRequest` order respects `forefront` enqueues', async () => {
const queue = await getEmptyQueue('fetch-next-request-order');

const retrievedUrls: string[] = [];

await queue.addRequests([
{ url: 'http://example.com/1' },
...Array.from({ length: 25 }, (_, i) => ({ url: `http://example.com/${i + 4}` })),
]);

retrievedUrls.push((await queue.fetchNextRequest()).url);

await queue.addRequest({ url: 'http://example.com/3' }, { forefront: true });
await queue.addRequest({ url: 'http://example.com/2' }, { forefront: true });

let req = await queue.fetchNextRequest();

while (req) {
retrievedUrls.push(req.url);
req = await queue.fetchNextRequest();
}

// 28 requests exceed the RQv2 batch size limit of 25, so we can examine the request ordering
expect(retrievedUrls.map((x) => new URL(x).pathname)).toEqual(
Array.from({ length: 28 }, (_, i) => `/${i + 1}`),
);
});

test('`reclaimRequest` with `forefront` respects the request ordering', async () => {
const queue = await getEmptyQueue('fetch-next-request-order-reclaim');

const retrievedUrls: string[] = [];

await queue.addRequests([
{ url: 'http://example.com/1' },
{ url: 'http://example.com/4' },
{ url: 'http://example.com/5' },
]);

retrievedUrls.push((await queue.fetchNextRequest()).url);

await queue.addRequest({ url: 'http://example.com/3' }, { forefront: true });
await queue.addRequest({ url: 'http://example.com/2' }, { forefront: true });

let req = await queue.fetchNextRequest();

expect(req.url).toBe('http://example.com/2');

await queue.reclaimRequest(req, { forefront: true });

req = await queue.fetchNextRequest();

while (req) {
retrievedUrls.push(req.url);
req = await queue.fetchNextRequest();
}

expect(retrievedUrls.map((x) => new URL(x).pathname)).toEqual(Array.from({ length: 5 }, (_, i) => `/${i + 1}`));
});
});