From 37249dcd4968640c27df7d578d618c655b118fa8 Mon Sep 17 00:00:00 2001 From: Louis Murerwa Date: Mon, 29 Apr 2024 11:12:45 -0700 Subject: [PATCH 1/3] Add Rate Limiting Architecture --- .../apps/google-drive/src/initialize/index | 3 + .../src/services/__tests__/rate-limiter | 66 +++++++++++++++++++ packages/ocular/src/services/rate-limiter.ts | 46 +++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 packages/apps/google-drive/src/initialize/index create mode 100644 packages/ocular/src/services/__tests__/rate-limiter create mode 100644 packages/ocular/src/services/rate-limiter.ts diff --git a/packages/apps/google-drive/src/initialize/index b/packages/apps/google-drive/src/initialize/index new file mode 100644 index 00000000..23e890d9 --- /dev/null +++ b/packages/apps/google-drive/src/initialize/index @@ -0,0 +1,3 @@ +// Resolve Rate Limiter Service Here. +// Register Rate Limiter For This GoogleDrive Application +// In Service Get Rate Limiter And Use It To Call External Api's \ No newline at end of file diff --git a/packages/ocular/src/services/__tests__/rate-limiter b/packages/ocular/src/services/__tests__/rate-limiter new file mode 100644 index 00000000..fb4942f6 --- /dev/null +++ b/packages/ocular/src/services/__tests__/rate-limiter @@ -0,0 +1,66 @@ +import { IQueueService } from '@ocular/types' +import QueueService from '../queue' +const { RateLimiter } = require('kafkajs') + +jest.useFakeTimers() +jest.setTimeout(1000000); + +const loggerMock = { + info: jest.fn().mockReturnValue(console.log), + warn: jest.fn().mockReturnValue(console.log), + error: jest.fn().mockReturnValue(console.log), + panic: jest.fn().mockReturnValue(console.log), + shouldLog: jest.fn().mockReturnValue(console.log), + setLogLevel: jest.fn().mockReturnValue(console.log), + unsetLogLevel: jest.fn().mockReturnValue(console.log), + activity: jest.fn().mockReturnValue(console.log), + progress: jest.fn().mockReturnValue(console.log), + failure: jest.fn().mockReturnValue(console.log), + success: jest.fn().mockReturnValue(console.log), + debug: jest.fn().mockReturnValue(console.log), + log: jest.fn().mockReturnValue(console.log), +} +// This test assumes that a Kafka service is running in Docker localhost:9092. +// To run the service, check the instructions in the Developer.md file. +describe('queueService', () => { + let queueService: IQueueService; + beforeEach( async () => { + try{ + // Connect To Real Kafka in Docker Container + const kafka = await new Kafka({ + clientId: 'ocular', + brokers: ['localhost:9092'], + }) + + const moduleDeps = { + logger: loggerMock, + eventBusRedisConnection: {}, + kafkaClient: kafka, + } + queueService= new QueueService(moduleDeps); + } catch (error) { + console.log('Error connecting to Kafka in Docker', error) + } + + }); + + it('should send a message to a topic', (done) => { + queueService.send("ocular", {message: "Hello World"}); + queueService.subscribe("ocular", async (message, topic) => { + console.log("Message Received", message) + expect(message).toEqual("expectedMessage"); + done() + }, {groupId: "ocular-group"}); + }); + + + it('should send batch messages to a topic', (done) => { + queueService.sendBatch("ocular", [{message: "Hello World 1"}, {message: "Hello World 2"}, {message: "Hello World 3"}, {message: "Hello World 4"}]); + queueService.subscribe("ocular", async (message, topic) => { + console.log("Message Received", message) + expect(message).toEqual("expectedMessage"); + done() + }, {groupId: "ocular-group"}); + }); +} +) \ No newline at end of file diff --git a/packages/ocular/src/services/rate-limiter.ts b/packages/ocular/src/services/rate-limiter.ts new file mode 100644 index 00000000..675e2435 --- /dev/null +++ b/packages/ocular/src/services/rate-limiter.ts @@ -0,0 +1,46 @@ +import Redis from "ioredis" +import { TransactionBaseService } from "@ocular/types" + +type RateLimiterServiceProps = { + redisClient: Redis +} + +/** + * Stores Rate Limiters For Apps In Ocular + */ +class RateLimiterService extends TransactionBaseService { + protected apiToRateLimiterMap_: Map = new Map() + protected redisClient_: Redis + + constructor({ redisClient }: RateLimiterServiceProps) { + // eslint-disable-next-line prefer-rest-params + super(arguments[0]) + this.redisClient_ = redisClient + } + + protected storeRateLimiters({ + apiName, + limiter + }: { + apiName: string + limiter: RateLimiterRedis + }) { + const existingLimiters = this.apiToRateLimiterMap_.get(apiName) ?? [] + if (existingLimiters) { + throw Error(`Limiter with ${apiName} already exists`) + } + this.apiToRateLimiterMap_.set(apiName, limiter) + } + + async register(apiName:string, opts?:RateLimiterOpts): Promise{ + const rateLimiter= new RateLimiterRedis({keyPrefix: apiName, ...opts}) + // Check Connection + this.storeRateLimiters(apiName,rateLimiter) + } + + async retrieve(apiName: string): Promise { + return this.apiToRateLimiterMap_.get(apiName) + } +} + +export default RateLimiterService \ No newline at end of file From dd173f293447c813cbd14de5f55fda71d410d301 Mon Sep 17 00:00:00 2001 From: Louis Murerwa Date: Mon, 29 Apr 2024 16:49:54 -0700 Subject: [PATCH 2/3] Add Rate Limiter Architecture To Ocular --- package-lock.json | 13 +++- packages/ocular/.gitignore | 3 +- packages/ocular/package.json | 2 + .../services/__tests__/{queue.ts => queue} | 0 .../src/services/__tests__/rate-limiter | 66 ------------------- .../src/services/__tests__/rate-limiter.ts | 58 ++++++++++++++++ packages/ocular/src/services/rate-limiter.ts | 36 +++++----- packages/ocular/src/types/index.ts | 3 +- packages/ocular/src/types/rate-limiter.ts | 4 ++ 9 files changed, 99 insertions(+), 86 deletions(-) rename packages/ocular/src/services/__tests__/{queue.ts => queue} (100%) delete mode 100644 packages/ocular/src/services/__tests__/rate-limiter create mode 100644 packages/ocular/src/services/__tests__/rate-limiter.ts create mode 100644 packages/ocular/src/types/rate-limiter.ts diff --git a/package-lock.json b/package-lock.json index 2fc06ad3..3f0ac751 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8368,9 +8368,9 @@ } }, "node_modules/ioredis": { - "version": "5.3.2", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", - "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz", + "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==", "dependencies": { "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", @@ -14274,6 +14274,11 @@ "node": ">= 0.6" } }, + "node_modules/rate-limiter-flexible": { + "version": "5.0.3", + "resolved": "https://registry.npmjs.org/rate-limiter-flexible/-/rate-limiter-flexible-5.0.3.tgz", + "integrity": "sha512-lWx2y8NBVlTOLPyqs+6y7dxfEpT6YFqKy3MzWbCy95sTTOhOuxufP2QvRyOHpfXpB9OUJPbVLybw3z3AVAS5fA==" + }, "node_modules/raw-body": { "version": "2.5.2", "resolved": "https://registry.npmjs.org/raw-body/-/raw-body-2.5.2.tgz", @@ -16689,6 +16694,7 @@ "express-session": "^1.18.0", "fast-glob": "^3.3.2", "fs-exists-cached": "^1.0.0", + "ioredis": "^5.4.1", "ioredis-mock": "^8.9.0", "kafka-js": "^0.0.0", "kafkajs": "^2.2.4", @@ -16698,6 +16704,7 @@ "passport-custom": "^1.1.1", "passport-jwt": "^4.0.1", "passport-local": "^1.0.0", + "rate-limiter-flexible": "^5.0.3", "typeorm": "^0.3.20" }, "devDependencies": { diff --git a/packages/ocular/.gitignore b/packages/ocular/.gitignore index b16ffd5a..971d3aed 100644 --- a/packages/ocular/.gitignore +++ b/packages/ocular/.gitignore @@ -1,4 +1,5 @@ env.local .env.dev .env.local -.env \ No newline at end of file +.env +*.DS_Store \ No newline at end of file diff --git a/packages/ocular/package.json b/packages/ocular/package.json index fad0f8ce..0e19f917 100644 --- a/packages/ocular/package.json +++ b/packages/ocular/package.json @@ -39,6 +39,7 @@ "express-session": "^1.18.0", "fast-glob": "^3.3.2", "fs-exists-cached": "^1.0.0", + "ioredis": "^5.4.1", "ioredis-mock": "^8.9.0", "kafka-js": "^0.0.0", "kafkajs": "^2.2.4", @@ -48,6 +49,7 @@ "passport-custom": "^1.1.1", "passport-jwt": "^4.0.1", "passport-local": "^1.0.0", + "rate-limiter-flexible": "^5.0.3", "typeorm": "^0.3.20" } } diff --git a/packages/ocular/src/services/__tests__/queue.ts b/packages/ocular/src/services/__tests__/queue similarity index 100% rename from packages/ocular/src/services/__tests__/queue.ts rename to packages/ocular/src/services/__tests__/queue diff --git a/packages/ocular/src/services/__tests__/rate-limiter b/packages/ocular/src/services/__tests__/rate-limiter deleted file mode 100644 index fb4942f6..00000000 --- a/packages/ocular/src/services/__tests__/rate-limiter +++ /dev/null @@ -1,66 +0,0 @@ -import { IQueueService } from '@ocular/types' -import QueueService from '../queue' -const { RateLimiter } = require('kafkajs') - -jest.useFakeTimers() -jest.setTimeout(1000000); - -const loggerMock = { - info: jest.fn().mockReturnValue(console.log), - warn: jest.fn().mockReturnValue(console.log), - error: jest.fn().mockReturnValue(console.log), - panic: jest.fn().mockReturnValue(console.log), - shouldLog: jest.fn().mockReturnValue(console.log), - setLogLevel: jest.fn().mockReturnValue(console.log), - unsetLogLevel: jest.fn().mockReturnValue(console.log), - activity: jest.fn().mockReturnValue(console.log), - progress: jest.fn().mockReturnValue(console.log), - failure: jest.fn().mockReturnValue(console.log), - success: jest.fn().mockReturnValue(console.log), - debug: jest.fn().mockReturnValue(console.log), - log: jest.fn().mockReturnValue(console.log), -} -// This test assumes that a Kafka service is running in Docker localhost:9092. -// To run the service, check the instructions in the Developer.md file. -describe('queueService', () => { - let queueService: IQueueService; - beforeEach( async () => { - try{ - // Connect To Real Kafka in Docker Container - const kafka = await new Kafka({ - clientId: 'ocular', - brokers: ['localhost:9092'], - }) - - const moduleDeps = { - logger: loggerMock, - eventBusRedisConnection: {}, - kafkaClient: kafka, - } - queueService= new QueueService(moduleDeps); - } catch (error) { - console.log('Error connecting to Kafka in Docker', error) - } - - }); - - it('should send a message to a topic', (done) => { - queueService.send("ocular", {message: "Hello World"}); - queueService.subscribe("ocular", async (message, topic) => { - console.log("Message Received", message) - expect(message).toEqual("expectedMessage"); - done() - }, {groupId: "ocular-group"}); - }); - - - it('should send batch messages to a topic', (done) => { - queueService.sendBatch("ocular", [{message: "Hello World 1"}, {message: "Hello World 2"}, {message: "Hello World 3"}, {message: "Hello World 4"}]); - queueService.subscribe("ocular", async (message, topic) => { - console.log("Message Received", message) - expect(message).toEqual("expectedMessage"); - done() - }, {groupId: "ocular-group"}); - }); -} -) \ No newline at end of file diff --git a/packages/ocular/src/services/__tests__/rate-limiter.ts b/packages/ocular/src/services/__tests__/rate-limiter.ts new file mode 100644 index 00000000..662109a1 --- /dev/null +++ b/packages/ocular/src/services/__tests__/rate-limiter.ts @@ -0,0 +1,58 @@ +import { after } from 'node:test'; +import RateLimiterService from '../rate-limiter' +import Redis from 'ioredis'; + +describe('queueService', () => { + let rateLimiterService: RateLimiterService; + let redis: Redis; + beforeAll(async () => { + try{ + redis = new Redis("redis://localhost:6379", { + // Lazy connect to properly handle connection errors + lazyConnect: true, + maxRetriesPerRequest: null, // Add this line + }) + + try { + await redis.connect() + } catch (err) { + console.log(`An error occurred while connecting to Redis:${err}`) + } + + + const moduleDeps = { + redisClient: redis, + } + rateLimiterService = new RateLimiterService(moduleDeps); + } catch (error) { + console.log('Error Instantiating The Rate Limiter Service', error) + } + }); + + afterAll(async () => { + await redis.disconnect(); + }) + + it('it should rate limit an api', async () => { + // Register a rate limiter for an ocular api to allow 5 requests per second + await rateLimiterService.register("ocular", 5, 1); + const requestQueue = await rateLimiterService.getRequestQueue("ocular"); + expect(requestQueue).toBeDefined(); + + // Consume 14 tokens from the rate limiter queue at a rate of 5 tokens per second + const promises: Promise[] = []; + for (let i = 0; i < 14; i++) { + const promise = requestQueue.removeTokens(1,"ocular") + .then((rateLimiterRes) => { + console.log('Rate Limit Remaining', rateLimiterRes) + }).catch((error) => { + console.log('Error Consuming Rate Limit', error) + }); + promises.push(promise); + } + + // Wait for all promises to complete + await Promise.all(promises) + }); + } +) \ No newline at end of file diff --git a/packages/ocular/src/services/rate-limiter.ts b/packages/ocular/src/services/rate-limiter.ts index 675e2435..802b7254 100644 --- a/packages/ocular/src/services/rate-limiter.ts +++ b/packages/ocular/src/services/rate-limiter.ts @@ -1,5 +1,8 @@ import Redis from "ioredis" +import { RateLimiterRedis, RateLimiterQueue } from "rate-limiter-flexible" +import { RateLimiterOpts } from "../types" import { TransactionBaseService } from "@ocular/types" +import { AutoflowAiError } from "@ocular/utils" type RateLimiterServiceProps = { redisClient: Redis @@ -9,36 +12,39 @@ type RateLimiterServiceProps = { * Stores Rate Limiters For Apps In Ocular */ class RateLimiterService extends TransactionBaseService { - protected apiToRateLimiterMap_: Map = new Map() + protected apiToRateLimiterMap_: Map = new Map() protected redisClient_: Redis constructor({ redisClient }: RateLimiterServiceProps) { - // eslint-disable-next-line prefer-rest-params super(arguments[0]) this.redisClient_ = redisClient } - protected storeRateLimiters({ + protected storeRateLimiterQueues({ apiName, - limiter + limiterQueue, }: { apiName: string - limiter: RateLimiterRedis + limiterQueue: RateLimiterQueue }) { - const existingLimiters = this.apiToRateLimiterMap_.get(apiName) ?? [] - if (existingLimiters) { - throw Error(`Limiter with ${apiName} already exists`) - } - this.apiToRateLimiterMap_.set(apiName, limiter) + this.apiToRateLimiterMap_.set(apiName, limiterQueue) } + - async register(apiName:string, opts?:RateLimiterOpts): Promise{ - const rateLimiter= new RateLimiterRedis({keyPrefix: apiName, ...opts}) - // Check Connection - this.storeRateLimiters(apiName,rateLimiter) + async register(apiName:string , points: number, duration: number): Promise { + try { + const rateLimiter = new RateLimiterRedis({storeClient: this.redisClient_, keyPrefix: apiName, points: points, duration: duration}) + const limiterQueue = new RateLimiterQueue(rateLimiter); + this.storeRateLimiterQueues({apiName, limiterQueue}) + } catch(e) { + throw new AutoflowAiError ( + AutoflowAiError.Types.INVALID_DATA, + `Failed to rate limiter for ${apiName} with ${e}` + ) + } } - async retrieve(apiName: string): Promise { + async getRequestQueue(apiName: string): Promise { return this.apiToRateLimiterMap_.get(apiName) } } diff --git a/packages/ocular/src/types/index.ts b/packages/ocular/src/types/index.ts index b00a1261..75a97b6c 100644 --- a/packages/ocular/src/types/index.ts +++ b/packages/ocular/src/types/index.ts @@ -3,4 +3,5 @@ export * from "./config-module" export * from "./chat" export * from "../../../types/src/logger/logger" export * from "./message" -export * from "./search" \ No newline at end of file +export * from "./search" +export * from "./rate-limiter" \ No newline at end of file diff --git a/packages/ocular/src/types/rate-limiter.ts b/packages/ocular/src/types/rate-limiter.ts new file mode 100644 index 00000000..3bd3a8b8 --- /dev/null +++ b/packages/ocular/src/types/rate-limiter.ts @@ -0,0 +1,4 @@ +export interface RateLimiterOpts{ + points: number // Number of points + duration: number, // Per second(s) +} \ No newline at end of file From 3a0c5c7b6f95400ff6c0ed774cbcca0f12fa9e8f Mon Sep 17 00:00:00 2001 From: Louis Murerwa Date: Mon, 29 Apr 2024 16:53:00 -0700 Subject: [PATCH 3/3] Remove Extra File --- packages/apps/google-drive/src/initialize/index | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 packages/apps/google-drive/src/initialize/index diff --git a/packages/apps/google-drive/src/initialize/index b/packages/apps/google-drive/src/initialize/index deleted file mode 100644 index 23e890d9..00000000 --- a/packages/apps/google-drive/src/initialize/index +++ /dev/null @@ -1,3 +0,0 @@ -// Resolve Rate Limiter Service Here. -// Register Rate Limiter For This GoogleDrive Application -// In Service Get Rate Limiter And Use It To Call External Api's \ No newline at end of file