Skip to content

Commit

Permalink
Merge branch 'Sikora00-feat/409-logging'
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed May 5, 2022
2 parents fdd5251 + 95ae336 commit 0a9d25f
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable, OnModuleDestroy, Type } from '@nestjs/common';
import { Injectable, Logger, OnModuleDestroy, Type } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import { from, Observable, Subscription } from 'rxjs';
import { filter, mergeMap } from 'rxjs/operators';
import { isFunction } from 'util';
import { CommandBus } from './command-bus';
import { EVENTS_HANDLER_METADATA, SAGA_METADATA } from './decorators/constants';
Expand All @@ -27,11 +27,13 @@ export type EventHandlerType<EventBase extends IEvent = IEvent> = Type<
@Injectable()
export class EventBus<EventBase extends IEvent = IEvent>
extends ObservableBus<EventBase>
implements IEventBus<EventBase>, OnModuleDestroy {
implements IEventBus<EventBase>, OnModuleDestroy
{
protected getEventId: (event: EventBase) => string | null;
protected readonly subscriptions: Subscription[];

private _publisher: IEventPublisher<EventBase>;
private readonly _logger = new Logger(EventBus.name);

constructor(
private readonly commandBus: CommandBus,
Expand Down Expand Up @@ -68,7 +70,17 @@ export class EventBus<EventBase extends IEvent = IEvent>

bind(handler: IEventHandler<EventBase>, id: string) {
const stream$ = id ? this.ofEventId(id) : this.subject$;
const subscription = stream$.subscribe((event) => handler.handle(event));
const subscription = stream$
.pipe(mergeMap((event) => from(handler.handle(event))))
.subscribe({
error: (error) => {
this._logger.error(
`"${handler.constructor.name}" has thrown an error.`,
error,
);
throw error;
},
});
this.subscriptions.push(subscription);
}

Expand Down Expand Up @@ -98,14 +110,15 @@ export class EventBus<EventBase extends IEvent = IEvent>
}
const events = this.reflectEvents(handler);
events.map((event) =>
this.bind(instance as IEventHandler<EventBase>, defaultReflectEventId(event)),
this.bind(
instance as IEventHandler<EventBase>,
defaultReflectEventId(event),
),
);
}

protected ofEventId(id: string) {
return this.subject$.pipe(
filter((event) => this.getEventId(event) === id),
);
return this.subject$.pipe(filter((event) => this.getEventId(event) === id));
}

protected registerSaga(saga: ISaga<EventBase>) {
Expand All @@ -118,8 +131,19 @@ export class EventBus<EventBase extends IEvent = IEvent>
}

const subscription = stream$
.pipe(filter((e) => !!e))
.subscribe((command) => this.commandBus.execute(command));
.pipe(
filter((e) => !!e),
mergeMap((command) => from(this.commandBus.execute(command))),
)
.subscribe({
error: (error) => {
this._logger.error(
`Command handler which execution was triggered by Saga has thrown an error.`,
error,
);
throw error;
},
});

this.subscriptions.push(subscription);
}
Expand Down

0 comments on commit 0a9d25f

Please sign in to comment.