Skip to content

Commit

Permalink
feat(core): Add support for WebSockets as an alternative to Server-Se…
Browse files Browse the repository at this point in the history
…nt Events

Co-authored-by: Matthijs Knigge <matthijs@volcano.nl>
  • Loading branch information
netroy and matthijs166 committed Feb 9, 2023
1 parent 69f5d6a commit 67b0511
Show file tree
Hide file tree
Showing 20 changed files with 391 additions and 570 deletions.
2 changes: 2 additions & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"@types/syslog-client": "^1.1.2",
"@types/uuid": "^8.3.2",
"@types/validator": "^13.7.0",
"@types/ws": "^8.5.4",
"@types/yamljs": "^0.2.31",
"chokidar": "^3.5.2",
"concurrently": "^5.1.0",
Expand Down Expand Up @@ -196,6 +197,7 @@
"uuid": "^8.3.2",
"validator": "13.7.0",
"winston": "^3.3.3",
"ws": "^8.12.0",
"yamljs": "^0.3.0"
}
}
21 changes: 15 additions & 6 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import { WEBHOOK_METHODS } from '@/WebhookHelpers';
const emptyBuffer = Buffer.alloc(0);

export abstract class AbstractServer {
protected server: Server;

protected app: express.Application;

protected externalHooks: IExternalHooksClass;
Expand Down Expand Up @@ -73,7 +75,7 @@ export abstract class AbstractServer {
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
}

private async setupCommonMiddlewares() {
private async setupErrorHandlers() {
const { app } = this;

// Augment errors sent to Sentry
Expand All @@ -82,6 +84,10 @@ export abstract class AbstractServer {
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
}

private async setupCommonMiddlewares() {
const { app } = this;

// Compress the response data
app.use(compression());
Expand Down Expand Up @@ -147,6 +153,8 @@ export abstract class AbstractServer {
this.app.use(corsMiddleware);
}

protected setupPushServer() {}

private async setupHealthCheck() {
this.app.use((req, res, next) => {
if (!Db.isInitialized) {
Expand Down Expand Up @@ -392,10 +400,9 @@ export abstract class AbstractServer {
async start(): Promise<void> {
const { app, externalHooks, protocol, sslKey, sslCert } = this;

let server: Server;
if (protocol === 'https' && sslKey && sslCert) {
const https = await import('https');
server = https.createServer(
this.server = https.createServer(
{
key: await readFile(this.sslKey, 'utf8'),
cert: await readFile(this.sslCert, 'utf8'),
Expand All @@ -404,13 +411,13 @@ export abstract class AbstractServer {
);
} else {
const http = await import('http');
server = http.createServer(app);
this.server = http.createServer(app);
}

const PORT = config.getEnv('port');
const ADDRESS = config.getEnv('listen_address');

server.on('error', (error: Error & { code: string }) => {
this.server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
console.log(
`n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`,
Expand All @@ -419,8 +426,10 @@ export abstract class AbstractServer {
}
});

await new Promise<void>((resolve) => server.listen(PORT, ADDRESS, () => resolve()));
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));

await this.setupErrorHandlers();
this.setupPushServer();
await this.setupCommonMiddlewares();
if (inDevelopment) {
this.setupDevMiddlewares();
Expand Down
51 changes: 1 addition & 50 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,56 +449,6 @@ export interface IInternalHooksClass {
onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise<void>;
}

export interface IN8nConfig {
database: IN8nConfigDatabase;
endpoints: IN8nConfigEndpoints;
executions: IN8nConfigExecutions;
generic: IN8nConfigGeneric;
host: string;
nodes: IN8nConfigNodes;
port: number;
protocol: 'http' | 'https';
}

export interface IN8nConfigDatabase {
type: DatabaseType;
postgresdb: {
host: string;
password: string;
port: number;
user: string;
};
}

export interface IN8nConfigEndpoints {
rest: string;
webhook: string;
webhookTest: string;
}

// eslint-disable-next-line import/export
export interface IN8nConfigExecutions {
saveDataOnError: SaveExecutionDataType;
saveDataOnSuccess: SaveExecutionDataType;
saveDataManualExecutions: boolean;
}

// eslint-disable-next-line import/export
export interface IN8nConfigExecutions {
saveDataOnError: SaveExecutionDataType;
saveDataOnSuccess: SaveExecutionDataType;
saveDataManualExecutions: boolean;
}

export interface IN8nConfigGeneric {
timezone: string;
}

export interface IN8nConfigNodes {
errorTriggerType: string;
exclude: string[];
}

export interface IVersionNotificationSettings {
enabled: boolean;
endpoint: string;
Expand Down Expand Up @@ -550,6 +500,7 @@ export interface IN8nUISettings {
onboardingCallPromptEnabled: boolean;
missingPackages?: boolean;
executionMode: 'regular' | 'queue';
pushBackend: 'sse' | 'websocket';
communityNodesEnabled: boolean;
deployment: {
type: string;
Expand Down
83 changes: 0 additions & 83 deletions packages/cli/src/Push.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/cli/src/ReloadNodesAndCredentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { realpath } from 'fs/promises';

import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import type { Push } from '@/Push';
import type { Push } from '@/push';

export const reloadNodesAndCredentials = async (
loadNodesAndCredentials: LoadNodesAndCredentialsClass,
Expand Down
58 changes: 14 additions & 44 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/await-thenable */
/* eslint-disable new-cap */
/* eslint-disable prefer-const */
/* eslint-disable @typescript-eslint/no-invalid-void-type */
/* eslint-disable no-return-assign */
/* eslint-disable no-param-reassign */
/* eslint-disable consistent-return */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable id-denylist */
/* eslint-disable no-console */
/* eslint-disable global-require */
/* eslint-disable @typescript-eslint/no-var-requires */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable no-continue */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable import/no-dynamic-require */
/* eslint-disable no-await-in-loop */

import { exec as callbackExec } from 'child_process';
import { access as fsAccess } from 'fs/promises';
Expand Down Expand Up @@ -60,7 +44,6 @@ import type {
INodeTypeNameVersion,
ITelemetrySettings,
WorkflowExecuteMode,
INodeTypes,
ICredentialTypes,
} from 'n8n-workflow';
import { LoggerProxy, jsonParse } from 'n8n-workflow';
Expand All @@ -78,7 +61,6 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { nodesController } from '@/api/nodes.api';
import { workflowsController } from '@/workflows/workflows.controller';
import {
AUTH_COOKIE_NAME,
EDITOR_UI_DIST_DIR,
GENERATED_STATIC_DIR,
inDevelopment,
Expand Down Expand Up @@ -106,7 +88,6 @@ import {
PasswordResetController,
UsersController,
} from '@/controllers';
import { resolveJwt } from '@/auth/jwt';

import { executionsController } from '@/executions/executions.controller';
import { nodeTypesController } from '@/api/nodeTypes.api';
Expand Down Expand Up @@ -143,7 +124,6 @@ import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import { NodeTypes } from '@/NodeTypes';
import * as Push from '@/Push';
import * as ResponseHelper from '@/ResponseHelper';
import type { WaitTrackerClass } from '@/WaitTracker';
import { WaitTracker } from '@/WaitTracker';
Expand All @@ -155,7 +135,9 @@ import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import { corsMiddleware, setupAuthMiddlewares } from './middlewares';
import type { Push } from '@/push';
import { getPushInstance, setupPushServer, setupPushHandler } from '@/push';
import { setupAuthMiddlewares } from './middlewares';
import { initEvents } from './events';
import { ldapController } from './Ldap/routes/ldap.controller.ee';
import { getLdapLoginLabel, isLdapEnabled, isLdapLoginEnabled } from './Ldap/helpers';
Expand Down Expand Up @@ -183,7 +165,7 @@ class Server extends AbstractServer {

credentialTypes: ICredentialTypes;

push: Push.Push;
push: Push;

constructor() {
super();
Expand All @@ -198,12 +180,12 @@ class Server extends AbstractServer {
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');

this.push = getPushInstance();

if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
}

this.push = Push.getInstance();

const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const telemetrySettings: ITelemetrySettings = {
enabled: config.getEnv('diagnostics.enabled'),
Expand Down Expand Up @@ -280,6 +262,7 @@ class Server extends AbstractServer {
},
onboardingCallPromptEnabled: config.getEnv('onboardingCallPrompt.enabled'),
executionMode: config.getEnv('executions.mode'),
pushBackend: config.getEnv('push.backend'),
communityNodesEnabled: config.getEnv('nodes.communityPackages.enabled'),
deployment: {
type: config.getEnv('deployment.type'),
Expand Down Expand Up @@ -434,26 +417,8 @@ class Server extends AbstractServer {
// Parse cookies for easier access
this.app.use(cookieParser());

// Get push connections
this.app.use(`/${this.restEndpoint}/push`, corsMiddleware, async (req, res, next) => {
const { sessionId } = req.query;
if (sessionId === undefined) {
next(new Error('The query parameter "sessionId" is missing!'));
return;
}

if (isUserManagementEnabled()) {
try {
const authCookie = req.cookies?.[AUTH_COOKIE_NAME] ?? '';
await resolveJwt(authCookie);
} catch (error) {
res.status(401).send('Unauthorized');
return;
}
}

this.push.add(sessionId as string, req, res);
});
const { restEndpoint, app } = this;
setupPushHandler(restEndpoint, app, isUserManagementEnabled());

// Make sure that Vue history mode works properly
this.app.use(
Expand Down Expand Up @@ -1324,6 +1289,11 @@ class Server extends AbstractServer {
this.app.use('/', express.static(GENERATED_STATIC_DIR));
}
}

protected setupPushServer(): void {
const { restEndpoint, server, app } = this;
setupPushServer(restEndpoint, server, app);
}
}

export async function start(): Promise<void> {
Expand Down
Loading

0 comments on commit 67b0511

Please sign in to comment.