diff --git a/src/event-bus.ts b/src/event-bus.ts index db24d707..876f5b5e 100644 --- a/src/event-bus.ts +++ b/src/event-bus.ts @@ -1,6 +1,6 @@ import { Injectable, Logger, OnModuleDestroy, Type } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; -import { Observable, Subscription, defer, throwError } from 'rxjs'; +import { Observable, Subscription, defer, of, throwError } from 'rxjs'; import { catchError, filter, mergeMap } from 'rxjs/operators'; import { CommandBus } from './command-bus'; import { EVENTS_HANDLER_METADATA, SAGA_METADATA } from './decorators/constants'; @@ -94,24 +94,19 @@ export class EventBus .pipe( mergeMap((event) => defer(() => Promise.resolve(handler.handle(event))).pipe( - catchError((error) => - throwError(() => this.mapToUnhandledErrorInfo(event, error)), - ), + catchError((error) => { + const unhandledError = this.mapToUnhandledErrorInfo(event, error); + this.unhandledExceptionBus.publish(unhandledError); + this._logger.error( + `"${handler.constructor.name}" has thrown an unhandled exception.`, + error, + ); + return of(); + }), ), ), ) - .subscribe({ - error: (error) => { - if (this.isUnhandledErrorInfo(error)) { - this.unhandledExceptionBus.publish(error); - error = error.exception; - } - this._logger.error( - `"${handler.constructor.name}" has thrown an unhandled exception.`, - error, - ); - }, - }); + .subscribe(); this.subscriptions.push(subscription); } @@ -166,24 +161,19 @@ export class EventBus filter((e) => !!e), mergeMap((command) => defer(() => this.commandBus.execute(command)).pipe( - catchError((error) => - throwError(() => this.mapToUnhandledErrorInfo(command, error)), - ), + catchError((error) => { + const unhandledError = this.mapToUnhandledErrorInfo(event, error); + this.unhandledExceptionBus.publish(unhandledError); + this._logger.error( + `Command handler which execution was triggered by Saga has thrown an unhandled exception.`, + error, + ); + return of(); + }), ), ), ) - .subscribe({ - error: (error) => { - if (this.isUnhandledErrorInfo(error)) { - this.unhandledExceptionBus.publish(error); - error = error.exception; - } - this._logger.error( - `Command handler which execution was triggered by Saga has thrown an unhandled exception.`, - error, - ); - }, - }); + .subscribe(); this.subscriptions.push(subscription); } @@ -207,15 +197,4 @@ export class EventBus exception, }; } - - private isUnhandledErrorInfo( - error: unknown, - ): error is UnhandledExceptionInfo { - return ( - typeof error === 'object' && - error !== null && - 'cause' in error && - 'exception' in error - ); - } }