Skip to content

Commit

Permalink
✨ Rate Limit
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Oct 1, 2024
1 parent ad5afc9 commit dd4d011
Show file tree
Hide file tree
Showing 187 changed files with 2,031 additions and 3,119 deletions.
8 changes: 8 additions & 0 deletions packages/api/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ CREATE TABLE webhooks_payloads
CONSTRAINT PK_webhooks_payload PRIMARY KEY ( id_webhooks_payload )
);

-- ************************************** rate_limit_state
CREATE TABLE rate_limit_state (
id_rate_limit_state uuid NOT NULL,
id_connection NOT NULL,
last_request_timestamp TIMESTAMP NOT NULL,
request_count INTEGER NOT NULL
CONSTRAINT PK_rate_limit_state PRIMARY KEY ( id_rate_limit_state )
);

-- ************************************** webhook_endpoints
CREATE TABLE webhook_endpoints
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/@core-services/queues/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { Queues } from './types';
{
name: Queues.RAG_DOCUMENT_PROCESSING,
},
{
name: Queues.RATE_LIMIT_FAILED_JOBS,
},
),
],
providers: [BullQueueService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class BullQueueService {
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
@InjectQueue(Queues.RATE_LIMIT_FAILED_JOBS)
private rlFailedJobsQueue: Queue,
) {}

// getters
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getRlFailedJobsQueue() {
return this.rlFailedJobsQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
RATE_LIMIT_FAILED_JOBS = 'RATE_LIMIT_FAILED_JOBS',
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { Injectable } from '@nestjs/common';
import { CoreSyncRegistry } from '../registries/core-sync.registry';
import { CoreUnification } from './core-unification.service';
import { v4 as uuidv4 } from 'uuid';
import { PrismaService } from '../prisma/prisma.service';
import { ConnectionUtils } from '@@core/connections/@utils';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { RagService } from '@@core/rag/rag.service';
import { FileInfo } from '@@core/rag/types';
import { RateLimitError } from '@@core/rate-limit/error';
import {
ApiResponse,
getFileExtensionFromMimeType,
TargetObject,
} from '@@core/utils/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
import { WebhookService } from '../webhooks/panora-webhooks/webhook.service';
import { ConnectionUtils } from '@@core/connections/@utils';
import { IBaseObjectService, SyncParam } from '@@core/utils/types/interface';
import { FieldMappingService } from '@@core/field-mapping/field-mapping.service';
import { LoggerService } from '../logger/logger.service';
import { RagService } from '@@core/rag/rag.service';
import { FileInfo } from '@@core/rag/types';
import { UnifySourceType } from '@@core/utils/types/unify.output';
import { Injectable } from '@nestjs/common';
import { fs_files as FileStorageFile } from '@prisma/client';
import { v4 as uuidv4 } from 'uuid';
import { LoggerService } from '../logger/logger.service';
import { PrismaService } from '../prisma/prisma.service';
import { BullQueueService } from '../queues/shared.service';
import { CoreSyncRegistry } from '../registries/core-sync.registry';
import { WebhookService } from '../webhooks/panora-webhooks/webhook.service';
import { CoreUnification } from './core-unification.service';

@Injectable()
export class IngestDataService {
Expand All @@ -29,6 +31,7 @@ export class IngestDataService {
private logger: LoggerService,
private fieldMappingService: FieldMappingService,
private ragService: RagService,
private queues: BullQueueService,
) {}

async syncForLinkedUser<T, U, V extends IBaseObjectService>(
Expand Down Expand Up @@ -87,7 +90,7 @@ export class IngestDataService {

// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
connection,
custom_properties: remoteProperties,
};

Expand Down Expand Up @@ -144,6 +147,32 @@ export class IngestDataService {
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
syncError,
);
// handle the case where ratelimit is throwed
if (syncError instanceof RateLimitError) {
this.logger.warn(
`Rate limit exceeded for ${integrationId} ${commonObject}. Retry after ${syncError.retryAfter}ms.`,
);
// You might want to add some logic here to handle the rate limit,
// such as scheduling a retry or notifying the user.
const rlFailedJobsQueue = this.queues.getRlFailedJobsQueue();
await rlFailedJobsQueue.add(
'rate-limit-sync',
{
method: 'syncForLinkedUser',
args: [
integrationId,
linkedUserId,
vertical,
commonObject,
service,
params,
wh_real_time_trigger,
],
},
{ delay: syncError.retryAfter },
);
return;
}
// Optionally, you could create an event to log this error
/*await this.prisma.events.create({
data: {
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/@core/core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import { OrganisationsModule } from './organisations/organisations.module';
import { PassthroughModule } from './passthrough/passthrough.module';
import { ProjectConnectorsModule } from './project-connectors/project-connectors.module';
import { ProjectsModule } from './projects/projects.module';
import { SyncModule } from './sync/sync.module';
import { RagModule } from './rag/rag.module';
import { RateLimitModule } from './rate-limit/rate-limit.module';
import { SyncModule } from './sync/sync.module';

@Module({
imports: [
Expand All @@ -36,6 +37,7 @@ import { RagModule } from './rag/rag.module';
SyncModule,
ProjectConnectorsModule,
BullQueueModule,
RateLimitModule,
RagModule,
],
exports: [
Expand All @@ -55,6 +57,7 @@ import { RagModule } from './rag/rag.module';
SyncModule,
ProjectConnectorsModule,
IngestDataService,
RateLimitModule,
BullQueueModule,
RagModule,
],
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/@core/rate-limit/error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export class RateLimitError extends Error {
constructor(message: string, public retryAfter: number) {
super(message);
this.name = 'RateLimitError';
}
}
33 changes: 33 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Queues } from '@@core/@core-services/queues/types';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor(Queues.RATE_LIMIT_FAILED_JOBS)
export class RateLimitJobProcessor {
constructor(private ingestDataService: IngestDataService) {}

@Process('rate-limit-sync')
async processRateLimitedJob(job: Job<{ method: string; args: any[] }>) {
const { method, args } = job.data;
try {
if (method === 'syncForLinkedUser') {
await this.ingestDataService.syncForLinkedUser(
...(args as Parameters<
typeof this.ingestDataService.syncForLinkedUser
>),
);
}

// Fallback for other methods (if any)
/*const targetInstance = this.moduleRef.get(target, { strict: false });
if (targetInstance && typeof targetInstance[method] === 'function') {
await targetInstance[method](...args);
return;
}*/
} catch (error) {
console.error(`Error processing rate-limited job: ${error.message}`);
throw error;
}
}
}
33 changes: 33 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { RateLimitService } from './rate-limit.service';

export function RateLimit() {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor,
) {
const originalMethod = descriptor.value;

descriptor.value = async function (...args: any[]) {
const rateLimitService: RateLimitService = (this as any).rateLimitService;
const { connection } = args[0];

if (!rateLimitService) {
console.error('RateLimitService not found in the class instance');
return originalMethod.apply(this, args);
}

try {
await rateLimitService.checkRateLimit(
connection.id_connection,
connection.provider_slug,
);
return await originalMethod.apply(this, args);
} catch (error) {
throw error;
}
};

return descriptor;
};
}
9 changes: 9 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { RateLimitJobProcessor } from './rate-limit.consumer';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { WebhookService } from '@@core/@core-services/webhooks/panora-webhooks/webhook.service';

@Module({
providers: [RateLimitJobProcessor, WebhookService, PrismaService],
})
export class RateLimitModule {}
137 changes: 137 additions & 0 deletions packages/api/src/@core/rate-limit/rate-limit.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { Injectable } from '@nestjs/common';
import { RateLimitError } from './error';

interface RateLimitPolicy {
timeWindow: number;
maxRequests: number;
}

@Injectable()
export class RateLimitService {
constructor(private prisma: PrismaService) {}

async checkRateLimit(
connectionId: string,
providerSlug: string,
): Promise<boolean> {
const policies = await this.getRateLimitPolicies(providerSlug);

for (const policy of policies) {
const { timeWindow, maxRequests } = policy;
const windowStart = new Date(Date.now() - timeWindow * 1000);

const requestCount = await this.prisma.events.count({
where: {
id_connection: connectionId,
timestamp: { gte: windowStart },
type: { endsWith: '.pulled' },
},
});

if (requestCount >= maxRequests) {
const retryAfter = await this.getRetryAfter(connectionId);
throw new RateLimitError('Rate limit exceeded', retryAfter);
}
}

return true; // All checks passed
}

private async getRateLimitPolicies(
providerSlug: string,
): Promise<RateLimitPolicy[]> {
const policies: Record<string, RateLimitPolicy[]> = {
hubspot: [
{ timeWindow: 10, maxRequests: 110 }, // 110 calls per 10 seconds
{ timeWindow: 86400, maxRequests: 250000 }, // 250k calls per day
],
};
return policies[providerSlug] || [];
}

async getRetryAfter(connectionId: string): Promise<number> {
const connection = await this.prisma.connections.findUnique({
where: { id_connection: connectionId },
});

if (!connection) {
throw new Error(`Connection not found for id: ${connectionId}`);
}

const policies = await this.getRateLimitPolicies(connection.provider_slug);

if (policies.length === 0) {
return 10000; // 10 seconds default delay if no policies
}

let maxTimeUntilReset = 0;

for (const policy of policies) {
const windowStart = new Date(Date.now() - policy.timeWindow * 1000);
const requestCount = await this.prisma.events.count({
where: {
id_connection: connectionId,
timestamp: { gte: windowStart },
type: { endsWith: '.pulled' },
},
});

if (requestCount >= policy.maxRequests) {
const latestEvent = await this.prisma.events.findFirst({
where: {
id_connection: connectionId,
type: { endsWith: '.pulled' },
},
orderBy: { timestamp: 'desc' },
});

if (latestEvent) {
const timeSinceLastRequest =
Date.now() - latestEvent.timestamp.getTime();
const timeUntilReset =
policy.timeWindow * 1000 - timeSinceLastRequest;
maxTimeUntilReset = Math.max(maxTimeUntilReset, timeUntilReset);
}
}
}

if (maxTimeUntilReset <= 0) {
return 0;
}

const buffer = 1000; // 1 second buffer
return maxTimeUntilReset + buffer;
}

/*async retryWithBackoff(config: any): Promise<AxiosResponse> {
return backOff(
async () => {
try {
const response = await axios(config);
return response;
} catch (error) {
if (error.response && error.response.status === 429) {
const retryAfter = await this.getRetryAfter(config.connectionId);
if (retryAfter) {
await new Promise((resolve) => setTimeout(resolve, retryAfter));
}
throw error; // Rethrow to trigger backoff
}
throw error; // Rethrow non-rate-limit errors
}
},
{
numOfAttempts: 10,
startingDelay: 1000,
timeMultiple: 2,
maxDelay: 60000,
jitter: 'full',
retry: (e: Error, attemptNumber: number) => {
console.log(`Retry attempt ${attemptNumber} due to: ${e.message}`);
return true;
},
},
);
}*/
}
Loading

0 comments on commit dd4d011

Please sign in to comment.