Skip to content

Commit

Permalink
Merge pull request #53 from Praashh/feat-event
Browse files Browse the repository at this point in the history
[feat]: pub sub integration using websocket and redis, refactored queue and worker using redis package, removed bullmq
  • Loading branch information
Praashh authored Oct 20, 2024
2 parents dd4179e + dddb50f commit 4ae55b1
Show file tree
Hide file tree
Showing 22 changed files with 1,056 additions and 579 deletions.
5 changes: 4 additions & 1 deletion apps/server/src/controllers/order/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { addToOrderQueue } from "@repo/order-queue";
import { AsyncWrapper } from "../../utils/asynCatch";
import { generateOrderId } from "../../utils/utils";
import { SuccessResponse } from "../../utils/wrappers/success.res";

import { EOrderType } from "@opinix/types";
import { Request } from "express";
import { RedisManager } from "@repo/order-queue";

let redisClient = RedisManager.getInstance();
type TPlaceOrder = {
event_id: number;
l1_expected_price: number;
Expand All @@ -26,6 +28,7 @@ export const placeHandler = AsyncWrapper(
},
};
await addToOrderQueue(order);
redisClient.publishMessage("123", order);
let response = new SuccessResponse("Order placed successfully", 201);
return res.status(201).json(response);
}
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/utils/asynCatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type AsyncHandler<T = unknown> = (
export const AsyncWrapper = <T>(handler: AsyncHandler<T>) => {
return (req: Request, res: Response, next: NextFunction) => {
handler(req, res, next).catch((error: unknown) => {
console.log(error);
const standardizedError = standardizeApiError(error);
res.status(standardizedError.code).json(standardizedError);
});
Expand Down
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions packages/order-queue/logs/server.log
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,42 @@
[ 2024-10-19 20:13:01 ] - info - WORKER | Starting order worker
[ 2024-10-19 20:13:01 ] - info - SERVER | REDIS: Connected to Redis
[ 2024-10-19 20:13:01 ] - info - SERVER | REDIS: Redis connection is ready to start execution
[ 2024-10-20 19:19:20 ] - info - WORKER | Starting order worker
[ 2024-10-20 19:19:20 ] - info - SERVER | REDIS: Connected to Redis
[ 2024-10-20 19:19:20 ] - info - SERVER | REDIS: Redis connection is ready to start execution
[ 2024-10-20 19:58:11 ] - info - WORKER | Starting order worker
[ 2024-10-20 19:58:11 ] - info - SERVER | REDIS: Connected to Redis
[ 2024-10-20 19:58:11 ] - info - SERVER | REDIS: Redis connection is ready to start execution
[ 2024-10-20 19:59:33 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:05:59 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:06:39 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:10:15 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:10:35 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:14:47 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:18:12 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:21:18 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:28:39 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:29:26 ] - info - WORKER | Starting order worker
[ 2024-10-20 20:40:55 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:37:27 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:40:21 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:42:45 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:43:22 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:44:32 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:46:58 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:52:11 ] - info - WORKER | Starting order worker
[ 2024-10-20 21:54:32 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:06:49 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:14:03 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:41:24 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:49:07 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:50:32 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:53:18 ] - info - WORKER | Starting order worker
[ 2024-10-20 22:59:19 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:00:43 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:17:30 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:25:15 ] - info - Processing order: {"7462230":{"event_id":9282,"l1_expected_price":0.5,"l1_order_quantity":1,"offer_type":"buy"}}
[ 2024-10-20 23:47:30 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:48:10 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:52:39 ] - info - WORKER | Starting order worker
[ 2024-10-20 23:55:48 ] - info - WORKER | Starting order worker
48 changes: 26 additions & 22 deletions packages/order-queue/src/classes/RedisManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,34 @@ import { RedisClientType, createClient } from "redis";
import { DbMessage, MessageToApi, WsMessage } from "@opinix/types";

export class RedisManager {
private client: RedisClientType;
private static instance: RedisManager;
private client: RedisClientType;
private static instance: RedisManager;

constructor() {
this.client = createClient();
this.client.connect();
}
constructor() {
this.client = createClient();
this.client.connect();
}

public static getInstance() {
if (!this.instance) {
this.instance = new RedisManager();
}
return this.instance;
}

public pushMessage(message: DbMessage) {
this.client.lPush("db_processor", JSON.stringify(message));
}
public getClient(): RedisClientType {
return this.client;
}

public publishMessage(channel: string, message: WsMessage) {
this.client.publish(channel, JSON.stringify(message));
public static getInstance() {
if (!this.instance) {
this.instance = new RedisManager();
}
return this.instance;
}

public sendToApi(clientId: string, message: MessageToApi) {
this.client.publish(clientId, JSON.stringify(message));
}
}
public pushMessage(message: DbMessage) {
this.client.lPush("db_processor", JSON.stringify(message));
}

public publishMessage(channel: string, message: WsMessage) {
this.client.publish(channel, JSON.stringify(message));
}

public sendToApi(clientId: string, message: MessageToApi) {
this.client.publish(clientId, JSON.stringify(message));
}
}
32 changes: 32 additions & 0 deletions packages/order-queue/src/classes/SubscriberManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { RedisClientType, createClient } from "redis";
import { DbMessage, MessageToApi, WsMessage } from "@opinix/types";

export class SubscriberManager {
private client: RedisClientType;
private static instance: SubscriberManager;

constructor() {
this.client = createClient();
this.client.connect();
}

public static getInstance() {
if (!this.instance) {
this.instance = new SubscriberManager();
}
return this.instance;
}

public subscribeToChannel(
channel: string,
callback: (event: string, message: string) => void
) {
this.client.subscribe(channel, (message: string, channel: string) => {
callback(channel, message);
});
}

public unsubscribeFromChannel(channel: string) {
this.client.unsubscribe(channel);
}
}
38 changes: 0 additions & 38 deletions packages/order-queue/src/config/redisClient.ts

This file was deleted.

7 changes: 4 additions & 3 deletions packages/order-queue/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { addToOrderQueue } from "./queues/orderQueue";
import { createClient } from "redis";
import orderWorker from "./queues/orderProcessor";
import { processOrderQueue } from "./queues/orderQueue";
import { logger } from "@opinix/logger";
import { RedisManager } from "./classes/RedisManager";
import { SubscriberManager } from "./classes/SubscriberManager";
const startWorker = async () => {
logger.info("WORKER | Starting order worker");
orderWorker;
processOrderQueue;
};

startWorker();
export { addToOrderQueue, RedisManager, createClient };
export { addToOrderQueue, RedisManager, createClient, SubscriberManager };
21 changes: 0 additions & 21 deletions packages/order-queue/src/queues/orderProcessor.ts

This file was deleted.

42 changes: 29 additions & 13 deletions packages/order-queue/src/queues/orderQueue.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import { Queue } from "bullmq";
import getRedisClient from "../config/redisClient";
let redisClient = getRedisClient();
export const orderQueue = new Queue("orderQueue", {
connection: redisClient,
});
import { logger } from "@opinix/logger";
import { RedisClientType, createClient } from "redis";
import { RedisManager } from "../classes/RedisManager";

let redisClient = RedisManager.getInstance().getClient();

const QUEUE_NAME = "ORDER_QUEUE";

export const addToOrderQueue = async (order: object) => {
await orderQueue.add("order", order, {
attempts: 3,
backoff: {
type: "exponential",
delay: 5000,
},
});
try {
await redisClient.lPush(QUEUE_NAME, JSON.stringify(order));
logger.info(`Order added to queue: ${JSON.stringify(order)}`);
} catch (err) {
if (err instanceof Error)
logger.error(`Error adding order to queue: ${err.message}`);
}
};

export const processOrderQueue = async () => {
while (true) {
try {
const order = await redisClient.lPop(QUEUE_NAME);
if (order) {
logger.info(`Processing order: ${JSON.stringify(order)}`);
}
} catch (err) {
if (err instanceof Error) {
logger.error(`Error processing order: ${err.message}`);
}
}
}
};
Loading

0 comments on commit 4ae55b1

Please sign in to comment.