Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Request Queue v2 #1975

Merged
merged 28 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
22e4a2c
chore: some groundwork, need to be less zombie to continue
vladfrangu Jul 11, 2023
36de203
chore: more handling for queue hydration
vladfrangu Jul 13, 2023
bb6e2b9
chore: finish getOrHydrateRequest 🤞
vladfrangu Jul 13, 2023
1f379ac
chore: finish fetchNextRequest
vladfrangu Jul 13, 2023
44c4d4d
docs: explain the reason for hydratingPromise
vladfrangu Jul 13, 2023
7566d8c
chore: add listing and locking the head
vladfrangu Jul 21, 2023
61c7084
chore: add missing methods
vladfrangu Jul 21, 2023
1d22982
chore: add support for crawler experiments, and add e2e test
vladfrangu Aug 8, 2023
06d70f3
chore: lint
vladfrangu Aug 8, 2023
1c5f66b
chore: addRequestsBatched
vladfrangu Aug 8, 2023
1ceef6f
Apply suggestions from code review
vladfrangu Aug 11, 2023
f8de059
chore: implement base layer for request providers
vladfrangu Aug 19, 2023
62b75ee
chore: fix test
vladfrangu Aug 19, 2023
e9aa52f
chore: fix bugs, implement support for abort/migrating dropping locks
vladfrangu Aug 25, 2023
f1df5fb
chore: small inconsistencies fixed
vladfrangu Aug 25, 2023
fc654d3
chore: fix build
vladfrangu Aug 25, 2023
5a09526
chore: revert back to inProgress
vladfrangu Aug 25, 2023
078eaaa
chore: limit list and lock head to 25 too
vladfrangu Aug 31, 2023
0d82097
chore: more requested changs
vladfrangu Sep 11, 2023
b83876f
chore: lint
vladfrangu Sep 11, 2023
7d6efdc
chore: remove strict validation of unknown fields in Request
vladfrangu Sep 11, 2023
f77ab72
chore: use internal timeouts instead
vladfrangu Sep 11, 2023
e49d90e
chore: RQv2 tests
vladfrangu Sep 14, 2023
a8879f6
chore: bump storage local to test in e2e too
vladfrangu Sep 18, 2023
ffe5f51
chore: rebase woes
vladfrangu Sep 18, 2023
60e5653
Update packages/basic-crawler/src/internals/basic-crawler.ts
vladfrangu Sep 19, 2023
083337c
chore: requested changes
vladfrangu Sep 19, 2023
bf11fb5
fix: incorrectly overriding methods
vladfrangu Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 72 additions & 14 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
Request,
RequestList,
RequestOptions,
RequestProvider,
RouterHandler,
RouterRoutes,
Session,
Expand All @@ -23,24 +24,25 @@ import type {
StatisticState,
} from '@crawlee/core';
import {
mergeCookies,
Dataset,
AutoscaledPool,
Configuration,
enqueueLinks,
EventType,
KeyValueStore,
CriticalError,
NonRetryableError,
RequestQueue,
RequestQueueV2,
RequestState,
RetryRequestError,
Router,
SessionPool,
Statistics,
enqueueLinks,
mergeCookies,
purgeDefaultStorages,
validators,
RetryRequestError,
SessionError,
Dataset,
CriticalError,
} from '@crawlee/core';
import type { Dictionary, Awaitable, BatchAddRequestsResult, SetStatusMessageOptions } from '@crawlee/types';
import { ROTATE_PROXY_ERRORS } from '@crawlee/utils';
Expand Down Expand Up @@ -165,7 +167,7 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
* > Alternatively, `requests` parameter of {@apilink BasicCrawler.run|`crawler.run()`} could be used to enqueue the initial requests -
* it is a shortcut for running `crawler.addRequests()` before the `crawler.run()`.
*/
requestQueue?: RequestQueue;
requestQueue?: RequestProvider;

/**
* Timeout in which the function passed as {@apilink BasicCrawlerOptions.requestHandler|`requestHandler`} needs to finish, in seconds.
Expand Down Expand Up @@ -325,6 +327,26 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw

/** @internal */
log?: Log;

/**
* Enables experimental features of Crawlee, which can alter the behavior of the crawler.
* WARNING: these options are not guaranteed to be stable and may change or be removed at any time.
*/
experiments?: CrawlerExperiments;
}

/**
* A set of options that you can toggle to enable experimental features in Crawlee.
*
* NOTE: These options will not respect semantic versioning and may be removed or changed at any time. Use at your own risk.
* If you do use these and encounter issues, please report them to us.
*/
export interface CrawlerExperiments {
/**
* Enables the use of the new RequestQueue API, which allows multiple clients to use the same queue,
* by locking the requests they are processing for a period of time.
*/
requestLocking?: boolean;
}

/**
Expand Down Expand Up @@ -410,7 +432,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
* A reference to the underlying {@apilink RequestQueue} class that manages the crawler's {@apilink Request|requests}.
* Only available if used by the crawler.
*/
requestQueue?: RequestQueue;
requestQueue?: RequestProvider;

/**
* A reference to the underlying {@apilink SessionPool} class that manages the crawler's {@apilink Session|sessions}.
Expand Down Expand Up @@ -456,6 +478,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected retryOnBlocked: boolean;
private _closeEvents?: boolean;

private experiments: CrawlerExperiments;
private _experimentWarnings: Partial<Record<keyof CrawlerExperiments, boolean>> = {};

protected static optionsShape = {
requestList: ow.optional.object.validate(validators.requestList),
requestQueue: ow.optional.object.validate(validators.requestQueue),
Expand Down Expand Up @@ -493,6 +518,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

// internal
log: ow.optional.object,
experiments: ow.optional.object,
};

/**
Expand Down Expand Up @@ -522,6 +548,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

// internal
log = defaultLog.child({ prefix: this.constructor.name }),
experiments = {},

// Old and new request handler methods
handleRequestFunction,
Expand All @@ -546,6 +573,15 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.statusMessageCallback = statusMessageCallback as StatusMessageCallback;
this.events = config.getEventManager();
this.domainAccessedTime = new Map();
this.experiments = experiments;

if (requestQueue && requestQueue instanceof RequestQueueV2 && !experiments.requestLocking) {
throw new Error([
'You provided the new RequestQueue v2 class into your crawler without enabling the experiment!',
"If you're sure you want to test out the new experimental RequestQueue v2, please provide `experiments: { requestLocking: true }` "
drobnikj marked this conversation as resolved.
Show resolved Hide resolved
+ 'in your crawler options, and try again.',
].join('\n'));
}

this._handlePropertyNameChange({
newName: 'requestHandler',
Expand Down Expand Up @@ -596,9 +632,13 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
const tryEnv = (val?: string) => (val == null ? null : +val);
// allow at least 5min for internal timeouts
this.internalTimeoutMillis = tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? Math.max(this.requestHandlerTimeoutMillis * 2, 300e3);

// override the default internal timeout of request queue to respect `requestHandlerTimeoutMillis`
if (this.requestQueue) {
this.requestQueue.internalTimeoutMillis = this.internalTimeoutMillis;
// for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but
// with a minimum of a minute
this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60);
}

this.maxRequestRetries = maxRequestRetries;
Expand Down Expand Up @@ -769,7 +809,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
// ignored - as a failed requests is still handled.
if (this.running === false && this.requestQueue?.name === 'default' && purgeRequestQueue) {
await this.requestQueue.drop();
this.requestQueue = await RequestQueue.open(null, { config: this.config });
this.requestQueue = await this._getRequestQueue();
}

this.running = true;
Expand Down Expand Up @@ -851,7 +891,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
}

async getRequestQueue() {
this.requestQueue ??= await RequestQueue.open(null, { config: this.config });
this.requestQueue ??= await this._getRequestQueue();

return this.requestQueue!;
}
Expand Down Expand Up @@ -1006,7 +1046,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
* adding it back to the queue after the timeout passes. Returns `true` if the request
* should be ignored and will be reclaimed to the queue once ready.
*/
protected delayRequest(request: Request, source: RequestQueue | RequestList) {
protected delayRequest(request: Request, source: RequestList | RequestProvider) {
const domain = getDomain(request.url);

if (!domain || !request) {
Expand All @@ -1021,13 +1061,15 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
return false;
}

source.inProgress.delete(request.id!);
// eslint-disable-next-line dot-notation
source['inProgress'].delete(request.id!);
const delay = lastAccessTime + this.sameDomainDelayMillis - now;
this.log.debug(`Request ${request.url} (${request.id}) will be reclaimed after ${delay} milliseconds due to same domain delay`);
setTimeout(async () => {
this.log.debug(`Adding request ${request.url} (${request.id}) back to the queue`);
source?.inProgress.add(request.id!);
await source?.reclaimRequest(request);
// eslint-disable-next-line dot-notation
source['inProgress'].add(request.id!);
await source.reclaimRequest(request);
}, delay);

return true;
Expand Down Expand Up @@ -1230,7 +1272,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected async _requestFunctionErrorHandler(
error: Error,
crawlingContext: Context,
source: RequestList | RequestQueue,
source: RequestList | RequestProvider,
): Promise<void> {
const { request } = crawlingContext;
request.pushErrorMessage(error);
Expand Down Expand Up @@ -1444,6 +1486,22 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

return request.headers?.Cookie || request.headers?.cookie || '';
}

private _getRequestQueue() {
if (this.experiments.requestLocking) {
if (!this._experimentWarnings.requestLocking) {
this.log.warning([
'The RequestQueue v2 is an experimental feature, and may have issues when used in a production environment.',
'Please report any issues you encounter on GitHub: https://github.com/apify/crawlee',
].join('\n'));
this._experimentWarnings.requestLocking = true;
}

return RequestQueueV2.open(null, { config: this.config });
}

return RequestQueue.open(null, { config: this.config });
}
}

export interface CreateContextOptions {
Expand Down
4 changes: 2 additions & 2 deletions packages/browser-crawler/src/internals/browser-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
ProxyConfiguration,
ProxyInfo,
RequestHandler,
RequestQueue,
RequestProvider,
Session,
} from '@crawlee/basic';
import {
Expand Down Expand Up @@ -712,7 +712,7 @@ export abstract class BrowserCrawler<
interface EnqueueLinksInternalOptions {
options?: EnqueueLinksOptions;
page: CommonPage;
requestQueue: RequestQueue;
requestQueue: RequestProvider;
originalRequestUrl: string;
finalRequestUrl?: string;
}
Expand Down
14 changes: 10 additions & 4 deletions packages/cheerio-crawler/src/internals/cheerio-crawler.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import type { IncomingMessage } from 'http';

import type {
Configuration,
EnqueueLinksOptions,
ErrorHandler,
GetUserDataFromRequest,
HttpCrawlerOptions,
InternalHttpCrawlingContext,
InternalHttpHook,
RequestHandler,
RequestQueue,
RouterRoutes,
Configuration,
RequestProvider,
} from '@crawlee/http';
import {
HttpCrawler,
enqueueLinks,
Router,
resolveBaseUrlForEnqueueLinksFiltering,
tryAbsoluteURL,
} from '@crawlee/http';
import { HttpCrawler, enqueueLinks, Router, resolveBaseUrlForEnqueueLinksFiltering, tryAbsoluteURL } from '@crawlee/http';
import type { Dictionary } from '@crawlee/types';
import type { CheerioOptions } from 'cheerio';
import * as cheerio from 'cheerio';
Expand Down Expand Up @@ -203,7 +209,7 @@ export class CheerioCrawler extends HttpCrawler<CheerioCrawlingContext> {
interface EnqueueLinksInternalOptions {
options?: EnqueueLinksOptions;
$: cheerio.CheerioAPI | null;
requestQueue: RequestQueue;
requestQueue: RequestProvider;
originalRequestUrl: string;
finalRequestUrl?: string;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/enqueue_links/enqueue_links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
createRequests,
} from './shared';
import type { RequestOptions } from '../request';
import type { RequestQueue, RequestQueueOperationOptions } from '../storages/request_queue';
import type { RequestProvider, RequestQueueOperationOptions } from '../storages';

export interface EnqueueLinksOptions extends RequestQueueOperationOptions {
/** Limit the amount of actually enqueued URLs to this number. Useful for testing across the entire crawling scope. */
Expand All @@ -24,7 +24,7 @@ export interface EnqueueLinksOptions extends RequestQueueOperationOptions {
urls?: string[];

/** A request queue to which the URLs will be enqueued. */
requestQueue?: RequestQueue;
requestQueue?: RequestProvider;

/** A CSS selector matching links to be enqueued. */
selector?: string;
Expand Down
14 changes: 9 additions & 5 deletions packages/core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import util from 'node:util';
import { normalizeUrl } from '@apify/utilities';
import type { Dictionary } from '@crawlee/types';
import type { BasePredicate } from 'ow';
import ow, { ArgumentError } from 'ow';
import ow from 'ow';

import { log as defaultLog } from './log';
import type { AllowedHttpMethods } from './typedefs';
Expand Down Expand Up @@ -141,14 +141,15 @@ export class Request<UserData extends Dictionary = Dictionary> {
// properties and speeds up the validation cca 3-fold.
// See https://github.com/sindresorhus/ow/issues/193
keys(options).forEach((prop) => {
// skip url, because it is validated above
if (prop === 'url') {
return;
}

const predicate = requestOptionalPredicates[prop as keyof typeof requestOptionalPredicates];
const value = options[prop];
if (predicate) {
ow(value, `RequestOptions.${prop}`, predicate as BasePredicate);
// 'url' is checked above because it's not optional
} else if (prop !== 'url') {
const msg = `Did not expect property \`${prop}\` to exist, got \`${value}\` in object \`RequestOptions\``;
throw new ArgumentError(msg, this.constructor);
}
});

Expand Down Expand Up @@ -479,6 +480,9 @@ export interface RequestOptions<UserData extends Dictionary = Dictionary> {
/** @internal */
handledAt?: string;

/** @internal */
lockExpiresAt?: Date;

}

export interface PushErrorMessageOptions {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export * from './dataset';
export * from './key_value_store';
export * from './request_list';
export * from './request_provider';
export * from './request_queue';
export * from './request_queue_v2';
export * from './storage_manager';
export * from './utils';
Loading