Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Confluence page indexing #147

Merged
merged 9 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions packages/apps/confluence/src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
import { AppNameDefinitions, RateLimiterOpts} from "@ocular/types";
import { RateLimiterService} from "@ocular/ocular";
import { AppNameDefinitions, RateLimiterOpts } from "@ocular/types";
import { RateLimiterService } from "@ocular/ocular";
import { AutoflowAiError, AutoflowAiErrorTypes } from "@ocular/utils";

export default async (container, options) => {
try {
// Register Rate Limiter For Google Drive
// Register Rate Limiter For Confluence App
if (!options.rate_limiter_opts) {
throw new Error("No options provided for rate limiter")
throw new AutoflowAiError(
AutoflowAiErrorTypes.INVALID_DATA,
"registerRateLimiter: No options provided for rate limiter for Confluence App"
);
}
const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts
const rateLimiterService: RateLimiterService = container.resolve("rateLimiterService")
await rateLimiterService.register(AppNameDefinitions.CONFLUENCE,rateLimiterOpts.requests, rateLimiterOpts.interval);
const rateLimiterOpts: RateLimiterOpts = options.rate_limiter_opts;
const rateLimiterService: RateLimiterService =
container.resolve("rateLimiterService");
await rateLimiterService.register(
AppNameDefinitions.CONFLUENCE,
rateLimiterOpts.requests,
rateLimiterOpts.interval
);
} catch (err) {
throw(err)
throw new AutoflowAiError(
AutoflowAiErrorTypes.INVALID_DATA,
"registerRateLimiter: Failed to register rate limiter for Confluence App with error: " +
err.message
);
}
}
};
182 changes: 26 additions & 156 deletions packages/apps/confluence/src/services/confluence.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import fs from "fs";
import axios from "axios";
import { Readable } from "stream";
import {
AppAuthorizationService,
Expand All @@ -12,10 +10,11 @@ import {
Logger,
AppNameDefinitions,
DocType,
ApiConfig,
AuthStrategy,
} from "@ocular/types";
import { ConfigModule } from "@ocular/ocular/src/types";
import { RateLimiterQueue } from "rate-limiter-flexible";
import ApiTokenService from "../utils/api-token-service";

export default class ConfluenceService extends TransactionBaseService {
protected appAuthorizationService_: AppAuthorizationService;
Expand All @@ -42,10 +41,6 @@ export default class ConfluenceService extends TransactionBaseService {
async *getConfluenceSpaceAndPages(
org: Organisation
): AsyncGenerator<IndexableDocument[]> {
this.logger_.info(
`Starting oculation of Confluence for ${org.id} organisation`
);

// Get Confluence Auth for the organisation
const auth = await this.appAuthorizationService_.retrieve({
id: org.id,
Expand All @@ -57,167 +52,42 @@ export default class ConfluenceService extends TransactionBaseService {
return;
}

const config: ApiConfig = {
headers: {
Authorization: `Bearer ${auth.token}`,
Accept: "application/json",
},
};
let documents: IndexableDocument[] = [];

try {
const cloudID = await this.fetchConfluenceCloudID(config);

if (!cloudID) {
this.logger_.error("Failed to retrieve Confluence Cloud ID");
return;
}

const pages = await this.fetchConfluencePages(cloudID, config);
if (auth.auth_strategy === AuthStrategy.API_TOKEN_STRATEGY) {
const apiTokenService = new ApiTokenService(
auth.token,
auth.metadata.domain_name as string,
auth.metadata.user_name as string,
org,
this.logger_,
this.rateLimiterService_,
auth.last_sync
);

if (!pages) {
this.logger_.error("No accesible pages found in conflunece instance.");
}
const pageIndexableDocs =
await apiTokenService.confluenceIndexableDocuments();

let documents: IndexableDocument[] = [];
for (const pageID of pages) {
const pageInfo = await this.fetchPageContent(pageID, cloudID, config);
if (pageInfo.text) {
const pageDocument: IndexableDocument = {
id: pageInfo.id,
organisationId: org.id,
title: pageInfo.title,
source: AppNameDefinitions.CONFLUENCE,
sections: [
{
content: pageInfo.text,
link: pageInfo.location,
},
],
type: DocType.TXT,
updatedAt: new Date(),
metadata: {},
};
for (const doc of pageIndexableDocs) {
documents.push(doc);

documents.push(pageDocument);
if (documents.length >= 100) {
yield documents;
documents = [];
}
}

if (documents.length >= 100) {
yield documents;
documents = [];
}
yield documents;
}

yield documents;
await this.appAuthorizationService_.update(auth.id, {
last_sync: new Date(),
});
} catch (error) {
if (error.response && error.response.status === 401) {
this.logger_.info(
`Refreshing Confluence token for ${org.id} organisation`
);

const authToken = await this.container_["confluenceOauth"].refreshToken(
auth.refresh_token
);

await this.appAuthorizationService_.update(auth.id, authToken);

return this.getConfluenceSpaceAndPages(org);
} else {
console.error("Error fetching Confluence page content:", error);
}
}

this.logger_.info(
`Finished oculation of Confluence for ${org.id} organisation`
);
}

async fetchPageContent(pageID: string, cloudID: string, config: ApiConfig) {
try {
// Block Until Rate Limit Allows Request
await this.requestQueue_.removeTokens(1, AppNameDefinitions.CONFLUENCE);
const baseUrl = `https://api.atlassian.com/ex/confluence/${cloudID}/wiki/api/v2/pages`;
const pageContentUrl = `${baseUrl}/${pageID}?body-format=atlas_doc_format`;

const response = await axios.get(pageContentUrl, config);

const { title, _links, body } = response.data;
const pageLinkBase = _links.base;
const pageLinkWebUI = _links.webui;
const pageLink = `${pageLinkBase}${pageLinkWebUI}`;

const pageInformation = body.atlas_doc_format.value;
const contentArray = JSON.parse(pageInformation).content;

let pageText = "";

for (const element of contentArray) {
if (element.type === "paragraph" && element.content) {
const textArray = element.content.filter(
(textElement) => textElement.type === "text" && textElement.text
);
const paratext = textArray
.map((textElement) => textElement.text)
.join(" ");
pageText += paratext + " ";
}
}

return {
id: pageID,
title: title,
location: pageLink,
text: pageText.trim(),
};
} catch (error) {
console.error("Error in Fetching Page Content:", error.message);
throw error;
}
}

async fetchConfluenceCloudID(config: ApiConfig) {
try {
const accessibleResourcesResponse = await axios.get(
"https://api.atlassian.com/oauth/token/accessible-resources",
config
);

const accessibleResources = accessibleResourcesResponse.data;

if (!accessibleResources || accessibleResources.length === 0) {
throw new Error("No accessible resources found.");
}

const cloudId = accessibleResources[0].id;

if (!cloudId) {
throw new Error("Invalid cloud ID.");
}

return cloudId;
} catch (error) {
this.logger_.error(`Error in fetching Cloud ID of Confluence Instance.`);
throw error;
}
}

async fetchConfluencePages(cloudID: string, config: ApiConfig) {
try {
// Block Until Rate Limit Allows Request
await this.requestQueue_.removeTokens(1, AppNameDefinitions.CONFLUENCE);
const pagesEndpoint = `https://api.atlassian.com/ex/confluence/${cloudID}/wiki/rest/api/content`;

const pagesResponse = await axios.get(pagesEndpoint, config);

const pagesResults = pagesResponse.data.results;
const pages = pagesResults.map((page) => page.id);

return pages;
} catch (error) {
this.logger_.error(`Failed to retrieve Confluence pages:`, error.message);
throw error;
this.logger_.error(`
getConfluenceSpaceAndPages: Error while fetching Confluence data for org: ${org.id}
organisation: ${error}
`);
}
}
}
13 changes: 13 additions & 0 deletions packages/apps/confluence/src/services/oauth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {
AppNameDefinitions,
AppCategoryDefinitions,
AuthToken,
AuthStrategy,
TokenTypes,
} from "@ocular/types";
import { ConfigModule } from "@ocular/ocular/src/types";

Expand All @@ -12,13 +14,15 @@ class ConfluenceOauth extends AppauthorizationService {
protected client_secret_: string;
protected configModule_: ConfigModule;
protected redirect_uri_: string;
protected auth_strategy_: AuthStrategy;

constructor(container, options) {
super(arguments[0]);
this.client_id_ = options.client_id;
this.client_secret_ = options.client_secret;
this.redirect_uri_ = options.redirect_uri;
this.configModule_ = container.configModule;
this.auth_strategy_ = options.auth_strategy;
}

static getAppDetails(projectConfig, options) {
Expand Down Expand Up @@ -79,6 +83,15 @@ class ConfluenceOauth extends AppauthorizationService {
async generateToken(code: string): Promise<AuthToken> {
console.log("***** Generating token from the code:\n");

if (this.auth_strategy_ === AuthStrategy.API_TOKEN_STRATEGY) {
return {
type: TokenTypes.BEARER,
token: code,
token_expires_at: new Date(),
refresh_token: "NO_REFRESH_TOKEN",
auth_strategy: AuthStrategy.API_TOKEN_STRATEGY,
} as AuthToken;
}
const body = {
grant_type: "authorization_code",
client_id: `${this.client_id_}`,
Expand Down
33 changes: 28 additions & 5 deletions packages/apps/confluence/src/strategies/confluence.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,55 @@
import { BatchJobService, Organisation, QueueService } from "@ocular/ocular";
import ConfluenceService from "../services/confluence";
import { INDEX_DOCUMENT_EVENT, APPS_INDEXING_TOPIC } from "@ocular/types";
import { AbstractBatchJobStrategy } from "@ocular/types";
import {
INDEX_DOCUMENT_EVENT,
APPS_INDEXING_TOPIC,
Logger,
AbstractBatchJobStrategy,
} from "@ocular/types";

export default class ConfluenceStrategy extends AbstractBatchJobStrategy {
static identifier = "confluence-indexing-strategy";
static batchType = "confluence";
protected batchJobService_: BatchJobService;
protected confluenceService_: ConfluenceService;
protected queueService_: QueueService;
protected logger_: Logger;

constructor(container) {
super(arguments[0]);
this.confluenceService_ = container.confluenceService;
this.batchJobService_ = container.batchJobService;
this.queueService_ = container.queueService;
this.logger_ = container.logger;
}

async processJob(batchJobId: string): Promise<void> {
const batchJob = await this.batchJobService_.retrieve(batchJobId);
const stream = await this.confluenceService_.getConfluenceData(
batchJob.context?.org as Organisation

const org = batchJob.context?.org as Organisation;

const oculationConfluenceActivity = this.logger_.activity(
`processJob: Oculating Confluence Data for organisation: ${org.id} name: ${org.name}`
);

const stream = await this.confluenceService_.getConfluenceData(org);

stream.on("data", (documents) => {
this.queueService_.sendBatch(APPS_INDEXING_TOPIC, documents);
});

stream.on("end", () => {
console.log("No more data");
this.logger_.success(
oculationConfluenceActivity,
`processJob:Finished oculation of Confluence Data for ${org.id} organisation`
);
});

stream.on("error", () => {
this.logger_.error(
oculationConfluenceActivity,
`processJob:Error in oculation of Confluence Data for ${org.id} organisation`
);
});
}

Expand Down
Loading