Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repeater): refrain from utilizing non-standard ports #404

Merged
merged 37 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c1b6d76
feat(repeater): replace rabbitmq with repeater server
lsndr Jul 21, 2023
28fd24e
refactor: introduce `ServerRepeaterLauncher`
lsndr Jul 25, 2023
4e391c7
refactor: replace `repeater-server` with `repeaterServer`
lsndr Jul 25, 2023
a469f99
feat: ping repeater server every 10s
lsndr Jul 25, 2023
6008885
fix: use non secure connection in local env
lsndr Jul 25, 2023
367e7d5
fix: so nothing if socket not connected
lsndr Jul 25, 2023
13a42f1
refactor: rephrase error message
lsndr Jul 25, 2023
fb4caa6
refactor: extend error info
lsndr Jul 25, 2023
d08744c
fix: always call response callback
lsndr Jul 25, 2023
e028e40
refactor: use arrow function
lsndr Jul 25, 2023
75d25f8
refactor: rename to `repeaterServer`
lsndr Jul 25, 2023
b93e651
refactor: rename method to `onReconnectionFailed`
lsndr Jul 25, 2023
de2032b
feat: log reconnect attempts
lsndr Jul 25, 2023
71e22c6
refactor: reorganize typings
lsndr Jul 25, 2023
4b49487
feat: listen to error events
lsndr Jul 25, 2023
73bf837
feat: allow to run multiple repeaters
lsndr Jul 25, 2023
c1d484c
refactor: move ping logic to repeater server class
lsndr Jul 25, 2023
e913bc2
fix: set reconnect times to 20
lsndr Jul 25, 2023
8a7313b
test: fix test
lsndr Jul 25, 2023
7e7b2ee
refactor: rename method to `reconnectionFailed`
lsndr Jul 27, 2023
a776ad5
feat: keep repeater id mandatory
lsndr Jul 27, 2023
a573622
refactor: use `clearPingTimer`
lsndr Jul 27, 2023
c9d79a8
refactor: extract socket getter
lsndr Jul 27, 2023
6544536
refactor: mark `compileScripts` as deprecated
lsndr Jul 27, 2023
63cc71b
refactor: improve `RepeaterServerRequestResponse` safety
lsndr Jul 27, 2023
f78a9ac
refactor: rename method to `requestReceived`
lsndr Jul 27, 2023
e3cf60d
refactor: separate responsibilities of event listeners
lsndr Jul 27, 2023
94a4491
refactor: extract reconnection attempt handling to launcher
lsndr Jul 27, 2023
7985d96
feat: add `reconnectionSucceeded` event handler
lsndr Jul 27, 2023
ac71fd1
refactor: remove excess typings
lsndr Jul 27, 2023
9c8ff38
feat: set connectTimeout to 10s
lsndr Jul 27, 2023
6f6efd6
feat: add proxy support
lsndr Jul 27, 2023
b1fd995
refactor: make `compileScripts` optional
lsndr Jul 27, 2023
5ca2059
refactor: rephrase explanation
lsndr Jul 27, 2023
78ea71e
refactor: rename to `Repeater connected`
lsndr Jul 27, 2023
c60466f
refactor: use async/await approach
lsndr Jul 31, 2023
42f2a12
refactor: remove `RepeaterServerEventHandler`
lsndr Jul 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 103 additions & 187 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"request": "~2.88.2",
"request-promise": "~4.2.6",
"semver": "^7.3.5",
"socket.io-client": "^4.7.1",
"socket.io-msgpack-parser": "^3.0.2",
"socks": "~2.6.1",
"socks-proxy-agent": "~5.0.0",
"tslib": "^2.3.1",
Expand Down
23 changes: 10 additions & 13 deletions src/Commands/RunRepeater.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { RabbitMQBusOptions } from '../Bus';
import { Cert, RequestExecutorOptions } from '../RequestExecutor';
import { Helpers, logger } from '../Utils';
import { container } from '../Config';
import { RepeaterLauncher } from '../Repeater';
import { DefaultRepeaterServerOptions, RepeaterLauncher } from '../Repeater';
import { Arguments, Argv, CommandModule } from 'yargs';
import { normalize } from 'path';

Expand Down Expand Up @@ -178,19 +177,17 @@ export class RunRepeater implements CommandModule {
]
}
})
.register<RabbitMQBusOptions>(RabbitMQBusOptions, {
useValue: {
exchange: 'EventBus',
clientQueue: `agent:${args.id as string}`,
connectTimeout: 10000,
url: args.bus as string,
proxyUrl: (args.proxyExternal ?? args.proxy) as string,
credentials: {
username: 'bot',
password: args.token as string
.register<DefaultRepeaterServerOptions>(
DefaultRepeaterServerOptions,
{
useValue: {
uri: args.repeaterServer as string,
token: args.token as string,
connectTimeout: 10000,
proxyUrl: (args.proxyExternal ?? args.proxy) as string
}
}
})
)
);
}

Expand Down
8 changes: 5 additions & 3 deletions src/Config/CliBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ export class CliBuilder {
describe: 'SOCKS4 or SOCKS5 URL to proxy internal traffic'
})
.middleware((args: Arguments) => {
({ bus: args.bus, api: args.api } = Helpers.getClusterUrls(
args as ClusterArgs
));
({
bus: args.bus,
api: args.api,
repeaterServer: args.repeaterServer
} = Helpers.getClusterUrls(args as ClusterArgs));
})
// TODO: (victor.polyakov@brightsec.com) Write correct type checking
.middleware(
Expand Down
16 changes: 14 additions & 2 deletions src/Config/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ import { ConfigReader } from './ConfigReader';
import { DefaultConfigReader } from './DefaultConfigReader';
import { CliInfo } from './CliInfo';
import { CliBuilder } from './CliBuilder';
import { DefaultRepeaterLauncher, RepeaterLauncher } from '../Repeater';
import {
ServerRepeaterLauncher,
RepeaterLauncher,
RepeaterServer,
DefaultRepeaterServer
} from '../Repeater';
import { container, Lifecycle } from 'tsyringe';

container
Expand Down Expand Up @@ -103,6 +108,13 @@ container
},
{ lifecycle: Lifecycle.Singleton }
)
.register(
RepeaterServer,
{
useClass: DefaultRepeaterServer
},
{ lifecycle: Lifecycle.Singleton }
)
.register(
Tokens,
{
Expand Down Expand Up @@ -202,7 +214,7 @@ container
)
.register<RepeaterLauncher>(
RepeaterLauncher,
{ useClass: DefaultRepeaterLauncher },
{ useClass: ServerRepeaterLauncher },
{ lifecycle: Lifecycle.Singleton }
)
.register<CliBuilder>(CliBuilder, {
Expand Down
207 changes: 207 additions & 0 deletions src/Repeater/DefaultRepeaterServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import { logger } from '../Utils';
import {
RepeaterServer,
RepeaterServerDeployedEvent,
RepeaterServerReconnectionFailedEvent,
RepeaterServerRequestEvent,
RepeaterServerRequestResponse,
RepeaterServerErrorEvent,
RepeaterServerReconnectionAttemptedEvent
} from './RepeaterServer';
import { inject, injectable } from 'tsyringe';
import io, { Socket } from 'socket.io-client';
import parser from 'socket.io-msgpack-parser';
import { SocksProxyAgent } from 'socks-proxy-agent';
import { once } from 'events';
import { parse } from 'url';
import Timer = NodeJS.Timer;

export interface DefaultRepeaterServerOptions {
readonly uri: string;
readonly token: string;
readonly connectTimeout?: number;
readonly proxyUrl?: string;
}

export const DefaultRepeaterServerOptions: unique symbol = Symbol(
'DefaultRepeaterServerOptions'
);

@injectable()
export class DefaultRepeaterServer implements RepeaterServer {
private latestReconnectionError?: Error;
private readonly MAX_RECONNECTION_ATTEMPTS = 20;
private _socket?: Socket;
private timer?: Timer;

constructor(
@inject(DefaultRepeaterServerOptions)
private readonly options: DefaultRepeaterServerOptions
) {}

public disconnect() {
this.clearPingTimer();

this._socket?.disconnect();
this._socket?.removeAllListeners();
this._socket = undefined;
}

public async deploy(
repeaterId?: string
): Promise<RepeaterServerDeployedEvent> {
this.socket.emit('deploy', {
repeaterId
});

const [result]: RepeaterServerDeployedEvent[] = await once(
this.socket,
'deployed'
);

return result;
}

public connect(hostname: string) {
this._socket = io(this.options.uri, {
parser,
path: '/api/ws/v1',
transports: ['websocket'],
timeout: this.options?.connectTimeout,
// @ts-expect-error Type is wrong.
// Agent is passed directly to "ws" package, which accepts http.Agent
agent: this.options.proxyUrl
? new SocksProxyAgent({
...parse(this.options.proxyUrl)
})
: false,
reconnectionAttempts: this.MAX_RECONNECTION_ATTEMPTS,
auth: {
token: this.options.token,
domain: hostname
}
});

this.socket.on('connect_error', (error: Error) => {
logger.debug(`Unable to connect to the %s host`, this.options.uri, error);
});

this.createPingTimer();

logger.debug('Event bus connected to %s', this.options.uri);
}

public requestReceived(
handler: (
event: RepeaterServerRequestEvent
) => RepeaterServerRequestResponse | Promise<RepeaterServerRequestResponse>
): void {
this.socket.on('request', (payload, callback) => {
this.processEventHandler('request', payload, handler, callback);
});
}

public reconnectionFailed(
handler: (
event: RepeaterServerReconnectionFailedEvent
) => void | Promise<void>
): void {
this.socket.io.on('reconnect', () => {
this.latestReconnectionError = undefined;
});

this.socket.io.on('reconnect_error', (error) => {
this.latestReconnectionError = error;
});

this.socket.io.on('reconnect_failed', () => {
this.processEventHandler(
'reconnection_failed',
{
error: this.latestReconnectionError
},
handler
);
});
}

public errorOccurred(
handler: (event: RepeaterServerErrorEvent) => void | Promise<void>
): void {
this.socket.on('error', (payload, callback) => {
this.processEventHandler('error', payload, handler, callback);
});
}

public reconnectionAttempted(
handler: (
event: RepeaterServerReconnectionAttemptedEvent
) => void | Promise<void>
): void {
this.socket.io.on('reconnect_attempt', (attempt) => {
this.processEventHandler(
'reconnect_attempt',
{ attempt, maxAttempts: this.MAX_RECONNECTION_ATTEMPTS },
handler
);
});
}

public reconnectionSucceeded(handler: () => void | Promise<void>): void {
this.socket.io.on('reconnect', () => {
this.processEventHandler('reconnect', undefined, handler);
});
}

private get socket() {
if (!this._socket) {
throw new Error(
'Please make sure that repeater established a connection with host.'
);
}

return this._socket;
}

private processEventHandler<P>(
event: string,
payload: P,
handler: (payload: P) => unknown,
callback?: unknown
) {
(async function () {
try {
const response = await handler(payload);

if (typeof callback !== 'function') {
return;
}

callback(response);
} catch (error) {
logger.debug(
'Error processing event "%s" with the following payload: %s. Details: %s',
event,
payload,
error
);
logger.error('Error: %s', error.message);
}
})();
}

private createPingTimer() {
this.clearPingTimer();

this.timer = setInterval(
() => this.socket.volatile.emit('ping'),
derevnjuk marked this conversation as resolved.
Show resolved Hide resolved
10000
).unref();
}

private clearPingTimer() {
if (this.timer) {
clearInterval(this.timer);
}
derevnjuk marked this conversation as resolved.
Show resolved Hide resolved
}
}
5 changes: 4 additions & 1 deletion src/Repeater/RepeaterLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ export interface RepeaterLauncher {

loadScripts(scripts: Record<string, string>): Promise<void>;

compileScripts(scripts: string | Record<string, string>): void;
derevnjuk marked this conversation as resolved.
Show resolved Hide resolved
/**
* @deprecated currently not supported by some implementations
*/
compileScripts?(scripts: string | Record<string, string>): void;

run(repeaterId: string, asDaemon?: boolean): Promise<void>;

Expand Down
76 changes: 76 additions & 0 deletions src/Repeater/RepeaterServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { Protocol } from '../RequestExecutor';

export interface RepeaterServerDeployedEvent {
repeaterId: string;
}

export interface RepeaterServerRequestEvent {
protocol: Protocol;
url: string;
method?: string;
headers?: Record<string, string | string[]>;
correlationIdRegex?: string;
body?: string;
}

export type RepeaterServerRequestResponse =
| {
protocol: Protocol;
statusCode?: number;
message?: string;
errorCode?: string;
headers?: Record<string, string | string[] | undefined>;
body?: string;
}
| {
protocol: Protocol;
message?: string;
errorCode?: string;
};

export interface RepeaterServerReconnectionFailedEvent {
error: Error;
}

export interface RepeaterServerReconnectionAttemptedEvent {
attempt: number;
maxAttempts: number;
}

export interface RepeaterServerErrorEvent {
message: string;
}

export interface RepeaterServer {
disconnect(): void;

connect(hostname: string): void;

deploy(repeaterId?: string): Promise<RepeaterServerDeployedEvent>;

requestReceived(
handler: (
event: RepeaterServerRequestEvent
) => RepeaterServerRequestResponse | Promise<RepeaterServerRequestResponse>
): void;

reconnectionFailed(
handler: (
event: RepeaterServerReconnectionFailedEvent
) => void | Promise<void>
): void;

reconnectionAttempted(
handler: (
event: RepeaterServerReconnectionAttemptedEvent
) => void | Promise<void>
): void;

reconnectionSucceeded(handler: () => void | Promise<void>): void;

errorOccurred(
handler: (event: RepeaterServerErrorEvent) => void | Promise<void>
): void;
}

export const RepeaterServer: unique symbol = Symbol('RepeaterServer');
Loading