Skip to content

Commit

Permalink
Adds generics to input/output for proper type checks
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasdondorf committed Feb 28, 2019
1 parent 385b8ed commit b9a883e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 52 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Create a cluster of puppeteer workers. This library spawns a pool of Chromium in
- [Usage](#usage)
- [Examples](#examples)
- [Concurrency implementations](#concurrency-implementations)
- [Typings for input/output (via TypeScript Generics)](#typings-for-inputoutput-via-typescript-generics)
- [Debugging](#debugging)
- [API](#api)

Expand Down Expand Up @@ -76,6 +77,7 @@ const { Cluster } = require('puppeteer-cluster');
* [Queuing functions (complex)](examples/function-queuing-complex.js)
* [Error handling](examples/error-handling.js)
* [Using a different puppeteer library (like puppeteer-core)](examples/different-puppeteer-library.js)
* [Provide types for input/output with TypeScript generics](examples/typings.ts)

## Concurrency implementations

Expand All @@ -88,6 +90,26 @@ There are different concurrency models, which define how isolated each job is ru
| `CONCURRENCY_BROWSER` | One browser (using an incognito page) per URL. If one browser instance crashes for any reason, this will not affect other jobs. | No shared data. |
| Custom concurrency (**experimental**) | You can create your own concurrency implementation. Copy one of the files of the `concurrency/built-in` directory and implement `ConcurrencyImplementation`. Then provide the class to the option `concurrency`. This part of the library is currently experimental and might break in the future, even in a minor version upgrade while the version has not reached 1.0. | Depends on your implementation |

## Typings for input/output (via TypeScript Generics)

To allow proper type checks with TypeScript you can provide generics. In case no types are provided, `any` is assumed for input and output. See the following minimal example or check out the more complex [typings example](examples/typings.ts) for more information.

```ts
const cluster: Cluster<string, number> = await Cluster.launch(/* ... */);

await cluster.task(async ({ page, data }) => {
// TypeScript knows that data is a string and expects this function to return a number
return 123;
});

// Typescript expects a string as argument ...
cluster.queue('http://...');

// ... and will return a number when execute is called.
const result = await cluster.execute('https://www.google.com');
```


## Debugging

Try to checkout the [puppeteer debugging tips](https://github.com/GoogleChrome/puppeteer#debugging-tips) first. Your problem might not be related to `puppeteer-cluster`, but `puppteer` itself. Additionally, you can enable verbose logging to see which data is consumed by which worker and some other cluster information. Set the DEBUG environment variable to `puppeteer-cluster:*`. See an example below or checkout the [debug docs](https://github.com/visionmedia/debug#windows-command-prompt-notes) for more information.
Expand Down
66 changes: 66 additions & 0 deletions examples/typings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Cluster } from '../dist';

(async () => {
/******************************* STRING -> NUMBER **********************/

// Queued data is a string and task functions return a number
const cluster1: Cluster<string, number> = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_PAGE,
maxConcurrency: 2,
});

await cluster1.task(async ({ page, data }) => {
// ...
const value = Math.random(); // somehow generate data
return value;
});

// TypeScript now knows that data1 is a number
const data1 = await cluster1.execute('https://www.google.com');

await cluster1.idle();
await cluster1.close();

/******************************* STRING DATA **********************/

// Queued data is a string
const cluster2: Cluster<string> = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_PAGE,
maxConcurrency: 2,
});

await cluster2.task(async ({ page, data: url }) => {
await page.goto(url);
});

await cluster2.queue('https://www.google.com');
await cluster2.queue('https://www.wikipedia.org');
await cluster2.queue('https://github.com/');

await cluster2.idle();
await cluster2.close();

/******************************* COMPLEX DATA **********************/

interface SomeData {
url: string;
someValue: number;
}

const cluster3: Cluster<SomeData> = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_PAGE,
maxConcurrency: 2,
});

await cluster3.task(async ({ page, data }) => {
await page.goto(data.url);
console.log(`some value: ${data.someValue}`);
});

await cluster3.queue({ url: 'https://www.google.com', someValue: 1 });
await cluster3.queue({ url: 'https://www.wikipedia.org', someValue: 2 });
await cluster3.queue({ url: 'https://github.com/', someValue: 3 });

await cluster3.idle();
await cluster3.close();
})();
89 changes: 50 additions & 39 deletions src/Cluster.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

import Job, { JobData, ExecuteResolve, ExecuteReject, ExecuteCallbacks } from './Job';
import Job, { ExecuteResolve, ExecuteReject, ExecuteCallbacks } from './Job';
import Display from './Display';
import * as util from './util';
import Worker, { WorkResult } from './Worker';
Expand Down Expand Up @@ -51,37 +51,39 @@ const DEFAULT_OPTIONS: ClusterOptions = {
puppeteer: undefined,
};

interface TaskFunctionArguments {
interface TaskFunctionArguments<JobData> {
page: Page;
data: JobData;
worker: {
id: number;
};
}

export type TaskFunction = (arg: TaskFunctionArguments) => Promise<any>;
export type TaskFunction<JobData, ReturnData> = (
arg: TaskFunctionArguments<JobData>,
) => Promise<ReturnData>;

const MONITORING_DISPLAY_INTERVAL = 500;
const CHECK_FOR_WORK_INTERVAL = 100;
const WORK_CALL_INTERVAL_LIMIT = 10;

export default class Cluster extends EventEmitter {
export default class Cluster<JobData = any, ReturnData = any> extends EventEmitter {

static CONCURRENCY_PAGE = 1; // shares cookies, etc.
static CONCURRENCY_CONTEXT = 2; // no cookie sharing (uses contexts)
static CONCURRENCY_BROWSER = 3; // no cookie sharing and individual processes (uses contexts)

private options: ClusterOptions;
private workers: Worker[] = [];
private workersAvail: Worker[] = [];
private workersBusy: Worker[] = [];
private workers: Worker<JobData, ReturnData>[] = [];
private workersAvail: Worker<JobData, ReturnData>[] = [];
private workersBusy: Worker<JobData, ReturnData>[] = [];
private workersStarting = 0;

private allTargetCount = 0;
private jobQueue: Queue<Job> = new Queue<Job>();
private jobQueue: Queue<Job<JobData, ReturnData>> = new Queue<Job<JobData, ReturnData>>();
private errorCount = 0;

private taskFunction: TaskFunction | null = null;
private taskFunction: TaskFunction<JobData, ReturnData> | null = null;
private idleResolvers: (() => void)[] = [];
private waitForOneResolvers: ((data:JobData) => void)[] = [];
private browser: ConcurrencyImplementation | null = null;
Expand Down Expand Up @@ -175,7 +177,7 @@ export default class Cluster extends EventEmitter {
throw new Error(`Unable to launch browser for worker, error message: ${err.message}`);
}

const worker = new Worker({
const worker = new Worker<JobData, ReturnData>({
cluster: this,
args: [''], // this.options.args,
browser: workerBrowserInstance,
Expand All @@ -192,7 +194,7 @@ export default class Cluster extends EventEmitter {
}
}

public async task(taskFunction: TaskFunction) {
public async task(taskFunction: TaskFunction<JobData, ReturnData>) {
this.taskFunction = taskFunction;
}

Expand Down Expand Up @@ -278,7 +280,7 @@ export default class Cluster extends EventEmitter {
this.lastDomainAccesses.set(domain, Date.now());
}

const worker = <Worker>this.workersAvail.shift();
const worker = this.workersAvail.shift() as Worker<JobData, ReturnData>;
this.workersBusy.push(worker);

if (this.workersAvail.length !== 0 || this.allowedToStartWorker()) {
Expand All @@ -296,7 +298,7 @@ export default class Cluster extends EventEmitter {
}

const result: WorkResult = await worker.handle(
(jobFunction as TaskFunction),
(jobFunction as TaskFunction<JobData, ReturnData>),
job,
this.options.timeout,
);
Expand Down Expand Up @@ -324,7 +326,9 @@ export default class Cluster extends EventEmitter {
job.executeCallbacks.resolve(result.data);
}

this.waitForOneResolvers.forEach(resolve => resolve(job.data));
this.waitForOneResolvers.forEach(
resolve => resolve(job.data as JobData),
);
this.waitForOneResolvers = [];

// add worker to available workers again
Expand All @@ -350,48 +354,55 @@ export default class Cluster extends EventEmitter {
);
}

// Type Guard for TypeScript
private isTaskFunction(
data: JobData | TaskFunction<JobData, ReturnData>,
) : data is TaskFunction<JobData, ReturnData> {
return (typeof data === 'function');
}

private queueJob(
data: JobData,
taskFunction?: TaskFunction,
callbacks?: ExecuteCallbacks,
): void;
private queueJob(
taskFunction: TaskFunction,
_: undefined,
callbacks?: ExecuteCallbacks,
): void;
private queueJob(
data: JobData | TaskFunction,
taskFunction?: TaskFunction,
data: JobData | TaskFunction<JobData, ReturnData>,
taskFunction?: TaskFunction<JobData, ReturnData>,
callbacks?: ExecuteCallbacks,
): void {
let job;
if (typeof data === 'function') {
job = new Job(undefined, data, callbacks);
if (this.isTaskFunction(data)) {
job = new Job<JobData, ReturnData>(undefined, data, callbacks);
} else {
job = new Job(data, taskFunction, callbacks);
job = new Job<JobData, ReturnData>(data, taskFunction, callbacks);
}
this.allTargetCount += 1;
this.jobQueue.push(job);
this.work();
}

public async queue(data: JobData, taskFunction?: TaskFunction): Promise<void>;
public async queue(taskFunction: TaskFunction): Promise<void>;
public async queue(
data: JobData | TaskFunction,
taskFunction?: TaskFunction,
data: JobData,
taskFunction?: TaskFunction<JobData, ReturnData>,
): Promise<void>;
public async queue(
taskFunction: TaskFunction<JobData, ReturnData>,
): Promise<void>;
public async queue(
data: JobData | TaskFunction<JobData, ReturnData>,
taskFunction?: TaskFunction<JobData, ReturnData>,
): Promise<void> {
this.queueJob(data, taskFunction);
}

public execute(data: JobData, taskFunction?: TaskFunction): Promise<void>;
public execute(taskFunction: TaskFunction): Promise<void>;
public execute(
data: JobData | TaskFunction,
taskFunction?: TaskFunction,
): Promise<void> {
return new Promise<any>((resolve: ExecuteResolve, reject: ExecuteReject) => {
data: JobData,
taskFunction?: TaskFunction<JobData, ReturnData>,
): Promise<ReturnData>;
public execute(
taskFunction: TaskFunction<JobData, ReturnData>,
): Promise<ReturnData>;
public execute(
data: JobData | TaskFunction<JobData, ReturnData>,
taskFunction?: TaskFunction<JobData, ReturnData>,
): Promise<ReturnData> {
return new Promise<ReturnData>((resolve: ExecuteResolve, reject: ExecuteReject) => {
const callbacks = { resolve, reject };
this.queueJob(data, taskFunction, callbacks);
});
Expand Down
15 changes: 7 additions & 8 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@
import { URL } from 'url';
import { TaskFunction } from './Cluster';

export type JobData = any;
export type ExecuteResolve = (value?: any) => void;
export type ExecuteReject = (reason?: any) => void;
export interface ExecuteCallbacks {
resolve: (value?: any) => void;
reject: ExecuteReject;
}

export default class Job {
export default class Job<JobData, ReturnData> {

public data: JobData;
public taskFunction: TaskFunction | undefined;
public data?: JobData;
public taskFunction: TaskFunction<JobData, ReturnData> | undefined;
public executeCallbacks: ExecuteCallbacks | undefined;

private lastError: Error | null = null;
public tries: number = 0;

public constructor(
data: JobData,
taskFunction?: TaskFunction,
data?: JobData,
taskFunction?: TaskFunction<JobData, ReturnData>,
executeCallbacks?: ExecuteCallbacks,
) {
this.data = data;
Expand All @@ -36,8 +35,8 @@ export default class Job {
if (typeof this.data === 'string') {
return this.data;
}
if (this.data !== undefined && typeof this.data.url === 'string') {
return this.data.url;
if (this.data !== undefined && typeof (this.data as any).url === 'string') {
return (this.data as any).url;
}
return undefined;
}
Expand Down
13 changes: 8 additions & 5 deletions src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ export interface WorkData {

export type WorkResult = WorkError | WorkData;

export default class Worker implements WorkerOptions {
export default class Worker<JobData, ReturnData> implements WorkerOptions {

cluster: Cluster;
args: string[];
id: number;
browser: WorkerInstance;

activeTarget: Job | null = null;
activeTarget: Job<JobData, ReturnData> | null = null;

public constructor({ cluster, args, id, browser }: WorkerOptions) {
this.cluster = cluster;
Expand All @@ -52,8 +52,8 @@ export default class Worker implements WorkerOptions {
}

public async handle(
task: TaskFunction,
job: Job,
task: TaskFunction<JobData, ReturnData>,
job: Job<JobData, ReturnData>,
timeout: number,
): Promise<WorkResult> {
this.activeTarget = job;
Expand Down Expand Up @@ -95,7 +95,10 @@ export default class Worker implements WorkerOptions {
timeout,
task({
page,
data: job.data,
// data might be undefined if queue is only called with a function
// we ignore that case, as the user should use Cluster<undefined> in that case
// to get correct typings
data: job.data as JobData,
worker: {
id: this.id,
},
Expand Down

0 comments on commit b9a883e

Please sign in to comment.