diff --git a/packages/features/bookings/lib/dto/types.d.ts b/packages/features/bookings/lib/dto/types.d.ts index 5df5b123955ae0..3c7d3df6c5f575 100644 --- a/packages/features/bookings/lib/dto/types.d.ts +++ b/packages/features/bookings/lib/dto/types.d.ts @@ -48,3 +48,8 @@ export type BookingHandlerInput = { export type RegularBookingCreateResult = Awaited>; export type InstantBookingCreateResult = Awaited>; + +// More properties to be added to this config in followup PRs +export type BookingFlowConfig = { + isDryRun: boolean; +}; diff --git a/packages/features/bookings/lib/messageBus/BookingMessageBus.ts b/packages/features/bookings/lib/messageBus/BookingMessageBus.ts new file mode 100644 index 00000000000000..7a78aff55df80a --- /dev/null +++ b/packages/features/bookings/lib/messageBus/BookingMessageBus.ts @@ -0,0 +1,23 @@ +import { BaseMessageBus } from "@calcom/lib/messageBus/MessageBus"; +import type { IMessageHandler, Message } from "@calcom/lib/messageBus/types"; + +import type { BookingMessage, BookingMessagePayloadMap } from "./types.d"; + +export type { BookingCreatedMessagePayload, BookingRescheduledMessagePayload } from "./types.d"; +export { BOOKING_MESSAGES } from "./types.d"; + +export type { BookingMessage }; + +// Booking-specific message handler interface +export interface IBookingMessageHandler + extends IMessageHandler { + subscribedMessage: T; + handle(message: Message): Promise; + isEnabled?(message: Message): boolean; +} + +export class BookingMessageBus extends BaseMessageBus { + constructor() { + super(["BookingMessageBus"]); + } +} diff --git a/packages/features/bookings/lib/messageBus/registry.ts b/packages/features/bookings/lib/messageBus/registry.ts new file mode 100644 index 00000000000000..a24230998c5927 --- /dev/null +++ b/packages/features/bookings/lib/messageBus/registry.ts @@ -0,0 +1,14 @@ +import { subscriptions as privateLinkSubscriptions } from "@calcom/features/privateLink/messages/bookingMessageBusRegistration"; +import { subscriptions as jobsSubscriptions } from "@calcom/jobs/messages/messageBusRegistration"; + +import type { BookingMessageBus } from "./BookingMessageBus"; + +export function registerBookingMessageHandlers(messageBus: BookingMessageBus): void { + privateLinkSubscriptions.forEach((subscription) => { + messageBus.subscribe(subscription); + }); + + jobsSubscriptions.forEach((subscription) => { + messageBus.subscribe(subscription); + }); +} diff --git a/packages/features/bookings/lib/messageBus/types.d.ts b/packages/features/bookings/lib/messageBus/types.d.ts new file mode 100644 index 00000000000000..77f9b01fe77dfa --- /dev/null +++ b/packages/features/bookings/lib/messageBus/types.d.ts @@ -0,0 +1,126 @@ +import type { Message } from "@calcom/lib/messageBus/MessageBus"; +import type { BookingStatus, SchedulingType } from "@calcom/prisma/enums"; + +import type { BookingFlowConfig } from "../dto/types"; + +type CreatedBooking = { + uid: string; + userId: number | null; + status: BookingStatus; + startTime: Date; + endTime: Date; + location: string | null; +}; + +type BookingEventType = { + id: number; + slug: string; + schedulingType: SchedulingType | null; + metadata: Record | null; + hashedLink?: string; +}; + +export interface BookingCreationPayload { + booking: CreatedBooking; + eventType: BookingEventType; + config: BookingFlowConfig; +} + +export type BookingCreatedMessagePayload = BookingCreationPayload; + +export interface BookingRescheduledMessagePayload extends BookingCreatedMessagePayload { + reschedule: { + originalBooking: { + uid: string; + }; + rescheduleReason: string | null; + rescheduledBy: string | null; + }; +} + +export type BookingCreatedMessage = Message; + +export type BookingRescheduledMessage = Message; + +// export interface BookingRequestedMessagePayload extends BookingCreationPayload { +// // Core booking information +// booking: { +// id: number; +// uid: string; +// userId: number; +// status: BookingStatus; +// startTime: Date; +// endTime: Date; +// location: string | null; +// metadata: Record | null; +// eventTypeId: number | null; +// }; + +// eventType: { +// id: number; +// slug: string; +// schedulingType: string | null; +// hosts: any[]; +// metadata: Record | null; +// isTeamEventType?: boolean; +// teamId?: number | null; +// hashedLink?: string; +// seatsPerTimeSlot?: number | null; +// }; + +// calendarEvent: CalendarEvent; + +// // Platform and execution context +// context: { +// platformClientId?: string; +// noEmail?: boolean; +// isDryRun: boolean; +// }; + +// // User and organization context +// organizer: { +// id: number; +// }; + +// // Integration and platform data +// integrations?: { +// credentials?: any[]; +// }; + +// // Form submission and user input data +// formData?: any; + +// // Calendar and conferencing app sync data from EventManager +// appSync?: { +// additionalInformation?: any; +// }; + +// // Routing and assignment specific data +// routing?: { +// contactOwnerEmail?: string | null; +// routingFormResponseId?: number; +// crmRecordId?: string; +// reroutingFormResponses?: any; +// assignmentReason?: { +// reasonEnum: AssignmentReasonEnum; +// reasonString: string; +// }; +// }; +// } + +export const BOOKING_MESSAGES = { + BOOKING_CREATED: "booking.created", + BOOKING_RESCHEDULED: "booking.rescheduled", + BOOKING_REQUESTED: "booking.requested", +} as const; + +export type BookingMessage = (typeof BOOKING_MESSAGES)[keyof typeof BOOKING_MESSAGES]; + +// Placeholder type for booking requested - to be implemented +export type BookingRequestedMessagePayload = BookingCreationPayload; + +export interface BookingMessagePayloadMap { + [BOOKING_MESSAGES.BOOKING_CREATED]: BookingCreatedMessagePayload; + [BOOKING_MESSAGES.BOOKING_RESCHEDULED]: BookingRescheduledMessagePayload; + [BOOKING_MESSAGES.BOOKING_REQUESTED]: BookingRequestedMessagePayload; +} diff --git a/packages/features/bookings/lib/service/RegularBookingService.ts b/packages/features/bookings/lib/service/RegularBookingService.ts index 47a08f37820334..39d7838275baf9 100644 --- a/packages/features/bookings/lib/service/RegularBookingService.ts +++ b/packages/features/bookings/lib/service/RegularBookingService.ts @@ -30,6 +30,8 @@ import type { CheckBookingAndDurationLimitsService } from "@calcom/features/book import { handlePayment } from "@calcom/features/bookings/lib/handlePayment"; import { handleWebhookTrigger } from "@calcom/features/bookings/lib/handleWebhookTrigger"; import { isEventTypeLoggingEnabled } from "@calcom/features/bookings/lib/isEventTypeLoggingEnabled"; +// TODO: Must be injected from DI +import { BOOKING_MESSAGES } from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; import type { CacheService } from "@calcom/features/calendar-cache/lib/getShouldServeCache"; import AssignmentReasonRecorder from "@calcom/features/ee/round-robin/assignmentReason/AssignmentReasonRecorder"; import { getUsernameList } from "@calcom/features/eventtypes/lib/defaultEvents"; @@ -67,13 +69,8 @@ import { getPiiFreeCalendarEvent, getPiiFreeEventType } from "@calcom/lib/piiFre import { safeStringify } from "@calcom/lib/safeStringify"; import type { LuckyUserService } from "@calcom/lib/server/getLuckyUser"; import { getTranslation } from "@calcom/lib/server/i18n"; -import type { PrismaAttributeRepository as AttributeRepository } from "@calcom/lib/server/repository/PrismaAttributeRepository"; import type { BookingRepository } from "@calcom/lib/server/repository/booking"; -import type { HostRepository } from "@calcom/lib/server/repository/host"; -import type { PrismaOOORepository as OooRepository } from "@calcom/lib/server/repository/ooo"; -import type { UserRepository } from "@calcom/lib/server/repository/user"; import { WorkflowRepository } from "@calcom/lib/server/repository/workflow"; -import { HashedLinkService } from "@calcom/lib/server/service/hashedLinkService"; import { WorkflowService } from "@calcom/lib/server/service/workflows"; import { getTimeFormatStringFromUserTimeFormat } from "@calcom/lib/timeFormat"; import type { PrismaClient } from "@calcom/prisma"; @@ -126,6 +123,7 @@ import { validateBookingTimeIsNotOutOfBounds } from "../handleNewBooking/validat import { validateEventLength } from "../handleNewBooking/validateEventLength"; import handleSeats from "../handleSeats/handleSeats"; import type { IBookingService } from "../interfaces/IBookingService"; +import { registerBookingMessageHandlers } from "../messageBus/registry"; const translator = short(); const log = logger.getSubLogger({ prefix: ["[api] book:user"] }); @@ -419,10 +417,7 @@ export interface IBookingServiceDependencies { featuresRepository: FeaturesRepository; checkBookingLimitsService: CheckBookingLimitsService; luckyUserService: LuckyUserService; - hostRepository: HostRepository; - oooRepository: OooRepository; - userRepository: UserRepository; - attributeRepository: AttributeRepository; + bookingMessageBus: BookingMessageBus; } async function handler( @@ -449,6 +444,7 @@ async function handler( cacheService, checkBookingAndDurationLimitsService, luckyUserService, + bookingMessageBus, } = deps; const isPlatformBooking = !!platformClientId; @@ -2045,6 +2041,36 @@ async function handler( } : undefined; + const bookingFlowConfig = { + isDryRun, + }; + + const bookingCreationPayload = { + booking, + eventType, + config: bookingFlowConfig, + }; + + if (originalRescheduledBooking) { + await bookingMessageBus.emit(BOOKING_MESSAGES.BOOKING_RESCHEDULED, { + ...bookingCreationPayload, + reschedule: { + originalBooking: { + uid: originalRescheduledBooking.uid, + }, + rescheduleReason, + rescheduledBy: reqBody.rescheduledBy, + }, + }); + } else { + console.log("RegularBookingService emit BOOKING_CREATED", { + bookingCreationPayload, + }); + await bookingMessageBus.emit(BOOKING_MESSAGES.BOOKING_CREATED, { + ...bookingCreationPayload, + }); + } + const webhookData: EventPayloadType = { ...evt, ...eventTypeInfo, @@ -2282,23 +2308,6 @@ async function handler( }); } - try { - const hashedLinkService = new HashedLinkService(); - if (hasHashedBookingLink && reqBody.hashedLink && !isDryRun) { - await hashedLinkService.validateAndIncrementUsage(reqBody.hashedLink as string); - } - } catch (error) { - loggerWithEventDetails.error("Error while updating hashed link", JSON.stringify({ error })); - - // Handle repository errors and convert to HttpErrors - if (error instanceof Error) { - throw new HttpError({ statusCode: 410, message: error.message }); - } - - // For unexpected errors, provide a generic message - throw new HttpError({ statusCode: 500, message: "Failed to process booking link" }); - } - if (!booking) throw new HttpError({ statusCode: 400, message: "Booking failed" }); try { @@ -2420,7 +2429,9 @@ async function handler( * We are open to renaming it to something more descriptive. */ export class RegularBookingService implements IBookingService { - constructor(private readonly deps: IBookingServiceDependencies) {} + constructor(private readonly deps: IBookingServiceDependencies) { + registerBookingMessageHandlers(this.deps.bookingMessageBus); + } async createBooking(input: { bookingData: CreateRegularBookingData; bookingMeta?: CreateBookingMeta }) { return handler({ bookingData: input.bookingData, ...input.bookingMeta }, this.deps); diff --git a/packages/features/privateLink/lib/updateHashedLinkUsage.ts b/packages/features/privateLink/lib/updateHashedLinkUsage.ts new file mode 100644 index 00000000000000..418f17c82d8d66 --- /dev/null +++ b/packages/features/privateLink/lib/updateHashedLinkUsage.ts @@ -0,0 +1,43 @@ +import type { Logger } from "tslog"; + +import { safeStringify } from "@calcom/lib/safeStringify"; +import { HashedLinkService } from "@calcom/lib/server/service/hashedLinkService"; + +// I am passing deps as an argument instead of having this module as a class and passing constructor dependencies because this is a very simple fn and having a class would be overkill. +/** + * Updates the usage of a hashed link for a booking + * @param hashedLink - The hashed link to update the usage of + * @param bookingUid - The UID of the booking to update the usage of + * @param deps - The dependencies to use + * @returns void + */ +export const updateHashedLinkUsage = async ( + { + hashedLink, + bookingUid, + }: { + hashedLink: string; + bookingUid: string; + }, + deps: { + log: Logger; + } +) => { + const { log } = deps; + try { + const hashedLinkService = new HashedLinkService(); + await hashedLinkService.validateAndIncrementUsage(hashedLink); + + log.debug(`Successfully updated hashed link usage for booking ${bookingUid}`); + } catch (error) { + log.error( + "Error while updating hashed link", + safeStringify(error), + safeStringify({ + error, + bookingUid, + hashedLink, + }) + ); + } +}; diff --git a/packages/features/privateLink/messages/booking.created/MessageBookingCreatedPrivateLinkHandler.ts b/packages/features/privateLink/messages/booking.created/MessageBookingCreatedPrivateLinkHandler.ts new file mode 100644 index 00000000000000..1905b437e6b30e --- /dev/null +++ b/packages/features/privateLink/messages/booking.created/MessageBookingCreatedPrivateLinkHandler.ts @@ -0,0 +1,42 @@ +import { BOOKING_MESSAGES } from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; +import type { + IBookingMessageHandler, + BookingCreatedMessagePayload, +} from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; +import logger from "@calcom/lib/logger"; +import type { Message } from "@calcom/lib/messageBus/types"; + +import { updateHashedLinkUsage } from "../../lib/updateHashedLinkUsage"; + +const log = logger.getSubLogger({ prefix: ["HashedLinkHandler"] }); +export class MessageBookingCreatedPrivateLinkHandler + implements IBookingMessageHandler +{ + subscribedMessage = BOOKING_MESSAGES.BOOKING_CREATED; + + isEnabled(message: Message): boolean { + console.log("MessageBookingCreatedPrivateLinkHandler isEnabled - Checking", { + config: message.payload.config, + }); + return !message.payload.config.isDryRun; + } + + async handle(message: Message): Promise { + console.log("MessageBookingCreatedPrivateLinkHandler handle - Handling", { + payload: message.payload, + }); + const { payload } = message; + if (!payload.eventType.hashedLink) { + return; + } + await updateHashedLinkUsage( + { + hashedLink: payload.eventType.hashedLink, + bookingUid: payload.booking.uid, + }, + { + log, + } + ); + } +} diff --git a/packages/features/privateLink/messages/booking.rescheduled/MessageBookingRescheduledPrivateLinkHandler.ts b/packages/features/privateLink/messages/booking.rescheduled/MessageBookingRescheduledPrivateLinkHandler.ts new file mode 100644 index 00000000000000..68f96c5b5388fb --- /dev/null +++ b/packages/features/privateLink/messages/booking.rescheduled/MessageBookingRescheduledPrivateLinkHandler.ts @@ -0,0 +1,37 @@ +import { BOOKING_MESSAGES } from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; +import type { + IBookingMessageHandler, + BookingRescheduledMessagePayload, +} from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; +import logger from "@calcom/lib/logger"; +import type { Message } from "@calcom/lib/messageBus/MessageBus"; + +import { updateHashedLinkUsage } from "../../lib/updateHashedLinkUsage"; + +const log = logger.getSubLogger({ prefix: ["HashedLinkHandler"] }); + +export class MessageBookingRescheduledPrivateLinkHandler + implements IBookingMessageHandler +{ + subscribedMessage = BOOKING_MESSAGES.BOOKING_RESCHEDULED; + + isEnabled(message: Message): boolean { + return !message.payload.config.isDryRun; + } + + async handle(message: Message): Promise { + const { payload } = message; + if (!payload.eventType.hashedLink) { + return; + } + await updateHashedLinkUsage( + { + hashedLink: payload.eventType.hashedLink, + bookingUid: payload.booking.uid, + }, + { + log, + } + ); + } +} diff --git a/packages/features/privateLink/messages/bookingMessageBusRegistration.ts b/packages/features/privateLink/messages/bookingMessageBusRegistration.ts new file mode 100644 index 00000000000000..770d288ebac402 --- /dev/null +++ b/packages/features/privateLink/messages/bookingMessageBusRegistration.ts @@ -0,0 +1,7 @@ +import { MessageBookingCreatedPrivateLinkHandler } from "./booking.created/MessageBookingCreatedPrivateLinkHandler"; +import { MessageBookingRescheduledPrivateLinkHandler } from "./booking.rescheduled/MessageBookingRescheduledPrivateLinkHandler"; + +export const subscriptions = [ + new MessageBookingCreatedPrivateLinkHandler(), + new MessageBookingRescheduledPrivateLinkHandler(), +]; diff --git a/packages/jobs/.gitignore b/packages/jobs/.gitignore new file mode 100644 index 00000000000000..6524f048dcbcb8 --- /dev/null +++ b/packages/jobs/.gitignore @@ -0,0 +1 @@ +.trigger \ No newline at end of file diff --git a/packages/jobs/features/bookings/sendBookingCreationEmails.ts b/packages/jobs/features/bookings/sendBookingCreationEmails.ts new file mode 100644 index 00000000000000..3d8cd7ef22d465 --- /dev/null +++ b/packages/jobs/features/bookings/sendBookingCreationEmails.ts @@ -0,0 +1,16 @@ +import { logger, task, wait } from "@trigger.dev/sdk/v3"; + +export const helloWorldTask = task({ + id: "send-booking-creation-emails", + // Set an optional maxDuration to prevent tasks from running indefinitely + maxDuration: 300, // Stop executing after 300 secs (5 mins) of compute + run: async (payload: any, { ctx }) => { + logger.log("Hello, world!", { payload, ctx }); + + await wait.for({ seconds: 5 }); + + return { + message: "Booking creation emails sent", + }; + }, +}); diff --git a/packages/jobs/messages/anyMessageHandler.ts b/packages/jobs/messages/anyMessageHandler.ts new file mode 100644 index 00000000000000..a1ce62068a67dd --- /dev/null +++ b/packages/jobs/messages/anyMessageHandler.ts @@ -0,0 +1,47 @@ +import { tasks as triggerTasks } from "@trigger.dev/sdk"; + +import type { IWildcardMessageHandler, Message } from "@calcom/lib/messageBus/types"; + +const bookingTasks = { + "booking.created": [ + { + task: "send-webhook", + }, + { + task: "send-booking-creation-emails", + }, + { + task: "create-calendar-event", + }, + ], + "booking.rescheduled": [ + { + task: "send-webhook", + }, + { + task: "send-booking-confirmation", // Updated confirmation for reschedule + }, + { + task: "create-calendar-event", // Update calendar event + }, + ], +}; + +export class AnyMessageHandler implements IWildcardMessageHandler { + readonly subscribedMessage = "*" as const; + readonly forPersistentQueue = true; + + async handle(message: Message): Promise { + console.log("AnyMessageHandler called", { message }); + const tasks = bookingTasks[message.type as keyof typeof bookingTasks]; + if (tasks) { + await Promise.all( + tasks.map((task) => + triggerTasks.trigger(task.task, { + messagePayload: message.payload, + }) + ) + ); + } + } +} diff --git a/packages/jobs/messages/messageBusRegistration.ts b/packages/jobs/messages/messageBusRegistration.ts new file mode 100644 index 00000000000000..dc31f4b9224f0e --- /dev/null +++ b/packages/jobs/messages/messageBusRegistration.ts @@ -0,0 +1,3 @@ +import { AnyMessageHandler } from "./anyMessageHandler"; + +export const subscriptions = [new AnyMessageHandler()]; diff --git a/packages/jobs/trigger.config.ts b/packages/jobs/trigger.config.ts new file mode 100644 index 00000000000000..24014b1e4a2312 --- /dev/null +++ b/packages/jobs/trigger.config.ts @@ -0,0 +1,22 @@ +import { defineConfig } from "@trigger.dev/sdk/v3"; + +export default defineConfig({ + project: "proj_hekqsvbeizgsnjoflkqb", + runtime: "node", + logLevel: "log", + // The max compute seconds a task is allowed to run. If the task run exceeds this duration, it will be stopped. + // You can override this on an individual task. + // See https://trigger.dev/docs/runs/max-duration + maxDuration: 3600, + retries: { + enabledInDev: true, + default: { + maxAttempts: 3, + minTimeoutInMs: 1000, + maxTimeoutInMs: 10000, + factor: 2, + randomize: true, + }, + }, + dirs: ["features/**/*"], +}); diff --git a/packages/jobs/tsconfig.json b/packages/jobs/tsconfig.json new file mode 100644 index 00000000000000..23b52d32eee37a --- /dev/null +++ b/packages/jobs/tsconfig.json @@ -0,0 +1,17 @@ +{ + "extends": "@calcom/tsconfig/base.json", + "include": ["features/**/*", "shared/**/*"], + "exclude": ["node_modules"], + "compilerOptions": { + "outDir": "./dist", + "module": "CommonJS", + "target": "ES2020", + "lib": ["ES2020"], + "moduleResolution": "node", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "strict": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true + } +} diff --git a/packages/lib/di/bookings/modules/BookingMessageBus.ts b/packages/lib/di/bookings/modules/BookingMessageBus.ts new file mode 100644 index 00000000000000..1dd99663a4eaa0 --- /dev/null +++ b/packages/lib/di/bookings/modules/BookingMessageBus.ts @@ -0,0 +1,24 @@ +// eslint-disable-next-line no-restricted-imports +import { BookingMessageBus } from "@calcom/features/bookings/lib/messageBus/BookingMessageBus"; + +import { createModule, type Container, bindModuleToClassOnToken } from "../../di"; +import { DI_TOKENS } from "../../tokens"; + +export const bookingMessageBusModule = createModule(); +const token = DI_TOKENS.BOOKING_MESSAGE_BUS; +const moduleToken = DI_TOKENS.BOOKING_MESSAGE_BUS_MODULE; + +bindModuleToClassOnToken({ + module: bookingMessageBusModule, + token, + classs: BookingMessageBus, + depsMap: {}, + moduleToken, +}); + +export const moduleLoader = { + token, + loadModule: (container: Container) => { + container.load(moduleToken, bookingMessageBusModule); + }, +}; diff --git a/packages/lib/di/bookings/modules/RegularBookingService.module.ts b/packages/lib/di/bookings/modules/RegularBookingService.module.ts index 938e893b83ad6d..d06ee11d29a81e 100644 --- a/packages/lib/di/bookings/modules/RegularBookingService.module.ts +++ b/packages/lib/di/bookings/modules/RegularBookingService.module.ts @@ -15,6 +15,7 @@ import { DI_TOKENS } from "@calcom/lib/di/tokens"; import { moduleLoader as prismaModuleLoader } from "@calcom/prisma/prisma.module"; import { bindModuleToClassOnToken, createModule } from "../../di"; +import { moduleLoader as bookingMessageBusModuleLoader } from "./BookingMessageBus"; const thisModule = createModule(); const token = DI_TOKENS.REGULAR_BOOKING_SERVICE; @@ -36,6 +37,7 @@ const loadModule = bindModuleToClassOnToken({ oooRepository: oooRepositoryModuleLoader, userRepository: userRepositoryModuleLoader, attributeRepository: attributeRepositoryModuleLoader, + bookingMessageBus: bookingMessageBusModuleLoader, }, }); diff --git a/packages/lib/di/bookings/tokens.ts b/packages/lib/di/bookings/tokens.ts index 84f8827cc393c6..4ba9ef8430c326 100644 --- a/packages/lib/di/bookings/tokens.ts +++ b/packages/lib/di/bookings/tokens.ts @@ -5,4 +5,6 @@ export const BOOKING_DI_TOKENS = { RECURRING_BOOKING_SERVICE_MODULE: Symbol("RecurringBookingServiceModule"), INSTANT_BOOKING_CREATE_SERVICE: Symbol("InstantBookingCreateService"), INSTANT_BOOKING_CREATE_SERVICE_MODULE: Symbol("InstantBookingCreateServiceModule"), + BOOKING_MESSAGE_BUS: Symbol("BookingMessageBus"), + BOOKING_MESSAGE_BUS_MODULE: Symbol("BookingMessageBusModule"), }; diff --git a/packages/lib/messageBus/MessageBus.ts b/packages/lib/messageBus/MessageBus.ts new file mode 100644 index 00000000000000..78b5b0da8ca45e --- /dev/null +++ b/packages/lib/messageBus/MessageBus.ts @@ -0,0 +1,103 @@ +import logger from "@calcom/lib/logger"; + +import type { IMessageHandler, EmitOptions, IWildcardMessageHandler } from "./types"; + +// Re-export types for convenience +export type { IMessageHandler, Message, EmitOptions, IWildcardMessageHandler } from "./types"; + +// Union type for all handler types +type AnyHandler> = + | IMessageHandler + | IWildcardMessageHandler; + +export class BaseMessageBus> { + static Messages: unknown; + private handlers: Map>> = new Map(); + private log: ReturnType; + + constructor(loggerPrefix: string[] = ["MessageBus"]) { + this.log = logger.getSubLogger({ prefix: loggerPrefix }); + } + + subscribe(handler: AnyHandler): void { + const messageKey = handler.subscribedMessage as TMessage | "*"; + if (!this.handlers.has(messageKey)) { + this.handlers.set(messageKey, new Set()); + } + const eventHandlers = this.handlers.get(messageKey); + if (eventHandlers) { + eventHandlers.add(handler); + } + + const handlerType = handler.forPersistentQueue ? "persistent queue" : "regular"; + this.log.debug(`Subscribed ${handlerType} handler for messages: ${handler.subscribedMessage}`); + } + + unsubscribe(handler: AnyHandler): void { + const messageKey = handler.subscribedMessage as TMessage | "*"; + const messageHandlers = this.handlers.get(messageKey); + if (messageHandlers) { + messageHandlers.delete(handler); + if (messageHandlers.size === 0) { + this.handlers.delete(messageKey); + } + } + + const handlerType = handler.forPersistentQueue ? "persistent queue" : "regular"; + this.log.debug(`Unsubscribed ${handlerType} handler for messages: ${handler.subscribedMessage}`); + } + + async emit( + messageType: T, + payload: TPayloadMap[T], + options: EmitOptions = {} + ): Promise { + // Get all handlers for specific message type and wildcard handlers + const specificHandlers = this.handlers.get(messageType) || new Set(); + const wildcardHandlers = this.handlers.get("*") || new Set(); + + const allHandlers = [...Array.from(specificHandlers), ...Array.from(wildcardHandlers)]; + + if (allHandlers.length === 0) { + this.log.debug(`No handlers registered for message: ${messageType}`); + return; + } + + // Filter handlers based on options and enabled state + const enabledHandlers = allHandlers.filter((handler) => { + // Skip persistent queue handlers if requested + if (options.skipPersistentQueue && handler.forPersistentQueue) { + this.log.debug(`Skipping persistent queue handler ${handler.constructor.name} for ${messageType}`); + return false; + } + + if (handler.isEnabled && !handler.isEnabled({ type: messageType, payload })) { + const handlerType = handler.forPersistentQueue ? "persistent queue" : "regular"; + this.log.debug( + `${handlerType} handler ${handler.constructor.name} for ${messageType} is disabled, skipping` + ); + return false; + } + + return true; + }); + + this.log.debug(`Emitting message: ${messageType} with ${enabledHandlers.length} handlers`); + + // Execute all enabled handlers + const handlerPromises = enabledHandlers.map(async (handler) => { + return handler.handle({ + type: messageType, + payload, + }); + }); + + const results = await Promise.allSettled(handlerPromises); + + results.forEach((result) => { + if (result.status === "rejected") { + this.log.error(`Handler for ${messageType} failed:`, result.reason); + } + }); + } +} diff --git a/packages/lib/messageBus/types.d.ts b/packages/lib/messageBus/types.d.ts new file mode 100644 index 00000000000000..68af3301e173e8 --- /dev/null +++ b/packages/lib/messageBus/types.d.ts @@ -0,0 +1,23 @@ +export type Message = { + type: string; + payload: TPayload; +}; + +export type EmitOptions = { + skipPersistentQueue?: boolean; +}; + +export interface IMessageHandler> { + subscribedMessage: TMessage; + handle(message: Message): Promise; + isEnabled?(message: Message): boolean; + forPersistentQueue?: boolean; +} + +// Wildcard handler interface - allows handling any message type +export interface IWildcardMessageHandler> { + subscribedMessage: "*"; + handle(message: Message): Promise; + isEnabled?(message: Message): boolean; + forPersistentQueue?: boolean; +} diff --git a/packages/lib/persistentQueue/messages/anyMessageHandler.ts b/packages/lib/persistentQueue/messages/anyMessageHandler.ts new file mode 100644 index 00000000000000..963fccdb1f97b1 --- /dev/null +++ b/packages/lib/persistentQueue/messages/anyMessageHandler.ts @@ -0,0 +1,44 @@ +import { tasks as triggerTasks } from "@trigger.dev/sdk"; + +import type { IMessageHandler, Message } from "../../messageBus/types"; + +const bookingTasks = { + "booking.created": [ + { + task: "send-webhook", + }, + { + task: "send-booking-confirmation", + }, + { + task: "create-calendar-event", + }, + ], + "booking.rescheduled": [ + { + task: "send-webhook", + }, + { + task: "send-booking-confirmation", // Updated confirmation for reschedule + }, + { + task: "create-calendar-event", // Update calendar event + }, + ], +}; + +export class AnyMessageHandler implements IMessageHandler<"*", Record> { + readonly subscribedMessage = "*" as const; + readonly forPersistentQueue = true; + + async handle(message: Message>): Promise { + const tasks = bookingTasks[message.type as keyof typeof bookingTasks]; + await Promise.all( + tasks.map((task) => + triggerTasks.trigger(task.task, { + messagePayload: message.payload, + }) + ) + ); + } +}