diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 24df8d0d3bb0..f1a01eccfa39 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -15,6 +15,7 @@ import type { Request, RequestList, RequestOptions, + RequestProvider, RouterHandler, RouterRoutes, Session, @@ -23,24 +24,25 @@ import type { StatisticState, } from '@crawlee/core'; import { - mergeCookies, + Dataset, AutoscaledPool, Configuration, - enqueueLinks, EventType, KeyValueStore, - CriticalError, NonRetryableError, RequestQueue, + RequestQueueV2, RequestState, + RetryRequestError, Router, SessionPool, Statistics, + enqueueLinks, + mergeCookies, purgeDefaultStorages, validators, - RetryRequestError, SessionError, - Dataset, + CriticalError, } from '@crawlee/core'; import type { Dictionary, Awaitable, BatchAddRequestsResult, SetStatusMessageOptions } from '@crawlee/types'; import { ROTATE_PROXY_ERRORS } from '@crawlee/utils'; @@ -165,7 +167,7 @@ export interface BasicCrawlerOptions Alternatively, `requests` parameter of {@apilink BasicCrawler.run|`crawler.run()`} could be used to enqueue the initial requests - * it is a shortcut for running `crawler.addRequests()` before the `crawler.run()`. */ - requestQueue?: RequestQueue; + requestQueue?: RequestProvider; /** * Timeout in which the function passed as {@apilink BasicCrawlerOptions.requestHandler|`requestHandler`} needs to finish, in seconds. @@ -325,6 +327,26 @@ export interface BasicCrawlerOptions> = {}; + protected static optionsShape = { requestList: ow.optional.object.validate(validators.requestList), requestQueue: ow.optional.object.validate(validators.requestQueue), @@ -493,6 +518,7 @@ export class BasicCrawler (val == null ? null : +val); // allow at least 5min for internal timeouts this.internalTimeoutMillis = tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? Math.max(this.requestHandlerTimeoutMillis * 2, 300e3); + // override the default internal timeout of request queue to respect `requestHandlerTimeoutMillis` if (this.requestQueue) { this.requestQueue.internalTimeoutMillis = this.internalTimeoutMillis; + // for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but + // with a minimum of a minute + this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60); } this.maxRequestRetries = maxRequestRetries; @@ -769,7 +809,7 @@ export class BasicCrawler { this.log.debug(`Adding request ${request.url} (${request.id}) back to the queue`); - source?.inProgress.add(request.id!); - await source?.reclaimRequest(request); + // eslint-disable-next-line dot-notation + source['inProgress'].add(request.id!); + await source.reclaimRequest(request); }, delay); return true; @@ -1230,7 +1272,7 @@ export class BasicCrawler { const { request } = crawlingContext; request.pushErrorMessage(error); @@ -1444,6 +1486,22 @@ export class BasicCrawler { interface EnqueueLinksInternalOptions { options?: EnqueueLinksOptions; $: cheerio.CheerioAPI | null; - requestQueue: RequestQueue; + requestQueue: RequestProvider; originalRequestUrl: string; finalRequestUrl?: string; } diff --git a/packages/core/src/enqueue_links/enqueue_links.ts b/packages/core/src/enqueue_links/enqueue_links.ts index b69dbff2ea46..a8b4b9669b1a 100644 --- a/packages/core/src/enqueue_links/enqueue_links.ts +++ b/packages/core/src/enqueue_links/enqueue_links.ts @@ -14,7 +14,7 @@ import { createRequests, } from './shared'; import type { RequestOptions } from '../request'; -import type { RequestQueue, RequestQueueOperationOptions } from '../storages/request_queue'; +import type { RequestProvider, RequestQueueOperationOptions } from '../storages'; export interface EnqueueLinksOptions extends RequestQueueOperationOptions { /** Limit the amount of actually enqueued URLs to this number. Useful for testing across the entire crawling scope. */ @@ -24,7 +24,7 @@ export interface EnqueueLinksOptions extends RequestQueueOperationOptions { urls?: string[]; /** A request queue to which the URLs will be enqueued. */ - requestQueue?: RequestQueue; + requestQueue?: RequestProvider; /** A CSS selector matching links to be enqueued. */ selector?: string; diff --git a/packages/core/src/request.ts b/packages/core/src/request.ts index 60a3b922bf5e..0a5a0cd56004 100644 --- a/packages/core/src/request.ts +++ b/packages/core/src/request.ts @@ -5,7 +5,7 @@ import util from 'node:util'; import { normalizeUrl } from '@apify/utilities'; import type { Dictionary } from '@crawlee/types'; import type { BasePredicate } from 'ow'; -import ow, { ArgumentError } from 'ow'; +import ow from 'ow'; import { log as defaultLog } from './log'; import type { AllowedHttpMethods } from './typedefs'; @@ -141,14 +141,15 @@ export class Request { // properties and speeds up the validation cca 3-fold. // See https://github.com/sindresorhus/ow/issues/193 keys(options).forEach((prop) => { + // skip url, because it is validated above + if (prop === 'url') { + return; + } + const predicate = requestOptionalPredicates[prop as keyof typeof requestOptionalPredicates]; const value = options[prop]; if (predicate) { ow(value, `RequestOptions.${prop}`, predicate as BasePredicate); - // 'url' is checked above because it's not optional - } else if (prop !== 'url') { - const msg = `Did not expect property \`${prop}\` to exist, got \`${value}\` in object \`RequestOptions\``; - throw new ArgumentError(msg, this.constructor); } }); @@ -479,6 +480,9 @@ export interface RequestOptions { /** @internal */ handledAt?: string; + /** @internal */ + lockExpiresAt?: Date; + } export interface PushErrorMessageOptions { diff --git a/packages/core/src/storages/index.ts b/packages/core/src/storages/index.ts index ee78d7c73737..c51ef0b1e167 100644 --- a/packages/core/src/storages/index.ts +++ b/packages/core/src/storages/index.ts @@ -1,6 +1,8 @@ export * from './dataset'; export * from './key_value_store'; export * from './request_list'; +export * from './request_provider'; export * from './request_queue'; +export * from './request_queue_v2'; export * from './storage_manager'; export * from './utils'; diff --git a/packages/core/src/storages/request_provider.ts b/packages/core/src/storages/request_provider.ts new file mode 100644 index 000000000000..50ad2f6f8c58 --- /dev/null +++ b/packages/core/src/storages/request_provider.ts @@ -0,0 +1,726 @@ +import { ListDictionary, LruCache } from '@apify/datastructures'; +import type { Log } from '@apify/log'; +import { cryptoRandomObjectId } from '@apify/utilities'; +import type { + BatchAddRequestsResult, + Dictionary, + ProcessedRequest, + QueueOperationInfo, + RequestQueueClient, + RequestQueueInfo, + StorageClient, +} from '@crawlee/types'; +import { chunk, downloadListOfUrls, sleep } from '@crawlee/utils'; +import ow from 'ow'; + +import type { IStorage, StorageManagerOptions } from './storage_manager'; +import { StorageManager } from './storage_manager'; +import { QUERY_HEAD_MIN_LENGTH, STORAGE_CONSISTENCY_DELAY_MILLIS, getRequestId, purgeDefaultStorages } from './utils'; +import { Configuration } from '../configuration'; +import { EventType } from '../events'; +import { log } from '../log'; +import type { ProxyConfiguration } from '../proxy_configuration'; +import { Request } from '../request'; +import type { RequestOptions, InternalSource, Source } from '../request'; +import type { Constructor } from '../typedefs'; + +export abstract class RequestProvider implements IStorage { + id: string; + name?: string; + timeoutSecs = 30; + clientKey = cryptoRandomObjectId(); + client: RequestQueueClient; + protected proxyConfiguration?: ProxyConfiguration; + + log: Log; + internalTimeoutMillis = 5 * 60_000; // defaults to 5 minutes, will be overridden by BasicCrawler + requestLockSecs = 3 * 60; // defaults to 3 minutes, will be overridden by BasicCrawler + + // We can trust these numbers only in a case that queue is used by a single client. + // This information is returned by getHead() under the hadMultipleClients property. + assumedTotalCount = 0; + assumedHandledCount = 0; + + protected queueHeadIds = new ListDictionary(); + protected requestCache: LruCache; + /** @internal */ + inProgress = new Set(); + protected recentlyHandledRequestsCache: LruCache; + + protected queuePausedForMigration = false; + + constructor(options: InternalRequestProviderOptions, readonly config = Configuration.getGlobalConfig()) { + this.id = options.id; + this.name = options.name; + this.client = options.client.requestQueue(this.id, { + clientKey: this.clientKey, + timeoutSecs: this.timeoutSecs, + }); + + this.proxyConfiguration = options.proxyConfiguration; + + this.requestCache = new LruCache({ maxLength: options.requestCacheMaxSize }); + this.recentlyHandledRequestsCache = new LruCache({ maxLength: options.recentlyHandledRequestsMaxSize }); + this.log = log.child({ prefix: options.logPrefix }); + + const eventManager = config.getEventManager(); + + eventManager.on(EventType.MIGRATING, async () => { + this.queuePausedForMigration = true; + }); + } + + /** + * @ignore + */ + inProgressCount() { + return this.inProgress.size; + } + + /** + * Adds a request to the queue. + * + * If a request with the same `uniqueKey` property is already present in the queue, + * it will not be updated. You can find out whether this happened from the resulting + * {@apilink QueueOperationInfo} object. + * + * To add multiple requests to the queue by extracting links from a webpage, + * see the {@apilink enqueueLinks} helper function. + * + * @param requestLike {@apilink Request} object or vanilla object with request data. + * Note that the function sets the `uniqueKey` and `id` fields to the passed Request. + * @param [options] Request queue operation options. + */ + async addRequest(requestLike: Source, options: RequestQueueOperationOptions = {}): Promise { + ow(requestLike, ow.object); + ow(options, ow.object.exactShape({ + forefront: ow.optional.boolean, + })); + + const { forefront = false } = options; + + if ('requestsFromUrl' in requestLike) { + const requests = await this._fetchRequestsFromUrl(requestLike as InternalSource); + const processedRequests = await this._addFetchedRequests(requestLike as InternalSource, requests, options); + + return processedRequests[0]; + } + + ow(requestLike, ow.object.partialShape({ + url: ow.string, + id: ow.undefined, + })); + + const request = requestLike instanceof Request + ? requestLike + : new Request(requestLike); + + const cacheKey = getRequestId(request.uniqueKey); + const cachedInfo = this.requestCache.get(cacheKey); + + if (cachedInfo) { + request.id = cachedInfo.id; + return { + wasAlreadyPresent: true, + // We may assume that if request is in local cache then also the information if the + // request was already handled is there because just one client should be using one queue. + wasAlreadyHandled: cachedInfo.isHandled, + requestId: cachedInfo.id, + uniqueKey: cachedInfo.uniqueKey, + }; + } + + const queueOperationInfo = await this.client.addRequest(request, { forefront }) as RequestQueueOperationInfo; + queueOperationInfo.uniqueKey = request.uniqueKey; + + const { requestId, wasAlreadyPresent } = queueOperationInfo; + this._cacheRequest(cacheKey, queueOperationInfo); + + if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) { + this.assumedTotalCount++; + + // Performance optimization: add request straight to head if possible + this._maybeAddRequestToQueueHead(requestId, forefront); + } + + return queueOperationInfo; + } + + /** + * Adds requests to the queue in batches of 25. + * + * If a request that is passed in is already present due to its `uniqueKey` property being the same, + * it will not be updated. You can find out whether this happened by finding the request in the resulting + * {@apilink BatchAddRequestsResult} object. + * + * @param requestsLike {@apilink Request} objects or vanilla objects with request data. + * Note that the function sets the `uniqueKey` and `id` fields to the passed requests if missing. + * @param [options] Request queue operation options. + */ + async addRequests( + requestsLike: Source[], + options: RequestQueueOperationOptions = {}, + ): Promise { + ow(requestsLike, ow.array); + ow(options, ow.object.exactShape({ + forefront: ow.optional.boolean, + })); + + const { forefront = false } = options; + + const uniqueKeyToCacheKey = new Map(); + const getCachedRequestId = (uniqueKey: string) => { + const cached = uniqueKeyToCacheKey.get(uniqueKey); + + if (cached) return cached; + + const newCacheKey = getRequestId(uniqueKey); + uniqueKeyToCacheKey.set(uniqueKey, newCacheKey); + + return newCacheKey; + }; + + const results: BatchAddRequestsResult = { + processedRequests: [], + unprocessedRequests: [], + }; + + for (const requestLike of requestsLike) { + if ('requestsFromUrl' in requestLike) { + const requests = await this._fetchRequestsFromUrl(requestLike as InternalSource); + await this._addFetchedRequests(requestLike as InternalSource, requests, options); + } + } + + const requests = requestsLike + .filter((requestLike) => !('requestsFromUrl' in requestLike)) + .map((requestLike) => { + return requestLike instanceof Request ? requestLike : new Request(requestLike as RequestOptions); + }); + + const requestsToAdd = new Map(); + + for (const request of requests) { + const cacheKey = getCachedRequestId(request.uniqueKey); + const cachedInfo = this.requestCache.get(cacheKey); + + if (cachedInfo) { + request.id = cachedInfo.id; + results.processedRequests.push({ + wasAlreadyPresent: true, + // We may assume that if request is in local cache then also the information if the + // request was already handled is there because just one client should be using one queue. + wasAlreadyHandled: cachedInfo.isHandled, + requestId: cachedInfo.id, + uniqueKey: cachedInfo.uniqueKey, + }); + } else if (!requestsToAdd.has(request.uniqueKey)) { + requestsToAdd.set(request.uniqueKey, request); + } + } + + // Early exit if all provided requests were already added + if (!requestsToAdd.size) { + return results; + } + + const apiResults = await this.client.batchAddRequests([...requestsToAdd.values()], { forefront }); + + // Report unprocessed requests + results.unprocessedRequests = apiResults.unprocessedRequests; + + // Add all new requests to the queue head + for (const newRequest of apiResults.processedRequests) { + // Add the new request to the processed list + results.processedRequests.push(newRequest); + + const cacheKey = getCachedRequestId(newRequest.uniqueKey); + + const { requestId, wasAlreadyPresent } = newRequest; + this._cacheRequest(cacheKey, newRequest); + + if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) { + this.assumedTotalCount++; + + // Performance optimization: add request straight to head if possible + this._maybeAddRequestToQueueHead(requestId, forefront); + } + } + + return results; + } + + /** + * Adds requests to the queue in batches. By default, it will resolve after the initial batch is added, and continue + * adding the rest in background. You can configure the batch size via `batchSize` option and the sleep time in between + * the batches via `waitBetweenBatchesMillis`. If you want to wait for all batches to be added to the queue, you can use + * the `waitForAllRequestsToBeAdded` promise you get in the response object. + * + * @param requests The requests to add + * @param options Options for the request queue + */ + async addRequestsBatched(requests: (string | Source)[], options: AddRequestsBatchedOptions = {}): Promise { + ow(requests, ow.array.ofType(ow.any( + ow.string, + ow.object.partialShape({ url: ow.string, id: ow.undefined }), + ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }), + ))); + ow(options, ow.object.exactShape({ + forefront: ow.optional.boolean, + waitForAllRequestsToBeAdded: ow.optional.boolean, + batchSize: ow.optional.number, + waitBetweenBatchesMillis: ow.optional.number, + })); + + const { + batchSize = 1000, + waitBetweenBatchesMillis = 1000, + } = options; + const builtRequests: Request[] = []; + + for (const opts of requests) { + if (opts && typeof opts === 'object' && 'requestsFromUrl' in opts) { + await this.addRequest(opts, { forefront: options.forefront }); + } else { + builtRequests.push(new Request(typeof opts === 'string' ? { url: opts } : opts as RequestOptions)); + } + } + + const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => { + const resultsToReturn: ProcessedRequest[] = []; + const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront }); + resultsToReturn.push(...apiResult.processedRequests); + + if (apiResult.unprocessedRequests.length) { + await sleep(waitBetweenBatchesMillis); + + resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed( + providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)), + )); + } + + return resultsToReturn; + }; + + const initialChunk = builtRequests.splice(0, batchSize); + + // Add initial batch of `batchSize` to process them right away + const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk); + + // If we have no more requests to add, return early + if (!builtRequests.length) { + return { + addedRequests, + waitForAllRequestsToBeAdded: Promise.resolve([]), + }; + } + + // eslint-disable-next-line no-async-promise-executor + const promise = new Promise(async (resolve) => { + const chunks = chunk(builtRequests, batchSize); + const finalAddedRequests: ProcessedRequest[] = []; + + for (const requestChunk of chunks) { + finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk)); + + await sleep(waitBetweenBatchesMillis); + } + + resolve(finalAddedRequests); + }); + + // If the user wants to wait for all the requests to be added, we wait for the promise to resolve for them + if (options.waitForAllRequestsToBeAdded) { + addedRequests.push(...await promise); + } + + return { + addedRequests, + waitForAllRequestsToBeAdded: promise, + }; + } + + /** + * Gets the request from the queue specified by ID. + * + * @param id ID of the request. + * @returns Returns the request object, or `null` if it was not found. + */ + async getRequest(id: string): Promise | null> { + ow(id, ow.string); + + const requestOptions = await this.client.getRequest(id); + if (!requestOptions) return null; + + return new Request(requestOptions as unknown as RequestOptions); + } + + abstract fetchNextRequest(options?: RequestOptions): Promise | null>; + + /** + * Marks a request that was previously returned by the + * {@apilink RequestQueue.fetchNextRequest} + * function as handled after successful processing. + * Handled requests will never again be returned by the `fetchNextRequest` function. + */ + async markRequestHandled(request: Request): Promise { + ow(request, ow.object.partialShape({ + id: ow.string, + uniqueKey: ow.string, + handledAt: ow.optional.string, + })); + + if (!this.inProgress.has(request.id)) { + this.log.debug(`Cannot mark request ${request.id} as handled, because it is not in progress!`, { requestId: request.id }); + return null; + } + + const handledAt = request.handledAt ?? new Date().toISOString(); + const queueOperationInfo = await this.client.updateRequest({ ...request, handledAt }) as RequestQueueOperationInfo; + request.handledAt = handledAt; + queueOperationInfo.uniqueKey = request.uniqueKey; + + this.inProgress.delete(request.id); + this.recentlyHandledRequestsCache.add(request.id, true); + + if (!queueOperationInfo.wasAlreadyHandled) { + this.assumedHandledCount++; + } + + this._cacheRequest(getRequestId(request.uniqueKey), queueOperationInfo); + + return queueOperationInfo; + } + + /** + * Reclaims a failed request back to the queue, so that it can be returned for processing later again + * by another call to {@apilink RequestQueue.fetchNextRequest}. + * The request record in the queue is updated using the provided `request` parameter. + * For example, this lets you store the number of retries or error messages for the request. + */ + async reclaimRequest(request: Request, options: RequestQueueOperationOptions = {}): Promise { + ow(request, ow.object.partialShape({ + id: ow.string, + uniqueKey: ow.string, + })); + ow(options, ow.object.exactShape({ + forefront: ow.optional.boolean, + })); + + const { forefront = false } = options; + + if (!this.inProgress.has(request.id)) { + this.log.debug(`Cannot reclaim request ${request.id}, because it is not in progress!`, { requestId: request.id }); + return null; + } + + // TODO: If request hasn't been changed since the last getRequest(), + // we don't need to call updateRequest() and thus improve performance. + const queueOperationInfo = await this.client.updateRequest(request, { forefront }) as RequestQueueOperationInfo; + queueOperationInfo.uniqueKey = request.uniqueKey; + this._cacheRequest(getRequestId(request.uniqueKey), queueOperationInfo); + + // Wait a little to increase a chance that the next call to fetchNextRequest() will return the request with updated data. + // This is to compensate for the limitation of DynamoDB, where writes might not be immediately visible to subsequent reads. + setTimeout(() => { + if (!this.inProgress.has(request.id)) { + this.log.debug('The request is no longer marked as in progress in the queue?!', { requestId: request.id }); + return; + } + + this.inProgress.delete(request.id); + + // Performance optimization: add request straight to head if possible + this._maybeAddRequestToQueueHead(request.id, forefront); + }, STORAGE_CONSISTENCY_DELAY_MILLIS); + + return queueOperationInfo; + } + + protected abstract ensureHeadIsNonEmpty(): Promise; + + /** + * Resolves to `true` if the next call to {@apilink RequestQueue.fetchNextRequest} + * would return `null`, otherwise it resolves to `false`. + * Note that even if the queue is empty, there might be some pending requests currently being processed. + * If you need to ensure that there is no activity in the queue, use {@apilink RequestQueue.isFinished}. + */ + async isEmpty(): Promise { + await this.ensureHeadIsNonEmpty(); + return this.queueHeadIds.length() === 0; + } + + /** + * Resolves to `true` if all requests were already handled and there are no more left. + * Due to the nature of distributed storage used by the queue, + * the function might occasionally return a false negative, + * but it will never return a false positive. + */ + async isFinished(): Promise { + if (this.queueHeadIds.length() > 0 || this.inProgressCount() > 0) return false; + + const currentHead = await this.client.listHead({ limit: 2 }); + return currentHead.items.length === 0 && this.inProgressCount() === 0; + } + + protected _reset() { + this.queueHeadIds.clear(); + this.inProgress.clear(); + this.recentlyHandledRequestsCache.clear(); + this.assumedTotalCount = 0; + this.assumedHandledCount = 0; + this.requestCache.clear(); + } + + /** + * Caches information about request to beware of unneeded addRequest() calls. + */ + protected _cacheRequest(cacheKey: string, queueOperationInfo: RequestQueueOperationInfo): void { + this.requestCache.add(cacheKey, { + id: queueOperationInfo.requestId, + isHandled: queueOperationInfo.wasAlreadyHandled, + uniqueKey: queueOperationInfo.uniqueKey, + hydrated: null, + lockExpiresAt: null, + }); + } + + /** + * Adds a request straight to the queueHeadDict, to improve performance. + */ + protected _maybeAddRequestToQueueHead(requestId: string, forefront: boolean): void { + if (forefront) { + this.queueHeadIds.add(requestId, requestId, true); + } else if (this.assumedTotalCount < QUERY_HEAD_MIN_LENGTH) { + this.queueHeadIds.add(requestId, requestId, false); + } + } + + /** + * Removes the queue either from the Apify Cloud storage or from the local database, + * depending on the mode of operation. + */ + async drop(): Promise { + await this.client.delete(); + const manager = StorageManager.getManager(this.constructor as Constructor, this.config); + manager.closeStorage(this); + } + + /** + * Returns the number of handled requests. + * + * This function is just a convenient shortcut for: + * + * ```javascript + * const { handledRequestCount } = await queue.getInfo(); + * ``` + */ + async handledCount(): Promise { + // NOTE: We keep this function for compatibility with RequestList.handledCount() + const { handledRequestCount } = await this.getInfo() ?? {}; + return handledRequestCount ?? 0; + } + + /** + * Returns an object containing general information about the request queue. + * + * The function returns the same object as the Apify API Client's + * [getQueue](https://docs.apify.com/api/apify-client-js/latest#ApifyClient-requestQueues) + * function, which in turn calls the + * [Get request queue](https://apify.com/docs/api/v2#/reference/request-queues/queue/get-request-queue) + * API endpoint. + * + * **Example:** + * ``` + * { + * id: "WkzbQMuFYuamGv3YF", + * name: "my-queue", + * userId: "wRsJZtadYvn4mBZmm", + * createdAt: new Date("2015-12-12T07:34:14.202Z"), + * modifiedAt: new Date("2015-12-13T08:36:13.202Z"), + * accessedAt: new Date("2015-12-14T08:36:13.202Z"), + * totalRequestCount: 25, + * handledRequestCount: 5, + * pendingRequestCount: 20, + * } + * ``` + */ + async getInfo(): Promise { + return this.client.get(); + } + + /** + * Fetches URLs from requestsFromUrl and returns them in format of list of requests + */ + protected async _fetchRequestsFromUrl(source: InternalSource): Promise { + const { requestsFromUrl, regex, ...sharedOpts } = source; + + // Download remote resource and parse URLs. + let urlsArr; + try { + urlsArr = await this._downloadListOfUrls({ url: requestsFromUrl, urlRegExp: regex, proxyUrl: await this.proxyConfiguration?.newUrl() }); + } catch (err) { + throw new Error(`Cannot fetch a request list from ${requestsFromUrl}: ${err}`); + } + + // Skip if resource contained no URLs. + if (!urlsArr.length) { + this.log.warning('list fetched, but it is empty.', { requestsFromUrl, regex }); + return []; + } + + return urlsArr.map((url) => ({ url, ...sharedOpts })); + } + + /** + * Adds all fetched requests from a URL from a remote resource. + */ + protected async _addFetchedRequests(source: InternalSource, fetchedRequests: RequestOptions[], options: RequestQueueOperationOptions) { + const { requestsFromUrl, regex } = source; + const { addedRequests } = await this.addRequestsBatched(fetchedRequests, options); + + this.log.info('Fetched and loaded Requests from a remote resource.', { + requestsFromUrl, + regex, + fetchedCount: fetchedRequests.length, + importedCount: addedRequests.length, + duplicateCount: fetchedRequests.length - addedRequests.length, + sample: JSON.stringify(fetchedRequests.slice(0, 5)), + }); + + return addedRequests; + } + + /** + * @internal wraps public utility for mocking purposes + */ + private async _downloadListOfUrls(options: { url: string; urlRegExp?: RegExp; proxyUrl?: string }): Promise { + return downloadListOfUrls(options); + } + + /** + * Opens a request queue and returns a promise resolving to an instance + * of the {@apilink RequestQueue} class. + * + * {@apilink RequestQueue} represents a queue of URLs to crawl, which is stored either on local filesystem or in the cloud. + * The queue is used for deep crawling of websites, where you start with several URLs and then + * recursively follow links to other pages. The data structure supports both breadth-first + * and depth-first crawling orders. + * + * For more details and code examples, see the {@apilink RequestQueue} class. + * + * @param [queueIdOrName] + * ID or name of the request queue to be opened. If `null` or `undefined`, + * the function returns the default request queue associated with the crawler run. + * @param [options] Open Request Queue options. + */ + static async open(queueIdOrName?: string | null, options: StorageManagerOptions = {}): Promise { + ow(queueIdOrName, ow.optional.any(ow.string, ow.null)); + ow(options, ow.object.exactShape({ + config: ow.optional.object.instanceOf(Configuration), + storageClient: ow.optional.object, + proxyConfiguration: ow.optional.object, + })); + + options.config ??= Configuration.getGlobalConfig(); + options.storageClient ??= options.config.getStorageClient(); + + await purgeDefaultStorages({ onlyPurgeOnce: true, client: options.storageClient, config: options.config }); + + const manager = StorageManager.getManager(this as typeof BuiltRequestProvider, options.config); + const queue = await manager.openStorage(queueIdOrName, options.storageClient); + queue.proxyConfiguration = options.proxyConfiguration; + + return queue; + } +} + +declare class BuiltRequestProvider extends RequestProvider { + override fetchNextRequest(options?: RequestOptions | undefined): Promise | null>; + protected override ensureHeadIsNonEmpty(): Promise; +} + +interface RequestLruItem { + uniqueKey: string; + isHandled: boolean; + id: string; + hydrated: Request | null; + lockExpiresAt: number | null; +} + +export interface RequestProviderOptions { + id: string; + name?: string; + client: StorageClient; + + /** + * Used to pass the proxy configuration for the `requestsFromUrl` objects. + * Takes advantage of the internal address rotation and authentication process. + * If undefined, the `requestsFromUrl` requests will be made without proxy. + */ + proxyConfiguration?: ProxyConfiguration; +} + +export interface InternalRequestProviderOptions extends RequestProviderOptions { + logPrefix: string; + requestCacheMaxSize: number; + recentlyHandledRequestsMaxSize: number; +} + +export interface RequestQueueOperationOptions { + /** + * If set to `true`: + * - while adding the request to the queue: the request will be added to the foremost position in the queue. + * - while reclaiming the request: the request will be placed to the beginning of the queue, so that it's returned + * in the next call to {@apilink RequestQueue.fetchNextRequest}. + * By default, it's put to the end of the queue. + * @default false + */ + forefront?: boolean; +} + +/** + * @internal + */ +export interface RequestQueueOperationInfo extends QueueOperationInfo { + uniqueKey: string; +} + +export interface AddRequestsBatchedOptions extends RequestQueueOperationOptions { + /** + * Whether to wait for all the provided requests to be added, instead of waiting just for the initial batch of up to `batchSize`. + * @default false + */ + waitForAllRequestsToBeAdded?: boolean; + + /** + * @default 1000 + */ + batchSize?: number; + + /** + * @default 1000 + */ + waitBetweenBatchesMillis?: number; +} + +export interface AddRequestsBatchedResult { + addedRequests: ProcessedRequest[]; + /** + * A promise which will resolve with the rest of the requests that were added to the queue. + * + * Alternatively, we can set {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`} to `true` + * in the {@apilink BasicCrawler.addRequests|`crawler.addRequests()`} options. + * + * **Example:** + * + * ```ts + * // Assuming `requests` is a list of requests. + * const result = await crawler.addRequests(requests); + * + * // If we want to wait for the rest of the requests to be added to the queue: + * await result.waitForAllRequestsToBeAdded; + * ``` + */ + waitForAllRequestsToBeAdded: Promise; +} diff --git a/packages/core/src/storages/request_queue.ts b/packages/core/src/storages/request_queue.ts index 36a2ee4b7a7d..1eccddea5d92 100644 --- a/packages/core/src/storages/request_queue.ts +++ b/packages/core/src/storages/request_queue.ts @@ -1,54 +1,24 @@ -import crypto from 'node:crypto'; import { setTimeout as sleep } from 'node:timers/promises'; import { REQUEST_QUEUE_HEAD_MAX_LIMIT } from '@apify/consts'; -import { ListDictionary, LruCache } from '@apify/datastructures'; -import { cryptoRandomObjectId } from '@apify/utilities'; -import type { - BatchAddRequestsResult, - Dictionary, - ProcessedRequest, - QueueOperationInfo, - RequestQueueClient, - RequestQueueInfo, - StorageClient, +import type { Dictionary, } from '@crawlee/types'; -import { chunk, downloadListOfUrls } from '@crawlee/utils'; -import ow from 'ow'; -import type { StorageManagerOptions } from './storage_manager'; -import { StorageManager } from './storage_manager'; -import { purgeDefaultStorages } from './utils'; +import type { RequestProviderOptions } from './request_provider'; +import { RequestProvider } from './request_provider'; +import { + API_PROCESSED_REQUESTS_DELAY_MILLIS, + MAX_QUERIES_FOR_CONSISTENCY, + QUERY_HEAD_BUFFER, + QUERY_HEAD_MIN_LENGTH, + STORAGE_CONSISTENCY_DELAY_MILLIS, + getRequestId, +} from './utils'; import { Configuration } from '../configuration'; -import { log } from '../log'; -import type { ProxyConfiguration } from '../proxy_configuration'; -import type { InternalSource, RequestOptions, Source } from '../request'; -import { Request } from '../request'; +import type { Request } from '../request'; const MAX_CACHED_REQUESTS = 1_000_000; -/** - * When requesting queue head we always fetch requestsInProgressCount * QUERY_HEAD_BUFFER number of requests. - * @internal - */ -export const QUERY_HEAD_MIN_LENGTH = 100; - -/** @internal */ -export const QUERY_HEAD_BUFFER = 3; - -/** - * If queue was modified (request added/updated/deleted) before more than API_PROCESSED_REQUESTS_DELAY_MILLIS - * then we assume the get head operation to be consistent. - * @internal - */ -export const API_PROCESSED_REQUESTS_DELAY_MILLIS = 10_000; - -/** - * How many times we try to get queue head with queueModifiedAt older than API_PROCESSED_REQUESTS_DELAY_MILLIS. - * @internal - */ -export const MAX_QUERIES_FOR_CONSISTENCY = 6; - /** * This number must be large enough so that processing of all these requests cannot be done in * a time lower than expected maximum latency of DynamoDB, but low enough not to waste too much memory. @@ -56,62 +26,6 @@ export const MAX_QUERIES_FOR_CONSISTENCY = 6; */ const RECENTLY_HANDLED_CACHE_SIZE = 1000; -/** - * Indicates how long it usually takes for the underlying storage to propagate all writes - * to be available to subsequent reads. - * @internal - */ -export const STORAGE_CONSISTENCY_DELAY_MILLIS = 3000; - -/** - * Helper function that creates ID from uniqueKey for local emulation of request queue. - * It's also used for local cache of remote request queue. - * - * This function may not exactly match how requestId is created server side. - * So we never pass requestId created by this to server and use it only for local cache. - * - * @internal - */ -export function getRequestId(uniqueKey: string) { - const str = crypto - .createHash('sha256') - .update(uniqueKey) - .digest('base64') - .replace(/[+/=]/g, ''); - - return str.substr(0, 15); -} - -/** - * @internal - */ -interface RequestQueueOperationInfo extends QueueOperationInfo { - - /** Indicates if request was already present in the queue. */ - wasAlreadyPresent: boolean; - - /** Indicates if request was already marked as handled. */ - wasAlreadyHandled: boolean; - - /** The ID of the added request */ - requestId: string; - - uniqueKey: string; - -} - -export interface RequestQueueOperationOptions { - /** - * If set to `true`: - * - while adding the request to the queue: the request will be added to the foremost position in the queue. - * - while reclaiming the request: the request will be placed to the beginning of the queue, so that it's returned - * in the next call to {@apilink RequestQueue.fetchNextRequest}. - * By default, it's put to the end of the queue. - * @default false - */ - forefront?: boolean; -} - /** * Represents a queue of URLs to crawl, which is used for deep crawling of websites * where you start with several URLs and then recursively @@ -158,22 +72,8 @@ export interface RequestQueueOperationOptions { * ``` * @category Sources */ -export class RequestQueue { - log = log.child({ prefix: 'RequestQueue' }); - id: string; - name?: string; - timeoutSecs = 30; - clientKey = cryptoRandomObjectId(); - client: RequestQueueClient; - private proxyConfiguration?: ProxyConfiguration; - - /** - * Contains a cached list of request IDs from the head of the queue, - * as obtained in the last query. Both key and value is the request ID. - * Need to apply a type here to the generated TS types don't try to use types-apify - */ - private queueHeadDict = new ListDictionary(); - queryQueueHeadPromise?: Promise<{ +export class RequestQueue extends RequestProvider { + private queryQueueHeadPromise?: Promise<{ wasLimitReached: boolean; prevLimit: number; queueModifiedAt: Date; @@ -181,329 +81,18 @@ export class RequestQueue { hadMultipleClients?: boolean; }> | null = null; - // A set of all request IDs that are currently being handled, - // i.e. which were returned by fetchNextRequest() but not markRequestHandled() - inProgress = new Set(); - - // To track whether the queue gets stuck, and we need to reset it - // `lastActivity` tracks the time when we either added, processed or reclaimed a request, - // or when we add new request to in-progress cache - lastActivity = new Date(); - internalTimeoutMillis = 5 * 60e3; // defaults to 5 minutes, will be overridden by BasicCrawler - - // Contains a list of recently handled requests. It is used to avoid inconsistencies - // caused by delays in the underlying DynamoDB storage. - // Keys are request IDs, values are true. - recentlyHandled = new LruCache({ maxLength: RECENTLY_HANDLED_CACHE_SIZE }); - - // We can trust these numbers only in a case that queue is used by a single client. - // This information is returned by getHead() under the hadMultipleClients property. - assumedTotalCount = 0; - assumedHandledCount = 0; - - // Caching requests to avoid redundant addRequest() calls. - // Key is computed using getRequestId() and value is { id, isHandled }. - requestsCache = new LruCache< - { uniqueKey: string; wasAlreadyHandled: boolean; isHandled: boolean; id: string } - >({ maxLength: MAX_CACHED_REQUESTS }); + private lastActivity = new Date(); /** * @internal */ - constructor(options: RequestQueueOptions, readonly config = Configuration.getGlobalConfig()) { - this.id = options.id; - this.name = options.name; - this.client = options.client.requestQueue(this.id, { - clientKey: this.clientKey, - timeoutSecs: this.timeoutSecs, - }) as RequestQueueClient; - this.proxyConfiguration = options.proxyConfiguration; - } - - /** - * @ignore - */ - inProgressCount() { - return this.inProgress.size; - } - - /** - * Adds a request to the queue. - * - * If a request with the same `uniqueKey` property is already present in the queue, - * it will not be updated. You can find out whether this happened from the resulting - * {@apilink QueueOperationInfo} object. - * - * To add multiple requests to the queue by extracting links from a webpage, - * see the {@apilink enqueueLinks} helper function. - * - * @param requestLike {@apilink Request} object or vanilla object with request data. - * Note that the function sets the `uniqueKey` and `id` fields to the passed Request. - * @param [options] Request queue operation options. - */ - async addRequest(requestLike: Source, options: RequestQueueOperationOptions = {}): Promise { - ow(requestLike, ow.object); - ow(options, ow.object.exactShape({ - forefront: ow.optional.boolean, - })); - - this.lastActivity = new Date(); - const { forefront = false } = options; - - if ('requestsFromUrl' in requestLike) { - const requests = await this._fetchRequestsFromUrl(requestLike as InternalSource); - const processedRequests = await this._addFetchedRequests(requestLike as InternalSource, requests, options); - - return processedRequests[0]; - } - - ow(requestLike, ow.object.partialShape({ - url: ow.string, - id: ow.undefined, - })); - - const request = requestLike instanceof Request - ? requestLike - : new Request(requestLike); - - const cacheKey = getRequestId(request.uniqueKey); - const cachedInfo = this.requestsCache.get(cacheKey); - - if (cachedInfo) { - request.id = cachedInfo.id; - return { - wasAlreadyPresent: true, - // We may assume that if request is in local cache then also the information if the - // request was already handled is there because just one client should be using one queue. - wasAlreadyHandled: cachedInfo.isHandled, - requestId: cachedInfo.id, - uniqueKey: cachedInfo.uniqueKey, - }; - } - - const queueOperationInfo = await this.client.addRequest(request, { forefront }) as RequestQueueOperationInfo; - queueOperationInfo.uniqueKey = request.uniqueKey; - - const { requestId, wasAlreadyPresent } = queueOperationInfo; - this._cacheRequest(cacheKey, queueOperationInfo); - - if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandled.get(requestId)) { - this.assumedTotalCount++; - - // Performance optimization: add request straight to head if possible - this._maybeAddRequestToQueueHead(requestId, forefront); - } - - return queueOperationInfo; - } - - /** - * Adds requests to the queue in batches of 25. - * - * If a request that is passed in is already present due to its `uniqueKey` property being the same, - * it will not be updated. You can find out whether this happened by finding the request in the resulting - * {@apilink BatchAddRequestsResult} object. - * - * @param requestsLike {@apilink Request} objects or vanilla objects with request data. - * Note that the function sets the `uniqueKey` and `id` fields to the passed requests if missing. - * @param [options] Request queue operation options. - */ - async addRequests( - requestsLike: Source[], - options: RequestQueueOperationOptions = {}, - ): Promise { - ow(requestsLike, ow.array); - ow(options, ow.object.exactShape({ - forefront: ow.optional.boolean, - })); - - const { forefront = false } = options; - - const uniqueKeyToCacheKey = new Map(); - const getCachedRequestId = (uniqueKey: string) => { - const cached = uniqueKeyToCacheKey.get(uniqueKey); - - if (cached) return cached; - - const newCacheKey = getRequestId(uniqueKey); - uniqueKeyToCacheKey.set(uniqueKey, newCacheKey); - - return newCacheKey; - }; - - const results: BatchAddRequestsResult = { - processedRequests: [], - unprocessedRequests: [], - }; - - for (const requestLike of requestsLike) { - if ('requestsFromUrl' in requestLike) { - const requests = await this._fetchRequestsFromUrl(requestLike as InternalSource); - await this._addFetchedRequests(requestLike as InternalSource, requests, options); - } - } - - const requests = requestsLike - .filter((requestLike) => !('requestsFromUrl' in requestLike)) - .map((requestLike) => { - return requestLike instanceof Request ? requestLike : new Request(requestLike as RequestOptions); - }); - - const requestsToAdd = new Map(); - - for (const request of requests) { - const cacheKey = getCachedRequestId(request.uniqueKey); - const cachedInfo = this.requestsCache.get(cacheKey); - - if (cachedInfo) { - request.id = cachedInfo.id; - results.processedRequests.push({ - wasAlreadyPresent: true, - // We may assume that if request is in local cache then also the information if the - // request was already handled is there because just one client should be using one queue. - wasAlreadyHandled: cachedInfo.isHandled, - requestId: cachedInfo.id, - uniqueKey: cachedInfo.uniqueKey, - }); - } else if (!requestsToAdd.has(request.uniqueKey)) { - requestsToAdd.set(request.uniqueKey, request); - } - } - - // Early exit if all provided requests were already added - if (!requestsToAdd.size) { - return results; - } - - const apiResults = await this.client.batchAddRequests([...requestsToAdd.values()], { forefront }); - - // Report unprocessed requests - results.unprocessedRequests = apiResults.unprocessedRequests; - - // Add all new requests to the queue head - for (const newRequest of apiResults.processedRequests) { - // Add the new request to the processed list - results.processedRequests.push(newRequest); - - const cacheKey = getCachedRequestId(newRequest.uniqueKey); - - const { requestId, wasAlreadyPresent } = newRequest; - this._cacheRequest(cacheKey, newRequest); - - if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandled.get(requestId)) { - this.assumedTotalCount++; - - // Performance optimization: add request straight to head if possible - this._maybeAddRequestToQueueHead(requestId, forefront); - } - } - - return results; - } - - /** - * Adds requests to the queue in batches. By default, it will resolve after the initial batch is added, and continue - * adding the rest in background. You can configure the batch size via `batchSize` option and the sleep time in between - * the batches via `waitBetweenBatchesMillis`. If you want to wait for all batches to be added to the queue, you can use - * the `waitForAllRequestsToBeAdded` promise you get in the response object. - * - * @param requests The requests to add - * @param options Options for the request queue - */ - async addRequestsBatched(requests: (string | Source)[], options: AddRequestsBatchedOptions = {}): Promise { - ow(requests, ow.array.ofType(ow.any( - ow.string, - ow.object.partialShape({ url: ow.string, id: ow.undefined }), - ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }), - ))); - ow(options, ow.object.exactShape({ - forefront: ow.optional.boolean, - waitForAllRequestsToBeAdded: ow.optional.boolean, - batchSize: ow.optional.number, - waitBetweenBatchesMillis: ow.optional.number, - })); - - const { - batchSize = 1000, - waitBetweenBatchesMillis = 1000, - } = options; - const builtRequests: Request[] = []; - - for (const opts of requests) { - if (opts && typeof opts === 'object' && 'requestsFromUrl' in opts) { - await this.addRequest(opts, { forefront: options.forefront }); - } else { - builtRequests.push(new Request(typeof opts === 'string' ? { url: opts } : opts as RequestOptions)); - } - } - - const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => { - const resultsToReturn: ProcessedRequest[] = []; - const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront }); - resultsToReturn.push(...apiResult.processedRequests); - - if (apiResult.unprocessedRequests.length) { - await sleep(waitBetweenBatchesMillis); - - resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed( - providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)), - )); - } - - return resultsToReturn; - }; - - const initialChunk = builtRequests.splice(0, batchSize); - - // Add initial batch of `batchSize` to process them right away - const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk); - - // If we have no more requests to add, return early - if (!builtRequests.length) { - return { - addedRequests, - waitForAllRequestsToBeAdded: Promise.resolve([]), - }; - } - - // eslint-disable-next-line no-async-promise-executor - const promise = new Promise(async (resolve) => { - const chunks = chunk(builtRequests, batchSize); - const finalAddedRequests: ProcessedRequest[] = []; - - for (const requestChunk of chunks) { - finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk)); - - await sleep(waitBetweenBatchesMillis); - } - - resolve(finalAddedRequests); - }); - - // If the user wants to wait for all the requests to be added, we wait for the promise to resolve for them - if (options.waitForAllRequestsToBeAdded) { - addedRequests.push(...await promise); - } - - return { - addedRequests, - waitForAllRequestsToBeAdded: promise, - }; - } - - /** - * Gets the request from the queue specified by ID. - * - * @param id ID of the request. - * @returns Returns the request object, or `null` if it was not found. - */ - async getRequest(id: string): Promise | null> { - ow(id, ow.string); - - const requestOptions = await this.client.getRequest(id); - if (!requestOptions) return null; - - return new Request(requestOptions as unknown as RequestOptions); + constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) { + super({ + ...options, + logPrefix: 'RequestQueue', + recentlyHandledRequestsMaxSize: RECENTLY_HANDLED_CACHE_SIZE, + requestCacheMaxSize: MAX_CACHED_REQUESTS, + }, config); } /** @@ -523,20 +112,20 @@ export class RequestQueue { * @returns * Returns the request object or `null` if there are no more pending requests. */ - async fetchNextRequest(): Promise | null> { - await this._ensureHeadIsNonEmpty(); + override async fetchNextRequest(): Promise | null> { + await this.ensureHeadIsNonEmpty(); - const nextRequestId = this.queueHeadDict.removeFirst(); + const nextRequestId = this.queueHeadIds.removeFirst(); // We are likely done at this point. if (!nextRequestId) return null; // This should never happen, but... - if (this.inProgress.has(nextRequestId) || this.recentlyHandled.get(nextRequestId)) { + if (this.inProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) { this.log.warning('Queue head returned a request that is already in progress?!', { nextRequestId, inProgress: this.inProgress.has(nextRequestId), - recentlyHandled: !!this.recentlyHandled.get(nextRequestId), + recentlyHandled: !!this.recentlyHandledRequestsCache.get(nextRequestId), }); return null; } @@ -544,7 +133,7 @@ export class RequestQueue { this.inProgress.add(nextRequestId); this.lastActivity = new Date(); - let request; + let request: Request | null; try { request = await this.getRequest(nextRequestId); } catch (e) { @@ -574,146 +163,16 @@ export class RequestQueue { // will not put the request again to queueHeadDict. if (request.handledAt) { this.log.debug('Request fetched from the beginning of queue was already handled', { nextRequestId }); - this.recentlyHandled.add(nextRequestId, true); + this.recentlyHandledRequestsCache.add(nextRequestId, true); return null; } return request; } - /** - * Marks a request that was previously returned by the - * {@apilink RequestQueue.fetchNextRequest} - * function as handled after successful processing. - * Handled requests will never again be returned by the `fetchNextRequest` function. - */ - async markRequestHandled(request: Request): Promise { - this.lastActivity = new Date(); - ow(request, ow.object.partialShape({ - id: ow.string, - uniqueKey: ow.string, - handledAt: ow.optional.string, - })); - - if (!this.inProgress.has(request.id)) { - this.log.debug(`Cannot mark request ${request.id} as handled, because it is not in progress!`, { requestId: request.id }); - return null; - } - - const handledAt = request.handledAt ?? new Date().toISOString(); - const queueOperationInfo = await this.client.updateRequest({ ...request, handledAt }) as RequestQueueOperationInfo; - request.handledAt = handledAt; - queueOperationInfo.uniqueKey = request.uniqueKey; - - this.inProgress.delete(request.id); - this.recentlyHandled.add(request.id, true); - - if (!queueOperationInfo.wasAlreadyHandled) { - this.assumedHandledCount++; - } - - this._cacheRequest(getRequestId(request.uniqueKey), queueOperationInfo); - - return queueOperationInfo; - } - - /** - * Reclaims a failed request back to the queue, so that it can be returned for processing later again - * by another call to {@apilink RequestQueue.fetchNextRequest}. - * The request record in the queue is updated using the provided `request` parameter. - * For example, this lets you store the number of retries or error messages for the request. - */ - async reclaimRequest(request: Request, options: RequestQueueOperationOptions = {}): Promise { - this.lastActivity = new Date(); - ow(request, ow.object.partialShape({ - id: ow.string, - uniqueKey: ow.string, - })); - ow(options, ow.object.exactShape({ - forefront: ow.optional.boolean, - })); - - const { forefront = false } = options; - - if (!this.inProgress.has(request.id)) { - this.log.debug(`Cannot reclaim request ${request.id}, because it is not in progress!`, { requestId: request.id }); - return null; - } - - // TODO: If request hasn't been changed since the last getRequest(), - // we don't need to call updateRequest() and thus improve performance. - const queueOperationInfo = await this.client.updateRequest(request, { forefront }) as RequestQueueOperationInfo; - queueOperationInfo.uniqueKey = request.uniqueKey; - this._cacheRequest(getRequestId(request.uniqueKey), queueOperationInfo); - - // Wait a little to increase a chance that the next call to fetchNextRequest() will return the request with updated data. - // This is to compensate for the limitation of DynamoDB, where writes might not be immediately visible to subsequent reads. - setTimeout(() => { - if (!this.inProgress.has(request.id)) { - this.log.debug('The request is no longer marked as in progress in the queue?!', { requestId: request.id }); - return; - } - - this.inProgress.delete(request.id); - - // Performance optimization: add request straight to head if possible - this._maybeAddRequestToQueueHead(request.id, forefront); - }, STORAGE_CONSISTENCY_DELAY_MILLIS); - - return queueOperationInfo; - } - - /** - * Resolves to `true` if the next call to {@apilink RequestQueue.fetchNextRequest} - * would return `null`, otherwise it resolves to `false`. - * Note that even if the queue is empty, there might be some pending requests currently being processed. - * If you need to ensure that there is no activity in the queue, use {@apilink RequestQueue.isFinished}. - */ - async isEmpty(): Promise { + protected override async ensureHeadIsNonEmpty(): Promise { + // Alias for backwards compatibility await this._ensureHeadIsNonEmpty(); - return this.queueHeadDict.length() === 0; - } - - /** - * Resolves to `true` if all requests were already handled and there are no more left. - * Due to the nature of distributed storage used by the queue, - * the function might occasionally return a false negative, - * but it will never return a false positive. - */ - async isFinished(): Promise { - if ((Date.now() - +this.lastActivity) > this.internalTimeoutMillis) { - const message = `The request queue seems to be stuck for ${this.internalTimeoutMillis / 1e3}s, resetting internal state.`; - this.log.warning(message, { inProgress: [...this.inProgress] }); - this._reset(); - } - - if (this.queueHeadDict.length() > 0 || this.inProgressCount() > 0) return false; - - const isHeadConsistent = await this._ensureHeadIsNonEmpty(true); - return isHeadConsistent && this.queueHeadDict.length() === 0 && this.inProgressCount() === 0; - } - - private _reset() { - this.queueHeadDict.clear(); - this.queryQueueHeadPromise = null; - this.inProgress.clear(); - this.recentlyHandled.clear(); - this.assumedTotalCount = 0; - this.assumedHandledCount = 0; - this.requestsCache.clear(); - this.lastActivity = new Date(); - } - - /** - * Caches information about request to beware of unneeded addRequest() calls. - */ - protected _cacheRequest(cacheKey: string, queueOperationInfo: RequestQueueOperationInfo): void { - this.requestsCache.add(cacheKey, { - id: queueOperationInfo.requestId, - isHandled: queueOperationInfo.wasAlreadyHandled, - uniqueKey: queueOperationInfo.uniqueKey, - wasAlreadyHandled: queueOperationInfo.wasAlreadyHandled, - }); } /** @@ -732,8 +191,15 @@ export class RequestQueue { limit = Math.max(this.inProgressCount() * QUERY_HEAD_BUFFER, QUERY_HEAD_MIN_LENGTH), iteration = 0, ): Promise { + // If we are paused for migration, resolve immediately. + if (this.queuePausedForMigration) { + return true; + } + // If is nonempty resolve immediately. - if (this.queueHeadDict.length() > 0) return true; + if (this.queueHeadIds.length() > 0) { + return true; + } if (!this.queryQueueHeadPromise) { const queryStartedAt = new Date(); @@ -743,9 +209,9 @@ export class RequestQueue { .then(({ items, queueModifiedAt, hadMultipleClients }) => { items.forEach(({ id: requestId, uniqueKey }) => { // Queue head index might be behind the main table, so ensure we don't recycle requests - if (!requestId || !uniqueKey || this.inProgress.has(requestId) || this.recentlyHandled.get(requestId!)) return; + if (!requestId || !uniqueKey || this.inProgress.has(requestId) || this.recentlyHandledRequestsCache.get(requestId!)) return; - this.queueHeadDict.add(requestId, requestId, false); + this.queueHeadIds.add(requestId, requestId, false); this._cacheRequest(getRequestId(uniqueKey), { requestId, wasAlreadyHandled: false, @@ -781,7 +247,7 @@ export class RequestQueue { if (prevLimit >= REQUEST_QUEUE_HEAD_MAX_LIMIT) { this.log.warning(`Reached the maximum number of requests in progress: ${REQUEST_QUEUE_HEAD_MAX_LIMIT}.`); } - const shouldRepeatWithHigherLimit = this.queueHeadDict.length() === 0 + const shouldRepeatWithHigherLimit = this.queueHeadIds.length() === 0 && wasLimitReached && prevLimit < REQUEST_QUEUE_HEAD_MAX_LIMIT; @@ -814,204 +280,47 @@ export class RequestQueue { return this._ensureHeadIsNonEmpty(ensureConsistency, nextLimit, iteration + 1); } - /** - * Adds a request straight to the queueHeadDict, to improve performance. - */ - private _maybeAddRequestToQueueHead(requestId: string, forefront: boolean): void { - if (forefront) { - this.queueHeadDict.add(requestId, requestId, true); - } else if (this.assumedTotalCount < QUERY_HEAD_MIN_LENGTH) { - this.queueHeadDict.add(requestId, requestId, false); + // RequestQueue v1 behavior overrides below + override async isFinished(): Promise { + if ((Date.now() - +this.lastActivity) > this.internalTimeoutMillis) { + const message = `The request queue seems to be stuck for ${this.internalTimeoutMillis / 1e3}s, resetting internal state.`; + this.log.warning(message, { inProgress: [...this.inProgress] }); + this._reset(); } - } - /** - * Removes the queue either from the Apify Cloud storage or from the local database, - * depending on the mode of operation. - */ - async drop(): Promise { - await this.client.delete(); - const manager = StorageManager.getManager(RequestQueue, this.config); - manager.closeStorage(this); - } + if (this.queueHeadIds.length() > 0 || this.inProgressCount() > 0) return false; - /** - * Returns the number of handled requests. - * - * This function is just a convenient shortcut for: - * - * ```javascript - * const { handledRequestCount } = await queue.getInfo(); - * ``` - */ - async handledCount(): Promise { - // NOTE: We keep this function for compatibility with RequestList.handledCount() - const { handledRequestCount } = await this.getInfo() ?? {}; - return handledRequestCount ?? 0; + const isHeadConsistent = await this._ensureHeadIsNonEmpty(true); + return isHeadConsistent && this.queueHeadIds.length() === 0 && this.inProgressCount() === 0; } - /** - * Returns an object containing general information about the request queue. - * - * The function returns the same object as the Apify API Client's - * [getQueue](https://docs.apify.com/api/apify-client-js/latest#ApifyClient-requestQueues) - * function, which in turn calls the - * [Get request queue](https://apify.com/docs/api/v2#/reference/request-queues/queue/get-request-queue) - * API endpoint. - * - * **Example:** - * ``` - * { - * id: "WkzbQMuFYuamGv3YF", - * name: "my-queue", - * userId: "wRsJZtadYvn4mBZmm", - * createdAt: new Date("2015-12-12T07:34:14.202Z"), - * modifiedAt: new Date("2015-12-13T08:36:13.202Z"), - * accessedAt: new Date("2015-12-14T08:36:13.202Z"), - * totalRequestCount: 25, - * handledRequestCount: 5, - * pendingRequestCount: 20, - * } - * ``` - */ - async getInfo(): Promise { - return this.client.get(); + override async addRequest(...args: Parameters) { + this.lastActivity = new Date(); + return super.addRequest(...args); } - /** - * Fetches URLs from requestsFromUrl and returns them in format of list of requests - */ - protected async _fetchRequestsFromUrl(source: InternalSource): Promise { - const { requestsFromUrl, regex, ...sharedOpts } = source; - - // Download remote resource and parse URLs. - let urlsArr; - try { - urlsArr = await this._downloadListOfUrls({ url: requestsFromUrl, urlRegExp: regex, proxyUrl: await this.proxyConfiguration?.newUrl() }); - } catch (err) { - throw new Error(`Cannot fetch a request list from ${requestsFromUrl}: ${err}`); - } - - // Skip if resource contained no URLs. - if (!urlsArr.length) { - this.log.warning('list fetched, but it is empty.', { requestsFromUrl, regex }); - return []; - } - - return urlsArr.map((url) => ({ url, ...sharedOpts })); + override async addRequests(...args: Parameters) { + this.lastActivity = new Date(); + return super.addRequests(...args); } - /** - * Adds all fetched requests from a URL from a remote resource. - */ - protected async _addFetchedRequests(source: InternalSource, fetchedRequests: RequestOptions[], options: RequestQueueOperationOptions) { - const { requestsFromUrl, regex } = source; - const { addedRequests } = await this.addRequestsBatched(fetchedRequests, options); - - this.log.info('Fetched and loaded Requests from a remote resource.', { - requestsFromUrl, - regex, - fetchedCount: fetchedRequests.length, - importedCount: addedRequests.length, - duplicateCount: fetchedRequests.length - addedRequests.length, - sample: JSON.stringify(fetchedRequests.slice(0, 5)), - }); - - return addedRequests; + override async addRequestsBatched(...args: Parameters) { + this.lastActivity = new Date(); + return super.addRequestsBatched(...args); } - /** - * @internal wraps public utility for mocking purposes - */ - private async _downloadListOfUrls(options: { url: string; urlRegExp?: RegExp; proxyUrl?: string }): Promise { - return downloadListOfUrls(options); + override async markRequestHandled(...args: Parameters) { + this.lastActivity = new Date(); + return super.markRequestHandled(...args); } - /** - * Opens a request queue and returns a promise resolving to an instance - * of the {@apilink RequestQueue} class. - * - * {@apilink RequestQueue} represents a queue of URLs to crawl, which is stored either on local filesystem or in the cloud. - * The queue is used for deep crawling of websites, where you start with several URLs and then - * recursively follow links to other pages. The data structure supports both breadth-first - * and depth-first crawling orders. - * - * For more details and code examples, see the {@apilink RequestQueue} class. - * - * @param [queueIdOrName] - * ID or name of the request queue to be opened. If `null` or `undefined`, - * the function returns the default request queue associated with the crawler run. - * @param [options] Open Request Queue options. - */ - static async open(queueIdOrName?: string | null, options: StorageManagerOptions = {}): Promise { - ow(queueIdOrName, ow.optional.any(ow.string, ow.null)); - ow(options, ow.object.exactShape({ - config: ow.optional.object.instanceOf(Configuration), - storageClient: ow.optional.object, - proxyConfiguration: ow.optional.object, - })); - - options.config ??= Configuration.getGlobalConfig(); - options.storageClient ??= options.config.getStorageClient(); - - await purgeDefaultStorages({ onlyPurgeOnce: true, client: options.storageClient, config: options.config }); - - const manager = StorageManager.getManager(this, options.config); - const queue = await manager.openStorage(queueIdOrName, options.storageClient); - queue.proxyConfiguration = options.proxyConfiguration; - - return queue; + override async reclaimRequest(...args: Parameters) { + this.lastActivity = new Date(); + return super.reclaimRequest(...args); } -} - -export interface RequestQueueOptions { - id: string; - name?: string; - client: StorageClient; - - /** - * Used to pass the proxy configuration for the `requestsFromUrl` objects. - * Takes advantage of the internal address rotation and authentication process. - * If undefined, the `requestsFromUrl` requests will be made without proxy. - */ - proxyConfiguration?: ProxyConfiguration; -} - -export interface AddRequestsBatchedOptions extends RequestQueueOperationOptions { - /** - * Whether to wait for all the provided requests to be added, instead of waiting just for the initial batch of up to `batchSize`. - * @default false - */ - waitForAllRequestsToBeAdded?: boolean; - /** - * @default 1000 - */ - batchSize?: number; - - /** - * @default 1000 - */ - waitBetweenBatchesMillis?: number; -} - -export interface AddRequestsBatchedResult { - addedRequests: ProcessedRequest[]; - /** - * A promise which will resolve with the rest of the requests that were added to the queue. - * - * Alternatively, we can set {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`} to `true` - * in the {@apilink BasicCrawler.addRequests|`crawler.addRequests()`} options. - * - * **Example:** - * - * ```ts - * // Assuming `requests` is a list of requests. - * const result = await crawler.addRequests(requests); - * - * // If we want to wait for the rest of the requests to be added to the queue: - * await result.waitForAllRequestsToBeAdded; - * ``` - */ - waitForAllRequestsToBeAdded: Promise; + protected override _reset() { + super._reset(); + this.lastActivity = new Date(); + } } diff --git a/packages/core/src/storages/request_queue_v2.ts b/packages/core/src/storages/request_queue_v2.ts new file mode 100644 index 000000000000..9051d2e5195c --- /dev/null +++ b/packages/core/src/storages/request_queue_v2.ts @@ -0,0 +1,296 @@ +import type { Dictionary } from '@crawlee/types'; + +import type { RequestQueueOperationInfo, RequestProviderOptions } from './request_provider'; +import { RequestProvider } from './request_provider'; +import { + STORAGE_CONSISTENCY_DELAY_MILLIS, + getRequestId, +} from './utils'; +import { Configuration } from '../configuration'; +import { EventType } from '../events'; +import type { Request } from '../request'; + +// Double the limit of RequestQueue v1 (1_000_000) as we also store keyed by request.id, not just from uniqueKey +const MAX_CACHED_REQUESTS = 2_000_000; + +/** + * This number must be large enough so that processing of all these requests cannot be done in + * a time lower than expected maximum latency of DynamoDB, but low enough not to waste too much memory. + * @internal + */ +const RECENTLY_HANDLED_CACHE_SIZE = 1000; + +class RequestQueue extends RequestProvider { + private _listHeadAndLockPromise: Promise | null = null; + + constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) { + super({ + ...options, + logPrefix: 'RequestQueue2', + recentlyHandledRequestsMaxSize: RECENTLY_HANDLED_CACHE_SIZE, + requestCacheMaxSize: MAX_CACHED_REQUESTS, + }, config); + + const eventManager = config.getEventManager(); + + eventManager.on(EventType.MIGRATING, async () => { + await this._clearPossibleLocks(); + }); + + eventManager.on(EventType.ABORTING, async () => { + await this._clearPossibleLocks(); + }); + } + + /** + * Caches information about request to beware of unneeded addRequest() calls. + */ + protected override _cacheRequest(cacheKey: string, queueOperationInfo: RequestQueueOperationInfo): void { + super._cacheRequest(cacheKey, queueOperationInfo); + + this.requestCache.add(queueOperationInfo.requestId, { + id: queueOperationInfo.requestId, + isHandled: queueOperationInfo.wasAlreadyHandled, + uniqueKey: queueOperationInfo.uniqueKey, + hydrated: null, + lockExpiresAt: null, + }); + } + + /** + * Returns a next request in the queue to be processed, or `null` if there are no more pending requests. + * + * Once you successfully finish processing of the request, you need to call + * {@apilink RequestQueue.markRequestHandled} + * to mark the request as handled in the queue. If there was some error in processing the request, + * call {@apilink RequestQueue.reclaimRequest} instead, + * so that the queue will give the request to some other consumer in another call to the `fetchNextRequest` function. + * + * Note that the `null` return value doesn't mean the queue processing finished, + * it means there are currently no pending requests. + * To check whether all requests in queue were finished, + * use {@apilink RequestQueue.isFinished} instead. + * + * @returns + * Returns the request object or `null` if there are no more pending requests. + */ + override async fetchNextRequest(): Promise | null> { + await this.ensureHeadIsNonEmpty(); + + const nextRequestId = this.queueHeadIds.removeFirst(); + + // We are likely done at this point. + if (!nextRequestId) { + return null; + } + + // This should never happen, but... + if (this.inProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) { + this.log.warning('Queue head returned a request that is already in progress?!', { + nextRequestId, + inProgress: this.inProgress.has(nextRequestId), + recentlyHandled: !!this.recentlyHandledRequestsCache.get(nextRequestId), + }); + return null; + } + + this.inProgress.add(nextRequestId); + + let request: Request | null; + + try { + request = await this.getOrHydrateRequest(nextRequestId); + } catch (e) { + // On error, remove the request from in progress, otherwise it would be there forever + this.inProgress.delete(nextRequestId); + throw e; + } + + // NOTE: It can happen that the queue head index is inconsistent with the main queue table. This can occur in two situations: + + // 1) Queue head index is ahead of the main table and the request is not present in the main table yet (i.e. getRequest() returned null). + // In this case, keep the request marked as in progress for a short while, + // so that isFinished() doesn't return true and _ensureHeadIsNonEmpty() doesn't not load the request + // into the queueHeadDict straight again. After the interval expires, fetchNextRequest() + // will try to fetch this request again, until it eventually appears in the main table. + if (!request) { + this.log.debug('Cannot find a request from the beginning of queue or lost lock, will be retried later', { nextRequestId }); + + setTimeout(() => { + this.inProgress.delete(nextRequestId); + }, STORAGE_CONSISTENCY_DELAY_MILLIS); + + return null; + } + + // 2) Queue head index is behind the main table and the underlying request was already handled + // (by some other client, since we keep the track of handled requests in recentlyHandled dictionary). + // We just add the request to the recentlyHandled dictionary so that next call to _ensureHeadIsNonEmpty() + // will not put the request again to queueHeadDict. + if (request.handledAt) { + this.log.debug('Request fetched from the beginning of queue was already handled', { nextRequestId }); + this.recentlyHandledRequestsCache.add(nextRequestId, true); + return null; + } + + return request; + } + + protected async ensureHeadIsNonEmpty() { + // Stop fetching if we are paused for migration + if (this.queuePausedForMigration) { + return; + } + + // We want to fetch ahead of time to minimize dead time + if (this.queueHeadIds.length() > 1) { + return; + } + + this._listHeadAndLockPromise ??= this._listHeadAndLock().finally(() => { + this._listHeadAndLockPromise = null; + }); + + await this._listHeadAndLockPromise; + } + + private async _listHeadAndLock(): Promise { + const headData = await this.client.listAndLockHead({ limit: 25, lockSecs: this.requestLockSecs }); + + for (const { id, uniqueKey } of headData.items) { + // Queue head index might be behind the main table, so ensure we don't recycle requests + if (!id || !uniqueKey || this.inProgress.has(id) || this.recentlyHandledRequestsCache.get(id)) { + continue; + } + + this.queueHeadIds.add(id, id, false); + this._cacheRequest(getRequestId(uniqueKey), { + requestId: id, + uniqueKey, + wasAlreadyPresent: true, + wasAlreadyHandled: false, + }); + } + } + + private async getOrHydrateRequest(requestId: string): Promise | null> { + const cachedEntry = this.requestCache.get(requestId); + + if (!cachedEntry) { + // 2.1. Attempt to prolong the request lock to see if we still own the request + const prolongResult = await this._prolongRequestLock(requestId); + + if (!prolongResult) { + return null; + } + + // 2.1.1. If successful, hydrate the request and return it + const hydratedRequest = await this.getRequest(requestId); + + // Queue head index is ahead of the main table and the request is not present in the main table yet (i.e. getRequest() returned null). + if (!hydratedRequest) { + // Remove the lock from the request for now, so that it can be picked up later + // This may/may not succeed, but that's fine + try { + await this.client.deleteRequestLock(requestId); + } catch { + // Ignore + } + + return null; + } + + this.requestCache.add(requestId, { + id: requestId, + uniqueKey: hydratedRequest.uniqueKey, + hydrated: hydratedRequest, + isHandled: hydratedRequest.handledAt !== null, + lockExpiresAt: prolongResult.getTime(), + }); + + return hydratedRequest; + } + + // 1.1. If hydrated, prolong the lock more and return it + if (cachedEntry.hydrated) { + // 1.1.1. If the lock expired on the hydrated requests, try to prolong. If we fail, we lost the request (or it was handled already) + if (cachedEntry.lockExpiresAt && cachedEntry.lockExpiresAt < Date.now()) { + const prolonged = await this._prolongRequestLock(cachedEntry.id); + + if (!prolonged) { + return null; + } + + cachedEntry.lockExpiresAt = prolonged.getTime(); + } + + return cachedEntry.hydrated; + } + + // 1.2. If not hydrated, try to prolong the lock first (to ensure we keep it in our queue), hydrate and return it + const prolonged = await this._prolongRequestLock(cachedEntry.id); + + if (!prolonged) { + return null; + } + + // This might still return null if the queue head is inconsistent with the main queue table. + const hydratedRequest = await this.getRequest(cachedEntry.id); + + cachedEntry.hydrated = hydratedRequest; + + // Queue head index is ahead of the main table and the request is not present in the main table yet (i.e. getRequest() returned null). + if (!hydratedRequest) { + // Remove the lock from the request for now, so that it can be picked up later + // This may/may not succeed, but that's fine + try { + await this.client.deleteRequestLock(cachedEntry.id); + } catch { + // Ignore + } + + return null; + } + + return hydratedRequest; + } + + private async _prolongRequestLock(requestId: string): Promise { + try { + const res = await this.client.prolongRequestLock(requestId, { lockSecs: this.requestLockSecs }); + return res.lockExpiresAt; + } catch (err: any) { + // Most likely we do not own the lock anymore + this.log.warning(`Failed to prolong lock for cached request ${requestId}, either lost the lock or the request was already handled\n`, { + err, + }); + + return null; + } + } + + protected override _reset() { + super._reset(); + this._listHeadAndLockPromise = null; + } + + protected override _maybeAddRequestToQueueHead() { + // Do nothing for request queue v2, as we are only able to lock requests when listing the head + } + + protected async _clearPossibleLocks() { + this.queuePausedForMigration = true; + let requestId: string | null; + + // eslint-disable-next-line no-cond-assign + while ((requestId = this.queueHeadIds.removeFirst()) !== null) { + try { + await this.client.deleteRequestLock(requestId); + } catch { + // We don't have the lock, or the request was never locked. Either way it's fine + } + } + } +} + +export { RequestQueue as RequestQueueV2 }; diff --git a/packages/core/src/storages/storage_manager.ts b/packages/core/src/storages/storage_manager.ts index dd2cc533a260..be8d45da8c65 100644 --- a/packages/core/src/storages/storage_manager.ts +++ b/packages/core/src/storages/storage_manager.ts @@ -86,6 +86,7 @@ export class StorageManager { name: storageObject.name, client, }); + this._addStorageToCache(storage); } diff --git a/packages/core/src/storages/utils.ts b/packages/core/src/storages/utils.ts index f32710d7fe3a..72b08b53211c 100644 --- a/packages/core/src/storages/utils.ts +++ b/packages/core/src/storages/utils.ts @@ -1,3 +1,5 @@ +import crypto from 'node:crypto'; + import type { Dictionary, StorageClient } from '@crawlee/types'; import { KeyValueStore } from './key_value_store'; @@ -90,3 +92,51 @@ export async function useState( const kvStore = await KeyValueStore.open(options?.keyValueStoreName, { config: options?.config || Configuration.getGlobalConfig() }); return kvStore.getAutoSavedValue(name || 'CRAWLEE_GLOBAL_STATE', defaultValue); } + +/** + * Helper function that creates ID from uniqueKey for local emulation of request queue. + * It's also used for local cache of remote request queue. + * + * This function may not exactly match how requestId is created server side. + * So we never pass requestId created by this to server and use it only for local cache. + * + * @internal + */ +export function getRequestId(uniqueKey: string) { + const str = crypto + .createHash('sha256') + .update(uniqueKey) + .digest('base64') + .replace(/[+/=]/g, ''); + + return str.slice(0, 15); +} + +/** + * When requesting queue head we always fetch requestsInProgressCount * QUERY_HEAD_BUFFER number of requests. + * @internal + */ +export const QUERY_HEAD_MIN_LENGTH = 100; + +/** + * Indicates how long it usually takes for the underlying storage to propagate all writes + * to be available to subsequent reads. + * @internal + */ +export const STORAGE_CONSISTENCY_DELAY_MILLIS = 3000; + +/** @internal */ +export const QUERY_HEAD_BUFFER = 3; + +/** + * If queue was modified (request added/updated/deleted) before more than API_PROCESSED_REQUESTS_DELAY_MILLIS + * then we assume the get head operation to be consistent. + * @internal + */ +export const API_PROCESSED_REQUESTS_DELAY_MILLIS = 10_000; + +/** + * How many times we try to get queue head with queueModifiedAt older than API_PROCESSED_REQUESTS_DELAY_MILLIS. + * @internal + */ +export const MAX_QUERIES_FOR_CONSISTENCY = 6; diff --git a/packages/core/test/request-queue/request-queue-v2.test.ts b/packages/core/test/request-queue/request-queue-v2.test.ts new file mode 100644 index 000000000000..d8fe8de68ea6 --- /dev/null +++ b/packages/core/test/request-queue/request-queue-v2.test.ts @@ -0,0 +1,147 @@ +/* eslint-disable dot-notation */ +import { MemoryStorage } from '@crawlee/memory-storage'; +import type { ListAndLockHeadResult, ListAndLockOptions, ListOptions, ProlongRequestLockOptions, ProlongRequestLockResult, QueueHead } from '@crawlee/types'; +import { RequestQueueV2 } from 'crawlee'; + +const storage = new MemoryStorage({ persistStorage: false, writeMetadata: false }); + +async function makeQueue(name: string, numOfRequestsToAdd = 0) { + const queueData = await storage.requestQueues().getOrCreate(name); + + const queue = new RequestQueueV2({ id: queueData.id, client: storage }); + + if (numOfRequestsToAdd) { + await queue.addRequests( + Array.from( + { length: numOfRequestsToAdd }, + (_, i) => ({ url: 'https://example.com', uniqueKey: `${i}` }), + ), + ); + } + + return queue; +} + +describe('RequestQueueV2#isFinished should use listHead instead of listAndLock', () => { + let queue: RequestQueueV2; + let clientListHeadSpy: jest.SpyInstance, [options?: ListOptions | undefined], any>; + let listHeadCallCount = 0; + let clientListAndLockHeadSpy: jest.SpyInstance, [options: ListAndLockOptions], any>; + let listAndLockHeadCallCount = 0; + let lockResult: ListAndLockHeadResult; + + beforeAll(async () => { + queue = await makeQueue('is-finished', 2); + clientListHeadSpy = jest.spyOn(queue.client, 'listHead'); + clientListAndLockHeadSpy = jest.spyOn(queue.client, 'listAndLockHead'); + }); + + test('should return false if there are still requests in the queue', async () => { + expect(await queue.isFinished()).toBe(false); + expect(clientListHeadSpy).toHaveBeenCalledTimes(++listHeadCallCount); + }); + + test('should return false even if all requests are locked', async () => { + lockResult = await queue.client.listAndLockHead({ lockSecs: 60 }); + + expect(lockResult.items.length).toBe(2); + expect(clientListAndLockHeadSpy).toHaveBeenCalledTimes(++listAndLockHeadCallCount); + + expect(await queue.isFinished()).toBe(false); + expect(clientListHeadSpy).toHaveBeenCalledTimes(++listHeadCallCount); + expect(clientListAndLockHeadSpy).toHaveBeenCalledTimes(listAndLockHeadCallCount); + }); +}); + +describe('RequestQueueV2#isFinished should return true once locked requests are handled', () => { + let queue: RequestQueueV2; + let clientListHeadSpy: jest.SpyInstance, [options?: ListOptions | undefined], any>; + let listHeadCallCount = 0; + let clientListAndLockHeadSpy: jest.SpyInstance, [options: ListAndLockOptions], any>; + let lockResult: ListAndLockHeadResult; + + beforeAll(async () => { + queue = await makeQueue('is-finished-locked', 1); + clientListHeadSpy = jest.spyOn(queue.client, 'listHead'); + clientListAndLockHeadSpy = jest.spyOn(queue.client, 'listAndLockHead'); + + lockResult = await queue.client.listAndLockHead({ lockSecs: 60 }); + queue['inProgress'].add(lockResult.items[0].id); + }); + + test('should return true once locked requests are handled', async () => { + // Check that, when locked request isn't handled yet, it returns false + expect(await queue.isFinished()).toBe(false); + + // Mark the locked request as handled + await queue.markRequestHandled((await queue.getRequest(lockResult.items[0].id))!); + + // Check that, when locked request is handled, it returns true + expect(await queue.isFinished()).toBe(true); + expect(clientListHeadSpy).toHaveBeenCalledWith({ limit: 2 }); + expect(clientListHeadSpy).toHaveBeenCalledTimes(++listHeadCallCount); + // One time + expect(clientListAndLockHeadSpy).toHaveBeenCalledTimes(1); + }); +}); + +describe('RequestQueueV2#fetchNextRequest should use locking API', () => { + let queue: RequestQueueV2; + let clientListHeadSpy: jest.SpyInstance, [options?: ListOptions | undefined], any>; + let clientListAndLockHeadSpy: jest.SpyInstance, [options: ListAndLockOptions], any>; + let clientProlongLockSpy: jest.SpyInstance, [id: string, options: ProlongRequestLockOptions], any>; + let listAndLockHeadCallCount = 0; + + beforeAll(async () => { + queue = await makeQueue('fetch-next-request', 1); + clientListHeadSpy = jest.spyOn(queue.client, 'listHead'); + clientListAndLockHeadSpy = jest.spyOn(queue.client, 'listAndLockHead'); + clientProlongLockSpy = jest.spyOn(queue.client, 'prolongRequestLock'); + }); + + test('should return the first request', async () => { + expect(await queue.fetchNextRequest()).not.toBe(null); + + // Check that it uses the locking API + expect(clientListAndLockHeadSpy).toHaveBeenCalledTimes(++listAndLockHeadCallCount); + expect(clientListHeadSpy).not.toHaveBeenCalled(); + + // Check that the lock is prolonged too + expect(clientProlongLockSpy).toHaveBeenCalled(); + }); + + test('should return null when all requests are locked', async () => { + expect(await queue.fetchNextRequest()).toBe(null); + + expect(clientListAndLockHeadSpy).toHaveBeenCalledTimes(++listAndLockHeadCallCount); + expect(clientListHeadSpy).not.toHaveBeenCalled(); + }); +}); + +describe('RequestQueueV2#isEmpty should return true even if isFinished returns false due to locked requests', () => { + let queue: RequestQueueV2; + let clientListHeadSpy: jest.SpyInstance, [options?: ListOptions | undefined], any>; + let clientListAndLockHeadSpy: jest.SpyInstance, [options: ListAndLockOptions], any>; + let lockResult: ListAndLockHeadResult; + + beforeAll(async () => { + queue = await makeQueue('is-empty-vs-is-finished', 1); + clientListHeadSpy = jest.spyOn(queue.client, 'listHead'); + clientListAndLockHeadSpy = jest.spyOn(queue.client, 'listAndLockHead'); + + lockResult = await queue.client.listAndLockHead({ lockSecs: 60 }); + queue['inProgress'].add(lockResult.items[0].id); + }); + + test('should return true when isFinished returns false', async () => { + expect(await queue.isEmpty()).toBe(true); + expect(await queue.isFinished()).toBe(false); + }); + + test('should return true when isFinished returns true', async () => { + await queue.markRequestHandled((await queue.getRequest(lockResult.items[0].id))!); + + expect(await queue.isEmpty()).toBe(true); + expect(await queue.isFinished()).toBe(true); + }); +}); diff --git a/packages/jsdom-crawler/src/internals/jsdom-crawler.ts b/packages/jsdom-crawler/src/internals/jsdom-crawler.ts index 88e0e66531fe..e0f115238379 100644 --- a/packages/jsdom-crawler/src/internals/jsdom-crawler.ts +++ b/packages/jsdom-crawler/src/internals/jsdom-crawler.ts @@ -3,7 +3,6 @@ import type { IncomingMessage } from 'http'; import { addTimeoutToPromise } from '@apify/timeout'; import { concatStreamToBuffer } from '@apify/utilities'; import type { - Configuration, EnqueueLinksOptions, ErrorHandler, GetUserDataFromRequest, @@ -11,10 +10,17 @@ import type { InternalHttpCrawlingContext, InternalHttpHook, RequestHandler, - RequestQueue, RouterRoutes, + Configuration, + RequestProvider, +} from '@crawlee/http'; +import { + HttpCrawler, + enqueueLinks, + Router, + resolveBaseUrlForEnqueueLinksFiltering, + tryAbsoluteURL, } from '@crawlee/http'; -import { HttpCrawler, enqueueLinks, Router, resolveBaseUrlForEnqueueLinksFiltering, tryAbsoluteURL } from '@crawlee/http'; import type { Dictionary } from '@crawlee/types'; import * as cheerio from 'cheerio'; import type { DOMWindow } from 'jsdom'; @@ -287,7 +293,7 @@ export class JSDOMCrawler extends HttpCrawler { interface EnqueueLinksInternalOptions { options?: EnqueueLinksOptions; window: DOMWindow | null; - requestQueue: RequestQueue; + requestQueue: RequestProvider; originalRequestUrl: string; finalRequestUrl?: string; } diff --git a/packages/linkedom-crawler/src/internals/linkedom-crawler.ts b/packages/linkedom-crawler/src/internals/linkedom-crawler.ts index 9a6a31131595..9b170070afe7 100644 --- a/packages/linkedom-crawler/src/internals/linkedom-crawler.ts +++ b/packages/linkedom-crawler/src/internals/linkedom-crawler.ts @@ -8,9 +8,9 @@ import type { ErrorHandler, RequestHandler, EnqueueLinksOptions, - RequestQueue, GetUserDataFromRequest, RouterRoutes, + RequestProvider, } from '@crawlee/http'; import { HttpCrawler, enqueueLinks, Router, resolveBaseUrlForEnqueueLinksFiltering, tryAbsoluteURL } from '@crawlee/http'; import type { Dictionary } from '@crawlee/types'; @@ -169,7 +169,7 @@ export class LinkeDOMCrawler extends HttpCrawler { interface EnqueueLinksInternalOptions { options?: LinkeDOMCrawlerEnqueueLinksOptions; window: Window | null; - requestQueue: RequestQueue; + requestQueue: RequestProvider; originalRequestUrl: string; finalRequestUrl?: string; } diff --git a/packages/memory-storage/src/resource-clients/request-queue.ts b/packages/memory-storage/src/resource-clients/request-queue.ts index 2c1b11907cb2..2e2d0589a0b3 100644 --- a/packages/memory-storage/src/resource-clients/request-queue.ts +++ b/packages/memory-storage/src/resource-clients/request-queue.ts @@ -181,7 +181,7 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue async listAndLockHead(options: storage.ListAndLockOptions): Promise { const { limit, lockSecs } = s.object({ - limit: s.number.optional.default(100), + limit: s.number.lessThanOrEqual(25).optional.default(25), lockSecs: s.number, }).parse(options); @@ -240,11 +240,10 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue throw new Error(`Request with ID ${id} not found in queue ${queue.name ?? queue.id}`); } - const currentTimestamp = Date.now(); - const canProlong = (r: InternalRequest) => !r.orderNo || r.orderNo > currentTimestamp || r.orderNo < -currentTimestamp; + const canProlong = (r: InternalRequest) => !!r.orderNo; if (!canProlong(internalRequest)) { - throw new Error(`Request with ID ${id} is not locked in queue ${queue.name ?? queue.id}`); + throw new Error(`Request with ID ${id} has already been handled in queue ${queue.name ?? queue.id}`); } const unlockTimestamp = Math.abs(internalRequest.orderNo!) + lockSecs * 1000; diff --git a/packages/playwright-crawler/src/internals/enqueue-links/click-elements.ts b/packages/playwright-crawler/src/internals/enqueue-links/click-elements.ts index e04762e26b2a..72cc22297fda 100644 --- a/packages/playwright-crawler/src/internals/enqueue-links/click-elements.ts +++ b/packages/playwright-crawler/src/internals/enqueue-links/click-elements.ts @@ -7,8 +7,8 @@ import type { RegExpInput, RequestTransform, UrlPatternObject, - RequestQueue, RequestOptions, + RequestProvider, } from '@crawlee/browser'; import { constructGlobObjectsFromGlobs, @@ -40,7 +40,7 @@ export interface EnqueueLinksByClickingElementsOptions { /** * A request queue to which the URLs will be enqueued. */ - requestQueue: RequestQueue; + requestQueue: RequestProvider; /** * A CSS selector matching elements to be clicked on. Unlike in {@apilink enqueueLinks}, there is no default diff --git a/packages/puppeteer-crawler/src/internals/enqueue-links/click-elements.ts b/packages/puppeteer-crawler/src/internals/enqueue-links/click-elements.ts index 9d8c25b65968..79a676495256 100644 --- a/packages/puppeteer-crawler/src/internals/enqueue-links/click-elements.ts +++ b/packages/puppeteer-crawler/src/internals/enqueue-links/click-elements.ts @@ -7,8 +7,8 @@ import type { RegExpInput, RequestTransform, UrlPatternObject, - RequestQueue, RequestOptions, + RequestProvider, } from '@crawlee/browser'; import { constructGlobObjectsFromGlobs, @@ -46,7 +46,7 @@ export interface EnqueueLinksByClickingElementsOptions { /** * A request queue to which the URLs will be enqueued. */ - requestQueue: RequestQueue; + requestQueue: RequestProvider; /** * A CSS selector matching elements to be clicked on. Unlike in {@apilink enqueueLinks}, there is no default diff --git a/test/core/storages/request_queue.test.ts b/test/core/storages/request_queue.test.ts index 6783e4aea6dd..c7b5387247be 100644 --- a/test/core/storages/request_queue.test.ts +++ b/test/core/storages/request_queue.test.ts @@ -1,3 +1,5 @@ +/* eslint-disable dot-notation */ + import { QUERY_HEAD_MIN_LENGTH, API_PROCESSED_REQUESTS_DELAY_MILLIS, @@ -56,8 +58,8 @@ describe('RequestQueue remote', () => { expect(queueOperationInfo1).toMatchObject({ ...firstResolveValue, }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(mockAddRequest).toBeCalledTimes(1); expect(mockAddRequest).toBeCalledWith(requestA, { forefront: false }); @@ -68,8 +70,8 @@ describe('RequestQueue remote', () => { wasAlreadyHandled: false, requestId: 'a', }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); const requestB = new Request({ url: 'http://example.com/b' }); const secondResolveValue = { @@ -82,8 +84,8 @@ describe('RequestQueue remote', () => { await queue.addRequest(requestB, { forefront: true }); expect(mockAddRequest).toBeCalledTimes(2); expect(mockAddRequest).toHaveBeenLastCalledWith(requestB, { forefront: true }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(2); + + expect(queue['queueHeadIds'].length()).toBe(2); expect(queue.inProgressCount()).toBe(0); // Forefronted request was added to the queue. @@ -94,8 +96,8 @@ describe('RequestQueue remote', () => { expect(mockGetRequest).toBeCalledTimes(1); expect(mockGetRequest).toHaveBeenLastCalledWith('b'); expect(requestBFromQueue).toEqual({ ...requestB, id: 'b' }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(1); // Test validations @@ -130,12 +132,12 @@ describe('RequestQueue remote', () => { await queue.reclaimRequest(requestBFromQueue, { forefront: true }); expect(mockUpdateRequest).toBeCalledTimes(1); expect(mockUpdateRequest).toHaveBeenLastCalledWith(requestBFromQueue, { forefront: true }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(1); await sleep(STORAGE_CONSISTENCY_DELAY_MILLIS + 10); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(2); + + expect(queue['queueHeadIds'].length()).toBe(2); expect(queue.inProgressCount()).toBe(0); // Fetch again. @@ -145,8 +147,8 @@ describe('RequestQueue remote', () => { expect(mockGetRequest).toBeCalledTimes(3); expect(mockGetRequest).toHaveBeenLastCalledWith('b'); expect(requestBFromQueue2).toEqual(requestBFromQueue); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(1); // Mark handled. @@ -162,13 +164,13 @@ describe('RequestQueue remote', () => { await queue.markRequestHandled(requestBFromQueue); expect(mockUpdateRequest).toBeCalledTimes(2); expect(mockUpdateRequest).toHaveBeenLastCalledWith(requestBFromQueue); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(0); // Emulate there are no cached items in queue - // @ts-expect-error Accessing private property - queue.queueHeadDict.clear(); + + queue['queueHeadIds'].clear(); // Query queue head. const mockListHead = jest.spyOn(queue.client, 'listHead'); @@ -186,8 +188,8 @@ describe('RequestQueue remote', () => { expect(mockListHead).toBeCalledTimes(1); expect(mockListHead).toHaveBeenLastCalledWith({ limit: QUERY_HEAD_MIN_LENGTH }); expect(requestAFromQueue).toEqual({ ...requestA, id: 'a' }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(1); // Drop queue. @@ -226,8 +228,8 @@ describe('RequestQueue remote', () => { }); // Ensure the client method was actually called, and added - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(mockAddRequests).toBeCalledTimes(1); expect(mockAddRequests).toBeCalledWith([requestA], { forefront: false }); @@ -238,8 +240,8 @@ describe('RequestQueue remote', () => { ...firstRequestAdded, wasAlreadyPresent: true, }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); // Adding more requests, forefront const requestB = new Request({ url: 'http://example.com/b' }); @@ -277,8 +279,8 @@ describe('RequestQueue remote', () => { wasAlreadyHandled: false, wasAlreadyPresent: false, }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(3); + + expect(queue['queueHeadIds'].length()).toBe(3); expect(mockAddRequests).toHaveBeenCalled(); expect(mockAddRequests).toBeCalledWith([requestB, requestC], { forefront: true }); }); @@ -389,8 +391,8 @@ describe('RequestQueue remote', () => { await queue.addRequest(requestA, { forefront: true }); expect(addRequestMock).toBeCalledTimes(1); expect(addRequestMock).toHaveBeenLastCalledWith(requestA, { forefront: true }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); // Try to get requestA which is not available yet. const getRequestMock = jest.spyOn(queue.client, 'getRequest'); @@ -526,8 +528,7 @@ describe('RequestQueue remote', () => { await queue.addRequest(requestA, { forefront: true }); await queue.addRequest(requestB, { forefront: true }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(2); + expect(queue['queueHeadIds'].length()).toBe(2); expect(queue.inProgressCount()).toBe(0); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(0); @@ -556,8 +557,7 @@ describe('RequestQueue remote', () => { expect(getRequestMock).toBeCalledTimes(2); expect(getRequestMock).toHaveBeenLastCalledWith('a'); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(0); + expect(queue['queueHeadIds'].length()).toBe(0); expect(queue.inProgressCount()).toBe(2); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(0); @@ -579,14 +579,14 @@ describe('RequestQueue remote', () => { await queue.reclaimRequest(requestAWithId, { forefront: true }); expect(updateRequestMock).toBeCalledTimes(2); expect(updateRequestMock).toHaveBeenLastCalledWith(requestAWithId, { forefront: true }); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(0); + + expect(queue['queueHeadIds'].length()).toBe(0); expect(queue.inProgressCount()).toBe(1); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(1); await sleep(STORAGE_CONSISTENCY_DELAY_MILLIS + 10); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(1); + + expect(queue['queueHeadIds'].length()).toBe(1); expect(queue.inProgressCount()).toBe(0); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(1); @@ -605,8 +605,7 @@ describe('RequestQueue remote', () => { expect(getRequestMock).toBeCalledTimes(3); expect(getRequestMock).toHaveBeenLastCalledWith('a'); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(0); + expect(queue['queueHeadIds'].length()).toBe(0); expect(queue.inProgressCount()).toBe(1); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(1); @@ -622,8 +621,7 @@ describe('RequestQueue remote', () => { expect(updateRequestMock).toBeCalledTimes(3); expect(updateRequestMock).toHaveBeenLastCalledWith(requestAWithId); - // @ts-expect-error Accessing private property - expect(queue.queueHeadDict.length()).toBe(0); + expect(queue['queueHeadIds'].length()).toBe(0); expect(queue.inProgressCount()).toBe(0); expect(queue.assumedTotalCount).toBe(2); expect(queue.assumedHandledCount).toBe(2); @@ -855,7 +853,7 @@ describe('RequestQueue with requestsFromUrl', () => { }); describe('RequestQueue v2', () => { - const totalRequestsPerTest = 100; + const totalRequestsPerTest = 50; function calculateHistogram(requests: { uniqueKey: string }[]) : number[] { const histogram: number[] = []; diff --git a/test/e2e/cheerio-request-queue-v2/actor/.actor/actor.json b/test/e2e/cheerio-request-queue-v2/actor/.actor/actor.json new file mode 100644 index 000000000000..b3d9a2c67e4b --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/actor/.actor/actor.json @@ -0,0 +1,7 @@ +{ + "actorSpecification": 1, + "name": "test-cheerio-request-queue-v2", + "version": "0.0", + "buildTag": "latest", + "env": null +} diff --git a/test/e2e/cheerio-request-queue-v2/actor/.gitignore b/test/e2e/cheerio-request-queue-v2/actor/.gitignore new file mode 100644 index 000000000000..ced7cbfc582d --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/actor/.gitignore @@ -0,0 +1,7 @@ +.idea +.DS_Store +node_modules +package-lock.json +apify_storage +crawlee_storage +storage diff --git a/test/e2e/cheerio-request-queue-v2/actor/Dockerfile b/test/e2e/cheerio-request-queue-v2/actor/Dockerfile new file mode 100644 index 000000000000..4874669fcad0 --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/actor/Dockerfile @@ -0,0 +1,16 @@ +FROM apify/actor-node:18-beta + +COPY packages ./packages +COPY package*.json ./ + +RUN npm --quiet set progress=false \ + && npm install --only=prod --no-optional --no-audit \ + && npm update --no-audit \ + && echo "Installed NPM packages:" \ + && (npm list --only=prod --no-optional --all || true) \ + && echo "Node.js version:" \ + && node --version \ + && echo "NPM version:" \ + && npm --version + +COPY . ./ diff --git a/test/e2e/cheerio-request-queue-v2/actor/main.js b/test/e2e/cheerio-request-queue-v2/actor/main.js new file mode 100644 index 000000000000..ee59ab9c54d9 --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/actor/main.js @@ -0,0 +1,36 @@ +import { Actor, LogLevel, log as Logger } from 'apify'; +import { CheerioCrawler, Dataset } from '@crawlee/cheerio'; + +const mainOptions = { + exit: Actor.isAtHome(), + storage: process.env.STORAGE_IMPLEMENTATION === 'LOCAL' ? new (await import('@apify/storage-local')).ApifyStorageLocal() : undefined, +}; + +await Actor.main(async () => { + const crawler = new CheerioCrawler({ + async requestHandler({ $, enqueueLinks, request, log }) { + const { url } = request; + await enqueueLinks({ + globs: ['https://crawlee.dev/docs/**'], + }); + + const pageTitle = $('title').first().text(); + log.info(`REQUEST ID: ${request.id} URL: ${url} TITLE: ${pageTitle}`); + + await Dataset.pushData({ url, pageTitle }); + }, + experiments: { + requestLocking: true, + }, + log: Logger.child({ + prefix: 'CheerioCrawler', + // level: LogLevel.DEBUG, + }), + }); + + try { + await crawler.run(['https://crawlee.dev/docs/quick-start']); + } catch (e) { + console.error(e); + } +}, mainOptions); diff --git a/test/e2e/cheerio-request-queue-v2/actor/package.json b/test/e2e/cheerio-request-queue-v2/actor/package.json new file mode 100644 index 000000000000..59c5f37e61c4 --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/actor/package.json @@ -0,0 +1,28 @@ +{ + "name": "test-cheerio-request-queue-v2", + "version": "0.0.1", + "description": "Cheerio Crawler Test - Request Queue V2", + "dependencies": { + "apify": "next", + "@apify/storage-local": "^2.2.0", + "@crawlee/basic": "file:./packages/basic-crawler", + "@crawlee/browser-pool": "file:./packages/browser-pool", + "@crawlee/http": "file:./packages/http-crawler", + "@crawlee/cheerio": "file:./packages/cheerio-crawler", + "@crawlee/core": "file:./packages/core", + "@crawlee/memory-storage": "file:./packages/memory-storage", + "@crawlee/types": "file:./packages/types", + "@crawlee/utils": "file:./packages/utils" + }, + "overrides": { + "apify": { + "@crawlee/core": "file:./packages/core", + "@crawlee/utils": "file:./packages/utils" + } + }, + "scripts": { + "start": "node main.js" + }, + "type": "module", + "license": "ISC" +} diff --git a/test/e2e/cheerio-request-queue-v2/test.mjs b/test/e2e/cheerio-request-queue-v2/test.mjs new file mode 100644 index 000000000000..bf2015b4e16e --- /dev/null +++ b/test/e2e/cheerio-request-queue-v2/test.mjs @@ -0,0 +1,10 @@ +import { initialize, getActorTestDir, runActor, expect, validateDataset } from '../tools.mjs'; + +const testActorDirname = getActorTestDir(import.meta.url); +await initialize(testActorDirname); + +const { stats, datasetItems } = await runActor(testActorDirname); + +await expect(stats.requestsFinished > 40, 'All requests finished'); +await expect(datasetItems.length > 40, 'Number of dataset items'); +await expect(validateDataset(datasetItems, ['url', 'pageTitle']), 'Dataset items validation');