Skip to content

Commit

Permalink
Merge pull request #1430 from thamcheongkit/master
Browse files Browse the repository at this point in the history
fix(event-bus): unstuck after unhandled exception
  • Loading branch information
kamilmysliwiec authored Jul 24, 2023
2 parents 174fc22 + 65e3822 commit 6f50f03
Showing 1 changed file with 21 additions and 42 deletions.
63 changes: 21 additions & 42 deletions src/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable, Logger, OnModuleDestroy, Type } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Observable, Subscription, defer, throwError } from 'rxjs';
import { Observable, Subscription, defer, of, throwError } from 'rxjs';
import { catchError, filter, mergeMap } from 'rxjs/operators';
import { CommandBus } from './command-bus';
import { EVENTS_HANDLER_METADATA, SAGA_METADATA } from './decorators/constants';
Expand Down Expand Up @@ -94,24 +94,19 @@ export class EventBus<EventBase extends IEvent = IEvent>
.pipe(
mergeMap((event) =>
defer(() => Promise.resolve(handler.handle(event))).pipe(
catchError((error) =>
throwError(() => this.mapToUnhandledErrorInfo(event, error)),
),
catchError((error) => {
const unhandledError = this.mapToUnhandledErrorInfo(event, error);
this.unhandledExceptionBus.publish(unhandledError);
this._logger.error(
`"${handler.constructor.name}" has thrown an unhandled exception.`,
error,
);
return of();
}),
),
),
)
.subscribe({
error: (error) => {
if (this.isUnhandledErrorInfo(error)) {
this.unhandledExceptionBus.publish(error);
error = error.exception;
}
this._logger.error(
`"${handler.constructor.name}" has thrown an unhandled exception.`,
error,
);
},
});
.subscribe();
this.subscriptions.push(subscription);
}

Expand Down Expand Up @@ -166,24 +161,19 @@ export class EventBus<EventBase extends IEvent = IEvent>
filter((e) => !!e),
mergeMap((command) =>
defer(() => this.commandBus.execute(command)).pipe(
catchError((error) =>
throwError(() => this.mapToUnhandledErrorInfo(command, error)),
),
catchError((error) => {
const unhandledError = this.mapToUnhandledErrorInfo(event, error);
this.unhandledExceptionBus.publish(unhandledError);
this._logger.error(
`Command handler which execution was triggered by Saga has thrown an unhandled exception.`,
error,
);
return of();
}),
),
),
)
.subscribe({
error: (error) => {
if (this.isUnhandledErrorInfo(error)) {
this.unhandledExceptionBus.publish(error);
error = error.exception;
}
this._logger.error(
`Command handler which execution was triggered by Saga has thrown an unhandled exception.`,
error,
);
},
});
.subscribe();

this.subscriptions.push(subscription);
}
Expand All @@ -207,15 +197,4 @@ export class EventBus<EventBase extends IEvent = IEvent>
exception,
};
}

private isUnhandledErrorInfo(
error: unknown,
): error is UnhandledExceptionInfo {
return (
typeof error === 'object' &&
error !== null &&
'cause' in error &&
'exception' in error
);
}
}

0 comments on commit 6f50f03

Please sign in to comment.