Skip to content

Commit

Permalink
Merge pull request #87 from ELEVATE-Project/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
rakeshSgr authored Dec 14, 2022
2 parents 276e0ac + 65c5e5c commit 2678651
Show file tree
Hide file tree
Showing 20 changed files with 354 additions and 381 deletions.
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"dependencies": {
"@babel/parser": "^7.20.3",
"@babel/types": "^7.20.2"
}
}
10 changes: 8 additions & 2 deletions src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ACCESS_TOKEN_SECRET = 'access-token-secret'
# Token secret to generate refresh token
REFRESH_TOKEN_SECRET = 'refresh-token-secret'

# Kafka hosted server url
# Kafka hosted server url separated by comma
KAFKA_URL = localhost:9092

# Kafka group to which consumer belongs
Expand Down Expand Up @@ -121,4 +121,10 @@ CLEAR_INTERNAL_CACHE = 'userInternal'
KEY = 'g5MQ7HG/r5gPCPQQCwfBBEduAt72ewJIY/gWc0RNoak='

#IV for email encryption 16 bit string
IV = '2lIctRkqzYMWbwlW1jCC9A=='
IV = '2lIctRkqzYMWbwlW1jCC9A=='

#MongoDB replica set name
REPLICA_SET_NAME = rs

#MongoDB replica set read preferance
REPLICA_SET_READ_PREFERENCE = secondaryPreferance
17 changes: 8 additions & 9 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ app.listen(process.env.APPLICATION_PORT, (res, err) => {

// Handles specific listen errors with friendly messages
function onError(error) {
switch (error.code) {
case 'EACCES':
console.log(process.env.APPLICATION_PORT + ' requires elevated privileges')
process.exit(1)
case 'EADDRINUSE':
console.log(process.env.APPLICATION_PORT + ' is already in use')
process.exit(1)
default:
throw error
if (error.code === 'EACCES') {
console.log(process.env.APPLICATION_PORT + ' requires elevated privileges')
process.exit(1)
} else if (error.code === 'EADDRINUSE') {
console.log(process.env.APPLICATION_PORT + ' is already in use')
process.exit(1)
} else {
throw error
}
}
81 changes: 33 additions & 48 deletions src/configs/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,49 @@
*/

//Dependencies
const Kafka = require('kafka-node')
const { Kafka } = require('kafkajs')

const utils = require('@generics/utils')
const profileService = require('@services/helper/profile')

module.exports = () => {
const Producer = Kafka.Producer
const KafkaClient = new Kafka.KafkaClient({
kafkaHost: process.env.KAFKA_URL,
})
const producer = new Producer(KafkaClient)

/* Uncomment while writing consuming actions for this service */
// const Consumer = Kafka.Consumer;
// const consumer = new Consumer(KafkaClient, [ { topic: process.env.RATING_TOPIC } ], { autoCommit: true, groupId: process.env.KAFKA_GROUP_ID })

/* Registered events */

KafkaClient.on('error', (error) => {
console.log('Kafka connection error: ', error)
module.exports = async () => {
const kafkaIps = process.env.KAFKA_URL.split(',')
const KafkaClient = new Kafka({
clientId: 'mentoring',
brokers: kafkaIps,
})

KafkaClient.on('connect', () => {
console.log('Connected to kafka client')
})
const producer = KafkaClient.producer()
const consumer = KafkaClient.consumer({ groupId: process.env.KAFKA_GROUP_ID })

producer.on('error', (error) => {
console.log('Kafka producer intialization error: ', error)
})
await producer.connect()
await consumer.connect()

producer.on('ready', () => {
console.log('Producer intialized successfully')
producer.on('producer.connect', () => {
console.log(`KafkaProvider: connected`)
})

const consumer = new Kafka.ConsumerGroup(
{
kafkaHost: process.env.KAFKA_URL,
groupId: process.env.KAFKA_GROUP_ID,
autoCommit: true,
},
[process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE]
)

consumer.on('message', async function (message) {
try {
let streamingData = JSON.parse(message.value)
if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) {
profileService.ratingCalculation(streamingData)
} else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') {
utils.internalDel(streamingData.value)
}
} catch (error) {
console.log('failed', error)
}
producer.on('producer.disconnect', () => {
console.log(`KafkaProvider: could not connect`)
})

consumer.on('error', async function (error) {
console.log('kafka consumer intialization error', error)
})
const subscribeToConsumer = async () => {
await consumer.subscribe({ topics: [process.env.RATING_KAFKA_TOPIC, process.env.CLEAR_INTERNAL_CACHE] })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
let streamingData = JSON.parse(message.value)
if (streamingData.type == 'MENTOR_RATING' && streamingData.value && streamingData.mentorId) {
profileService.ratingCalculation(streamingData)
} else if (streamingData.type == 'CLEAR_INTERNAL_CACHE') {
utils.internalDel(streamingData.value)
}
} catch (error) {
console.log('failed', error)
}
},
})
}
subscribeToConsumer()

global.kafkaProducer = producer
global.kafkaClient = KafkaClient
Expand Down
22 changes: 18 additions & 4 deletions src/configs/mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,27 @@ const mongoose = require('mongoose')
const mongoose_autopopulate = require('mongoose-autopopulate')
const mongoose_timestamp = require('mongoose-timestamp')


module.exports = function () {
// Added to remove depreciation warnings from logs.

const db = mongoose.createConnection(process.env.MONGODB_URL, {
useNewUrlParser: true,
})
let parameters = ''
if (process.env.REPLICA_SET_NAME) {
parameters = '?replicaSet=' + process.env.REPLICA_SET_NAME
}
if (process.env.REPLICA_SET_NAME && process.env.REPLICA_SET_READ_PREFERENCE) {
parameters = parameters + '&readPreference=' + process.env.REPLICA_SET_READ_PREFERENCE
}

let db
if (!parameters) {
db = mongoose.createConnection(process.env.MONGODB_URL, {
useNewUrlParser: true,
})
} else {
db = mongoose.createConnection(process.env.MONGODB_URL + parameters, {
useNewUrlParser: true,
})
}

db.on('error', function () {
console.log('Database connection error:')
Expand Down
82 changes: 35 additions & 47 deletions src/db/forms/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,50 @@
const Forms = require('./model')

module.exports = class FormsData {
static createForm(data) {
return new Promise(async (resolve, reject) => {
try {
await new Forms(data).save()
resolve(true)
} catch (error) {
reject(error)
}
})
static async createForm(data) {
try {
await new Forms(data).save()
return true
} catch (error) {
return error
}
}

static async findOneForm(filter, projection = {}) {
try {
const formData = await Forms.findOne(filter, projection)
return formData
} catch (error) {
return error
}
}

static findOneForm(filter) {
const projection = {}
return new Promise(async (resolve, reject) => {
try {
const formData = await Forms.findOne(filter, projection)
resolve(formData)
} catch (error) {
reject(error)
static async updateOneForm(filter, update, options = {}) {
try {
const res = await Forms.updateOne(filter, update, options)
if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) {
return 'FORM_UPDATED'
} else if ((res.n === 1 && res.nModified === 0) || (res.matchedCount === 1 && res.modifiedCount === 0)) {
return 'FORM_ALREADY_EXISTS'
} else {
return 'FORM_NOT_FOUND'
}
})
} catch (error) {
return error
}
}

static findAllTypeFormVersion() {
static async findAllTypeFormVersion() {
const projection = {
_id: 1,
type: 1,
__v: 1,
}
return new Promise(async (resolve, reject) => {
try {
const formData = await Forms.find({}, projection)
resolve(formData)
} catch (error) {
reject(error)
}
})
}

static updateOneForm(filter, update, options = {}) {
return new Promise(async (resolve, reject) => {
try {
const res = await Forms.updateOne(filter, update, options)
if ((res.n === 1 && res.nModified === 1) || (res.matchedCount === 1 && res.modifiedCount === 1)) {
resolve('ENTITY_UPDATED')
} else if (
(res.n === 1 && res.nModified === 0) ||
(res.matchedCount === 1 && res.modifiedCount === 0)
) {
resolve('ENTITY_ALREADY_EXISTS')
} else {
resolve('ENTITY_NOT_FOUND')
}
} catch (error) {
reject(error)
}
})
try {
const formData = await Forms.find({}, projection)
return formData
} catch (error) {
return error
}
}
}
91 changes: 43 additions & 48 deletions src/db/notification-template/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,67 +9,62 @@
const NotificationTemplate = require('./model')

module.exports = class NotificationTemplateData {
static findOneEmailTemplate(code) {
static async findOneEmailTemplate(code) {
const filter = {
code,
type: 'email',
deleted: false,
status: 'active',
}
return new Promise(async (resolve, reject) => {
try {
const templateData = await NotificationTemplate.findOne(filter).lean()
if (templateData && templateData.emailHeader) {
const header = await this.getEmailHeader(templateData.emailHeader)
if (header && header.body) {
templateData['body'] = header.body + templateData['body']
}
}

if (templateData && templateData.emailFooter) {
const footer = await this.getEmailFooter(templateData.emailFooter)
if (footer && footer.body) {
templateData['body'] = templateData['body'] + footer.body
}
try {
const templateData = await NotificationTemplate.findOne(filter).lean()
if (templateData && templateData.emailHeader) {
const header = await this.getEmailHeader(templateData.emailHeader)
if (header && header.body) {
templateData['body'] = header.body + templateData['body']
}
resolve(templateData)
} catch (error) {
reject(error)
}
})
}
static getEmailHeader(header) {
return new Promise(async (resolve, reject) => {
try {
const filterEmailHeader = {
code: header,
type: 'emailHeader',
deleted: false,
status: 'active',
}
const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean()

resolve(headerData)
} catch (error) {
reject(error)
if (templateData && templateData.emailFooter) {
const footer = await this.getEmailFooter(templateData.emailFooter)
if (footer && footer.body) {
templateData['body'] = templateData['body'] + footer.body
}
}
})
return templateData
} catch (error) {
return error
}
}
static getEmailFooter(footer) {
return new Promise(async (resolve, reject) => {
try {
const filterEmailFooter = {
code: footer,
type: 'emailFooter',
deleted: false,
status: 'active',
}
const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean()
static async getEmailHeader(header) {
try {
const filterEmailHeader = {
code: header,
type: 'emailHeader',
deleted: false,
status: 'active',
}
const headerData = await NotificationTemplate.findOne(filterEmailHeader).lean()

resolve(footerData)
} catch (error) {
reject(error)
return headerData
} catch (error) {
return error
}
}
static async getEmailFooter(footer) {
try {
const filterEmailFooter = {
code: footer,
type: 'emailFooter',
deleted: false,
status: 'active',
}
})
const footerData = await NotificationTemplate.findOne(filterEmailFooter).lean()

return footerData
} catch (error) {
return error
}
}
}
Loading

0 comments on commit 2678651

Please sign in to comment.