Skip to content

Commit

Permalink
fix(MemoryStorage): RequestQueue#handledRequestCount should update (#…
Browse files Browse the repository at this point in the history
…1817)

Closes #1764
  • Loading branch information
vladfrangu authored Mar 9, 2023
1 parent 4851cfd commit a775e4a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
31 changes: 26 additions & 5 deletions packages/memory-storage/src/resource-clients/request-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,14 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
await newEntry.update(requestModel);

existingQueueById.requests.set(requestModel.id, newEntry);
// We add 1 to pending requests if the request was not handled yet
existingQueueById.pendingRequestCount += requestModel.orderNo !== null ? 1 : 0;
existingQueueById.updateTimestamps(true);

if (requestModel.orderNo) {
existingQueueById.pendingRequestCount += 1;
} else {
existingQueueById.handledRequestCount += 1;
}

return {
requestId: requestModel.id,
// We return wasAlreadyHandled: false even though the request may
Expand Down Expand Up @@ -253,8 +257,13 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
await newEntry.update(requestModel);

existingQueueById.requests.set(requestModel.id, newEntry);
// We add 1 to pending requests if the request was not handled yet
existingQueueById.pendingRequestCount += requestModel.orderNo !== null ? 1 : 0;

if (requestModel.orderNo) {
existingQueueById.pendingRequestCount += 1;
} else {
existingQueueById.handledRequestCount += 1;
}

result.processedRequests.push({
requestId: requestModel.id,
uniqueKey: requestModel.uniqueKey,
Expand Down Expand Up @@ -323,10 +332,16 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue

const isRequestHandledStateChanging = typeof existingRequest.orderNo !== typeof requestModel.orderNo;
const requestWasHandledBeforeUpdate = existingRequest.orderNo === null;
const requestIsHandledAfterUpdate = requestModel.orderNo === null;

if (isRequestHandledStateChanging) {
existingQueueById.pendingRequestCount += requestWasHandledBeforeUpdate ? 1 : -1;
}

if (requestIsHandledAfterUpdate) {
existingQueueById.handledRequestCount += 1;
}

existingQueueById.updateTimestamps(true);

return {
Expand All @@ -349,8 +364,14 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
const request = await entry.get();

existingQueueById.requests.delete(id);
existingQueueById.pendingRequestCount -= request.orderNo ? 1 : 0;
existingQueueById.updateTimestamps(true);

if (request.orderNo) {
existingQueueById.pendingRequestCount -= 1;
} else {
existingQueueById.handledRequestCount -= 1;
}

await entry.delete();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { MemoryStorage } from '@crawlee/memory-storage';
import type { RequestQueueClient } from '@crawlee/types';

describe('RequestQueue handledRequestCount should update', () => {
const storage = new MemoryStorage({
persistStorage: false,
});

let requestQueue: RequestQueueClient;

beforeAll(async () => {
const { id } = await storage.requestQueues().getOrCreate('handledRequestCount');
requestQueue = storage.requestQueue(id);
});

test('after updating the request, it should increment the handledRequestCount', async () => {
const { requestId } = await requestQueue.addRequest({ url: 'http://example.com/1', uniqueKey: '1' });

await requestQueue.updateRequest({
url: 'http://example.com/1',
uniqueKey: '1',
id: requestId,
handledAt: new Date().toISOString(),
});

const updatedStatistics = await requestQueue.get();
expect(updatedStatistics?.handledRequestCount).toEqual(1);
});

test('adding an already handled request should increment the handledRequestCount', async () => {
await requestQueue.addRequest({ url: 'http://example.com/2', uniqueKey: '2', handledAt: new Date().toISOString() });

const updatedStatistics = await requestQueue.get();
expect(updatedStatistics?.handledRequestCount).toEqual(2);
});

test('deleting a request should decrement the handledRequestCount', async () => {
const { requestId } = await requestQueue.addRequest({ url: 'http://example.com/3', uniqueKey: '3', handledAt: new Date().toISOString() });

await requestQueue.deleteRequest(requestId);

const updatedStatistics = await requestQueue.get();
expect(updatedStatistics?.handledRequestCount).toEqual(2);
});
});
2 changes: 1 addition & 1 deletion packages/types/src/storages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export interface RequestQueueClient {
batchAddRequests(requests: RequestSchema[], options?: RequestOptions): Promise<BatchAddRequestsResult>;
getRequest(id: string): Promise<RequestOptions | undefined>;
updateRequest(request: UpdateRequestSchema, options?: RequestOptions): Promise<QueueOperationInfo>;
deleteRequest(_id: string): Promise<unknown>;
deleteRequest(id: string): Promise<unknown>;
}

export interface RequestQueueOptions {
Expand Down

0 comments on commit a775e4a

Please sign in to comment.