diff --git a/package.json b/package.json index b903f117..f7b398d3 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,7 @@ "pg-native": "^3.0.0", "sequelize": "^5.8.7", "swagger-ui-express": "^4.0.6", - "tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.3", + "tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.6", "traverse": "^0.6.6", "urlencode": "^1.1.0", "yamljs": "^0.3.0" diff --git a/src/events/kafkaHandlers.js b/src/events/kafkaHandlers.js index b6bf90cf..c1c76611 100644 --- a/src/events/kafkaHandlers.js +++ b/src/events/kafkaHandlers.js @@ -1,18 +1,22 @@ /** * BUS Event Handlers */ -import { CONNECT_NOTIFICATION_EVENT, BUS_API_EVENT, RESOURCES } from '../constants'; import { - projectCreatedKafkaHandler, - projectUpdatedKafkaHandler } from './projects'; -import { projectPhaseAddedKafkaHandler, projectPhaseRemovedKafkaHandler, - projectPhaseUpdatedKafkaHandler } from './projectPhases'; + CONNECT_NOTIFICATION_EVENT, + BUS_API_EVENT, + RESOURCES, +} from '../constants'; import { - timelineAdjustedKafkaHandler, -} from './timelines'; + projectCreatedKafkaHandler, + projectUpdatedKafkaHandler, +} from './projects'; import { - milestoneUpdatedKafkaHandler, -} from './milestones'; + projectPhaseAddedKafkaHandler, + projectPhaseRemovedKafkaHandler, + projectPhaseUpdatedKafkaHandler, +} from './projectPhases'; +import { timelineAdjustedKafkaHandler } from './timelines'; +import { milestoneUpdatedKafkaHandler } from './milestones'; const kafkaHandlers = { /** @@ -33,22 +37,64 @@ const kafkaHandlers = { // Events coming from timeline/milestones (considering it as a separate module/service in future) [CONNECT_NOTIFICATION_EVENT.MILESTONE_TRANSITION_COMPLETED]: milestoneUpdatedKafkaHandler, [CONNECT_NOTIFICATION_EVENT.TIMELINE_ADJUSTED]: timelineAdjustedKafkaHandler, +}; - /** - * New Unified Bus Events - */ - [BUS_API_EVENT.PROJECT_CREATED]: { - [RESOURCES.PROJECT]: projectCreatedKafkaHandler, - }, - [BUS_API_EVENT.PROJECT_PHASE_CREATED]: { - [RESOURCES.PHASE]: projectPhaseAddedKafkaHandler, - }, - [BUS_API_EVENT.PROJECT_PHASE_UPDATED]: { - [RESOURCES.PHASE]: projectPhaseUpdatedKafkaHandler, - }, - [BUS_API_EVENT.PROJECT_PHASE_DELETED]: { - [RESOURCES.PHASE]: projectPhaseRemovedKafkaHandler, - }, +/** + * Register New Unified Bus Event Handlers + * + * We need this special method so it would properly merge topics with the same names + * but different resources. + * + * @param {String} topic Kafka topic name + * @param {String} resource resource name + * @param {Function} handler handler method + * + * @returns {void} + */ +const registerKafkaHandler = (topic, resource, handler) => { + let topicConfig = kafkaHandlers[topic]; + + // if config for topic is not yet initialized, create it + if (!topicConfig) { + topicConfig = {}; + kafkaHandlers[topic] = topicConfig; + } + + if (typeof topicConfig !== 'object') { + throw new Error( + `Topic "${topic}" should be defined as object with resource names as keys.`, + ); + } + + if (topicConfig[resource]) { + throw new Error( + `Handler for topic "${topic}" with resource ${resource} has been already registered.`, + ); + } + + topicConfig[resource] = handler; }; +registerKafkaHandler( + BUS_API_EVENT.PROJECT_CREATED, + RESOURCES.PROJECT, + projectCreatedKafkaHandler, +); +registerKafkaHandler( + BUS_API_EVENT.PROJECT_PHASE_CREATED, + RESOURCES.PHASE, + projectPhaseAddedKafkaHandler, +); +registerKafkaHandler( + BUS_API_EVENT.PROJECT_PHASE_UPDATED, + RESOURCES.PHASE, + projectPhaseUpdatedKafkaHandler, +); +registerKafkaHandler( + BUS_API_EVENT.PROJECT_PHASE_DELETED, + RESOURCES.PHASE, + projectPhaseRemovedKafkaHandler, +); + + export default kafkaHandlers;