Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Add Functionality To Enqueue Indexable Documents To Kafka For Processing By Search Indexers and Other Consumers #59

Merged
merged 2 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,264 changes: 1,257 additions & 7 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions packages/apps/asana/src/strategies/asana.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@

import { BatchJobService, Organisation, EventBusService } from "@ocular/ocular"
import { BatchJobService, Organisation, QueueService} from "@ocular/ocular"
import AsanaService from "../services/asana"
import { INDEX_DOCUMENT_EVENT } from "@ocular/types"
import { INDEX_DOCUMENT_EVENT, SEARCH_INDEXING_TOPIC } from "@ocular/types"
import { AbstractBatchJobStrategy } from "@ocular/types"

export default class AsanaStrategy extends AbstractBatchJobStrategy {
static identifier = "asana-indexing-strategy"
static batchType = "asana"
protected batchJobService_: BatchJobService
protected asanaService_: AsanaService
protected eventBusService_: EventBusService
protected queueService_: QueueService

constructor(container) {
super(arguments[0])
this.asanaService_ = container.asanaService
this.batchJobService_ = container.batchJobService
this.eventBusService_ = container.eventBusService
this.queueService_ = container.queueService
}

async processJob(batchJobId: string): Promise<void> {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.asanaService_.getAsanaData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents)
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
12 changes: 7 additions & 5 deletions packages/apps/confluence/src/strategies/confluence.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { BatchJobService, Organisation, EventBusService } from '@ocular/ocular';
import { BatchJobService, Organisation, QueueService } from '@ocular/ocular';
import ConfluenceService from '../services/confluence';
import { INDEX_DOCUMENT_EVENT } from '@ocular/types';
import { INDEX_DOCUMENT_EVENT, SEARCH_INDEXING_TOPIC } from '@ocular/types';
import { AbstractBatchJobStrategy } from '@ocular/types';

export default class ConfluenceStrategy extends AbstractBatchJobStrategy {
static identifier = 'confluence-indexing-strategy';
static batchType = 'confluence';
protected batchJobService_: BatchJobService;
protected confluenceService_: ConfluenceService;
protected eventBusService_: EventBusService;
protected queueService_: QueueService

constructor(container) {
super(arguments[0]);
this.confluenceService_ = container.confluenceService;
this.batchJobService_ = container.batchJobService;
this.eventBusService_ = container.eventBusService;
this.queueService_ = container.queueService;
}

async processJob(batchJobId: string): Promise<void> {
Expand All @@ -23,7 +23,9 @@ export default class ConfluenceStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents);
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
12 changes: 7 additions & 5 deletions packages/apps/github/src/strategies/github.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {AbstractBatchJobStrategy } from "@ocular/types"
import { BatchJobService, Organisation, EventBusService } from "@ocular/ocular"
import {AbstractBatchJobStrategy, SEARCH_INDEXING_TOPIC } from "@ocular/types"
import { BatchJobService, Organisation, QueueService} from "@ocular/ocular"
import { EntityManager } from "typeorm"
import GitHubService from "../services/github"
import JobSchedulerService from "@ocular/ocular"
Expand All @@ -11,13 +11,13 @@ class GithubStrategy extends AbstractBatchJobStrategy {
static batchType = "github"
protected batchJobService_: BatchJobService
protected githubService_: GitHubService
protected eventBusService_: EventBusService
protected queueService_: QueueService

constructor(container) {
super(arguments[0])
this.githubService_ = container.githubService
this.batchJobService_ = container.batchJobService
this.eventBusService_ = container.eventBusService
this.queueService_ = container.queueService
}

async processJob(batchJobId: string): Promise<void> {
Expand All @@ -28,7 +28,9 @@ class GithubStrategy extends AbstractBatchJobStrategy {
const stream = await this.githubService_.getRepositoriesOcular(batchJob.context?.org as Organisation)

stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents)
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});


Expand Down
12 changes: 7 additions & 5 deletions packages/apps/gmail/src/strategies/gmail.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@

import { EntityManager } from "typeorm"
import { BatchJobService, Organisation, EventBusService } from "@ocular/ocular"
import { BatchJobService, Organisation, EventBusService, QueueService } from "@ocular/ocular"
import GmailService from "../services/gmail"
import JobSchedulerService from "@ocular/ocular"
import e from "express"
import { INDEX_DOCUMENT_EVENT, AbstractBatchJobStrategy } from "@ocular/types"
import { INDEX_DOCUMENT_EVENT, AbstractBatchJobStrategy, SEARCH_INDEXING_TOPIC } from "@ocular/types"

class GmailStrategy extends AbstractBatchJobStrategy {
static identifier = "gmail-indexing-strategy"
static batchType = "gmail"
protected batchJobService_: BatchJobService
protected gmailService_: GmailService
protected eventBusService_: EventBusService
protected queueService_: QueueService

constructor(container) {
super(arguments[0])
this.gmailService_ = container.gmailService
this.batchJobService_ = container.batchJobService
this.eventBusService_ = container.eventBusService
this.queueService_ = container.queueService
}

async processJob(batchJobId: string): Promise<void> {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.gmailService_.getGmailData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents)
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
10 changes: 7 additions & 3 deletions packages/apps/google-drive/src/strategies/google-drive.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@

import { BatchJobService, Organisation, EventBusService } from "@ocular/ocular"
import { BatchJobService, Organisation, EventBusService, QueueService } from "@ocular/ocular"
import { EntityManager } from "typeorm"
import GoogleDriveService from "../services/google-drive"
import JobSchedulerService from "@ocular/ocular"
import e from "express"
import { INDEX_DOCUMENT_EVENT, AbstractBatchJobStrategy } from "@ocular/types"
import { INDEX_DOCUMENT_EVENT, AbstractBatchJobStrategy, SEARCH_INDEXING_TOPIC } from "@ocular/types"

class GoogleDriveStrategy extends AbstractBatchJobStrategy {
static identifier = "google-drive-indexing-strategy"
static batchType = "google-drive"
protected batchJobService_: BatchJobService
protected googleDriveService_: GoogleDriveService
protected eventBusService_: EventBusService
protected queueService_: QueueService

constructor(container) {
super(arguments[0])
this.googleDriveService_ = container.googleDriveService
this.batchJobService_ = container.batchJobService
this.eventBusService_ = container.eventBusService
this.queueService_ = container.queueService
}

async processJob(batchJobId: string): Promise<void> {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.googleDriveService_.getGoogleDriveData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents)
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
12 changes: 7 additions & 5 deletions packages/apps/jira/src/strategies/jira.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { BatchJobService, Organisation, EventBusService } from '@ocular/ocular';
import { BatchJobService, Organisation, QueueService } from '@ocular/ocular';
import JiraService from '../services/jira';
import { INDEX_DOCUMENT_EVENT } from '@ocular/types';
import { INDEX_DOCUMENT_EVENT, SEARCH_INDEXING_TOPIC } from '@ocular/types';
import { AbstractBatchJobStrategy } from '@ocular/types';

export default class JiraStrategy extends AbstractBatchJobStrategy {
static identifier = 'jira-indexing-strategy';
static batchType = 'jira';
protected batchJobService_: BatchJobService;
protected jiraService_: JiraService;
protected eventBusService_: EventBusService;
protected queueService_: QueueService

constructor(container) {
super(arguments[0]);
this.jiraService_ = container.jiraService;
this.batchJobService_ = container.batchJobService;
this.eventBusService_ = container.eventBusService;
this.queueService_ = container.queueService;
}

async processJob(batchJobId: string): Promise<void> {
Expand All @@ -24,7 +24,9 @@ export default class JiraStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents);
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
12 changes: 7 additions & 5 deletions packages/apps/notion/src/strategies/notion.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { BatchJobService, Organisation, EventBusService } from '@ocular/ocular';
import { BatchJobService, Organisation, QueueService } from '@ocular/ocular';
import AsanaService from '../services/notion';
import { INDEX_DOCUMENT_EVENT } from '@ocular/types';
import { INDEX_DOCUMENT_EVENT, SEARCH_INDEXING_TOPIC } from '@ocular/types';
import { AbstractBatchJobStrategy } from '@ocular/types';

export default class AsanaStrategy extends AbstractBatchJobStrategy {
static identifier = 'notion-indexing-strategy';
static batchType = 'notion';
protected batchJobService_: BatchJobService;
protected asanaService_: AsanaService;
protected eventBusService_: EventBusService;
protected queueService_: QueueService;

constructor(container) {
super(arguments[0]);
this.asanaService_ = container.asanaService;
this.batchJobService_ = container.batchJobService;
this.eventBusService_ = container.eventBusService;
this.queueService_ = container.queueService;
}

async processJob(batchJobId: string): Promise<void> {
Expand All @@ -23,7 +23,9 @@ export default class AsanaStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents);
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
12 changes: 7 additions & 5 deletions packages/apps/slack/src/strategies/slack.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { BatchJobService, Organisation, EventBusService } from '@ocular/ocular';
import { BatchJobService, Organisation, QueueService } from '@ocular/ocular';
import SlackService from '../services/slack';
import { INDEX_DOCUMENT_EVENT } from '@ocular/types';
import { INDEX_DOCUMENT_EVENT, SEARCH_INDEXING_TOPIC } from '@ocular/types';
import { AbstractBatchJobStrategy } from '@ocular/types';

export default class SlackStrategy extends AbstractBatchJobStrategy {
static identifier = 'slack-indexing-strategy';
static batchType = 'slack';
protected batchJobService_: BatchJobService;
protected slackService_: SlackService;
protected eventBusService_: EventBusService;
protected queueService_: QueueService;

constructor(container) {
super(arguments[0]);
this.slackService_ = container.slackService;
this.batchJobService_ = container.batchJobService;
this.eventBusService_ = container.eventBusService;
this.queueService_ = container.queueService;
}

async processJob(batchJobId: string): Promise<void> {
Expand All @@ -23,7 +23,9 @@ export default class SlackStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
this.eventBusService_.emit(INDEX_DOCUMENT_EVENT, documents);
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
});
stream.on('end', () => {
console.log('No more data');
Expand Down
1 change: 1 addition & 0 deletions packages/ocular/core-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module.exports = {
database_database: process.env.DATABASE_NAME,
database_type: "postgres",
redis_url: process.env.REDIS_URL,
kafka_url: process.env.KAFKA_URL,
ui_cors: UI_CORS,
azure_open_ai_options: {
apiKey: process.env.AZURE_OPENAI_API_KEY,
Expand Down
2 changes: 2 additions & 0 deletions packages/ocular/src/loaders/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import repositoriesLoader from "./repositories"
import searchIndexLoader from "./search"
import servicesLoader from "./services.js"
import redisLoader from './redis';
import kafkaLoader from './kafka';
import subscribersLoader from "./subscribers"

type Options = {
Expand Down Expand Up @@ -58,6 +59,7 @@ export default async ({
})

await redisLoader({ container, configModule, logger: Logger })
await kafkaLoader({ container, configModule, logger: Logger })

const modelsActivity = Logger.activity(`Initializing models${EOL}`)
modelsLoader({ container})
Expand Down
45 changes: 45 additions & 0 deletions packages/ocular/src/loaders/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { asValue } from "awilix"
import Redis from "ioredis"
import FakeRedis from "ioredis-mock"
import { EOL } from "os"
import { Logger, AutoflowContainer } from "@ocular/types"
import {ConfigModule} from "../types/config-module"
import { Kafka } from "kafkajs"

type Options = {
container: AutoflowContainer
configModule: ConfigModule
logger: Logger
}

async function kafkaLoader({
container,
configModule,
logger,
}: Options): Promise<void> {
if (configModule.projectConfig.kafka_url){
const kafkaClient = new Kafka({
clientId: 'ocular',
brokers: [configModule.projectConfig.kafka_url],
})

try {
await kafkaClient.admin().connect()
logger?.info(`Connection to Kafka established`)
} catch (err) {
throw new Error(
`An error occurred while connecting to Kafka:${EOL} ${err}`
)
}

container.register({
kafkaClient: asValue(kafkaClient),
})
} else {
throw new Error(
`No Kafka url was provided - using Ocular without a proper Kafka instance is allowed`
)
}
}

export default kafkaLoader
1 change: 0 additions & 1 deletion packages/ocular/src/services/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ export default class JobSchedulerService {
handler: ScheduledJobHandler,
options?: CreateJobOptions
): Promise<Job> {
this.logger_.info(`Registering ${eventName}`)
this.registerHandler(eventName, handler)

const jobToCreate = {
Expand Down
7 changes: 5 additions & 2 deletions packages/ocular/src/services/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Kafka, Message, Producer,ProducerBatch, TopicMessages } from "kafkajs"
import { Consumer, ConsumerContext, TransactionBaseService, AbstractQueueService, Logger } from "@ocular/types"
import { ulid } from "ulid"
import { IndexableDocument } from "@ocular/types"

type InjectedDependencies = {
logger: Logger
Expand Down Expand Up @@ -29,7 +30,8 @@ export default class QueueService extends AbstractQueueService {
await this.clearConsumers()
});
}


// TODO: Implement Send Batch Documents to Queue Service
async send<T>(
topicName: string,
data: T,
Expand Down Expand Up @@ -59,7 +61,8 @@ export default class QueueService extends AbstractQueueService {
kafkaConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await consumer(message.value.toString(), topic)
const doc:IndexableDocument = JSON.parse(message.value.toString())
await consumer(doc, topic)
} catch (error) {
this.logger_.error(`Error processing message: ${error.message}`)
}
Expand Down
Loading