Skip to content

Commit

Permalink
fix(worker): fix close sequence to reduce risk for open handlers (#2656)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Aug 12, 2024
1 parent c70e5e7 commit 8468e44
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ export class RedisConnection extends EventEmitter {
return client.connect();
}

async close(): Promise<void> {
async close(force = false): Promise<void> {
if (!this.closing) {
const status = this.status;
this.status = 'closing';
Expand All @@ -310,7 +310,7 @@ export class RedisConnection extends EventEmitter {
await this.initializing;
}
if (!this.shared) {
if (status == 'initializing') {
if (status == 'initializing' || force || process.env.CI) {
// If we have not still connected to Redis, we need to disconnect.
this._client.disconnect();
} else {
Expand Down
50 changes: 23 additions & 27 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,38 +877,34 @@ will never work with more accuracy than 1ms. */
}
this.closing = (async () => {
this.emit('closing', 'closing queue');

this.abortDelayController?.abort();

const client =
this.blockingConnection.status == 'ready'
? await this.blockingConnection.client
: null;

this.resume();
await Promise.resolve()
.finally(() => {

// Define the async cleanup functions
const asyncCleanups = [
() => {
return force || this.whenCurrentJobsFinished(false);
})
.finally(() => {
const closePoolPromise = this.childPool?.clean();

if (force) {
// since we're not waiting for the job to end attach
// an error handler to avoid crashing the whole process
closePoolPromise?.catch(err => {
console.error(err); // TODO: emit error in next breaking change version
});
return;
}
return closePoolPromise;
})
.finally(() => clearTimeout(this.extendLocksTimer))
.finally(() => clearTimeout(this.stalledCheckTimer))
.finally(() => client && client.disconnect())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
},
() => this.childPool?.clean(),
() => this.blockingConnection.close(force),
() => this.connection.close(force),
];

// Run cleanup functions sequentially and make sure all are run despite any errors
for (const cleanup of asyncCleanups) {
try {
await cleanup();
} catch (err) {
this.emit('error', <Error>err);
}
}

clearTimeout(this.extendLocksTimer);
clearTimeout(this.stalledCheckTimer);

this.closed = true;
this.emit('closed');
})();
return this.closing;
}
Expand Down

0 comments on commit 8468e44

Please sign in to comment.