Skip to content

Commit

Permalink
feat(repeater): replace rabbitmq with repeater server
Browse files Browse the repository at this point in the history
  • Loading branch information
lsndr committed Jul 21, 2023
1 parent 362bbf5 commit 476bd79
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 337 deletions.
290 changes: 103 additions & 187 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"request": "~2.88.2",
"request-promise": "~4.2.6",
"semver": "^7.3.5",
"socket.io-client": "^4.7.1",
"socket.io-msgpack-parser": "^3.0.2",
"socks": "~2.6.1",
"socks-proxy-agent": "~5.0.0",
"tslib": "^2.3.1",
Expand Down
37 changes: 18 additions & 19 deletions src/Commands/RunRepeater.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { RabbitMQBusOptions } from '../Bus';
import { Cert, RequestExecutorOptions } from '../RequestExecutor';
import { Helpers, logger } from '../Utils';
import { container } from '../Config';
import { RepeaterLauncher } from '../Repeater';
import { RepeaterLauncher, DefaultRepeaterOptions } from '../Repeater';
import { Arguments, Argv, CommandModule } from 'yargs';
import { normalize } from 'path';

Expand All @@ -24,7 +23,7 @@ export class RunRepeater implements CommandModule {
'ID of an existing repeater which you want to use to run a new scan.',
type: 'string',
requiresArg: true,
demandOption: true
demandOption: false
})
.option('scripts', {
alias: 'S',
Expand Down Expand Up @@ -141,8 +140,12 @@ export class RunRepeater implements CommandModule {
}, true)
.exitProcess(false)
.check((args: Arguments) => {
const id = args.id as string;
if (!Helpers.isShortUUID(id) && !Helpers.isUUID(id)) {
const id = args.id as string | undefined;
if (
typeof id !== 'undefined' &&
!Helpers.isShortUUID(id) &&
!Helpers.isUUID(id)
) {
throw new Error(
'Option --id has wrong value. Please ensure that --id option has a valid ID.'
);
Expand Down Expand Up @@ -178,17 +181,10 @@ export class RunRepeater implements CommandModule {
]
}
})
.register<RabbitMQBusOptions>(RabbitMQBusOptions, {
.register<DefaultRepeaterOptions>(DefaultRepeaterOptions, {
useValue: {
exchange: 'EventBus',
clientQueue: `agent:${args.id as string}`,
connectTimeout: 10000,
url: args.bus as string,
proxyUrl: (args.proxyExternal ?? args.proxy) as string,
credentials: {
username: 'bot',
password: args.token as string
}
uri: args['repeater-server'] as string,
token: args.token as string
}
})
);
Expand Down Expand Up @@ -223,16 +219,19 @@ export class RunRepeater implements CommandModule {

try {
['SIGTERM', 'SIGINT', 'SIGHUP'].forEach((event) =>
process.on(event, async () => {
await repeaterLauncher.close();
process.on(event, () => {
repeaterLauncher.close();
process.exit(0);
})
);

await repeaterLauncher.run(args.id as string, args.run as boolean);
await repeaterLauncher.run(
args.id as string | undefined,
args.run as boolean
);
} catch (e) {
logger.error(e.message);
await repeaterLauncher.close();
repeaterLauncher.close();
process.exit(1);
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/Config/CliBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ export class CliBuilder {
describe: 'SOCKS4 or SOCKS5 URL to proxy internal traffic'
})
.middleware((args: Arguments) => {
({ bus: args.bus, api: args.api } = Helpers.getClusterUrls(
args as ClusterArgs
));
({
bus: args.bus,
api: args.api,
repeaterServer: args['repeater-server']
} = Helpers.getClusterUrls(args as ClusterArgs));
})
// TODO: (victor.polyakov@brightsec.com) Write correct type checking
.middleware(
Expand Down
14 changes: 13 additions & 1 deletion src/Config/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ import { ConfigReader } from './ConfigReader';
import { DefaultConfigReader } from './DefaultConfigReader';
import { CliInfo } from './CliInfo';
import { CliBuilder } from './CliBuilder';
import { DefaultRepeaterLauncher, RepeaterLauncher } from '../Repeater';
import {
DefaultRepeaterLauncher,
RepeaterLauncher,
Repeater,
DefaultRepeater
} from '../Repeater';
import { container, Lifecycle } from 'tsyringe';

container
Expand Down Expand Up @@ -103,6 +108,13 @@ container
},
{ lifecycle: Lifecycle.Singleton }
)
.register(
Repeater,
{
useClass: DefaultRepeater
},
{ lifecycle: Lifecycle.Singleton }
)
.register(
Tokens,
{
Expand Down
147 changes: 147 additions & 0 deletions src/Repeater/DefaultRepeater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { logger } from '../Utils';
import {
Repeater,
RepeaterEventHandler,
RepeaterDeployedEvent,
RepeaterEvents
} from './Repeater';
import { inject, injectable } from 'tsyringe';
import io, { Socket } from 'socket.io-client';
import parser from 'socket.io-msgpack-parser';
import { hostname } from 'os';
import { once } from 'events';

export interface DefaultRepeaterOptions {
readonly uri: string;
readonly token: string;
}

export const DefaultRepeaterOptions: unique symbol = Symbol(
'DefaultRepeaterOptions'
);

@injectable()
export class DefaultRepeater implements Repeater {
private latestReconnectionError?: Error;
private readonly DEFAULT_RECONNECT_TIMES = 3;
private socket?: Socket;

constructor(
@inject(DefaultRepeaterOptions)
private readonly options: DefaultRepeaterOptions
) {}

public disconnect() {
if (!this.socket) {
throw new Error('Not connected');
}

this.socket?.disconnect();
this.socket?.removeAllListeners();
this.socket = undefined;
}

public async deploy(
repeaterId?: string
): Promise<RepeaterDeployedEvent['response']> {
if (!this.socket) {
throw new Error('Repeater is not connected yet');
}

this.socket.emit('deploy', {
repeaterId
});

const [result]: RepeaterDeployedEvent['response'][] = await once(
this.socket,
'deployed'
);

return result;
}

public connect() {
this.socket = io(this.options.uri, {
parser,
path: '/api/ws/v1',
transports: ['websocket'],
reconnectionAttempts: this.DEFAULT_RECONNECT_TIMES,
auth: {
token: this.options.token,
domain: hostname()
}
});

this.socket.on('connect_error', (error: Error) => {
logger.debug(`Unexpected error: %s`, error);
});

logger.debug('Event bus connected to %s', this.options.uri);
}

public on<E extends keyof RepeaterEvents, H extends RepeaterEventHandler<E>>(
event: E,
handler: H
): void {
const eventName: string = event;

if (event === 'reconnection_failed') {
// TODO: Figure out why type is not narrowing
this.onReconnectionFailure(handler as any);
} else {
this.socket.on(eventName, (payload, callback) => {
this.processEventHandler(event, payload, handler, callback);
});
}
}

private onReconnectionFailure<
H extends RepeaterEventHandler<'reconnection_failed'>
>(handler: H) {
this.socket.io.on('reconnect', () => {
this.latestReconnectionError = undefined;
});

this.socket.io.on('reconnect_error', (error) => {
this.latestReconnectionError = error;
});

this.socket.io.on('reconnect_failed', () => {
this.processEventHandler(
'reconnection_failed',
{
error: this.latestReconnectionError
},
handler
);
});
}

private processEventHandler<
E extends keyof RepeaterEvents,
H extends RepeaterEventHandler<E>
>(
event: E,
payload: RepeaterEvents[E]['request'],
handler: H,
callback?: unknown
) {
Promise.resolve(handler(payload))
.then((response) => {
if (typeof callback !== 'function' || typeof response === 'undefined') {
return;
}

callback(response);
})
.catch((error) => {
logger.debug(
'Error processing event "%s" with the following payload: %s. Details: %s',
event,
payload,
error
);
logger.error('Error: %s', error.message);
});
}
}
Loading

0 comments on commit 476bd79

Please sign in to comment.