Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Add Rate limiter Architecture To Ocular #62

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/ocular/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
env.local
.env.dev
.env.local
.env
.env
*.DS_Store
2 changes: 2 additions & 0 deletions packages/ocular/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"express-session": "^1.18.0",
"fast-glob": "^3.3.2",
"fs-exists-cached": "^1.0.0",
"ioredis": "^5.4.1",
"ioredis-mock": "^8.9.0",
"kafka-js": "^0.0.0",
"kafkajs": "^2.2.4",
Expand All @@ -48,6 +49,7 @@
"passport-custom": "^1.1.1",
"passport-jwt": "^4.0.1",
"passport-local": "^1.0.0",
"rate-limiter-flexible": "^5.0.3",
"typeorm": "^0.3.20"
}
}
58 changes: 58 additions & 0 deletions packages/ocular/src/services/__tests__/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { after } from 'node:test';
import RateLimiterService from '../rate-limiter'
import Redis from 'ioredis';

describe('queueService', () => {
let rateLimiterService: RateLimiterService;
let redis: Redis;
beforeAll(async () => {
try{
redis = new Redis("redis://localhost:6379", {
// Lazy connect to properly handle connection errors
lazyConnect: true,
maxRetriesPerRequest: null, // Add this line
})

try {
await redis.connect()
} catch (err) {
console.log(`An error occurred while connecting to Redis:${err}`)
}


const moduleDeps = {
redisClient: redis,
}
rateLimiterService = new RateLimiterService(moduleDeps);
} catch (error) {
console.log('Error Instantiating The Rate Limiter Service', error)
}
});

afterAll(async () => {
await redis.disconnect();
})

it('it should rate limit an api', async () => {
// Register a rate limiter for an ocular api to allow 5 requests per second
await rateLimiterService.register("ocular", 5, 1);
const requestQueue = await rateLimiterService.getRequestQueue("ocular");
expect(requestQueue).toBeDefined();

// Consume 14 tokens from the rate limiter queue at a rate of 5 tokens per second
const promises: Promise<void>[] = [];
for (let i = 0; i < 14; i++) {
const promise = requestQueue.removeTokens(1,"ocular")
.then((rateLimiterRes) => {
console.log('Rate Limit Remaining', rateLimiterRes)
}).catch((error) => {
console.log('Error Consuming Rate Limit', error)
});
promises.push(promise);
}

// Wait for all promises to complete
await Promise.all(promises)
});
}
)
52 changes: 52 additions & 0 deletions packages/ocular/src/services/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import Redis from "ioredis"
import { RateLimiterRedis, RateLimiterQueue } from "rate-limiter-flexible"
import { RateLimiterOpts } from "../types"
import { TransactionBaseService } from "@ocular/types"
import { AutoflowAiError } from "@ocular/utils"

type RateLimiterServiceProps = {
redisClient: Redis
}

/**
* Stores Rate Limiters For Apps In Ocular
*/
class RateLimiterService extends TransactionBaseService {
protected apiToRateLimiterMap_: Map <string, RateLimiterQueue> = new Map()
protected redisClient_: Redis

constructor({ redisClient }: RateLimiterServiceProps) {
super(arguments[0])
this.redisClient_ = redisClient
}

protected storeRateLimiterQueues({
apiName,
limiterQueue,
}: {
apiName: string
limiterQueue: RateLimiterQueue
}) {
this.apiToRateLimiterMap_.set(apiName, limiterQueue)
}


async register(apiName:string , points: number, duration: number): Promise<void> {
try {
const rateLimiter = new RateLimiterRedis({storeClient: this.redisClient_, keyPrefix: apiName, points: points, duration: duration})
const limiterQueue = new RateLimiterQueue(rateLimiter);
this.storeRateLimiterQueues({apiName, limiterQueue})
} catch(e) {
throw new AutoflowAiError (
AutoflowAiError.Types.INVALID_DATA,
`Failed to rate limiter for ${apiName} with ${e}`
)
}
}

async getRequestQueue(apiName: string): Promise<RateLimiterQueue> {
return this.apiToRateLimiterMap_.get(apiName)
}
}

export default RateLimiterService
3 changes: 2 additions & 1 deletion packages/ocular/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from "./config-module"
export * from "./chat"
export * from "../../../types/src/logger/logger"
export * from "./message"
export * from "./search"
export * from "./search"
export * from "./rate-limiter"
4 changes: 4 additions & 0 deletions packages/ocular/src/types/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface RateLimiterOpts{
points: number // Number of points
duration: number, // Per second(s)
}