Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario #7643

Merged
merged 65 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
2af1a85
Handle workflow activation if leader
ivov Nov 7, 2023
80983ee
Centralize into `MultiMainSetup`
ivov Nov 7, 2023
cd1a6d4
Broadcast workflow update event
ivov Nov 7, 2023
b6dcaa4
Fix license test
ivov Nov 7, 2023
783df6a
Merge master
ivov Nov 7, 2023
8dd24d3
Fix pruning test
ivov Nov 7, 2023
a484907
Add missing mocks
ivov Nov 7, 2023
dda9007
Clean up tests
ivov Nov 8, 2023
9a75e07
Merge master
ivov Nov 8, 2023
2a27ce2
Inform FE of activation and deactivation
ivov Nov 8, 2023
5b0dc90
Fix orchestration test
ivov Nov 8, 2023
c7b6b24
Merge master, fix conflicts
ivov Nov 8, 2023
248935d
Fix some tests
ivov Nov 8, 2023
7d37083
Simplify init in `BaseCommand`
ivov Nov 8, 2023
467e3ed
Remove leftover logs
ivov Nov 8, 2023
64d3d3b
Remove unneeded handlers
ivov Nov 8, 2023
4829b0d
Fix webhooks tests
ivov Nov 8, 2023
b1adae5
Refactor pruning in multi-main scenario
ivov Nov 8, 2023
08dbd28
Fix `Start` command test
ivov Nov 8, 2023
735d30d
Merge master
ivov Nov 9, 2023
44bf302
Fix tests finally
ivov Nov 9, 2023
91cad72
Add more missing mocks
ivov Nov 9, 2023
d68d20c
Break dep cycle
ivov Nov 9, 2023
2d064e2
Fix `webhooks.api.test.ts`
ivov Nov 9, 2023
086e522
Fix relabeling on destructuring
ivov Nov 9, 2023
7424434
Refactor to use `workflowActiveStateChanged`
ivov Nov 9, 2023
0484402
Forgotten removal
ivov Nov 9, 2023
5ba2d2e
Coordinate activation errors
ivov Nov 9, 2023
59386f0
Fix tests
ivov Nov 9, 2023
10af7f6
Refactor to global config
ivov Nov 9, 2023
d5e6679
Remove unneeded leader check
ivov Nov 10, 2023
0b72bc1
Merge master, fix conflicts
ivov Nov 11, 2023
c4f5ee6
Fix misresolved conflict
ivov Nov 11, 2023
38754c7
Prefix context for pruning log messages
ivov Nov 11, 2023
7750ede
Stop pruning if follower
ivov Nov 11, 2023
6cf5904
Implement activation errors in Redis
ivov Nov 13, 2023
b1b8f18
Fix lint
ivov Nov 13, 2023
ea9eb70
Fix import
ivov Nov 13, 2023
0d7728c
Remove TODOs
ivov Nov 13, 2023
103e639
Remove unused method and type
ivov Nov 13, 2023
b9a6948
Better naming
ivov Nov 13, 2023
e05083b
Call `toCacheKey` only if needed
ivov Nov 13, 2023
b82e2cc
Fix lint
ivov Nov 13, 2023
7dd7416
Add guard for init
ivov Nov 13, 2023
2355251
Better naming
ivov Nov 13, 2023
98d22cd
Rename service file
ivov Nov 13, 2023
cca423b
Simplify error creation
ivov Nov 13, 2023
6a1064b
Add more guards
ivov Nov 13, 2023
3a494d9
Cleanup
ivov Nov 13, 2023
aade6a7
Tighten guard
ivov Nov 13, 2023
9c44e0b
Add clarifying comment
ivov Nov 13, 2023
55a33bf
More cleanup
ivov Nov 13, 2023
f9b2972
Better naming
ivov Nov 13, 2023
f060297
Add defensive `init`
ivov Nov 13, 2023
90337b6
Add guard to `WorkflowService`
ivov Nov 13, 2023
5c7d2a9
Cleanup
ivov Nov 13, 2023
4a0071d
Replace `ActivationError` with string
ivov Nov 15, 2023
0580647
Add missing check
ivov Nov 15, 2023
88c8952
Fix lint
ivov Nov 15, 2023
7435dd1
Ensure also followers init license
ivov Nov 15, 2023
401c02e
Consolidate activation errors into single object
ivov Nov 15, 2023
2d09363
Remove no longer relevant test
ivov Nov 15, 2023
31c9bae
Add telemetry
ivov Nov 16, 2023
c23f7e8
fix(core): Account for activation error caused by follower `main` ins…
ivov Nov 16, 2023
a3147a9
Address feedback
ivov Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';

import type {
Expand Down Expand Up @@ -64,8 +64,7 @@ import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { WorkflowRepository } from '@/databases/repositories';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee';

const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
Expand All @@ -92,31 +91,18 @@ export class ActiveWorkflowRunner implements IWebhookManager {
};
} = {};

isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');

multiMainInstancePublisher: MultiMainInstancePublisher | undefined;

constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
private readonly externalHooks: ExternalHooks,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
) {}

async init() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);

this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);

await this.multiMainInstancePublisher.init();
}

await this.multiMainSetup.init();
await this.addActiveWorkflows('init');

await this.externalHooks.run('activeWorkflows.initialized', []);
Expand Down Expand Up @@ -744,17 +730,34 @@ export class ActiveWorkflowRunner implements IWebhookManager {
activationMode: WorkflowActivateMode,
existingWorkflow?: WorkflowEntity,
) {
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
const prefix = '[Multi-main setup] Instance is follower';

this.logger.debug(
[prefix, 'skipping addition of workflow to active workflows in memory...'].join(', '),
);

if (['activate', 'update'].includes(activationMode)) {
this.logger.debug(
[prefix, 'broadcasting "workflowWasUpdated" into command channel...'].join(', '),
);
await this.multiMainSetup.broadcastWorkflowWasUpdated(workflowId);
}

return;
}

let workflow: Workflow;

let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;

if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}

if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
}
Expand Down Expand Up @@ -940,6 +943,17 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// TODO: this should happen in a transaction
async remove(workflowId: string) {
// Remove all the webhooks of the workflow

if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping removal of workflow from active workflows in memory and broadcasting "workflowWasDeactivated" into command channel...',
);

await this.multiMainSetup.broadcastWorkflowWasDeactivated(workflowId);
ivov marked this conversation as resolved.
Show resolved Hide resolved

return;
}

try {
await this.clearWebhooks(workflowId);
} catch (error) {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';

@Service()
export class ExternalSecretsManager {
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ExternalSecretsManager {
}

async broadcastReloadExternalSecretsProviders() {
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
}

private decryptSecretsSettings(value: string): ExternalSecretsSettings {
Expand Down
20 changes: 19 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,19 @@ export type IPushData =
| PushDataTestWebhook
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged;
| PushDataActiveWorkflowUsersChanged
| PushDataActiveWorkflowAdded
| PushDataActiveWorkflowRemoved;

type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
type: 'workflowActivated';
};

type PushDataActiveWorkflowRemoved = {
data: IActiveWorkflowRemoved;
type: 'workflowDeactivated';
};

type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
Expand Down Expand Up @@ -566,6 +578,12 @@ export interface IActiveWorkflowUser {
lastSeen: Date;
}

export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}

type IActiveWorkflowRemoved = IActiveWorkflowAdded;

export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
Expand Down
26 changes: 11 additions & 15 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { WorkflowRepository } from '@/databases/repositories';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

type FeatureReturnType = Partial<
{
Expand All @@ -40,13 +41,16 @@ export class License {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly multiMainSetup: MultiMainSetup,
) {}

async init(instanceType: N8nInstanceType = 'main') {
if (this.manager) {
return;
}

await this.multiMainSetup.init();

const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
Expand Down Expand Up @@ -112,22 +116,14 @@ export class License {
}

async onFeatureChange(_features: TFeatures): Promise<void> {
if (config.getEnv('executions.mode') === 'queue') {
if (config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);

const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);

await multiMainInstancePublisher.init();

if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping sending of reloadLicense command...');
return;
}
}
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...',
);
return;
}

if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
Expand Down
24 changes: 14 additions & 10 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager
import { initExpressionEvaluator } from '@/ExpressionEvalator';
import { generateHostInstanceId } from '../databases/utils/generators';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

export abstract class BaseCommand extends Command {
protected logger = Container.get(Logger);
Expand All @@ -38,6 +39,8 @@ export abstract class BaseCommand extends Command {

protected server?: AbstractServer;

protected multiMainSetup: MultiMainSetup;

async init(): Promise<void> {
await initErrorHandling();
initExpressionEvaluator();
Expand Down Expand Up @@ -82,6 +85,10 @@ export abstract class BaseCommand extends Command {

await Container.get(PostHogClient).init();
await Container.get(InternalHooks).init();

this.multiMainSetup = Container.get(MultiMainSetup);

await this.multiMainSetup.init();
}

protected setInstanceType(instanceType: N8nInstanceType) {
Expand Down Expand Up @@ -243,19 +250,16 @@ export abstract class BaseCommand extends Command {
}

async initLicense(): Promise<void> {
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainSetup = Container.get(MultiMainSetup);

const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainSetup.init();

await multiMainInstancePublisher.init();
if (multiMainSetup.isEnabled && multiMainSetup.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping license initialization...',
krynble marked this conversation as resolved.
Show resolved Hide resolved
);

if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping license initialization...');
return;
}
return;
}

const license = Container.get(License);
Expand Down
58 changes: 33 additions & 25 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { License, FeatureNotLicensedError } from '@/License';
import { IConfig } from '@oclif/config';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { PruningService } from '@/services/pruning.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -113,17 +114,20 @@ export class Start extends BaseCommand {
await Container.get(License).shutdown();

if (await this.pruningService.isPruningEnabled()) {
await this.pruningService.stopPruning();
if (
!this.multiMainSetup.isEnabled ||
(this.multiMainSetup.isEnabled && this.multiMainSetup.isLeader)
) {
this.pruningService.stopPruning();
}
}

if (config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainSetup = Container.get(MultiMainSetup);

if (multiMainSetup.isEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();

await Container.get(MultiMainInstancePublisher).destroy();
await multiMainSetup.shutdown();
}

await Container.get(InternalHooks).onN8nStop();
Expand Down Expand Up @@ -232,33 +236,24 @@ export class Start extends BaseCommand {
async initOrchestration() {
if (config.get('executions.mode') !== 'queue') return;

if (!config.get('leaderSelection.enabled')) {
await Container.get(SingleMainInstancePublisher).init();
// queue mode in single-main scenario

if (!this.multiMainSetup.isEnabled) {
await Container.get(SingleMainSetup).init();
await Container.get(OrchestrationHandlerMainService).init();
return;
}

// multi-main scenario

const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
// queue mode in multi-main scenario

const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);

await multiMainInstancePublisher.init();

if (
multiMainInstancePublisher.isLeader &&
!Container.get(License).isMultipleMainInstancesLicensed()
) {
if (this.multiMainSetup.isLeader && !Container.get(License).isMultipleMainInstancesLicensed()) {
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
ivov marked this conversation as resolved.
Show resolved Hide resolved
}

await Container.get(OrchestrationHandlerMainService).init();

multiMainInstancePublisher.on('leadershipChange', async () => {
if (multiMainInstancePublisher.isLeader) {
this.multiMainSetup.on('leadershipChange', async () => {
if (this.multiMainSetup.isLeader) {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
// only in case of leadership change without shutdown
Expand Down Expand Up @@ -346,8 +341,21 @@ export class Start extends BaseCommand {
await this.server.start();

this.pruningService = Container.get(PruningService);

if (await this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
if (!this.multiMainSetup.isEnabled) {
this.pruningService.startPruning();
}

if (this.multiMainSetup.isEnabled && this.multiMainSetup.isLeader) {
this.pruningService.startPruning();

this.multiMainSetup.on('leadershipChange', async () => {
if (this.multiMainSetup.isLeader) {
this.pruningService.startPruning();
}
});
}
}

// Start to get active workflows and run their triggers
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/controllers/orchestration.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';

@Authorized(['global', 'owner'])
@RestController('/orchestration')
@Service()
export class OrchestrationController {
constructor(private readonly orchestrationService: SingleMainInstancePublisher) {}
constructor(private readonly orchestrationService: SingleMainSetup) {}

/**
* These endpoint currently do not return anything, they just trigger the messsage to
Expand Down
Loading