Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support async handling and add CronJob status tracking #894

Merged
merged 17 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ day of week 0-7 (0 or 7 is Sunday, or use names)

- `unrefTimeout`: [OPTIONAL] - Useful for controlling event loop behavior. More details [here](https://nodejs.org/api/timers.html#timers_timeout_unref).

- `waitForCompletion`: [OPTIONAL] - If `true`, the job will wait for the `onTick` function to complete before stopping. Default is `false`.
intcreator marked this conversation as resolved.
Show resolved Hide resolved

#### Methods

- `from` (static): Create a new CronJob object providing arguments as an object. See argument names and descriptions above.
Expand All @@ -214,6 +216,23 @@ day of week 0-7 (0 or 7 is Sunday, or use names)

- `addCallback`: Permits addition of `onTick` callbacks.

#### Properties

- `isCallbackRunning`: [READ-ONLY] Indicates if a callback is currently executing.

```javascript
const job = new CronJob('* * * * * *', async () => {
console.log(job.isCallbackRunning); // true during callback execution
await someAsyncTask();
console.log(job.isCallbackRunning); // still true until callback completes
});

console.log(job.isCallbackRunning); // false
job.start();
console.log(job.running); // true (schedule is active)
console.log(job.isCallbackRunning); // false (no callback executing)
```

### CronTime Class

#### Constructor
Expand Down
92 changes: 68 additions & 24 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
onComplete?: WithOnComplete<OC> extends true
? CronOnCompleteCallback
: undefined;
waitForCompletion = false;

private _isCallbackRunning = false;
private _timeout?: NodeJS.Timeout;
private _callbacks: CronCallback<C, WithOnComplete<OC>>[] = [];

get isCallbackRunning() {
return this._isCallbackRunning;
}

constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
onTick: CronJobParams<OC, C>['onTick'],
Expand All @@ -34,7 +40,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: null,
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
);
constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
Expand All @@ -45,7 +52,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: CronJobParams<OC, C>['utcOffset'],
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
);
constructor(
cronTime: CronJobParams<OC, C>['cronTime'],
Expand All @@ -56,9 +64,11 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
context?: CronJobParams<OC, C>['context'],
runOnInit?: CronJobParams<OC, C>['runOnInit'],
utcOffset?: CronJobParams<OC, C>['utcOffset'],
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout']
unrefTimeout?: CronJobParams<OC, C>['unrefTimeout'],
waitForCompletion?: CronJobParams<OC, C>['waitForCompletion']
) {
this.context = (context ?? this) as CronContext<C>;
this.waitForCompletion = Boolean(waitForCompletion);

// runtime check for JS users
if (timeZone != null && utcOffset != null) {
Expand Down Expand Up @@ -92,7 +102,7 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {

if (runOnInit) {
this.lastExecution = new Date();
this.fireOnTick();
void this.fireOnTick();
}

if (start) this.start();
Expand All @@ -117,7 +127,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
} else if (params.utcOffset != null) {
return new CronJob<OC, C>(
Expand All @@ -129,7 +140,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
} else {
return new CronJob<OC, C>(
Expand All @@ -141,7 +153,8 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
params.context,
params.runOnInit,
params.utcOffset,
params.unrefTimeout
params.unrefTimeout,
params.waitForCompletion
);
}
}
Expand Down Expand Up @@ -193,14 +206,26 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
return this.cronTime.sendAt();
}

fireOnTick() {
for (const callback of this._callbacks) {
void callback.call(
this.context,
this.onComplete as WithOnComplete<OC> extends true
? CronOnCompleteCallback
: never
);
async fireOnTick() {
if (!this.waitForCompletion && this._isCallbackRunning) return;

this._isCallbackRunning = true;

try {
for (const callback of this._callbacks) {
const result = callback.call(
this.context,
this.onComplete as WithOnComplete<OC> extends true
? CronOnCompleteCallback
: never
);

if (this.waitForCompletion) await result;
}
} catch (error) {
console.error('[Cron] error in callback', error);
} finally {
this._isCallbackRunning = false;
}
}

Expand All @@ -209,9 +234,7 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
}

start() {
if (this.running) {
return;
}
if (this.running) return;

const MAXDELAY = 2147483647; // The maximum number of milliseconds setTimeout will wait.
let timeout = this.cronTime.getTimeout();
Expand Down Expand Up @@ -262,11 +285,9 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
this.running = false;

// start before calling back so the callbacks have the ability to stop the cron job
if (!this.runOnce) {
this.start();
}
if (!this.runOnce) this.start();

this.fireOnTick();
void this.fireOnTick();
}
};

Expand All @@ -290,14 +311,37 @@ export class CronJob<OC extends CronOnCompleteCommand | null = null, C = null> {
return this.lastExecution;
}

private async _executeOnComplete() {
if (typeof this.onComplete !== 'function') return;

try {
await this.onComplete.call(this.context);
} catch (error) {
console.error('[Cron] error in onComplete callback:', error);
}
}

private async _waitForJobCompletion() {
while (this._isCallbackRunning) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}

/**
* Stop the cronjob.
*/
stop() {
if (this._timeout) clearTimeout(this._timeout);
this.running = false;
if (typeof this.onComplete === 'function') {
void this.onComplete.call(this.context);

if (!this.waitForCompletion) {
void this._executeOnComplete();
return;
}

void Promise.resolve().then(async () => {
await this._waitForJobCompletion();
await this._executeOnComplete();
});
}
}
1 change: 1 addition & 0 deletions src/types/cron.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ interface BaseCronJobParams<
context?: C;
runOnInit?: boolean | null;
unrefTimeout?: boolean | null;
waitForCompletion?: boolean | null;
}

export type CronJobParams<
Expand Down
Loading