Skip to content

Commit

Permalink
fix(core): Fix worker shutdown errors when active executions (n8n-io#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Aug 13, 2024
1 parent cdd0ab4 commit e071b73
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
4 changes: 3 additions & 1 deletion packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ export abstract class BaseCommand extends Command {
this.logger.info(`Received ${signal}. Shutting down...`);
this.shutdownService.shutdown();

await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]);
await this.shutdownService.waitForShutdown();

await this.stopProcess();

clearTimeout(forceShutdownTimer);
};
Expand Down
19 changes: 1 addition & 18 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Container } from 'typedi';
import { Flags, type Config } from '@oclif/core';
import express from 'express';
import http from 'http';
import { sleep, ApplicationError } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';

import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
Expand Down Expand Up @@ -61,23 +61,6 @@ export class Worker extends BaseCommand {

try {
await this.externalHooks?.run('n8n.stop', []);

const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000;

// Wait for active workflow executions to finish
let count = 0;
while (this.jobProcessor.getRunningJobIds().length !== 0) {
if (count++ % 4 === 0) {
const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000);
this.logger.info(
`Waiting for ${
Object.keys(this.jobProcessor.getRunningJobIds()).length
} active executions to finish... (max wait ${waitLeft} more seconds)`,
);
}

await sleep(500);
}
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
}
Expand Down
13 changes: 9 additions & 4 deletions packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Job, JobData, JobOptions, JobQueue } from '../types';
import { ApplicationError } from 'n8n-workflow';
import { mockInstance } from '@test/mocking';
import { GlobalConfig } from '@n8n/config';
import type { JobProcessor } from '../job-processor';

const queue = mock<JobQueue>({
client: { ping: jest.fn() },
Expand Down Expand Up @@ -100,23 +101,27 @@ describe('ScalingService', () => {
});
});

describe('pauseQueue', () => {
it('should pause the queue', async () => {
describe('stop', () => {
it('should pause the queue and check for running jobs', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig);
const jobProcessor = mock<JobProcessor>();
const scalingService = new ScalingService(mock(), mock(), jobProcessor, globalConfig);
await scalingService.setupQueue();
jobProcessor.getRunningJobIds.mockReturnValue([]);
const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount');

/**
* Act
*/
await scalingService.pauseQueue();
await scalingService.stop();

/**
* Assert
*/
expect(queue.pause).toHaveBeenCalledWith(true, true);
expect(getRunningJobsCountSpy).toHaveBeenCalled();
});
});

Expand Down
20 changes: 18 additions & 2 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Container, { Service } from 'typedi';
import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow';
import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import { Logger } from '@/Logger';
Expand Down Expand Up @@ -59,10 +59,22 @@ export class ScalingService {
}

@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async pauseQueue() {
async stop() {
await this.queue.pause(true, true);

this.logger.debug('[ScalingService] Queue paused');

let count = 0;

while (this.getRunningJobsCount() !== 0) {
if (count++ % 4 === 0) {
this.logger.info(
`Waiting for ${this.getRunningJobsCount()} active executions to finish...`,
);
}

await sleep(500);
}
}

async pingQueue() {
Expand Down Expand Up @@ -113,6 +125,10 @@ export class ScalingService {
}
}

getRunningJobsCount() {
return this.jobProcessor.getRunningJobIds().length;
}

// #endregion

// #region Listeners
Expand Down

0 comments on commit e071b73

Please sign in to comment.