From 4d0a55ce8aac856cbf8e8c97db08c3b06f1ab289 Mon Sep 17 00:00:00 2001 From: Akash Shaw Date: Sun, 7 Apr 2024 19:09:33 +0530 Subject: [PATCH] chore: modifying the consumer side logic --- backend/notif-worker/package.json | 3 + backend/notif-worker/src/index.ts | 37 ++++++- backend/notif-worker/src/kafka/consumer.ts | 112 ++++++++++++++------- 3 files changed, 109 insertions(+), 43 deletions(-) diff --git a/backend/notif-worker/package.json b/backend/notif-worker/package.json index f8606c01..3b706514 100644 --- a/backend/notif-worker/package.json +++ b/backend/notif-worker/package.json @@ -21,7 +21,10 @@ "esbuild": "^0.20.2", "express": "^4.19.1", "kafkajs": "^2.2.4", + "nodemailer": "^6.9.13", "prom-client": "^15.1.0", + "speakeasy": "^2.0.0", + "twilio": "^5.0.3", "web-push": "^3.6.7" }, "devDependencies": { diff --git a/backend/notif-worker/src/index.ts b/backend/notif-worker/src/index.ts index 99d1c66a..0d6699e9 100644 --- a/backend/notif-worker/src/index.ts +++ b/backend/notif-worker/src/index.ts @@ -8,6 +8,11 @@ import { WorkerAdmin } from "./kafka/admin"; import { ConsumerWorker } from "./kafka/consumer"; import { ProducerWorker } from "./kafka/producer"; import Prometheus from "prom-client"; +import { TopicTypes } from "@paybox/common"; +import twilio from 'twilio'; +import nodemailer from 'nodemailer'; +import { GMAIL, GMAIL_APP_PASS, MAIL_SERVICE, TWILLO_ACCOUNT_SID, TWILLO_TOKEN } from "./config" +import { RedisBase } from "@paybox/backend-common"; export const kafka = new Kafka({ @@ -15,6 +20,18 @@ export const kafka = new Kafka({ brokers: [KAFKA_URL], }); +console.log(TWILLO_TOKEN, TWILLO_ACCOUNT_SID); +export const twillo = twilio(TWILLO_ACCOUNT_SID, TWILLO_TOKEN); +export const transporter = nodemailer.createTransport({ + service: MAIL_SERVICE, + port: 465, + secure: true, + auth: { + user: GMAIL, + pass: GMAIL_APP_PASS + } +}); + const defaultMetrics = Prometheus.collectDefaultMetrics; defaultMetrics({ register: Prometheus.register, }); @@ -49,11 +66,11 @@ if (cluster.isPrimary) { console.error('Error while fetching metrics:', error); return res.status(500).end('Error while fetching metrics'); } - }); + }); (async () => { await WorkerAdmin.getInstance().init([ - { topicName: "notif", partitions: 1 }, + { topicName: TopicTypes.Notif, partitions: 1 }, ]); // This can be connected in any service/s @@ -61,15 +78,25 @@ if (cluster.isPrimary) { })(); - app.listen(PORT, async () => { - console.log(`Server listening on port: ${PORT}\n`); + Promise.all([ + new Promise((resolve) => { + RedisBase.getInstance().getclient.on('ready', resolve); + }), + ]).then(() => { + app.listen(PORT, async () => { + console.log(`Server listening on port: ${PORT}\n`); + }); + }).catch((error) => { + console.error('Error while connecting producers:', error); }); + + } else { (async () => { try { await ConsumerWorker.getInstance().connectCounsumer( "notif-group", - ["notif"], + [TopicTypes.Notif, TopicTypes.Msg], true ); await ConsumerWorker.getInstance().runConsumer(); diff --git a/backend/notif-worker/src/kafka/consumer.ts b/backend/notif-worker/src/kafka/consumer.ts index 2025df3a..d3c2f77a 100644 --- a/backend/notif-worker/src/kafka/consumer.ts +++ b/backend/notif-worker/src/kafka/consumer.ts @@ -1,7 +1,7 @@ import { Consumer } from "kafkajs"; import { kafka } from ".."; -import { NotifTopics } from "@paybox/common"; -import {notifyFriendRequest, notifyFriendRequestAccepted, notifyFriendRequestRejected, notifyPaid, notifyReceiveTxn} from "../processes"; +import { MsgTopics, NotifTopics, TopicTypes } from "@paybox/common"; +import {notifyFriendRequest, notifyFriendRequestAccepted, notifyFriendRequestRejected, notifyPaid, notifyReceiveTxn, otpSendProcess, resendOtpProcess} from "../processes"; export class ConsumerWorker { private consumer!: Consumer; @@ -38,45 +38,81 @@ export class ConsumerWorker { async runConsumer() { await this.consumer.run({ eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { - const payload = JSON.parse(message.value?.toString() || ""); - - switch (payload.type) { - case NotifTopics.FriendRequest: - await notifyFriendRequest(payload.to, payload.from) - console.log("Friend Request Notification"); - break; - - case NotifTopics.FriendRequestAccepted: - await notifyFriendRequestAccepted(payload.to, payload.from, payload.friendshipId) - console.log("Friend Request Accepted"); - break; - - case NotifTopics.FriendRequestRejected: - await notifyFriendRequestRejected(payload.to, payload.from, payload.friendshipId) - console.log("Friend Request Rejected"); - break; - - case NotifTopics.TxnAccept: - await notifyReceiveTxn(payload.to, payload.from, payload.txnId) - console.log("Transaction Accepted"); - break; - - // case NotifTopics.TxnReject: - // //Todo: NOTIFY THE TRANSACTION REJECTED - // console.log("Transaction Rejected"); - // break; + try { + + const payload = JSON.parse(message.value?.toString() || ""); - case NotifTopics.Paid: - await notifyPaid(payload.to, payload.from, payload.txnId) - console.log("Transaction Paid"); - break; + switch(topic) { + case TopicTypes.Notif: + switch (payload.type) { + case NotifTopics.FriendRequest: + await notifyFriendRequest(payload.to, payload.from) + console.log("Friend Request Notification"); + break; + + case NotifTopics.FriendRequestAccepted: + await notifyFriendRequestAccepted(payload.to, payload.from, payload.friendshipId) + console.log("Friend Request Accepted"); + break; + + case NotifTopics.FriendRequestRejected: + await notifyFriendRequestRejected(payload.to, payload.from, payload.friendshipId) + console.log("Friend Request Rejected"); + break; + + case NotifTopics.TxnAccept: + await notifyReceiveTxn(payload.to, payload.from, payload.txnId) + console.log("Transaction Accepted"); + break; + + // case NotifTopics.TxnReject: + // //Todo: NOTIFY THE TRANSACTION REJECTED + // console.log("Transaction Rejected"); + // break; + + case NotifTopics.Paid: + await notifyPaid(payload.to, payload.from, payload.txnId) + console.log("Transaction Paid"); + break; + + default: + console.log(`No handler in topic: ${topic} for type: ${payload.type}`) + break; + } + break; + + case TopicTypes.Msg: + switch (payload.type) { + case MsgTopics.SendOtp: + await otpSendProcess(payload.name, payload.mobile, payload.email, payload.clientId) + console.log("OTP Sent"); + break; - default: - console.log(`No handler for topic: ${topic}`) - break; + case MsgTopics.ResendOtp: + await resendOtpProcess(payload.name, payload.mobile, payload.email, payload.clientId) + console.log("OTP Sent"); + break; + + default: + console.log(`No handler in topic: ${topic} for type: ${payload.type}`) + break; + } + break; + + default: + console.log(`No handler for topic: ${topic}`) + break; + } + + await heartbeat(); + } catch (error) { + if(error) { + console.log(error); + const resume = pause(); + setTimeout(resume, 1000) + } + throw error; } - - await heartbeat(); }, }); }