Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unified WS error handler #49

Merged
merged 16 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/lib/geoip/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ from 'lodash';
import anyAscii from 'any-ascii';
import type {ProbeLocation} from '../../probe/types.js';
import {scopedLogger} from '../logger.js';
import {InternalError} from '../internal-error.js';
import {ipinfoLookup} from './ipinfo.js';
import {fastlyLookup} from './fastly.js';

Expand Down Expand Up @@ -50,7 +51,7 @@ export const geoIpLookup = async (addr: string): Promise<LocationInfo> => {
);

if (fastly.status === 'fulfilled' && isVpn(fastly.value.client)) {
throw new Error('vpn detected');
throw new InternalError('vpn detected', true);
}

return fulfilled.filter(v => v !== null).flat();
Expand Down
9 changes: 9 additions & 0 deletions src/lib/internal-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class InternalError extends Error {
expose?: boolean;

constructor(message: string, isExposed: boolean) {
super(message);

this.expose = isExposed;
}
}
4 changes: 3 additions & 1 deletion src/lib/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ const logger = winston.createLogger({
winston.format.printf(info => `[${info['timestamp'] as string}] [${info.level.toUpperCase()}] [${process.pid}] [${info['scope'] as string}] ${info.message}`),
),
transports: [
new winston.transports.Console(),
new winston.transports.Console({
silent: process.env['NODE_ENV'] === 'test',
}),
],
});

Expand Down
24 changes: 9 additions & 15 deletions src/lib/ws/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
import {scopedLogger} from '../logger.js';
import type {Socket} from 'socket.io';
import type {Probe} from '../../probe/types.js';
import {handleMeasurementAck} from '../../measurement/handler/ack.js';
import {handleMeasurementResult} from '../../measurement/handler/result.js';
import {handleMeasurementProgress} from '../../measurement/handler/progress.js';
import {scopedLogger} from '../logger.js';
import {getWsServer, PROBES_NAMESPACE} from './server.js';
import {probeMetadata} from './middleware/probe-metadata.js';
import {verifyIpLimit} from './helper/probe-ip-limit.js';
import {errorHandler} from './helper/error-handler.js';

const io = getWsServer();
const logger = scopedLogger('gateway');

io
.of(PROBES_NAMESPACE)
.use(probeMetadata)
.on('connect', async socket => {
const {probe} = socket.data;

if (!probe) {
logger.error('socket metadata missing', socket);
socket.disconnect();
return;
}

const isLimitReached = await verifyIpLimit(socket);
if (isLimitReached) {
return;
}
.on('connect', errorHandler(async (socket: Socket) => {
await verifyIpLimit(socket);

const probe = socket.data['probe'] as Probe;
socket.emit('api:connect:location', probe.location);
logger.info(`ws client ${socket.id} connected from ${probe.location.country}`);

// Handlers
socket.on('probe:measurement:ack', handleMeasurementAck(probe));
socket.on('probe:measurement:progress', handleMeasurementProgress);
socket.on('probe:measurement:result', handleMeasurementResult);
});
}));
33 changes: 33 additions & 0 deletions src/lib/ws/helper/error-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import type {Socket} from 'socket.io';
import {scopedLogger} from '../../logger.js';
import {WsError} from '../ws-error.js';

const logger = scopedLogger('ws:error');

type NextConnectArgument = (
socket: Socket,
) => Promise<void>;

type NextMwArgument = (
socket: Socket,
next: () => void
) => Promise<void>;

type NextArgument = NextConnectArgument | NextMwArgument;

export const errorHandler = (next: NextArgument) => async (socket: Socket, mwNext?: (error?: any) => void | undefined) => {
try {
await next(socket, mwNext!);
} catch (error: unknown) {
if (error instanceof WsError) {
socket.emit('api:error', error.toJson());
logger.info(`disconnecting client ${error.info.socketId} for (${error.message})`);
}

if (mwNext) {
mwNext(error);
}

socket.disconnect();
}
};
20 changes: 13 additions & 7 deletions src/lib/ws/helper/probe-ip-limit.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import * as process from 'node:process';
import type {Socket} from 'socket.io';
import type {Probe} from '../../../probe/types.js';
import {WsError} from '../ws-error.js';
import {getWsServer, PROBES_NAMESPACE} from '../server.js';
import {scopedLogger} from '../../logger.js';

const io = getWsServer();
const logger = scopedLogger('ws:limit');

export const verifyIpLimit = async (socket: Socket): Promise<boolean> => {
export const verifyIpLimit = async (socket: Socket): Promise<void> => {
if (process.env['FAKE_PROBE_IP']) {
return false;
return;
}

const socketList = await io.of(PROBES_NAMESPACE).fetchSockets();
Expand All @@ -17,10 +19,14 @@ export const verifyIpLimit = async (socket: Socket): Promise<boolean> => {
);

if (previousSocket) {
socket.disconnect();
logger.info(`ws client ${socket.id} has reached the concurrent IP limit. Disconnected. (${previousSocket.data.probe.ipAddress})`);
return true;
logger.info(`ws client ${socket.id} has reached the concurrent IP limit.`, {message: previousSocket.data.probe.ipAddress});
throw new WsError(
'IP Limit',
{
code: 'ip_limit',
socketId: socket.id,
probe: socket.data['probe'] as Probe,
},
);
}

return false;
};
22 changes: 16 additions & 6 deletions src/lib/ws/middleware/probe-metadata.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import type {Socket} from 'socket.io';
import type {ExtendedError} from 'socket.io/dist/namespace.js';
import {WsError} from '../ws-error.js';
import {buildProbe} from '../../../probe/builder.js';
import {scopedLogger} from '../../logger.js';
import {InternalError} from '../../internal-error.js';
import {errorHandler} from '../helper/error-handler.js';

const logger = scopedLogger('ws');

export const probeMetadata = async (socket: Socket, next: (error?: ExtendedError) => void) => {
export const probeMetadata = errorHandler(async (socket: Socket, next: (error?: ExtendedError) => void) => {
try {
socket.data['probe'] = await buildProbe(socket);
next();
} catch {
// Todo: add more info to log when we add probe authentication
logger.warn('failed to collect probe metadata');
next(new Error('probe not recognized by the API'));
} catch (error: unknown) {
let message = 'failed to collect probe metadata';

if (error instanceof InternalError && error?.expose) {
message = error.message;
}

logger.warn(message);
const nError = new WsError(message, {socketId: socket.id});

throw nError;
}
};
});
31 changes: 31 additions & 0 deletions src/lib/ws/ws-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type {Probe} from '../../probe/types.js';

type Info = {
socketId: string;
// A string used for message parsing
code?: string;
probe?: Probe;
cause?: any;
};

type JsonResponse = {
message: string;
info: Info;
};

export class WsError extends Error {
info: Info;
constructor(message: string, info: Info) {
super(message);
this.info = info;
}

toJson(): JsonResponse {
return {
message: this.message,
info: this.info,
};
}
}

export default WsError;
108 changes: 108 additions & 0 deletions test/tests/unit/ws/error-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import {expect} from 'chai';
import type {Socket} from 'socket.io';

import {WsError} from '../../../../src/lib/ws/ws-error.js';
import {errorHandler} from '../../../../src/lib/ws/helper/error-handler.js';

class MockSocket {
id: string;
isConnected: boolean;
store: Array<{type: string; event: string; payload: any}> = [];

constructor(id: string) {
this.id = id;
this.isConnected = true;
}

emit(event: string, payload: string | Record<string, unknown>) {
this.store.push({type: 'emit', event, payload});
}

disconnect() {
this.isConnected = false;
}
}

type BundledMockSocket = Socket & MockSocket;

describe('ws error', () => {
describe('connect', () => {
it('should catch error and disconnect socket', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new Error('abc');
};

await errorHandler(testMethod)(socket as Socket);

expect(socket.isConnected).to.equal(false);
});

it('should catch error and emit api:error event ', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new WsError('abc', {socketId: socket.id});
};

await errorHandler(testMethod)(socket as Socket);

const storeError = socket.store.find(m => m.event === 'api:error');
expect(socket.isConnected).to.equal(false);
expect(storeError).to.exist;
expect(storeError?.payload?.message).to.equal('abc');
});
});

describe('middleware', () => {
it('should catch error and execute cb', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;
let cbError = null;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new Error('abc');
};

const testCb = (error: Error) => {
cbError = error;
};

await errorHandler(testMethod)(socket as Socket, testCb);

const apiError = socket.store.find(m => m.event === 'api:error');
expect(cbError).to.not.be.null;
expect(cbError).to.be.instanceof(Error);
expect(apiError).to.not.exist;
});

it('should catch error, execute cb and emit api:error event', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;
let cbError = null;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new WsError('vpn detected', {socketId: socket.id});
};

const testCb = (error: Error) => {
cbError = error;
};

await errorHandler(testMethod)(socket as Socket, testCb);

const apiError = socket.store.find(m => m.event === 'api:error');
expect(cbError).to.not.be.null;
expect(cbError).to.be.instanceof(WsError);
expect(apiError).to.exist;
expect(apiError?.payload.message).to.equal('vpn detected');
});
});
});