Skip to content

Commit

Permalink
fix: make event bus as signleton and failure events to be fired
Browse files Browse the repository at this point in the history
  • Loading branch information
SocketSomeone committed Dec 13, 2023
1 parent 5648eea commit ea96734
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 21 deletions.
4 changes: 1 addition & 3 deletions src/commands/base.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export type ReturnTypeOfRun<T extends BaseCommand> = ReturnType<T['run']>;
export abstract class BaseCommand {
protected readonly logger: Logger;

protected readonly eventBus = new ResilienceEventBus();
protected readonly eventBus = ResilienceEventBus.getInstance();

protected readonly strategies: Strategy[];

Expand Down Expand Up @@ -42,13 +42,11 @@ export abstract class BaseCommand {
if (error instanceof TimeoutException) {
this.logger.debug('Command timed out');
this.eventBus.emit(ResilienceEventType.Timeout, this);
return error;
}

if (error instanceof CircuitOpenedException) {
this.logger.debug('Command short-circuited');
this.eventBus.emit(ResilienceEventType.ShortCircuit, this);
return error;
}

this.logger.debug('Command failed');
Expand Down
26 changes: 13 additions & 13 deletions src/commands/resilience.command.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BaseCommand, ReturnTypeOfRun } from './base.command';
import { defer, lastValueFrom, Observable } from 'rxjs';
import { catchError, defer, lastValueFrom, Observable, tap } from 'rxjs';
import { ResilienceEventType } from '../enum';

export abstract class ResilienceCommand extends BaseCommand {
Expand All @@ -8,19 +8,19 @@ export abstract class ResilienceCommand extends BaseCommand {
public execute(...args: Parameters<this['run']>): ReturnTypeOfRun<this> {
this.eventBus.emit(ResilienceEventType.Emit, this);

try {
let observable: Observable<ReturnTypeOfRun<this>> = defer(() => this.run(...args));
let observable: Observable<ReturnTypeOfRun<this>> = defer(() => this.run(...args));

for (const strategy of this.strategies) {
observable = strategy.process(observable, this, ...args);
}

return lastValueFrom(observable).then(result => {
this.onSuccess();
return result;
}) as ReturnTypeOfRun<this>;
} catch (error) {
throw this.onFailure(error);
for (const strategy of this.strategies) {
observable = strategy.process(observable, this, ...args);
}

return lastValueFrom(
observable.pipe(
catchError(error => {
throw this.onFailure(error);
}),
tap(() => this.onSuccess())
)
) as ReturnTypeOfRun<this>;
}
}
2 changes: 2 additions & 0 deletions src/resilience.event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export class ResilienceEventBus {
return ResilienceEventBus.instance;
}

private constructor() {}

private readonly emitter = new EventEmitter();

public on<K extends keyof ResilienceEvents>(event: K, fn: (args: ResilienceEvents[K]) => void) {
Expand Down
2 changes: 1 addition & 1 deletion src/strategies/circuit-breaker.strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class CircuitBreakerStrategy extends Strategy<CircuitBreakerOptions> {
state.openedAt = 0;
}
}

state.lastRequestTimeMs = Date.now();

if (isOpen()) {
Expand Down
45 changes: 41 additions & 4 deletions test/commands/resilience.command.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import { ResilienceCommand } from '../../src';
import { ResilienceCommand, ResilienceEventBus } from '../../src';
import { retryStrategy } from './fixtures/strategy.fixture';

class TestCommand extends ResilienceCommand {
protected strategies = [retryStrategy];

private count = 0;

private isError = false;

public setCount(count: number) {
this.count = count;
}

public setIsError(isError: boolean) {
this.isError = isError;
}

public async run() {
this.count += 1;

if (this.isError) {
throw new Error('Test');
}

if (this.count !== 3) {
throw new Error('Test');
}
Expand All @@ -18,10 +30,35 @@ class TestCommand extends ResilienceCommand {
}

describe('Resilience Command', () => {
const command = new TestCommand([retryStrategy]);
const eventBus = ResilienceEventBus.getInstance();

let callback = jest.fn();

beforeEach(() => {
command.setCount(0);
command.setIsError(false);
callback = jest.fn();
});

it('should be able to retry a promise', async () => {
const command = new TestCommand([retryStrategy]);
eventBus.on('success', callback);

const value = await command.execute();

expect(value).toBe(1000);
expect(callback).toHaveBeenCalled();
});

it('should emit the failure', async () => {
command.setIsError(true);
eventBus.on('failure', callback);

try {
await command.execute();
} catch (error) {
expect(error.message).toBe('Test');
expect(callback).toHaveBeenCalled();
}
});
});

0 comments on commit ea96734

Please sign in to comment.