Skip to content

Commit

Permalink
chore: modifying the consumer side logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shawakash committed Apr 7, 2024
1 parent e552c5a commit 4d0a55c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 43 deletions.
3 changes: 3 additions & 0 deletions backend/notif-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"esbuild": "^0.20.2",
"express": "^4.19.1",
"kafkajs": "^2.2.4",
"nodemailer": "^6.9.13",
"prom-client": "^15.1.0",
"speakeasy": "^2.0.0",
"twilio": "^5.0.3",
"web-push": "^3.6.7"
},
"devDependencies": {
Expand Down
37 changes: 32 additions & 5 deletions backend/notif-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,30 @@ import { WorkerAdmin } from "./kafka/admin";
import { ConsumerWorker } from "./kafka/consumer";
import { ProducerWorker } from "./kafka/producer";
import Prometheus from "prom-client";
import { TopicTypes } from "@paybox/common";
import twilio from 'twilio';
import nodemailer from 'nodemailer';
import { GMAIL, GMAIL_APP_PASS, MAIL_SERVICE, TWILLO_ACCOUNT_SID, TWILLO_TOKEN } from "./config"
import { RedisBase } from "@paybox/backend-common";


export const kafka = new Kafka({
clientId: KAFKA_ID,
brokers: [KAFKA_URL],
});

console.log(TWILLO_TOKEN, TWILLO_ACCOUNT_SID);
export const twillo = twilio(TWILLO_ACCOUNT_SID, TWILLO_TOKEN);
export const transporter = nodemailer.createTransport({
service: MAIL_SERVICE,
port: 465,
secure: true,
auth: {
user: GMAIL,
pass: GMAIL_APP_PASS
}
});

const defaultMetrics = Prometheus.collectDefaultMetrics;
defaultMetrics({ register: Prometheus.register, });

Expand Down Expand Up @@ -49,27 +66,37 @@ if (cluster.isPrimary) {
console.error('Error while fetching metrics:', error);
return res.status(500).end('Error while fetching metrics');
}
});
});

(async () => {
await WorkerAdmin.getInstance().init([
{ topicName: "notif", partitions: 1 },
{ topicName: TopicTypes.Notif, partitions: 1 },
]);

// This can be connected in any service/s
await ProducerWorker.getInstance().connectProducer();

})();

app.listen(PORT, async () => {
console.log(`Server listening on port: ${PORT}\n`);
Promise.all([
new Promise((resolve) => {
RedisBase.getInstance().getclient.on('ready', resolve);
}),
]).then(() => {
app.listen(PORT, async () => {
console.log(`Server listening on port: ${PORT}\n`);
});
}).catch((error) => {
console.error('Error while connecting producers:', error);
});


} else {
(async () => {
try {
await ConsumerWorker.getInstance().connectCounsumer(
"notif-group",
["notif"],
[TopicTypes.Notif, TopicTypes.Msg],
true
);
await ConsumerWorker.getInstance().runConsumer();
Expand Down
112 changes: 74 additions & 38 deletions backend/notif-worker/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Consumer } from "kafkajs";
import { kafka } from "..";
import { NotifTopics } from "@paybox/common";
import {notifyFriendRequest, notifyFriendRequestAccepted, notifyFriendRequestRejected, notifyPaid, notifyReceiveTxn} from "../processes";
import { MsgTopics, NotifTopics, TopicTypes } from "@paybox/common";
import {notifyFriendRequest, notifyFriendRequestAccepted, notifyFriendRequestRejected, notifyPaid, notifyReceiveTxn, otpSendProcess, resendOtpProcess} from "../processes";

export class ConsumerWorker {
private consumer!: Consumer;
Expand Down Expand Up @@ -38,45 +38,81 @@ export class ConsumerWorker {
async runConsumer() {
await this.consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
const payload = JSON.parse(message.value?.toString() || "");

switch (payload.type) {
case NotifTopics.FriendRequest:
await notifyFriendRequest(payload.to, payload.from)
console.log("Friend Request Notification");
break;

case NotifTopics.FriendRequestAccepted:
await notifyFriendRequestAccepted(payload.to, payload.from, payload.friendshipId)
console.log("Friend Request Accepted");
break;

case NotifTopics.FriendRequestRejected:
await notifyFriendRequestRejected(payload.to, payload.from, payload.friendshipId)
console.log("Friend Request Rejected");
break;

case NotifTopics.TxnAccept:
await notifyReceiveTxn(payload.to, payload.from, payload.txnId)
console.log("Transaction Accepted");
break;

// case NotifTopics.TxnReject:
// //Todo: NOTIFY THE TRANSACTION REJECTED
// console.log("Transaction Rejected");
// break;
try {

const payload = JSON.parse(message.value?.toString() || "");

case NotifTopics.Paid:
await notifyPaid(payload.to, payload.from, payload.txnId)
console.log("Transaction Paid");
break;
switch(topic) {
case TopicTypes.Notif:
switch (payload.type) {
case NotifTopics.FriendRequest:
await notifyFriendRequest(payload.to, payload.from)
console.log("Friend Request Notification");
break;

case NotifTopics.FriendRequestAccepted:
await notifyFriendRequestAccepted(payload.to, payload.from, payload.friendshipId)
console.log("Friend Request Accepted");
break;

case NotifTopics.FriendRequestRejected:
await notifyFriendRequestRejected(payload.to, payload.from, payload.friendshipId)
console.log("Friend Request Rejected");
break;

case NotifTopics.TxnAccept:
await notifyReceiveTxn(payload.to, payload.from, payload.txnId)
console.log("Transaction Accepted");
break;

// case NotifTopics.TxnReject:
// //Todo: NOTIFY THE TRANSACTION REJECTED
// console.log("Transaction Rejected");
// break;

case NotifTopics.Paid:
await notifyPaid(payload.to, payload.from, payload.txnId)
console.log("Transaction Paid");
break;

default:
console.log(`No handler in topic: ${topic} for type: ${payload.type}`)
break;
}
break;

case TopicTypes.Msg:
switch (payload.type) {
case MsgTopics.SendOtp:
await otpSendProcess(payload.name, payload.mobile, payload.email, payload.clientId)
console.log("OTP Sent");
break;

default:
console.log(`No handler for topic: ${topic}`)
break;
case MsgTopics.ResendOtp:
await resendOtpProcess(payload.name, payload.mobile, payload.email, payload.clientId)
console.log("OTP Sent");
break;

default:
console.log(`No handler in topic: ${topic} for type: ${payload.type}`)
break;
}
break;

default:
console.log(`No handler for topic: ${topic}`)
break;
}

await heartbeat();
} catch (error) {
if(error) {
console.log(error);
const resume = pause();
setTimeout(resume, 1000)
}
throw error;
}

await heartbeat();
},
});
}
Expand Down

0 comments on commit 4d0a55c

Please sign in to comment.