diff --git a/packages/apps/asana/src/loaders/index.ts b/packages/apps/asana/src/loaders/index.ts new file mode 100644 index 00000000..bee33f7b --- /dev/null +++ b/packages/apps/asana/src/loaders/index.ts @@ -0,0 +1,16 @@ +import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types"; +import { RateLimiterService} from "@ocular/ocular"; + +export default async (container, options) => { + try { + // Register Rate Limiter For Google Drive + if (!options.rate_limiter_opts) { + throw new Error("No options provided for rate limiter") + } + const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts + const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService") + await rateLimiterService.register(AppNameDefinitions.ASANA,rateLimiterOpts.requests, rateLimiterOpts.interval); + } catch (err) { + throw(err) + } +} \ No newline at end of file diff --git a/packages/apps/asana/src/services/asana.ts b/packages/apps/asana/src/services/asana.ts index 8d5a28fa..322843b5 100644 --- a/packages/apps/asana/src/services/asana.ts +++ b/packages/apps/asana/src/services/asana.ts @@ -1,21 +1,25 @@ import fs from 'fs'; import axios from 'axios'; import { Readable } from 'stream'; -import { OAuthService, Organisation } from "@ocular/ocular"; +import { OAuthService, Organisation, RateLimiterService } from "@ocular/ocular"; import { IndexableDocument, DocType, TransactionBaseService, Logger, AppNameDefinitions } from "@ocular/types"; import { ConfigModule } from "@ocular/ocular/src/types"; - +import { RateLimiterQueue } from "rate-limiter-flexible" export default class AsanaService extends TransactionBaseService { protected oauthService_: OAuthService; protected logger_: Logger; protected container_: ConfigModule; + protected rateLimiterService_: RateLimiterService; + protected requestQueue_: RateLimiterQueue constructor(container) { super(arguments[0]); this.oauthService_ = container.oauthService; this.logger_ = container.logger; this.container_ = container; + this.rateLimiterService_ = container.rateLimiterService; + this.requestQueue_ = this.rateLimiterService_.getRequestQueue(AppNameDefinitions.ASANA); } @@ -93,6 +97,8 @@ export default class AsanaService extends TransactionBaseService { // Get Asana Projects async getAsanaProjects (accessToken: string, datetime: string) { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.ASANA) const response = await axios.get(`https://app.asana.com/api/1.0/projects?opt_expand=name,description,notes,completed,created_at`, { headers: { 'Authorization': `Bearer ${accessToken}` @@ -103,6 +109,8 @@ export default class AsanaService extends TransactionBaseService { // Get Asana Tasks async getAsanaTasks(accessToken: string, projectId: string, datetime: string){ + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.ASANA) let url = `https://app.asana.com/api/1.0/projects/${projectId}/tasks?opt_expand=name,description,notes,completed,created_at`; if(datetime){ url += `&modified_since=${datetime}`; diff --git a/packages/apps/confluence/src/loaders/index.ts b/packages/apps/confluence/src/loaders/index.ts new file mode 100644 index 00000000..a79f6b22 --- /dev/null +++ b/packages/apps/confluence/src/loaders/index.ts @@ -0,0 +1,16 @@ +import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types"; +import { RateLimiterService} from "@ocular/ocular"; + +export default async (container, options) => { + try { + // Register Rate Limiter For Google Drive + if (!options.rate_limiter_opts) { + throw new Error("No options provided for rate limiter") + } + const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts + const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService") + await rateLimiterService.register(AppNameDefinitions.CONFLUENCE,rateLimiterOpts.requests, rateLimiterOpts.interval); + } catch (err) { + throw(err) + } +} \ No newline at end of file diff --git a/packages/apps/confluence/src/services/confluence.ts b/packages/apps/confluence/src/services/confluence.ts index 1c061937..5f830251 100644 --- a/packages/apps/confluence/src/services/confluence.ts +++ b/packages/apps/confluence/src/services/confluence.ts @@ -1,7 +1,7 @@ import fs from "fs"; import axios from "axios"; import { Readable } from "stream"; -import { OAuthService, Organisation } from "@ocular/ocular"; +import { OAuthService, Organisation,RateLimiterService } from "@ocular/ocular"; import { IndexableDocument, TransactionBaseService, @@ -10,6 +10,7 @@ import { DocType, } from "@ocular/types"; import { ConfigModule } from "@ocular/ocular/src/types"; +import { RateLimiterQueue } from "rate-limiter-flexible" interface Config { headers: { @@ -22,12 +23,16 @@ export default class ConfluenceService extends TransactionBaseService { protected oauthService_: OAuthService; protected logger_: Logger; protected container_: ConfigModule; + protected rateLimiterService_: RateLimiterService; + protected requestQueue_: RateLimiterQueue constructor(container) { super(arguments[0]); this.oauthService_ = container.oauthService; this.logger_ = container.logger; this.container_ = container; + this.rateLimiterService_ = container.rateLimiterService; + this.requestQueue_ = this.rateLimiterService_.getRequestQueue(AppNameDefinitions.CONFLUENCE); } async getConfluenceData(org: Organisation) { @@ -129,6 +134,8 @@ export default class ConfluenceService extends TransactionBaseService { async fetchPageContent(pageID: string, cloudID: string, config: Config) { try { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.CONFLUENCE) const baseUrl = `https://api.atlassian.com/ex/confluence/${cloudID}/wiki/api/v2/pages`; const pageContentUrl = `${baseUrl}/${pageID}?body-format=atlas_doc_format`; @@ -196,6 +203,8 @@ export default class ConfluenceService extends TransactionBaseService { async fetchConfluencePages(cloudID: string, config: Config) { try { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.CONFLUENCE) const pagesEndpoint = `https://api.atlassian.com/ex/confluence/${cloudID}/wiki/rest/api/content`; const pagesResponse = await axios.get(pagesEndpoint, config); diff --git a/packages/apps/github/src/loaders/index.ts b/packages/apps/github/src/loaders/index.ts new file mode 100644 index 00000000..45a22cd2 --- /dev/null +++ b/packages/apps/github/src/loaders/index.ts @@ -0,0 +1,16 @@ +import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types"; +import { RateLimiterService} from "@ocular/ocular"; + +export default async (container, options) => { + try { + // Register Rate Limiter For Google Drive + if (!options.rate_limiter_opts) { + throw new Error("No options provided for rate limiter") + } + const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts + const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService") + await rateLimiterService.register(AppNameDefinitions.GITHUB, rateLimiterOpts.requests, rateLimiterOpts.interval); + } catch (err) { + throw(err) + } +} \ No newline at end of file diff --git a/packages/apps/github/src/services/github.ts b/packages/apps/github/src/services/github.ts index aafd39e2..66be3fee 100644 --- a/packages/apps/github/src/services/github.ts +++ b/packages/apps/github/src/services/github.ts @@ -1,22 +1,27 @@ import { Readable } from 'stream'; import { EntityManager } from "typeorm"; import { App } from "octokit"; -import { OAuthService, Organisation } from "@ocular/ocular"; +import { OAuthService, Organisation, RateLimiterService } from "@ocular/ocular"; import { IndexableDocument, TransactionBaseService, Logger, AppNameDefinitions, DocType } from "@ocular/types"; import { ConfigModule } from "@ocular/ocular/src/types";; import fs from 'fs'; import e from 'express'; +import { RateLimiterQueue } from "rate-limiter-flexible" export default class GitHubService extends TransactionBaseService { protected oauthService_: OAuthService; protected logger_: Logger; protected container_: ConfigModule; + protected rateLimiterService_: RateLimiterService; + protected requestQueue_: RateLimiterQueue constructor(container) { super(arguments[0]); this.oauthService_ = container.oauthService; this.logger_ = container.logger; this.container_ = container; + this.rateLimiterService_ = container.rateLimiterService; + this.requestQueue_ = this.rateLimiterService_.getRequestQueue(AppNameDefinitions.GITHUB); } @@ -33,7 +38,6 @@ export default class GitHubService extends TransactionBaseService { this.logger_.error(`No Github OAuth Cred found for ${org.id} organisation`); return; } - // Get the last sync date - this is the time the latest document that was synced from Gmail. let last_sync = '' @@ -49,12 +53,16 @@ export default class GitHubService extends TransactionBaseService { const octokit = await app.getInstallationOctokit(Number(oauth.metadata.installation_id)); try { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.GITHUB) const { data } = await octokit.rest.apps.listReposAccessibleToInstallation(); for (const repo of data.repositories) { console.log(repo.name); console.log(repo.owner); // Get Commits For This Repository + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.GITHUB) const prs = await octokit.rest.pulls.list({ owner: repo.owner.login, repo: repo.name, @@ -84,6 +92,8 @@ export default class GitHubService extends TransactionBaseService { } } + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.GITHUB) // Get Issues For This Repository const issues = await octokit.rest.issues.listForRepo({ owner: repo.owner.login, diff --git a/packages/apps/jira/src/loaders/index.ts b/packages/apps/jira/src/loaders/index.ts new file mode 100644 index 00000000..97fa60d1 --- /dev/null +++ b/packages/apps/jira/src/loaders/index.ts @@ -0,0 +1,17 @@ +import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types"; +import { RateLimiterService} from "@ocular/ocular"; + +export default async (container, options) => { + try { + // Register Rate Limiter For Google Drive + if (!options.rate_limiter_opts) { + throw new Error("No options provided for rate limiter") + } + const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts + const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService") + console.log("Gmail: Rate Limiter Options", rateLimiterOpts) + await rateLimiterService.register(AppNameDefinitions.JIRA,rateLimiterOpts.requests, rateLimiterOpts.interval); + } catch (err) { + console.log(err) + } +} \ No newline at end of file diff --git a/packages/apps/jira/src/services/jira.ts b/packages/apps/jira/src/services/jira.ts index 77c13bca..897cc821 100644 --- a/packages/apps/jira/src/services/jira.ts +++ b/packages/apps/jira/src/services/jira.ts @@ -1,7 +1,7 @@ import fs from "fs"; import axios from "axios"; import { Readable } from "stream"; -import { OAuthService, Organisation } from "@ocular/ocular"; +import { OAuthService, Organisation, RateLimiterService } from "@ocular/ocular"; import { IndexableDocument, TransactionBaseService, @@ -10,6 +10,7 @@ import { DocType, } from "@ocular/types"; import { ConfigModule } from "@ocular/ocular/src/types"; +import { RateLimiterQueue } from "rate-limiter-flexible" interface Config { headers: { @@ -22,12 +23,16 @@ export default class JiraService extends TransactionBaseService { protected oauthService_: OAuthService; protected logger_: Logger; protected container_: ConfigModule; + protected rateLimiterService_: RateLimiterService; + protected requestQueue_: RateLimiterQueue constructor(container) { super(arguments[0]); this.oauthService_ = container.oauthService; this.logger_ = container.logger; this.container_ = container; + this.rateLimiterService_ = container.rateLimiterService; + this.requestQueue_ = this.rateLimiterService_.getRequestQueue(AppNameDefinitions.JIRA); } async getJiraData(org: Organisation) { @@ -173,6 +178,8 @@ export default class JiraService extends TransactionBaseService { * @returns {Promise} A promise that resolves to an array of project objects. */ async fetchJiraProjects(cloudID: string, config: Config) { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.JIRA) // Ensure the variable names are case-sensitive and consistent. const projectEndpoint = `https://api.atlassian.com/ex/jira/${cloudID}/rest/api/3/project/search`; @@ -219,6 +226,8 @@ export default class JiraService extends TransactionBaseService { */ async fetchProjectIssues(projectID, cloudID, config) { try { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.JIRA) // Construct base URL and issue endpoint const baseUrl = `https://api.atlassian.com/ex/jira/${cloudID}`; const issueEndpoint = `${baseUrl}/rest/api/3/search?jql=project=${projectID}&maxResults=1000`; @@ -253,6 +262,8 @@ export default class JiraService extends TransactionBaseService { */ async fetchIssueDetails(issueID: string, cloudID: string, config: Config) { try { + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.JIRA) // Construct the issue endpoint URL const baseUrl = `https://api.atlassian.com/ex/jira/${cloudID}`; const issueEndpoint = `${baseUrl}/rest/api/3/issue/${issueID}`; diff --git a/packages/apps/slack/src/loaders/index.ts b/packages/apps/slack/src/loaders/index.ts new file mode 100644 index 00000000..86abe103 --- /dev/null +++ b/packages/apps/slack/src/loaders/index.ts @@ -0,0 +1,16 @@ +import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types"; +import { RateLimiterService} from "@ocular/ocular"; + +export default async (container, options) => { + try { + // Register Rate Limiter For Google Drive + if (!options.rate_limiter_opts) { + throw new Error("No options provided for rate limiter") + } + const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts + const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService") + await rateLimiterService.register(AppNameDefinitions.SLACK,rateLimiterOpts.requests, rateLimiterOpts.interval); + } catch (err) { + throw(err) + } +} \ No newline at end of file diff --git a/packages/apps/slack/src/services/slack.ts b/packages/apps/slack/src/services/slack.ts index 7155af6c..18f5b6d0 100644 --- a/packages/apps/slack/src/services/slack.ts +++ b/packages/apps/slack/src/services/slack.ts @@ -1,7 +1,7 @@ import fs from "fs"; import axios from "axios"; import { Readable } from "stream"; -import { App, OAuthService, Organisation } from "@ocular/ocular"; +import { App, OAuthService, Organisation, RateLimiterService } from "@ocular/ocular"; import { IndexableDocument, TransactionBaseService, @@ -11,6 +11,7 @@ import { } from "@ocular/types"; import { ConfigModule } from "@ocular/ocular/src/types"; import { DocType } from "@ocular/types"; +import { RateLimiterQueue } from "rate-limiter-flexible" interface Config { headers: { @@ -23,12 +24,16 @@ export default class SlackService extends TransactionBaseService { protected oauthService_: OAuthService; protected logger_: Logger; protected container_: ConfigModule; + protected rateLimiterService_: RateLimiterService; + protected requestQueue_: RateLimiterQueue constructor(container) { super(arguments[0]); this.oauthService_ = container.oauthService; this.logger_ = container.logger; this.container_ = container; + this.rateLimiterService_ = container.rateLimiterService; + this.requestQueue_ = this.rateLimiterService_.getRequestQueue(AppNameDefinitions.SLACK); } async getSlackData(org: Organisation) { @@ -121,6 +126,8 @@ export default class SlackService extends TransactionBaseService { } async fetchSlackChannels(config:Config){ + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.SLACK) try{ const response = await axios.get( "https://slack.com/api/conversations.list", @@ -144,6 +151,8 @@ export default class SlackService extends TransactionBaseService { async fetchChannelConversations(channelID:string ,config:Config){ try{ + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.SLACK) const conversationsEndpoint = `https://slack.com/api/conversations.history?channel=${channelID}` const response = await axios.get(conversationsEndpoint,config) const conversationsArray = response.data.messages || [] @@ -160,6 +169,8 @@ export default class SlackService extends TransactionBaseService { async fetchThreadForConversation(channelID:string, tsID:string, config:Config){ try{ + // Block Until Rate Limit Allows Request + await this.requestQueue_.removeTokens(1,AppNameDefinitions.SLACK) const threadsEndpoint = `https://slack.com/api/conversations.replies?channel_id=${channelID}&ts=${tsID}` const response = await axios.get(threadsEndpoint,config) return response.data.messages diff --git a/packages/ocular/core-config.js b/packages/ocular/core-config.js index edd07c84..5791b7d7 100644 --- a/packages/ocular/core-config.js +++ b/packages/ocular/core-config.js @@ -53,6 +53,10 @@ module.exports = { client_secret: process.env.ASANA_CLIENT_SECRET, scope: "openid email profile", redirect_uri: `${UI_CORS}/dashboard/marketplace/asana`, + rate_limiter_opts: { + requests: 1500, // Number of Requests + interval: 60, // Interval in Seconds + }, }, }, { @@ -61,6 +65,10 @@ module.exports = { client_id: process.env.CONFLUENCE_CLIENT_ID, client_secret: process.env.CONFLUENCE_CLIENT_SECRET, redirect_uri: `${UI_CORS}/dashboard/marketplace/confluence`, + rate_limiter_opts: { + requests: 10, // Number of Requests + interval: 1, // Interval in Seconds + }, }, }, { @@ -69,6 +77,10 @@ module.exports = { client_id: process.env.JIRA_CLIENT_ID, client_secret: process.env.JIRA_CLIENT_SECRET, redirect_uri: `${UI_CORS}/dashboard/marketplace/jira`, + rate_limiter_opts: { + requests: 10, // Number of Requests + interval: 1, // Interval in Seconds + }, }, }, { @@ -85,6 +97,10 @@ module.exports = { client_id: process.env.SLACK_CLIENT_ID, client_secret: process.env.SLACK_CLIENT_SECRET, redirect_uri: `${UI_CORS}/dashboard/marketplace/slack`, + rate_limiter_opts: { + requests: 60, // Number of Requests + interval: 60, // Interval in Seconds + }, }, }, {