Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #87

Merged
merged 23 commits into from
Dec 14, 2022
Merged

Dev #87

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"dependencies": {
"@babel/parser": "^7.20.3",
"@babel/types": "^7.20.2"
}
}
10 changes: 8 additions & 2 deletions src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ACCESS_TOKEN_SECRET = 'access-token-secret'
# Token secret to generate refresh token
REFRESH_TOKEN_SECRET = 'refresh-token-secret'

# Kafka hosted server url
# Kafka hosted server url separated by comma
KAFKA_URL = localhost:9092

# Kafka group to which consumer belongs
Expand Down Expand Up @@ -121,4 +121,10 @@ CLEAR_INTERNAL_CACHE = 'userInternal'
KEY = 'g5MQ7HG/r5gPCPQQCwfBBEduAt72ewJIY/gWc0RNoak='

#IV for email encryption 16 bit string
IV = '2lIctRkqzYMWbwlW1jCC9A=='
IV = '2lIctRkqzYMWbwlW1jCC9A=='

#MongoDB replica set name
REPLICA_SET_NAME = rs

#MongoDB replica set read preferance
REPLICA_SET_READ_PREFERENCE = secondaryPreferance
17 changes: 8 additions & 9 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ app.listen(process.env.APPLICATION_PORT, (res, err) => {

// Handles specific listen errors with friendly messages
function onError(error) {
switch (error.code) {
case 'EACCES':
console.log(process.env.APPLICATION_PORT + ' requires elevated privileges')
process.exit(1)
case 'EADDRINUSE':
console.log(process.env.APPLICATION_PORT + ' is already in use')
process.exit(1)
default:
throw error
if (error.code === 'EACCES') {
console.log(process.env.APPLICATION_PORT + ' requires elevated privileges')
process.exit(1)
} else if (error.code === 'EADDRINUSE') {
console.log(process.env.APPLICATION_PORT + ' is already in use')
process.exit(1)
} else {
throw error
}
}
81 changes: 33 additions & 48 deletions src/configs/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,49 @@
*/

//Dependencies
const Kafka = require('kafka-node')
const { Kafka } = require('kafkajs')

const utils = require('@generics/utils')
const profileService = require('@services/helper/profile')

module.exports = () => {
const Producer = Kafka.Producer
const KafkaClient = new Kafka.KafkaClient({
kafkaHost: process.env.KAFKA_URL,
})
const producer = new Producer(KafkaClient)

/* Uncomment while writing consuming actions for this service */
// const Consumer = Kafka.Consumer;
// const consumer = new Consumer(KafkaClient, [ { topic: process.env.RATING_TOPIC } ], { autoCommit: true, groupId: process.env.KAFKA_GROUP_ID })

/* Registered events */

KafkaClient.on('error', (error) => {
console.log('Kafka connection error: ', error)
module.exports = async () => {
const kafkaIps = process.env.KAFKA_URL.split(',')
const KafkaClient = new Kafka({
clientId: 'mentoring',
brokers: kafkaIps,
})

KafkaClient.on('connect', () => {
console.log('Connected to kafka client')
})
const producer = KafkaClient.producer()
const consumer = KafkaClient.consumer({ groupId: process.env.KAFKA_GROUP_ID })

producer.on('error', (error) => {
console.log('Kafka producer intialization error: ', error)
})
await producer.connect()
await consumer.connect()

producer.on('ready', () => {
console.log('Producer intialized successfully')
producer.on('producer.connect', () => {
console.log(`KafkaProvider: connected`)
})

const consumer = new Kafka.ConsumerGroup(
{
kafkaHost: process.env.KAFKA_URL,
groupId: process.env.KAFKA_GROUP_ID,
autoCommit: true,
},
[process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE]
)

consumer.on('message', async function (message) {
try {
let streamingData = JSON.parse(message.value)
if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) {
profileService.ratingCalculation(streamingData)
} else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') {
utils.internalDel(streamingData.value)
}
} catch (error) {
console.log('failed', error)
}
producer.on('producer.disconnect', () => {
console.log(`KafkaProvider: could not connect`)
})

consumer.on('error', async function (error) {
console.log('kafka consumer intialization error', error)
})
const subscribeToConsumer = async () => {
await consumer.subscribe({ topics: [process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE] })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
let streamingData = JSON.parse(message.value)
if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) {
profileService.ratingCalculation(streamingData)
} else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') {
utils.internalDel(streamingData.value)
}
} catch (error) {
console.log('failed', error)
}
},
})
}
subscribeToConsumer()

global.kafkaProducer = producer
global.kafkaClient = KafkaClient
Expand Down
22 changes: 18 additions & 4 deletions src/configs/mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,27 @@ const mongoose = require('mongoose')
const mongoose_autopopulate = require('mongoose-autopopulate')
const mongoose_timestamp = require('mongoose-timestamp')


module.exports = function () {
// Added to remove depreciation warnings from logs.

const db = mongoose.createConnection(process.env.MONGODB_URL, {
useNewUrlParser: true,
})
let parameters = ''
if (process.env.REPLICA_SET_NAME) {
parameters = '?replicaSet=' + process.env.REPLICA_SET_NAME
}
if (process.env.REPLICA_SET_NAME && process.env.REPLICA_SET_READ_PREFERENCE) {
parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE
}

let db
if (!parameters) {
db = mongoose.createConnection(process.env.MONGODB_URL, {
useNewUrlParser: true,
})
} else {
db = mongoose.createConnection(process.env.MONGODB_URL + parameters, {
useNewUrlParser: true,
})
}

db.on('error', function () {
console.log('Database connection error:')
Expand Down
82 changes: 35 additions & 47 deletions src/db/forms/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,50 @@
const Forms = require('./model')

module.exports = class FormsData {
static createForm(data) {
return new Promise(async (resolve, reject) => {
try {
await new Forms(data).save()
resolve(true)
} catch (error) {
reject(error)
}
})
static async createForm(data) {
try {
await new Forms(data).save()
return true
} catch (error) {
return error
}
}

static async findOneForm(filter, projection = {}) {
try {
const formData = await Forms.findOne(filter, projection)
return formData
} catch (error) {
return error
}
}

static findOneForm(filter) {
const projection = {}
return new Promise(async (resolve, reject) => {
try {
const formData = await Forms.findOne(filter, projection)
resolve(formData)
} catch (error) {
reject(error)
static async updateOneForm(filter, update, options = {}) {
try {
const res = await Forms.updateOne(filter, update, options)
if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) {
return 'FORM_UPDATED'
} else if ((res.n === 1 && res.nModified === 0) || (res.matchedCount === 1 && res.modifiedCount === 0)) {
return 'FORM_ALREADY_EXISTS'
} else {
return 'FORM_NOT_FOUND'
}
})
} catch (error) {
return error
}
}

static findAllTypeFormVersion() {
static async findAllTypeFormVersion() {
const projection = {
_id: 1,
type: 1,
__v: 1,
}
return new Promise(async (resolve, reject) => {
try {
const formData = await Forms.find({}, projection)
resolve(formData)
} catch (error) {
reject(error)
}
})
}

static updateOneForm(filter, update, options = {}) {
return new Promise(async (resolve, reject) => {
try {
const res = await Forms.updateOne(filter, update, options)
if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) {
resolve('ENTITY_UPDATED')
} else if (
(res.n === 1 && res.nModified === 0) ||
(res.matchedCount === 1 && res.modifiedCount === 0)
) {
resolve('ENTITY_ALREADY_EXISTS')
} else {
resolve('ENTITY_NOT_FOUND')
}
} catch (error) {
reject(error)
}
})
try {
const formData = await Forms.find({}, projection)
return formData
} catch (error) {
return error
}
}
}
91 changes: 43 additions & 48 deletions src/db/notification-template/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,67 +9,62 @@
const NotificationTemplate = require('./model')

module.exports = class NotificationTemplateData {
static findOneEmailTemplate(code) {
static async findOneEmailTemplate(code) {
const filter = {
code,
type: 'email',
deleted: false,
status: 'active',
}
return new Promise(async (resolve, reject) => {
try {
const templateData = await NotificationTemplate.findOne(filter).lean()
if (templateData && templateData.emailHeader) {
const header = await this.getEmailHeader(templateData.emailHeader)
if (header && header.body) {
templateData['body'] = header.body + templateData['body']
}
}

if (templateData && templateData.emailFooter) {
const footer = await this.getEmailFooter(templateData.emailFooter)
if (footer && footer.body) {
templateData['body'] = templateData['body'] + footer.body
}
try {
const templateData = await NotificationTemplate.findOne(filter).lean()
if (templateData && templateData.emailHeader) {
const header = await this.getEmailHeader(templateData.emailHeader)
if (header && header.body) {
templateData['body'] = header.body + templateData['body']
}
resolve(templateData)
} catch (error) {
reject(error)
}
})
}
static getEmailHeader(header) {
return new Promise(async (resolve, reject) => {
try {
const filterEmailHeader = {
code: header,
type: 'emailHeader',
deleted: false,
status: 'active',
}
const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean()

resolve(headerData)
} catch (error) {
reject(error)
if (templateData && templateData.emailFooter) {
const footer = await this.getEmailFooter(templateData.emailFooter)
if (footer && footer.body) {
templateData['body'] = templateData['body'] + footer.body
}
}
})
return templateData
} catch (error) {
return error
}
}
static getEmailFooter(footer) {
return new Promise(async (resolve, reject) => {
try {
const filterEmailFooter = {
code: footer,
type: 'emailFooter',
deleted: false,
status: 'active',
}
const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean()
static async getEmailHeader(header) {
try {
const filterEmailHeader = {
code: header,
type: 'emailHeader',
deleted: false,
status: 'active',
}
const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean()

resolve(footerData)
} catch (error) {
reject(error)
return headerData
} catch (error) {
return error
}
}
static async getEmailFooter(footer) {
try {
const filterEmailFooter = {
code: footer,
type: 'emailFooter',
deleted: false,
status: 'active',
}
})
const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean()

return footerData
} catch (error) {
return error
}
}
}
Loading