Skip to content

Commit

Permalink
refactor(core): Fix push message type inference (#12331)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Dec 20, 2024
1 parent 724e085 commit fe7fb41
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 135 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ export class CollaborationService {
collaborators: activeCollaborators,
};

this.push.sendToUsers('collaboratorsChanged', msgData, userIds);
this.push.sendToUsers({ type: 'collaboratorsChanged', data: msgData }, userIds);
}
}
45 changes: 30 additions & 15 deletions packages/cli/src/controllers/community-packages.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
this.push.broadcast('reloadNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'reloadNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down Expand Up @@ -206,9 +209,12 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down Expand Up @@ -246,16 +252,22 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

newInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('reloadNodeType', {
name: node.name,
version: node.latestVersion,
this.push.broadcast({
type: 'reloadNodeType',
data: {
name: node.name,
version: node.latestVersion,
},
});
});

Expand All @@ -272,9 +284,12 @@ export class CommunityPackagesController {
return newInstalledPackage;
} catch (error) {
previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down
13 changes: 6 additions & 7 deletions packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import { Request } from 'express';
import Container from 'typedi';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -58,14 +58,12 @@ type ResetRequest = Request<
}
>;

type PushRequest<T extends PushType> = Request<
type PushRequest = Request<
{},
{},
{
type: T;
pushRef: string;
data: PushPayload<T>;
}
} & PushMessage
>;

@RestController('/e2e')
Expand Down Expand Up @@ -144,8 +142,9 @@ export class E2EController {
}

@Post('/push', { skipAuth: true })
async pushSend(req: PushRequest<any>) {
this.push.broadcast(req.body.type, req.body.data);
async pushSend(req: PushRequest) {
const { pushRef: _, ...pushMsg } = req.body;
this.push.broadcast(pushMsg);
}

@Patch('/feature', { skipAuth: true })
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/src/events/maps/pub-sub.event-map.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { PushMessage, WorkerStatus } from '@n8n/api-types';

import type { IWorkflowDb } from '@/interfaces';

Expand Down Expand Up @@ -64,9 +64,7 @@ export type PubSubCommandMap = {
errorMessage: string;
};

'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
'relay-execution-lifecycle-event': PushMessage & {
pushRef: string;
};

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class ExecutionRecoveryService {

this.push.once('editorUiConnected', async () => {
await sleep(1000);
this.push.broadcast('executionRecovered', { executionId });
this.push.broadcast({ type: 'executionRecovered', data: { executionId } });
});

return amendedExecution;
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/load-nodes-and-credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ export class LoadNodesAndCredentials {
loader.reset();
await loader.loadAll();
await this.postProcessLoaders();
push.broadcast('nodeDescriptionUpdated', {});
push.broadcast({ type: 'nodeDescriptionUpdated', data: {} });
}, 100);

const toWatch = loader.isLazyLoaded
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/push/__tests__/websocket.push.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('WebSocketPush', () => {
it('sends data to one connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToOne(pushMessage.type, pushMessage.data, pushRef1);
webSocketPush.sendToOne(pushMessage, pushRef1);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
Expand All @@ -82,7 +82,7 @@ describe('WebSocketPush', () => {
it('sends data to all connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToAll(pushMessage.type, pushMessage.data);
webSocketPush.sendToAll(pushMessage);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
Expand All @@ -101,7 +101,7 @@ describe('WebSocketPush', () => {
it('sends data to all users connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToUsers(pushMessage.type, pushMessage.data, [userId]);
webSocketPush.sendToUsers(pushMessage, [userId]);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
Expand Down
20 changes: 8 additions & 12 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import { ErrorReporter } from 'n8n-core';
import { assert, jsonStringify } from 'n8n-workflow';
import { Service } from 'typedi';
Expand Down Expand Up @@ -69,7 +69,7 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
delete this.userIdByPushRef[pushRef];
}

private sendTo<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRefs: string[]) {
private sendTo({ type, data }: PushMessage, pushRefs: string[]) {
this.logger.debug(`Pushed to frontend: ${type}`, {
dataType: type,
pushRefs: pushRefs.join(', '),
Expand All @@ -90,30 +90,26 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
}
}

sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.sendTo(type, data, Object.keys(this.connections));
sendToAll(pushMsg: PushMessage) {
this.sendTo(pushMsg, Object.keys(this.connections));
}

sendToOne<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
sendToOne(pushMsg: PushMessage, pushRef: string) {
if (this.connections[pushRef] === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
}

this.sendTo(type, data, [pushRef]);
this.sendTo(pushMsg, [pushRef]);
}

sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
sendToUsers(pushMsg: PushMessage, userIds: Array<User['id']>) {
const { connections } = this;
const userPushRefs = Object.keys(connections).filter((pushRef) =>
userIds.includes(this.userIdByPushRef[pushRef]),
);

this.sendTo(type, data, userPushRefs);
this.sendTo(pushMsg, userPushRefs);
}

closeAllConnections() {
Expand Down
20 changes: 8 additions & 12 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server } from 'http';
Expand Down Expand Up @@ -81,11 +81,11 @@ export class Push extends TypedEmitter<PushEvents> {
this.emit('editorUiConnected', pushRef);
}

broadcast<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.backend.sendToAll(type, data);
broadcast(pushMsg: PushMessage) {
this.backend.sendToAll(pushMsg);
}

send<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
send(pushMsg: PushMessage, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
Expand All @@ -95,20 +95,16 @@ export class Push extends TypedEmitter<PushEvents> {
if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { type, args: data, pushRef },
payload: { ...pushMsg, pushRef },
});
return;
}

this.backend.sendToOne(type, data, pushRef);
this.backend.sendToOne(pushMsg, pushRef);
}

sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
this.backend.sendToUsers(type, data, userIds);
sendToUsers(pushMsg: PushMessage, userIds: Array<User['id']>) {
this.backend.sendToUsers(pushMsg, userIds);
}

@OnShutdown()
Expand Down
50 changes: 37 additions & 13 deletions packages/cli/src/scaling/__tests__/pubsub-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,10 @@ describe('PubSubHandler', () => {
expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, {
shouldPublish: false,
});
expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowActivated',
data: { workflowId },
});
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-activation',
payload: { workflowId },
Expand Down Expand Up @@ -680,7 +683,10 @@ describe('PubSubHandler', () => {
expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith(
workflowId,
);
expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowDeactivated',
data: { workflowId },
});
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-deactivation',
payload: { workflowId },
Expand Down Expand Up @@ -735,7 +741,10 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-activation', { workflowId });

expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowActivated',
data: { workflowId },
});
});

it('should handle `display-workflow-deactivation` event', () => {
Expand All @@ -758,7 +767,10 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-deactivation', { workflowId });

expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowDeactivated',
data: { workflowId },
});
});

it('should handle `display-workflow-activation-error` event', () => {
Expand All @@ -782,9 +794,12 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-activation-error', { workflowId, errorMessage });

expect(push.broadcast).toHaveBeenCalledWith('workflowFailedToActivate', {
workflowId,
errorMessage,
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowFailedToActivate',
data: {
workflowId,
errorMessage,
},
});
});

Expand All @@ -806,15 +821,21 @@ describe('PubSubHandler', () => {

const pushRef = 'test-push-ref';
const type = 'executionStarted';
const args = { testArg: 'value' };
const data = {
executionId: '123',
mode: 'webhook' as const,
startedAt: new Date(),
workflowId: '456',
flattedRunData: '[]',
};

push.getBackend.mockReturnValue(
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);

eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef });
eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef });

expect(push.send).toHaveBeenCalledWith(type, args, pushRef);
expect(push.send).toHaveBeenCalledWith({ type, data }, pushRef);
});

it('should handle `clear-test-webhooks` event', () => {
Expand Down Expand Up @@ -868,9 +889,12 @@ describe('PubSubHandler', () => {

eventService.emit('response-to-get-worker-status', workerStatus);

expect(push.broadcast).toHaveBeenCalledWith('sendWorkerStatusMessage', {
workerId: workerStatus.senderId,
status: workerStatus,
expect(push.broadcast).toHaveBeenCalledWith({
type: 'sendWorkerStatusMessage',
data: {
workerId: workerStatus.senderId,
status: workerStatus,
},
});
});
});
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ export class Publisher {
const metadata: LogMetadata = { msg: msg.command, channel: 'n8n.commands' };

if (msg.command === 'relay-execution-lifecycle-event') {
const { args, type } = msg.payload;
const { data, type } = msg.payload;
msgName += ` (${type})`;
metadata.type = type;
metadata.executionId = args.executionId;
if ('executionId' in data) metadata.executionId = data.executionId;
}

this.logger.debug(`Published pubsub msg: ${msgName}`, metadata);
Expand Down
Loading

0 comments on commit fe7fb41

Please sign in to comment.