Skip to content

Commit

Permalink
refactor(core): Move queue recovery to scaling service (no-changelog) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Aug 13, 2024
1 parent 5ac65b3 commit 56c4692
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 227 deletions.
2 changes: 0 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { EventService } from '@/events/event.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -305,7 +304,6 @@ export class Start extends BaseCommand {
await this.server.start();

Container.get(PruningService).init();
Container.get(ExecutionRecoveryService).init();

if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import { createExecution } from '@test-integration/db/executions';
import * as testDb from '@test-integration/testDb';

import { mock } from 'jest-mock-extended';
import { OrchestrationService } from '@/services/orchestration.service';
import config from '@/config';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { Push } from '@/push';
Expand All @@ -28,20 +26,17 @@ describe('ExecutionRecoveryService', () => {
const instanceSettings = new InstanceSettings();

let executionRecoveryService: ExecutionRecoveryService;
let orchestrationService: OrchestrationService;
let executionRepository: ExecutionRepository;

beforeAll(async () => {
await testDb.init();
executionRepository = Container.get(ExecutionRepository);
orchestrationService = Container.get(OrchestrationService);

executionRecoveryService = new ExecutionRecoveryService(
mock(),
instanceSettings,
push,
executionRepository,
orchestrationService,
mock(),
);
});
Expand All @@ -53,74 +48,12 @@ describe('ExecutionRecoveryService', () => {
afterEach(async () => {
jest.restoreAllMocks();
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
executionRecoveryService.shutdown();
});

afterAll(async () => {
await testDb.terminate();
});

describe('scheduleQueueRecovery', () => {
describe('queue mode', () => {
it('if leader, should schedule queue recovery', () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).toHaveBeenCalled();
});

it('if follower, should do nothing', () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
instanceSettings.markAsFollower();
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).not.toHaveBeenCalled();
});
});

describe('regular mode', () => {
it('should do nothing', () => {
/**
* Arrange
*/
config.set('executions.mode', 'regular');
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).not.toHaveBeenCalled();
});
});
});

describe('recoverFromLogs', () => {
describe('if follower', () => {
test('should do nothing', async () => {
Expand Down
126 changes: 2 additions & 124 deletions packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { Push } from '@/push';
import { jsonStringify, sleep } from 'n8n-workflow';
import { sleep } from 'n8n-workflow';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
import type { DateTime } from 'luxon';
Expand All @@ -12,10 +12,6 @@ import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
import { Logger } from '@/Logger';
import config from '@/config';
import { OnShutdown } from '@/decorators/OnShutdown';
import type { QueueRecoverySettings } from './execution.types';
import { OrchestrationService } from '@/services/orchestration.service';
import { EventService } from '@/events/event.service';

/**
Expand All @@ -28,34 +24,9 @@ export class ExecutionRecoveryService {
private readonly instanceSettings: InstanceSettings,
private readonly push: Push,
private readonly executionRepository: ExecutionRepository,
private readonly orchestrationService: OrchestrationService,
private readonly eventService: EventService,
) {}

/**
* @important Requires `OrchestrationService` to be initialized on queue mode.
*/
init() {
if (config.getEnv('executions.mode') === 'regular') return;

const { isLeader } = this.instanceSettings;
if (isLeader) this.scheduleQueueRecovery();

const { isMultiMainSetupEnabled } = this.orchestrationService;
if (isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup
.on('leader-takeover', () => this.scheduleQueueRecovery())
.on('leader-stepdown', () => this.stopQueueRecovery());
}
}

private readonly queueRecoverySettings: QueueRecoverySettings = {
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
};

private isShuttingDown = false;

/**
* Recover key properties of a truncated execution using event logs.
*/
Expand All @@ -82,89 +53,10 @@ export class ExecutionRecoveryService {
return amendedExecution;
}

/**
* Schedule a cycle to mark dangling executions as crashed in queue mode.
*/
scheduleQueueRecovery(waitMs = this.queueRecoverySettings.waitMs) {
if (!this.shouldScheduleQueueRecovery()) return;

this.queueRecoverySettings.timeout = setTimeout(async () => {
try {
const nextWaitMs = await this.recoverFromQueue();
this.scheduleQueueRecovery(nextWaitMs);
} catch (error) {
const msg = this.toErrorMsg(error);

this.logger.error('[Recovery] Failed to recover dangling executions from queue', { msg });
this.logger.error('[Recovery] Retrying...');

this.scheduleQueueRecovery();
}
}, waitMs);

const wait = [this.queueRecoverySettings.waitMs / (60 * 1000), 'min'].join(' ');

this.logger.debug(`[Recovery] Scheduled queue recovery check for next ${wait}`);
}

stopQueueRecovery() {
clearTimeout(this.queueRecoverySettings.timeout);
}

@OnShutdown()
shutdown() {
this.isShuttingDown = true;
this.stopQueueRecovery();
}

// ----------------------------------
// private
// ----------------------------------

/**
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
* but absent from the queue. Return time until next recovery cycle.
*/
private async recoverFromQueue() {
const { waitMs, batchSize } = this.queueRecoverySettings;

const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);

if (storedIds.length === 0) {
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
return waitMs;
}

const { ScalingService } = await import('@/scaling/scaling.service');

const runningJobs = await Container.get(ScalingService).findJobsByStatus(['active', 'waiting']);

const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));

if (queuedIds.size === 0) {
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
return waitMs;
}

const danglingIds = storedIds.filter((id) => !queuedIds.has(id));

if (danglingIds.length === 0) {
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
return waitMs;
}

await this.executionRepository.markAsCrashed(danglingIds);

this.logger.info('[Recovery] Completed queue recovery check, recovered dangling executions', {
danglingIds,
});

// if this cycle used up the whole batch size, it is possible for there to be
// dangling executions outside this check, so speed up next cycle

return storedIds.length >= this.queueRecoverySettings.batchSize ? waitMs / 2 : waitMs;
}

/**
* Amend `status`, `stoppedAt`, and (if possible) `data` of an execution using event logs.
*/
Expand Down Expand Up @@ -313,18 +205,4 @@ export class ExecutionRecoveryService {

await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]);
}

private toErrorMsg(error: unknown) {
return error instanceof Error
? error.message
: jsonStringify(error, { replaceCircularRefs: true });
}

private shouldScheduleQueueRecovery() {
return (
config.getEnv('executions.mode') === 'queue' &&
this.instanceSettings.isLeader &&
!this.isShuttingDown
);
}
}
17 changes: 0 additions & 17 deletions packages/cli/src/executions/execution.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,6 @@ export namespace ExecutionSummaries {
export type ExecutionSummaryWithScopes = ExecutionSummary & { scopes: Scope[] };
}

export type QueueRecoverySettings = {
/**
* ID of timeout for next scheduled recovery cycle.
*/
timeout?: NodeJS.Timeout;

/**
* Number of in-progress executions to check per cycle.
*/
batchSize: number;

/**
* Time (in milliseconds) to wait before the next cycle.
*/
waitMs: number;
};

export type StopResult = {
mode: WorkflowExecuteMode;
startedAt: Date;
Expand Down
Loading

0 comments on commit 56c4692

Please sign in to comment.