Skip to content

Commit

Permalink
chore(event-bus): event bus error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
carlos-r-l-rodrigues committed Nov 13, 2024
1 parent 0ea5765 commit d498490
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 55 deletions.
8 changes: 8 additions & 0 deletions .changeset/orange-donkeys-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/event-bus-redis": patch
"@medusajs/orchestration": patch
"@medusajs/utils": patch
---

Improve event bus error handling
27 changes: 0 additions & 27 deletions packages/core/orchestration/src/transaction/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,3 @@ export class TransactionTimeoutError extends Error {
this.name = "TransactionTimeoutError"
}
}

export function serializeError(error) {
const serialized = {
message: error.message,
name: error.name,
stack: error.stack,
}

Object.getOwnPropertyNames(error).forEach((key) => {
// eslint-disable-next-line no-prototype-builtins
if (!serialized.hasOwnProperty(key)) {
serialized[key] = error[key]
}
})

return serialized
}

export function isErrorLike(value) {
return (
!!value &&
typeof value === "object" &&
"name" in value &&
"message" in value &&
"stack" in value
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import {
TransactionStepStatus,
} from "./types"

import { MedusaError, promiseAll, TransactionStepState } from "@medusajs/utils"
import { EventEmitter } from "events"
import {
isErrorLike,
PermanentStepFailureError,
MedusaError,
promiseAll,
serializeError,
TransactionStepState,
} from "@medusajs/utils"
import { EventEmitter } from "events"
import {
PermanentStepFailureError,
SkipStepResponse,
TransactionStepTimeoutError,
TransactionTimeoutError,
Expand Down
4 changes: 3 additions & 1 deletion packages/core/utils/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export * from "./dynamic-import"
export * from "./env-editor"
export * from "./errors"
export * from "./file-system"
export * from "./filter-operator-map"
export * from "./generate-entity-id"
export * from "./get-caller-file-path"
export * from "./get-config-file"
Expand All @@ -34,6 +35,7 @@ export * from "./is-boolean"
export * from "./is-date"
export * from "./is-defined"
export * from "./is-email"
export * from "./is-error-like"
export * from "./is-object"
export * from "./is-present"
export * from "./is-string"
Expand Down Expand Up @@ -62,6 +64,7 @@ export * from "./remove-undefined-properties"
export * from "./resolve-exports"
export * from "./rules"
export * from "./selector-constraints-to-string"
export * from "./serialize-error"
export * from "./set-metadata"
export * from "./simple-hash"
export * from "./string-to-select-relation-object"
Expand All @@ -74,4 +77,3 @@ export * from "./trim-zeros"
export * from "./upper-case-first"
export * from "./validate-handle"
export * from "./wrap-handler"
export * from "./filter-operator-map"
9 changes: 9 additions & 0 deletions packages/core/utils/src/common/is-error-like.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export function isErrorLike(value) {
return (
!!value &&
typeof value === "object" &&
"name" in value &&
"message" in value &&
"stack" in value
)
}
16 changes: 16 additions & 0 deletions packages/core/utils/src/common/serialize-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export function serializeError(error) {
const serialized = {
message: error.message,
name: error.name,
stack: error.stack,
}

Object.getOwnPropertyNames(error).forEach((key) => {
// eslint-disable-next-line no-prototype-builtins
if (!serialized.hasOwnProperty(key)) {
serialized[key] = error[key]
}
})

return serialized
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
this.eventEmitter_.on(event, async (data: Event) => {
try {
await subscriber(data)
} catch (e) {
} catch (err) {
this.logger_?.error(
`An error occurred while processing ${event.toString()}: ${e}`
`An error occurred while processing ${event.toString()}:`
)
this.logger_?.error(err)
}
})
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jest.mock("bullmq")
jest.mock("ioredis")

const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
info: jest.fn().mockImplementation(console.log),
warn: jest.fn().mockImplementation(console.warn),
error: jest.fn().mockImplementation(console.error),
} as unknown as Logger

const redisMock = {
Expand Down Expand Up @@ -376,7 +376,7 @@ describe("RedisEventBusService", () => {
})
eventBus.subscribe("eventName", () => {
test.push("fail1")
return Promise.reject("fail1")
throw new Error("fail1")
})
eventBus.subscribe("eventName", () => {
test.push("hi2")
Expand All @@ -399,15 +399,21 @@ describe("RedisEventBusService", () => {
"Processing eventName which has 4 subscribers"
)

expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
expect(loggerMock.warn).toHaveBeenCalledTimes(5)
expect(loggerMock.warn).toHaveBeenNthCalledWith(
1,
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail2"
expect(loggerMock.warn).toHaveBeenNthCalledWith(2, new Error("fail1"))

expect(loggerMock.warn).toHaveBeenNthCalledWith(
3,
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenNthCalledWith(4, "fail2")

expect(loggerMock.warn).toHaveBeenCalledWith(
expect(loggerMock.warn).toHaveBeenNthCalledWith(
5,
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
)

Expand Down Expand Up @@ -439,10 +445,11 @@ describe("RedisEventBusService", () => {
} as any)
.catch((error) => void 0)

expect(loggerMock.warn).toHaveBeenCalledTimes(1)
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")

expect(loggerMock.info).toHaveBeenCalledTimes(2)
expect(loggerMock.info).toHaveBeenCalledWith(
Expand Down Expand Up @@ -478,10 +485,12 @@ describe("RedisEventBusService", () => {
} as any)
.catch((err) => void 0)

expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")

expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying..."
)
Expand Down
16 changes: 8 additions & 8 deletions packages/modules/event-bus-redis/src/services/event-bus-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
metadata: data.metadata,
}

return await subscriber(event)
.then(async (data) => {
try {
return await subscriber(event).then((data) => {
// For every subscriber that completes successfully, add their id to the list of completed subscribers
completedSubscribersInCurrentAttempt.push(id)
return data
})
.catch((err) => {
this.logger_.warn(
`An error occurred while processing ${name}: ${err}`
)
return err
})
} catch (err) {
this.logger_?.warn(`An error occurred while processing ${name}:`)
this.logger_?.warn(err)

return err
}
})
)

Expand Down

0 comments on commit d498490

Please sign in to comment.