From 85c8f7eeb3141ba4ab2e98b8421af558f2eb0091 Mon Sep 17 00:00:00 2001 From: ankitshahu Date: Tue, 14 Jun 2022 12:25:48 +0530 Subject: [PATCH 01/13] code fix done for users --- src/app.js | 17 ++- src/controllers/v1/account.js | 1 + src/db/forms/queries.js | 61 +++++------ src/db/notification-template/query.js | 91 ++++++++-------- src/db/systemUsers/queries.js | 32 +++--- src/db/userentities/query.js | 78 ++++++-------- src/db/users/queries.js | 144 ++++++++++++-------------- src/health-checks/kafka.js | 20 ++-- src/health-checks/mongodb.js | 16 ++- src/middlewares/validator.js | 2 - src/routes/index.js | 2 + src/services/helper/account.js | 80 +++++++------- src/services/helper/mentors.js | 1 - src/services/helper/userentity.js | 4 +- src/test/accountController.spec.js | 1 - 15 files changed, 252 insertions(+), 298 deletions(-) diff --git a/src/app.js b/src/app.js index ecab379fb..2615a421e 100644 --- a/src/app.js +++ b/src/app.js @@ -64,14 +64,13 @@ app.listen(process.env.APPLICATION_PORT, (res, err) => { // Handles specific listen errors with friendly messages function onError(error) { - switch (error.code) { - case 'EACCES': - console.log(process.env.APPLICATION_PORT + ' requires elevated privileges') - process.exit(1) - case 'EADDRINUSE': - console.log(process.env.APPLICATION_PORT + ' is already in use') - process.exit(1) - default: - throw error + if (error.code === 'EACCES') { + console.log(process.env.APPLICATION_PORT + ' requires elevated privileges') + process.exit(1) + } else if (error.code === 'EADDRINUSE') { + console.log(process.env.APPLICATION_PORT + ' is already in use') + process.exit(1) + } else { + throw error } } diff --git a/src/controllers/v1/account.js b/src/controllers/v1/account.js index 5b20f1d7d..c10a1020a 100644 --- a/src/controllers/v1/account.js +++ b/src/controllers/v1/account.js @@ -209,6 +209,7 @@ module.exports = class Account { */ async acceptTermsAndCondition(req) { try { + console.log(req.decodedToken._id, '------------------------') const result = await accountHelper.acceptTermsAndCondition(req.decodedToken._id) return result } catch (error) { diff --git a/src/db/forms/queries.js b/src/db/forms/queries.js index 6db4225fe..83bd0333c 100644 --- a/src/db/forms/queries.js +++ b/src/db/forms/queries.js @@ -9,45 +9,36 @@ const Forms = require('./model') module.exports = class FormsData { - static createForm(data) { - return new Promise(async (resolve, reject) => { - try { - await new Forms(data).save() - resolve(true) - } catch (error) { - reject(error) - } - }) + static async createForm(data) { + try { + await new Forms(data).save() + return true + } catch (error) { + return error + } } - static findOneForm(filter, projection = {}) { - return new Promise(async (resolve, reject) => { - try { - const formData = await Forms.findOne(filter, projection) - resolve(formData) - } catch (error) { - reject(error) - } - }) + static async findOneForm(filter, projection = {}) { + try { + const formData = await Forms.findOne(filter, projection) + return formData + } catch (error) { + return error + } } - static updateOneForm(filter, update, options = {}) { - return new Promise(async (resolve, reject) => { - try { - const res = await Forms.updateOne(filter, update, options) - if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { - resolve('FORM_UPDATED') - } else if ( - (res.n === 1 && res.nModified === 0) || - (res.matchedCount === 1 && res.modifiedCount === 0) - ) { - resolve('FORM_ALREADY_EXISTS') - } else { - resolve('FORM_NOT_FOUND') - } - } catch (error) { - reject(error) + static async updateOneForm(filter, update, options = {}) { + try { + const res = await Forms.updateOne(filter, update, options) + if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { + return 'FORM_UPDATED' + } else if ((res.n === 1 && res.nModified === 0) || (res.matchedCount === 1 && res.modifiedCount === 0)) { + return 'FORM_ALREADY_EXISTS' + } else { + return 'FORM_NOT_FOUND' } - }) + } catch (error) { + return error + } } } diff --git a/src/db/notification-template/query.js b/src/db/notification-template/query.js index c55692f10..7f0f2e0b1 100644 --- a/src/db/notification-template/query.js +++ b/src/db/notification-template/query.js @@ -9,67 +9,62 @@ const NotificationTemplate = require('./model') module.exports = class NotificationTemplateData { - static findOneEmailTemplate(code) { + static async findOneEmailTemplate(code) { const filter = { code, type: 'email', deleted: false, status: 'active', } - return new Promise(async (resolve, reject) => { - try { - const templateData = await NotificationTemplate.findOne(filter).lean() - if (templateData && templateData.emailHeader) { - const header = await this.getEmailHeader(templateData.emailHeader) - if (header && header.body) { - templateData['body'] = header.body + templateData['body'] - } - } - if (templateData && templateData.emailFooter) { - const footer = await this.getEmailFooter(templateData.emailFooter) - if (footer && footer.body) { - templateData['body'] = templateData['body'] + footer.body - } + try { + const templateData = await NotificationTemplate.findOne(filter).lean() + if (templateData && templateData.emailHeader) { + const header = await this.getEmailHeader(templateData.emailHeader) + if (header && header.body) { + templateData['body'] = header.body + templateData['body'] } - resolve(templateData) - } catch (error) { - reject(error) } - }) - } - static getEmailHeader(header) { - return new Promise(async (resolve, reject) => { - try { - const filterEmailHeader = { - code: header, - type: 'emailHeader', - deleted: false, - status: 'active', - } - const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean() - resolve(headerData) - } catch (error) { - reject(error) + if (templateData && templateData.emailFooter) { + const footer = await this.getEmailFooter(templateData.emailFooter) + if (footer && footer.body) { + templateData['body'] = templateData['body'] + footer.body + } } - }) + return templateData + } catch (error) { + return error + } } - static getEmailFooter(footer) { - return new Promise(async (resolve, reject) => { - try { - const filterEmailFooter = { - code: footer, - type: 'emailFooter', - deleted: false, - status: 'active', - } - const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean() + static async getEmailHeader(header) { + try { + const filterEmailHeader = { + code: header, + type: 'emailHeader', + deleted: false, + status: 'active', + } + const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean() - resolve(footerData) - } catch (error) { - reject(error) + return headerData + } catch (error) { + return error + } + } + static async getEmailFooter(footer) { + try { + const filterEmailFooter = { + code: footer, + type: 'emailFooter', + deleted: false, + status: 'active', } - }) + const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean() + + return footerData + } catch (error) { + return error + } } } diff --git a/src/db/systemUsers/queries.js b/src/db/systemUsers/queries.js index 38f1aa673..9c5c20d31 100644 --- a/src/db/systemUsers/queries.js +++ b/src/db/systemUsers/queries.js @@ -8,25 +8,21 @@ const SystemUsers = require('./model') module.exports = class SystemUsersData { - static findUsersByEmail(email) { - return new Promise(async (resolve, reject) => { - try { - const userData = await SystemUsers.findOne({ 'email.address': email }).lean() - resolve(userData) - } catch (error) { - reject(error) - } - }) + static async findUsersByEmail(email) { + try { + const userData = await SystemUsers.findOne({ 'email.address': email }).lean() + return userData + } catch (error) { + return error + } } - static create(data) { - return new Promise(async (resolve, reject) => { - try { - await new SystemUsers(data).save() - resolve(true) - } catch (error) { - reject(error) - } - }) + static async create(data) { + try { + await new SystemUsers(data).save() + return true + } catch (error) { + return error + } } } diff --git a/src/db/userentities/query.js b/src/db/userentities/query.js index 8424dc74a..d9705f28d 100644 --- a/src/db/userentities/query.js +++ b/src/db/userentities/query.js @@ -9,57 +9,45 @@ const UserEntities = require('./model') module.exports = class UserEntityData { - static createEntity(data) { - return new Promise(async (resolve, reject) => { - try { - await new UserEntities(data).save() - resolve(true) - } catch (error) { - reject(error) - } - }) + static async createEntity(data) { + try { + await new UserEntities(data).save() + return true + } catch (error) { + return error + } } - static findOneEntity(filter, projection = {}) { - return new Promise(async (resolve, reject) => { - try { - const userEntityData = await UserEntities.findOne(filter, projection) - resolve(userEntityData) - } catch (error) { - reject(error) - } - }) + static async findOneEntity(filter, projection = {}) { + try { + const userEntityData = await UserEntities.findOne(filter, projection) + return userEntityData + } catch (error) { + return error + } } - static findAllEntities(filter, projection = {}) { - return new Promise(async (resolve, reject) => { - try { - const userEntitiesData = await UserEntities.find(filter, projection) - resolve(userEntitiesData) - } catch (error) { - reject(error) - } - }) + static async findAllEntities(filter, projection = {}) { + try { + const userEntitiesData = await UserEntities.find(filter, projection) + return userEntitiesData + } catch (error) { + return error + } } - static updateOneEntity(filter, update, options = {}) { - return new Promise(async (resolve, reject) => { - try { - const res = await UserEntities.updateOne(filter, update, options) - if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { - resolve('ENTITY_UPDATED') - } else if ( - (res.n === 1 && res.nModified === 0) || - (res.matchedCount === 1 && res.modifiedCount === 0) - ) { - resolve('ENTITY_ALREADY_EXISTS') - } else { - x - resolve('ENTITY_NOT_FOUND') - } - } catch (error) { - reject(error) + static async updateOneEntity(filter, update, options = {}) { + try { + const res = await UserEntities.updateOne(filter, update, options) + if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { + resolve('ENTITY_UPDATED') + } else if ((res.n === 1 && res.nModified === 0) || (res.matchedCount === 1 && res.modifiedCount === 0)) { + return 'ENTITY_ALREADY_EXISTS' + } else { + return 'ENTITY_NOT_FOUND' } - }) + } catch (error) { + return error + } } } diff --git a/src/db/users/queries.js b/src/db/users/queries.js index 825496721..2ea08f510 100644 --- a/src/db/users/queries.js +++ b/src/db/users/queries.js @@ -10,99 +10,89 @@ const ObjectId = require('mongoose').Types.ObjectId const Users = require('./model') module.exports = class UsersData { - static findOne(filter, projection = {}) { - return new Promise(async (resolve, reject) => { - try { - const userData = await Users.findOne(filter, projection) - resolve(userData) - } catch (error) { - reject(error) - } - }) + static async findOne(filter, projection = {}) { + try { + const userData = await Users.findOne(filter, projection) + return userData + } catch (error) { + return error + } } - static findAllUsers(filter, projection = {}) { - return new Promise(async (resolve, reject) => { - try { - const usersData = await Users.find(filter, projection) - resolve(usersData) - } catch (error) { - reject(error) - } - }) + static async findAllUsers(filter, projection = {}) { + try { + const usersData = await Users.find(filter, projection) + return usersData + } catch (error) { + return error + } } - static createUser(data) { - return new Promise(async (resolve, reject) => { - try { - await new Users(data).save() - resolve(true) - } catch (error) { - reject(error) - } - }) + static async createUser(data) { + try { + await new Users(data).save() + return true + } catch (error) { + return error + } } - static updateOneUser(filter, update, options = {}) { - return new Promise(async (resolve, reject) => { - try { - const res = await Users.updateOne(filter, update, options) - if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { - resolve(true) - } else { - resolve(false) - } - } catch (error) { - reject(error) + static async updateOneUser(filter, update, options = {}) { + try { + const res = await Users.updateOne(filter, update, options) + if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) { + return true + } else { + return false } - }) + } catch (error) { + return error + } } - static searchMentors(page, limit, search, userId) { - return new Promise(async (resolve, reject) => { - try { - let users = await Users.aggregate([ - { - $match: { - deleted: false, - isAMentor: true, - _id: { - $ne: ObjectId(userId), - }, - $or: [{ name: new RegExp(search, 'i') }], - }, - }, - { - $project: { - name: 1, - image: 1, - areasOfExpertise: 1, + static async searchMentors(page, limit, search, userId) { + try { + let users = await Users.aggregate([ + { + $match: { + deleted: false, + isAMentor: true, + _id: { + $ne: ObjectId(userId), }, + $or: [{ name: new RegExp(search, 'i') }], }, - { - $sort: { name: 1 }, + }, + { + $project: { + name: 1, + image: 1, + areasOfExpertise: 1, }, - { - $facet: { - totalCount: [{ $count: 'count' }], - data: [{ $skip: limit * (page - 1) }, { $limit: limit }], - }, + }, + { + $sort: { name: 1 }, + }, + { + $facet: { + totalCount: [{ $count: 'count' }], + data: [{ $skip: limit * (page - 1) }, { $limit: limit }], }, - { - $project: { - data: 1, - count: { - $arrayElemAt: ['$totalCount.count', 0], - }, + }, + { + $project: { + data: 1, + count: { + $arrayElemAt: ['$totalCount.count', 0], }, }, - ]).collation({ locale: 'en', caseLevel: false }) + }, + ]).collation({ locale: 'en', caseLevel: false }) - return resolve(users) - } catch (error) { - return reject(error) - } - }) + return users + } catch (error) { + return error + } } static async listUsers(type, page, limit, search) { try { diff --git a/src/health-checks/kafka.js b/src/health-checks/kafka.js index 3b061d9e8..bde7c64c7 100644 --- a/src/health-checks/kafka.js +++ b/src/health-checks/kafka.js @@ -9,19 +9,17 @@ const kafka = require('kafka-node') function health_check() { - return new Promise(async (resolve, reject) => { - const client = new kafka.KafkaClient({ - kafkaHost: process.env.KAFKA_URL, - }) + const client = new kafka.KafkaClient({ + kafkaHost: process.env.KAFKA_URL, + }) - const producer = new kafka.Producer(client) + const producer = new kafka.Producer(client) - producer.on('error', function (err) { - return resolve(false) - }) - producer.on('ready', function () { - return resolve(true) - }) + producer.on('error', function (err) { + return false + }) + producer.on('ready', function () { + return true }) } diff --git a/src/health-checks/mongodb.js b/src/health-checks/mongodb.js index d6e5be27c..b12809415 100644 --- a/src/health-checks/mongodb.js +++ b/src/health-checks/mongodb.js @@ -9,16 +9,14 @@ const mongoose = require('mongoose') function health_check() { - return new Promise(async (resolve, reject) => { - const db = mongoose.createConnection(process.env.MONGODB_URL) - db.on('error', function () { - return resolve(false) - }) + const db = mongoose.createConnection(process.env.MONGODB_URL) + db.on('error', function () { + return false + }) - db.once('open', function () { - db.close(function () {}) - return resolve(true) - }) + db.once('open', function () { + db.close(function () {}) + return true }) } diff --git a/src/middlewares/validator.js b/src/middlewares/validator.js index ea0bb4629..22277bef6 100644 --- a/src/middlewares/validator.js +++ b/src/middlewares/validator.js @@ -4,8 +4,6 @@ * Date : 20-Oct-2021 * Description : Contains logic to call required validator from validators directory to validate request data */ -const fs = require('fs') - module.exports = (req, res, next) => { try { require(`@validators/${req.params.version}/${req.params.controller}`)[req.params.method](req) diff --git a/src/routes/index.js b/src/routes/index.js index 175891cdd..795b200d6 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -19,6 +19,7 @@ module.exports = (app) => { async function router(req, res, next) { let controllerResponse let validationError + console.log('rechecje') /* Check for input validation error */ try { @@ -58,6 +59,7 @@ module.exports = (app) => { } else { controller = require(`@controllers/${req.params.version}/${req.params.controller}`) } + console.log(controller) controllerResponse = new controller()[req.params.method] ? await new controller()[req.params.method](req) : next() diff --git a/src/services/helper/account.js b/src/services/helper/account.js index 34d48b4e2..328a0dcdc 100644 --- a/src/services/helper/account.js +++ b/src/services/helper/account.js @@ -106,7 +106,7 @@ module.exports = class AccountHelper { } await usersData.updateOneUser({ _id: ObjectId(user._id) }, update) - const deleteData = await redisCommunication.deleteKey(email) + await redisCommunication.deleteKey(email) const result = { access_token: accessToken, refresh_token: refreshToken, user } @@ -359,6 +359,7 @@ module.exports = class AccountHelper { if (userData && userData.action === 'forgetpassword') { otp = userData.otp // If valid then get previuosly generated otp + console.log(otp) } else { isValidOtpExist = false } @@ -570,7 +571,7 @@ module.exports = class AccountHelper { } await usersData.updateOneUser({ _id: user._id }, updateParams) - const deleteData = await redisCommunication.deleteKey(bodyData.email.toLowerCase()) + await redisCommunication.deleteKey(bodyData.email.toLowerCase()) /* Mongoose schema is in strict mode, so can not delete otpInfo directly */ delete user._doc.password @@ -598,51 +599,49 @@ module.exports = class AccountHelper { * @returns {CSV} - created mentors. */ static async bulkCreateMentors(mentors, tokenInformation) { - return new Promise(async (resolve, reject) => { - try { - const systemUser = await systemUserData.findUsersByEmail(tokenInformation.email) + try { + const systemUser = await systemUserData.findUsersByEmail(tokenInformation.email) - if (!systemUser) { - return common.failureResponse({ - message: apiResponses.USER_DOESNOT_EXISTS, - statusCode: httpStatusCode.bad_request, - responseCode: 'CLIENT_ERROR', - }) - } + if (!systemUser) { + return common.failureResponse({ + message: apiResponses.USER_DOESNOT_EXISTS, + statusCode: httpStatusCode.bad_request, + responseCode: 'CLIENT_ERROR', + }) + } - if (systemUser.role.toLowerCase() !== 'admin') { - return common.failureResponse({ - message: apiResponses.NOT_AN_ADMIN, - statusCode: httpStatusCode.bad_request, - responseCode: 'CLIENT_ERROR', - }) - } + if (systemUser.role.toLowerCase() !== 'admin') { + return common.failureResponse({ + message: apiResponses.NOT_AN_ADMIN, + statusCode: httpStatusCode.bad_request, + responseCode: 'CLIENT_ERROR', + }) + } - const fileName = `mentors-creation` - let fileStream = new FILESTREAM(fileName) - let input = fileStream.initStream() + const fileName = 'mentors-creation' + let fileStream = new FILESTREAM(fileName) + let input = fileStream.initStream() - ;(async function () { - await fileStream.getProcessorPromise() - return resolve({ - isResponseAStream: true, - fileNameWithPath: fileStream.fileNameWithPath(), - }) - })() - - for (const mentor of mentors) { - mentor.isAMentor = true - const data = await this.create(mentor) - mentor.email = mentor.email.address - mentor.status = data.message - input.push(mentor) + ;(async function () { + await fileStream.getProcessorPromise() + return { + isResponseAStream: true, + fileNameWithPath: fileStream.fileNameWithPath(), } + })() - input.push(null) - } catch (error) { - throw error + for (const mentor of mentors) { + mentor.isAMentor = true + const data = await this.create(mentor) + mentor.email = mentor.email.address + mentor.status = data.message + input.push(mentor) } - }) + + input.push(null) + } catch (error) { + throw error + } } /** @@ -825,6 +824,7 @@ module.exports = class AccountHelper { */ static async acceptTermsAndCondition(userId) { try { + console.log('=======>', userId) const user = await usersData.findOne({ _id: userId }, { _id: 1 }) if (!user) { diff --git a/src/services/helper/mentors.js b/src/services/helper/mentors.js index 966c97fcd..11c3801c1 100644 --- a/src/services/helper/mentors.js +++ b/src/services/helper/mentors.js @@ -10,7 +10,6 @@ const usersData = require('@db/users/queries') const apiResponses = require('@constants/api-responses') const common = require('@constants/common') const httpStatusCode = require('@generics/http-status') -const cloudServices = require('@generics/cloud-services') const utilsHelper = require('@generics/utils') module.exports = class MentorsHelper { diff --git a/src/services/helper/userentity.js b/src/services/helper/userentity.js index e58f24295..3ff165a82 100644 --- a/src/services/helper/userentity.js +++ b/src/services/helper/userentity.js @@ -8,7 +8,6 @@ // Dependencies const ObjectId = require('mongoose').Types.ObjectId -const utilsHelper = require('@generics/utils') const httpStatusCode = require('@generics/http-status') const apiResponses = require('@constants/api-responses') const common = require('@constants/common') @@ -29,7 +28,8 @@ module.exports = class UserEntityHelper { bodyData.createdBy = ObjectId(_id) bodyData.updatedBy = ObjectId(_id) try { - const filter = { type: bodyData.type, code: bodyData.code } + const filter = { type: bodyData.type, value: bodyData.value } + console.log(filter) const entity = await userEntitiesData.findOneEntity(filter) if (entity) { return common.failureResponse({ diff --git a/src/test/accountController.spec.js b/src/test/accountController.spec.js index 4c6da49cc..cfce4c71b 100644 --- a/src/test/accountController.spec.js +++ b/src/test/accountController.spec.js @@ -2,7 +2,6 @@ require('module-alias/register') const chai = require('chai') const sinon = require('sinon') const expect = chai.expect -const flushPromises = () => new Promise(setImmediate) global.db = { model: function () { From e5713c1a9d18231d30cedae947f092134624a5d3 Mon Sep 17 00:00:00 2001 From: rakesh kumar Date: Thu, 17 Nov 2022 17:24:19 +0530 Subject: [PATCH 02/13] kafka js fixes --- src/.env.sample | 2 +- src/configs/kafka.js | 81 ++++++++++++----------------- src/generics/kafka-communication.js | 27 +++++----- src/health-checks/kafka.js | 21 +++++--- src/package.json | 2 +- 5 files changed, 61 insertions(+), 72 deletions(-) diff --git a/src/.env.sample b/src/.env.sample index 8486eefa8..08cf293c5 100644 --- a/src/.env.sample +++ b/src/.env.sample @@ -18,7 +18,7 @@ ACCESS_TOKEN_SECRET = 'access-token-secret' # Token secret to generate refresh token REFRESH_TOKEN_SECRET = 'refresh-token-secret' -# Kafka hosted server url +# Kafka hosted server url separated by comma KAFKA_URL = localhost:9092 # Kafka group to which consumer belongs diff --git a/src/configs/kafka.js b/src/configs/kafka.js index 40dcab33d..0e7f6d696 100644 --- a/src/configs/kafka.js +++ b/src/configs/kafka.js @@ -6,64 +6,49 @@ */ //Dependencies -const Kafka = require('kafka-node') +const { Kafka } = require('kafkajs') + const utils = require('@generics/utils') const profileService = require('@services/helper/profile') -module.exports = () => { - const Producer = Kafka.Producer - const KafkaClient = new Kafka.KafkaClient({ - kafkaHost: process.env.KAFKA_URL, - }) - const producer = new Producer(KafkaClient) - - /* Uncomment while writing consuming actions for this service */ - // const Consumer = Kafka.Consumer; - // const consumer = new Consumer(KafkaClient, [ { topic: process.env.RATING_TOPIC } ], { autoCommit: true, groupId: process.env.KAFKA_GROUP_ID }) - - /* Registered events */ - - KafkaClient.on('error', (error) => { - console.log('Kafka connection error: ', error) +module.exports = async () => { + const kafkaIps = process.env.KAFKA_URL.split(',') + const KafkaClient = new Kafka({ + clientId: 'mentoring', + brokers: kafkaIps, }) - KafkaClient.on('connect', () => { - console.log('Connected to kafka client') - }) + const producer = KafkaClient.producer() + const consumer = KafkaClient.consumer({ groupId: process.env.KAFKA_GROUP_ID }) - producer.on('error', (error) => { - console.log('Kafka producer intialization error: ', error) - }) + await producer.connect() + await consumer.connect() - producer.on('ready', () => { - console.log('Producer intialized successfully') + producer.on('producer.connect', () => { + console.log(`KafkaProvider: connected`) }) - - const consumer = new Kafka.ConsumerGroup( - { - kafkaHost: process.env.KAFKA_URL, - groupId: process.env.KAFKA_GROUP_ID, - autoCommit: true, - }, - [process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE] - ) - - consumer.on('message', async function (message) { - try { - let streamingData = JSON.parse(message.value) - if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) { - profileService.ratingCalculation(streamingData) - } else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') { - utils.internalDel(streamingData.value) - } - } catch (error) { - console.log('failed', error) - } + producer.on('producer.disconnect', () => { + console.log(`KafkaProvider: could not connect`) }) - consumer.on('error', async function (error) { - console.log('kafka consumer intialization error', error) - }) + const subscribeToConsumer = async () => { + await consumer.subscribe({ topics: [process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE] }) + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + try { + let streamingData = JSON.parse(message.value) + if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) { + profileService.ratingCalculation(streamingData) + } else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') { + utils.internalDel(streamingData.value) + } + } catch (error) { + console.log('failed', error) + } + }, + }) + } + subscribeToConsumer() global.kafkaProducer = producer global.kafkaClient = KafkaClient diff --git a/src/generics/kafka-communication.js b/src/generics/kafka-communication.js index cb2a039eb..7a9a23c0b 100644 --- a/src/generics/kafka-communication.js +++ b/src/generics/kafka-communication.js @@ -7,7 +7,7 @@ const pushEmailToKafka = async (message) => { try { - const payload = [{ topic: process.env.NOTIFICATION_KAFKA_TOPIC, messages: JSON.stringify(message) }] + const payload = { topic: process.env.NOTIFICATION_KAFKA_TOPIC, messages: [{ value: JSON.stringify(message) }] } return await pushPayloadToKafka(payload) } catch (error) { throw error @@ -15,24 +15,23 @@ const pushEmailToKafka = async (message) => { } const pushPayloadToKafka = (payload) => { - return new Promise((resolve, reject) => { - kafkaProducer.send(payload, (error, data) => { - if (error) { - reject(error) - } - resolve(data) - }) + return new Promise(async function (resolve, reject) { + let response = await kafkaProducer.send(payload) + if (response) { + resolve(response) + } else { + reject(response) + } }) } const clearInternalCache = async (key) => { try { - const payload = [ - { - topic: process.env.CLEAR_INTERNAL_CACHE, - messages: JSON.stringify({ value: key, type: 'CLEAR_INTERNAL_CACHE' }), - }, - ] + const payload = { + topic: process.env.CLEAR_INTERNAL_CACHE, + messages: [{ value: JSON.stringify({ value: key, type: 'CLEAR_INTERNAL_CACHE' }) }], + } + return await pushPayloadToKafka(payload) } catch (error) { throw error diff --git a/src/health-checks/kafka.js b/src/health-checks/kafka.js index 3b061d9e8..fc1ad655a 100644 --- a/src/health-checks/kafka.js +++ b/src/health-checks/kafka.js @@ -6,22 +6,27 @@ */ // Dependencies -const kafka = require('kafka-node') +const kafka = require('kafkajs') function health_check() { return new Promise(async (resolve, reject) => { - const client = new kafka.KafkaClient({ - kafkaHost: process.env.KAFKA_URL, + const kafkaIps = process.env.KAFKA_URL.split(',') + const KafkaClient = new Kafka({ + clientId: 'mentoring', + brokers: kafkaIps, }) - const producer = new kafka.Producer(client) + const producer = KafkaClient.producer() + await producer.connect() - producer.on('error', function (err) { - return resolve(false) - }) - producer.on('ready', function () { + producer.on('producer.connect', () => { + console.log(`KafkaProvider: connected`) return resolve(true) }) + producer.on('producer.disconnect', () => { + console.log(`KafkaProvider: could not connect`) + return resolve(false) + }) }) } diff --git a/src/package.json b/src/package.json index db86990a2..73bb9fdf1 100644 --- a/src/package.json +++ b/src/package.json @@ -50,7 +50,7 @@ "jest": "^28.1.2", "json2csv": "^5.0.6", "jsonwebtoken": "^8.5.1", - "kafka-node": "^5.0.0", + "kafkajs": "^2.2.0", "md5": "^2.3.0", "module-alias": "^2.2.2", "moment": "^2.29.1", From d4f7989973f0538742b8f864dc4ce9f4586c16a2 Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 29 Nov 2022 15:26:25 +0530 Subject: [PATCH 03/13] updated circleci --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5bbbc148f..7d05d9408 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -50,4 +50,4 @@ workflows: - SonarCloud filters: tags: - only: ^(develop|dev|release).* + only: \b(dev|develop|main)\b From 96888b7a2456b974f14b8af3a3cfb198de4f4a7b Mon Sep 17 00:00:00 2001 From: rakesh kumar Date: Tue, 29 Nov 2022 17:59:42 +0530 Subject: [PATCH 04/13] mongo replica set changes --- src/.env.sample | 8 +++++++- src/configs/mongodb.js | 11 +++++++++-- src/envVariables.js | 8 ++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/.env.sample b/src/.env.sample index 8486eefa8..80b21a7ba 100644 --- a/src/.env.sample +++ b/src/.env.sample @@ -121,4 +121,10 @@ CLEAR_INTERNAL_CACHE = 'userInternal' KEY = 'g5MQ7HG/r5gPCPQQCwfBBEduAt72ewJIY/gWc0RNoak=' #IV for email encryption 16 bit string -IV = '2lIctRkqzYMWbwlW1jCC9A==' \ No newline at end of file +IV = '2lIctRkqzYMWbwlW1jCC9A==' + +#MongoDB replica set name +REPLICA_SET_NAME = rs + +#MongoDB replica set read preferance +REPLICA_SET_READ_PREFERENCE = secondaryPreferance \ No newline at end of file diff --git a/src/configs/mongodb.js b/src/configs/mongodb.js index fa8a6bc26..a28c92e87 100644 --- a/src/configs/mongodb.js +++ b/src/configs/mongodb.js @@ -10,11 +10,18 @@ const mongoose = require('mongoose') const mongoose_autopopulate = require('mongoose-autopopulate') const mongoose_timestamp = require('mongoose-timestamp') - module.exports = function () { // Added to remove depreciation warnings from logs. - const db = mongoose.createConnection(process.env.MONGODB_URL, { + let parameters + if (process.env.REPLICA_SET_NAME) { + parameters = '?replicaSet=' + process.env.REPLICA_SET_NAME + } + if (process.env.REPLICA_SET_NAME && process.env.REPLICA_SET_READ_PREFERENCE) { + parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE + } + + const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { useNewUrlParser: true, }) diff --git a/src/envVariables.js b/src/envVariables.js index 754943881..91abc9566 100644 --- a/src/envVariables.js +++ b/src/envVariables.js @@ -160,6 +160,14 @@ let enviromentVariables = { message: 'Required oci bucket name', optional: process.env.CLOUD_STORAGE === 'OCI' ? false : true, }, + REPLICA_SET_NAME: { + message: 'Required replica set name', + optional: true, + }, + REPLICA_SET_READ_PREFERENCE: { + message: 'Required replica read preferance', + optional: process.env.REPLICA_SET_NAME ? false : true, + }, } let success = true From d041ade5d2849af605d51ab105e11f9ac676df87 Mon Sep 17 00:00:00 2001 From: ankitpws Date: Wed, 30 Nov 2022 07:47:29 +0530 Subject: [PATCH 05/13] saving chnages --- src/db/userentities/query.js | 4 ++-- src/routes/index.js | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/db/userentities/query.js b/src/db/userentities/query.js index 5d4cf1fa4..adbbf26aa 100644 --- a/src/db/userentities/query.js +++ b/src/db/userentities/query.js @@ -11,8 +11,8 @@ const UserEntities = require('./model') module.exports = class UserEntityData { static async createEntity(data) { try { - await new UserEntities(data).save() - return true + const userEntityDataRes = await new UserEntities(data).save() + return userEntityDataRes } catch (error) { return error } diff --git a/src/routes/index.js b/src/routes/index.js index be1430abe..d8b7cf0ac 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -19,7 +19,6 @@ module.exports = (app) => { async function router(req, res, next) { let controllerResponse let validationError - console.log('rechecje') /* Check for input validation error */ try { From b59b8d828bcfcece336a04c9407261fdfd2e8af5 Mon Sep 17 00:00:00 2001 From: ankitshahu Date: Wed, 30 Nov 2022 17:32:33 +0530 Subject: [PATCH 06/13] adding and pushing --- src/middlewares/validator.js | 2 +- src/routes/index.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/middlewares/validator.js b/src/middlewares/validator.js index 22277bef6..013a4cfbb 100644 --- a/src/middlewares/validator.js +++ b/src/middlewares/validator.js @@ -7,6 +7,6 @@ module.exports = (req, res, next) => { try { require(`@validators/${req.params.version}/${req.params.controller}`)[req.params.method](req) - } catch (error) {} + } catch {} next() } diff --git a/src/routes/index.js b/src/routes/index.js index d8b7cf0ac..1529f48e0 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -58,7 +58,6 @@ module.exports = (app) => { } else { controller = require(`@controllers/${req.params.version}/${req.params.controller}`) } - console.log(controller) controllerResponse = new controller()[req.params.method] ? await new controller()[req.params.method](req) : next() From e452f3980b34d134fcc2b7783401058b15d214ab Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 6 Dec 2022 11:17:33 +0530 Subject: [PATCH 07/13] fixed mongo config --- src/configs/mongodb.js | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/configs/mongodb.js b/src/configs/mongodb.js index a28c92e87..fd9937811 100644 --- a/src/configs/mongodb.js +++ b/src/configs/mongodb.js @@ -21,9 +21,16 @@ module.exports = function () { parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE } - const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { - useNewUrlParser: true, - }) + let db + if (!parameters) { + db = mongoose.createConnection(process.env.MONGODB_URL, { + useNewUrlParser: true, + }) + } else { + db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { + useNewUrlParser: true, + }) + } db.on('error', function () { console.log('Database connection error:') From 68e6a11cfe66835283279643f8f276f7dbfeab22 Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 6 Dec 2022 12:42:53 +0530 Subject: [PATCH 08/13] removed console logs --- README.md | 5 ----- src/controllers/v1/account.js | 1 - src/services/helper/account.js | 1 - 3 files changed, 7 deletions(-) diff --git a/README.md b/README.md index 79e9d872b..73769431c 100644 --- a/README.md +++ b/README.md @@ -406,8 +406,3 @@ Several open source dependencies that have aided user service development: -<<<<<<< HEAD - -# - -> > > > > > > develop diff --git a/src/controllers/v1/account.js b/src/controllers/v1/account.js index 1bd60047e..c6e5a5aaa 100644 --- a/src/controllers/v1/account.js +++ b/src/controllers/v1/account.js @@ -208,7 +208,6 @@ module.exports = class Account { */ async acceptTermsAndCondition(req) { try { - console.log(req.decodedToken._id, '------------------------') const result = await accountHelper.acceptTermsAndCondition(req.decodedToken._id) return result } catch (error) { diff --git a/src/services/helper/account.js b/src/services/helper/account.js index 9a8d40b42..ce726d990 100644 --- a/src/services/helper/account.js +++ b/src/services/helper/account.js @@ -839,7 +839,6 @@ module.exports = class AccountHelper { */ static async acceptTermsAndCondition(userId) { try { - console.log('=======>', userId) const user = await usersData.findOne({ _id: userId }, { _id: 1 }) if (!user) { From 62d2c3b5bb9f4114ab27368f27844c94e0e475b1 Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 6 Dec 2022 17:49:23 +0530 Subject: [PATCH 09/13] updated kafka-comm.js --- src/generics/kafka-communication.js | 15 +++++++-------- src/services/helper/account.js | 4 +++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/generics/kafka-communication.js b/src/generics/kafka-communication.js index 7a9a23c0b..f4099beeb 100644 --- a/src/generics/kafka-communication.js +++ b/src/generics/kafka-communication.js @@ -10,19 +10,18 @@ const pushEmailToKafka = async (message) => { const payload = { topic: process.env.NOTIFICATION_KAFKA_TOPIC, messages: [{ value: JSON.stringify(message) }] } return await pushPayloadToKafka(payload) } catch (error) { + return error throw error } } -const pushPayloadToKafka = (payload) => { - return new Promise(async function (resolve, reject) { +const pushPayloadToKafka = async (payload) => { + try { let response = await kafkaProducer.send(payload) - if (response) { - resolve(response) - } else { - reject(response) - } - }) + return response + } catch (error) { + return error + } } const clearInternalCache = async (key) => { diff --git a/src/services/helper/account.js b/src/services/helper/account.js index ce726d990..fc1416bd0 100644 --- a/src/services/helper/account.js +++ b/src/services/helper/account.js @@ -486,7 +486,9 @@ module.exports = class AccountHelper { await kafkaCommunication.pushEmailToKafka(payload) } - console.log(otp) + if (process.env.APPLICATION_ENV === 'development') { + console.log(otp) + } return common.successResponse({ statusCode: httpStatusCode.ok, message: 'REGISTRATION_OTP_SENT_SUCCESSFULLY', From d90c8ce897f44fb9ef8806a5027cc07928d711bb Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 6 Dec 2022 18:38:19 +0530 Subject: [PATCH 10/13] updated kafka --- src/generics/kafka-communication.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/generics/kafka-communication.js b/src/generics/kafka-communication.js index f4099beeb..9644b0753 100644 --- a/src/generics/kafka-communication.js +++ b/src/generics/kafka-communication.js @@ -11,7 +11,6 @@ const pushEmailToKafka = async (message) => { return await pushPayloadToKafka(payload) } catch (error) { return error - throw error } } From 984dd91c7f57b2ee1053f1be8a2b8ed0bc997290 Mon Sep 17 00:00:00 2001 From: rakesh kumar Date: Mon, 12 Dec 2022 13:11:29 +0530 Subject: [PATCH 11/13] mongo fix --- src/configs/mongodb.js | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/configs/mongodb.js b/src/configs/mongodb.js index a28c92e87..eba41297f 100644 --- a/src/configs/mongodb.js +++ b/src/configs/mongodb.js @@ -21,9 +21,15 @@ module.exports = function () { parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE } - const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { - useNewUrlParser: true, - }) + if (parameters) { + const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { + useNewUrlParser: true, + }) + } else { + const db = mongoose.createConnection(process.env.MONGODB_URL, { + useNewUrlParser: true, + }) + } db.on('error', function () { console.log('Database connection error:') From cecd7d7b59db1b4dae94486b3d6350a6f2a3a019 Mon Sep 17 00:00:00 2001 From: rakesh kumar Date: Mon, 12 Dec 2022 13:16:00 +0530 Subject: [PATCH 12/13] mongo fix --- src/configs/mongodb.js | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/configs/mongodb.js b/src/configs/mongodb.js index eba41297f..a4a8ea4c3 100644 --- a/src/configs/mongodb.js +++ b/src/configs/mongodb.js @@ -13,7 +13,7 @@ const mongoose_timestamp = require('mongoose-timestamp') module.exports = function () { // Added to remove depreciation warnings from logs. - let parameters + let parameters = '' if (process.env.REPLICA_SET_NAME) { parameters = '?replicaSet=' + process.env.REPLICA_SET_NAME } @@ -21,15 +21,9 @@ module.exports = function () { parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE } - if (parameters) { - const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { - useNewUrlParser: true, - }) - } else { - const db = mongoose.createConnection(process.env.MONGODB_URL, { - useNewUrlParser: true, - }) - } + const db = mongoose.createConnection(process.env.MONGODB_URL + parameters, { + useNewUrlParser: true, + }) db.on('error', function () { console.log('Database connection error:') From 51bd975f00d7e0e7fa2f62106097012288afc53b Mon Sep 17 00:00:00 2001 From: Nevil Date: Tue, 13 Dec 2022 16:10:25 +0530 Subject: [PATCH 13/13] updated en and hi for OTP_INVALID --- src/locales/en.json | 2 +- src/locales/hi.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/locales/en.json b/src/locales/en.json index e7bdaa70b..b121a3ee3 100644 --- a/src/locales/en.json +++ b/src/locales/en.json @@ -31,7 +31,7 @@ "OTP_SENT_SUCCESSFULLY": "OTP has been sent to your registered email ID. Please enter the number to update your password.", "REGISTRATION_OTP_SENT_SUCCESSFULLY": "OTP has been sent to your registered email ID. Please enter the otp to complete the registration process.", "PASSWORD_RESET_SUCCESSFULLY": "Password reset successfully.", - "OTP_INVALID": "Wrong OTP, please try again.", + "OTP_INVALID": "The OTP you entered is wrong.", "RESET_OTP_INVALID": "Please enter the correct OTP.", "SYSTEM_USER_ALREADY_EXISTS": "System User already exists.", "NOT_AN_ADMIN": "Not an admin", diff --git a/src/locales/hi.json b/src/locales/hi.json index 96aaedd0e..f29f3f8a2 100644 --- a/src/locales/hi.json +++ b/src/locales/hi.json @@ -31,7 +31,7 @@ "OTP_SENT_SUCCESSFULLY": "आपके पंजीकृत ईमेल आईडी पर ओटीपी भेजा गया है। अपना पासवर्ड अपडेट करने के लिए कृपया नंबर दर्ज करें।", "REGISTRATION_OTP_SENT_SUCCESSFULLY": "आपके पंजीकृत ईमेल आईडी पर ओटीपी भेजा गया है। कृपया पंजीकरण प्रक्रिया को पूरा करने के लिए ओटीपी दर्ज करें।", "PASSWORD_RESET_SUCCESSFULLY": "पासवर्ड सफलतापूर्वक बदला गया।", - "OTP_INVALID": " ओटीपी अमान्य है, कृपया पुनः प्रयास करें", + "OTP_INVALID": "आपके द्वारा दर्ज किया हुआ OTP गलत है।", "RESET_OTP_INVALID": "कृपया सही ओटीपी दर्ज करें।", "SYSTEM_USER_ALREADY_EXISTS": "System User already exists.", "NOT_AN_ADMIN": "Not an admin",