From 821912dccbe62665f1c1bc776efc90eaab667ffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 16 Nov 2023 17:01:51 +0100 Subject: [PATCH] fix(core): Account for activation error on follower --- packages/cli/src/Interfaces.ts | 13 ++++++- .../orchestration/main/MultiMainSetup.ee.ts | 10 +++++ .../main/handleCommandMessageMain.ts | 38 +++++++++++++++++-- .../services/redis/RedisServiceCommands.ts | 3 +- .../cli/src/workflows/workflows.services.ts | 1 + packages/editor-ui/src/Interface.ts | 13 ++++++- .../editor-ui/src/mixins/pushConnection.ts | 14 +++++++ 7 files changed, 85 insertions(+), 7 deletions(-) diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index e08fcbafc7efb..bd8015a483a30 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -471,7 +471,13 @@ export type IPushData = | PushDataActiveWorkflowUsersChanged | PushDataWorkerStatusMessage | PushDataWorkflowActivated - | PushDataWorkflowDeactivated; + | PushDataWorkflowDeactivated + | PushDataWorkflowFailedToActivate; + +type PushDataWorkflowFailedToActivate = { + data: IWorkflowFailedToActivate; + type: 'workflowFailedToActivate'; +}; type PushDataWorkflowActivated = { data: IActiveWorkflowChanged; @@ -561,6 +567,11 @@ interface IActiveWorkflowChanged { workflowId: Workflow['id']; } +interface IWorkflowFailedToActivate { + workflowId: Workflow['id']; + errorMessage: string; +} + export interface IPushDataExecutionRecovered { executionId: string; } diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 1db0e68a85307..40a4a18da85e9 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -112,6 +112,7 @@ export class MultiMainSetup extends SingleMainSetup { workflowId: string; oldState: boolean; newState: boolean; + versionId: string; }) { if (!this.sanityCheck()) return; @@ -120,4 +121,13 @@ export class MultiMainSetup extends SingleMainSetup { payload, }); } + + async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) { + if (!this.sanityCheck()) return; + + await this.redisPublisher.publishToCommandChannel({ + command: 'workflowFailedToActivate', + payload, + }); + } } diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 560b7f7751bff..9a2752e5d6c8e 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -7,6 +7,8 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Push } from '@/push'; +import { MultiMainSetup } from './MultiMainSetup.ee'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; export async function handleCommandMessageMain(messageString: string) { const queueModeId = config.getEnv('redis.queueModeId'); @@ -71,12 +73,13 @@ export async function handleCommandMessageMain(messageString: string) { return message; } - const { workflowId, oldState, newState } = message.payload ?? {}; + const { workflowId, oldState, newState, versionId } = message.payload ?? {}; if ( typeof workflowId !== 'string' || typeof oldState !== 'boolean' || - typeof newState !== 'boolean' + typeof newState !== 'boolean' || + typeof versionId !== 'string' ) { break; } @@ -84,8 +87,22 @@ export async function handleCommandMessageMain(messageString: string) { const push = Container.get(Push); if (!oldState && newState) { - await activeWorkflowRunner.add(workflowId, 'activate'); - push.broadcast('workflowActivated', { workflowId }); + try { + await activeWorkflowRunner.add(workflowId, 'activate'); + push.broadcast('workflowActivated', { workflowId }); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + await Container.get(WorkflowRepository).update(workflowId, { + active: false, + versionId, + }); + + await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({ + workflowId, + errorMessage: error.message, + }); + } } else if (oldState && !newState) { await activeWorkflowRunner.remove(workflowId); push.broadcast('workflowDeactivated', { workflowId }); @@ -97,6 +114,19 @@ export async function handleCommandMessageMain(messageString: string) { await activeWorkflowRunner.removeActivationError(workflowId); } + case 'workflowFailedToActivate': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { workflowId, errorMessage } = message.payload ?? {}; + + if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break; + + Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); + } + default: break; } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a76cf01f61e6b..4c622e3ac9389 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -7,7 +7,8 @@ export type RedisServiceCommand = | 'stopWorker' | 'reloadLicense' | 'reloadExternalSecretsProviders' - | 'workflowActiveStateChanged'; // multi-main only + | 'workflowActiveStateChanged' // multi-main only + | 'workflowFailedToActivate'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. diff --git a/packages/cli/src/workflows/workflows.services.ts b/packages/cli/src/workflows/workflows.services.ts index 9c7ed6b3694ce..b41ff747fa001 100644 --- a/packages/cli/src/workflows/workflows.services.ts +++ b/packages/cli/src/workflows/workflows.services.ts @@ -382,6 +382,7 @@ export class WorkflowsService { workflowId, oldState, newState: updatedWorkflow.active, + versionId: shared.workflow.versionId, }); } } diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 3743e30360ecb..7f748234143c1 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -421,7 +421,8 @@ export type IPushData = | PushDataExecutionRecovered | PushDataWorkerStatusMessage | PushDataActiveWorkflowAdded - | PushDataActiveWorkflowRemoved; + | PushDataActiveWorkflowRemoved + | PushDataWorkflowFailedToActivate; type PushDataActiveWorkflowAdded = { data: IActiveWorkflowAdded; @@ -433,6 +434,11 @@ type PushDataActiveWorkflowRemoved = { type: 'workflowDeactivated'; }; +type PushDataWorkflowFailedToActivate = { + data: IWorkflowFailedToActivate; + type: 'workflowFailedToActivate'; +}; + type PushDataExecutionRecovered = { data: IPushDataExecutionRecovered; type: 'executionRecovered'; @@ -509,6 +515,11 @@ export interface IActiveWorkflowRemoved { workflowId: string; } +export interface IWorkflowFailedToActivate { + workflowId: string; + errorMessage: string; +} + export interface IPushDataUnsavedExecutionFinished { executionId: string; data: { finished: true; stoppedAt: Date }; diff --git a/packages/editor-ui/src/mixins/pushConnection.ts b/packages/editor-ui/src/mixins/pushConnection.ts index f628e3bec9ad1..87dbeb5403b64 100644 --- a/packages/editor-ui/src/mixins/pushConnection.ts +++ b/packages/editor-ui/src/mixins/pushConnection.ts @@ -291,6 +291,20 @@ export const pushConnection = defineComponent({ } } + if (receivedData.type === 'workflowFailedToActivate') { + this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId); + this.workflowsStore.setActive(false); + + this.showError( + new Error(receivedData.data.errorMessage), + this.$locale.baseText('workflowActivator.showError.title', { + interpolate: { newStateName: 'deactivated' }, + }) + ':', + ); + + return true; + } + if (receivedData.type === 'workflowActivated') { this.workflowsStore.setWorkflowActive(receivedData.data.workflowId); return true;