Skip to content

Commit

Permalink
feat: add basic support for setStatusMessage (#1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
barjin authored Mar 2, 2023
1 parent f1753d1 commit c318980
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
47 changes: 47 additions & 0 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
*/
sessionPoolOptions?: SessionPoolOptions;

/**
* Defines the length of the interval for calling the `setStatusMessage` in seconds.
*/
loggingInterval?: number;

/** @internal */
log?: Log;
}
Expand Down Expand Up @@ -372,6 +377,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected internalTimeoutMillis: number;
protected maxRequestRetries: number;
protected handledRequestsCount: number;
protected loggingInterval: number;
protected sessionPoolOptions: SessionPoolOptions;
protected useSessionPool: boolean;
protected crawlingContexts = new Map<string, Context>();
Expand Down Expand Up @@ -400,6 +406,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
autoscaledPoolOptions: ow.optional.object,
sessionPoolOptions: ow.optional.object,
useSessionPool: ow.optional.boolean,
loggingInterval: ow.optional.number,

// AutoscaledPool shorthands
minConcurrency: ow.optional.number,
Expand Down Expand Up @@ -446,11 +453,14 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

handleFailedRequestFunction,
failedRequestHandler,

loggingInterval = 60,
} = options;

this.requestList = requestList;
this.requestQueue = requestQueue;
this.log = log;
this.loggingInterval = loggingInterval;
this.events = config.getEventManager();

this._handlePropertyNameChange({
Expand Down Expand Up @@ -577,6 +587,38 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.autoscaledPoolOptions = { ...autoscaledPoolOptions, ...basicCrawlerAutoscaledPoolConfiguration };
}

private getPeriodicLogger() {
const client = this.config.getStorageClient();
let previousState = { ...this.stats.state };

const getOperationMode = () => {
const { requestsFailed } = this.stats.state;
const { requestsFailed: previousRequestsFailed } = previousState;

previousState = { ...this.stats.state };

if (requestsFailed - previousRequestsFailed > 0) {
return 'ERROR';
}

return 'REGULAR';
};

const log = async () => {
const operationMode = getOperationMode();
if (operationMode === 'ERROR') {
// eslint-disable-next-line max-len
await client.setStatusMessage?.(`Experiencing problems, ${this.stats.state.requestsFailed - previousState.requestsFailed || this.stats.state.requestsFailed} errors in the past ${this.loggingInterval} seconds.`);
} else {
// eslint-disable-next-line max-len
await client.setStatusMessage?.(`Crawled ${this.stats.state.requestsFinished}/${this.requestQueue?.assumedTotalCount || this.requestList?.length()} pages, ${this.stats.state.requestsFailed} errors.`);
}
};

const interval = setInterval(log, this.loggingInterval * 1e3);
return { log, stop: () => clearInterval(interval) };
}

/**
* Runs the crawler. Returns a promise that gets resolved once all the requests are processed.
* We can use the `requests` parameter to enqueue the initial requests - it is a shortcut for
Expand All @@ -594,6 +636,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

await this._init();
await this.stats.startCapturing();
const periodicLogger = this.getPeriodicLogger();
await periodicLogger.log();

const sigintHandler = async () => {
this.log.warning('Pausing... Press CTRL+C again to force exit. To resume, do: CRAWLEE_PURGE_ON_START=0 yarnstart');
Expand Down Expand Up @@ -651,6 +695,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
finished = true;
}

periodicLogger.stop();
// eslint-disable-next-line max-len
await client.setStatusMessage?.(`Finished! Total ${this.stats.state.requestsFinished + this.stats.state.requestsFailed} requests: ${this.stats.state.requestsFinished} succeeded, ${this.stats.state.requestsFailed} failed.`, { isStatusMessageTerminal: true });
return stats;
}

Expand Down
12 changes: 12 additions & 0 deletions packages/memory-storage/src/memory-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ensureDirSync, pathExistsSync } from 'fs-extra';
import { renameSync } from 'node:fs';
import { rm, rename, readdir } from 'node:fs/promises';
import { resolve } from 'node:path';
import log from '@apify/log';
import { DatasetClient } from './resource-clients/dataset';
import { DatasetCollectionClient } from './resource-clients/dataset-collection';
import { KeyValueStoreClient } from './resource-clients/key-value-store';
Expand Down Expand Up @@ -120,6 +121,17 @@ export class MemoryStorage implements storage.StorageClient {
return new RequestQueueClient({ id, baseStorageDirectory: this.requestQueuesDirectory, client: this, ...options });
}

setStatusMessage(message: string, options: storage.SetStatusMessageOptions = {}): Promise<void> {
s.string.parse(message);
s.object({
isStatusMessageTerminal: s.boolean.optional,
}).parse(options);

log.info(`Setting${options.isStatusMessageTerminal ? ' terminal' : ''} status message: ${message}`);

return Promise.resolve();
}

/**
* Cleans up the default storage directories before the run starts:
* - local directory containing the default dataset;
Expand Down
5 changes: 5 additions & 0 deletions packages/types/src/storages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ export interface RequestQueueOptions {
timeoutSecs?: number;
}

export interface SetStatusMessageOptions {
isStatusMessageTerminal?: boolean;
}

/**
* Represents a storage capable of working with datasets, KV stores and request queues.
*/
Expand All @@ -293,5 +297,6 @@ export interface StorageClient {
requestQueue(id: string, options?: RequestQueueOptions): RequestQueueClient;
purge?(): Promise<void>;
teardown?(): Promise<void>;
setStatusMessage?(message: string, options?: SetStatusMessageOptions): Promise<void>;
stats?: { rateLimitErrors: number[] };
}

0 comments on commit c318980

Please sign in to comment.