Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
Kafka indexing (#60)
Browse files Browse the repository at this point in the history
* Add Functionality To Enqueue Indexable Documents To Kafka For Processing

* Clean Up Scheduling of Indexing Jobs

* Send Batch Documents To Kafka Queue Service
  • Loading branch information
louismurerwa authored Apr 28, 2024
1 parent 2095b06 commit 9053200
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 30 deletions.
4 changes: 1 addition & 3 deletions packages/apps/asana/src/strategies/asana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ export default class AsanaStrategy extends AbstractBatchJobStrategy {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.asanaService_.getAsanaData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
4 changes: 1 addition & 3 deletions packages/apps/confluence/src/strategies/confluence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ export default class ConfluenceStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
6 changes: 2 additions & 4 deletions packages/apps/github/src/strategies/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ class GithubStrategy extends AbstractBatchJobStrategy {

const stream = await this.githubService_.getRepositoriesOcular(batchJob.context?.org as Organisation)

stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
stream.on('data', (documents) => {
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});


Expand Down
4 changes: 1 addition & 3 deletions packages/apps/gmail/src/strategies/gmail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ class GmailStrategy extends AbstractBatchJobStrategy {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.gmailService_.getGmailData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
6 changes: 1 addition & 5 deletions packages/apps/google-drive/src/strategies/google-drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,20 @@ class GoogleDriveStrategy extends AbstractBatchJobStrategy {
static batchType = "google-drive"
protected batchJobService_: BatchJobService
protected googleDriveService_: GoogleDriveService
protected eventBusService_: EventBusService
protected queueService_: QueueService

constructor(container) {
super(arguments[0])
this.googleDriveService_ = container.googleDriveService
this.batchJobService_ = container.batchJobService
this.eventBusService_ = container.eventBusService
this.queueService_ = container.queueService
}

async processJob(batchJobId: string): Promise<void> {
const batchJob = await this.batchJobService_.retrieve(batchJobId)
const stream = await this.googleDriveService_.getGoogleDriveData(batchJob.context?.org as Organisation)
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
6 changes: 2 additions & 4 deletions packages/apps/jira/src/strategies/jira.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ export default class JiraStrategy extends AbstractBatchJobStrategy {
// Confluenec method need to be implemmented
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
stream.on('data', (documents) => {
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
6 changes: 2 additions & 4 deletions packages/apps/notion/src/strategies/notion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ export default class AsanaStrategy extends AbstractBatchJobStrategy {
const stream = await this.asanaService_.getNotionPagesData(
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
stream.on('data', (documents) => {
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
4 changes: 1 addition & 3 deletions packages/apps/slack/src/strategies/slack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ export default class SlackStrategy extends AbstractBatchJobStrategy {
batchJob.context?.org as Organisation
);
stream.on('data', (documents) => {
for (const document of documents) {
this.queueService_.send(SEARCH_INDEXING_TOPIC, document)
}
this.queueService_.sendBatch(SEARCH_INDEXING_TOPIC, documents)
});
stream.on('end', () => {
console.log('No more data');
Expand Down
14 changes: 13 additions & 1 deletion packages/ocular/src/services/__tests__/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ describe('queueService', () => {
clientId: 'ocular',
brokers: ['localhost:9092'],
})

const moduleDeps = {
logger: loggerMock,
eventBusRedisConnection: {},
Expand All @@ -51,4 +52,15 @@ describe('queueService', () => {
done()
}, {groupId: "ocular-group"});
});
})


it('should send batch messages to a topic', (done) => {
queueService.sendBatch("ocular", [{message: "Hello World 1"}, {message: "Hello World 2"}, {message: "Hello World 3"}, {message: "Hello World 4"}]);
queueService.subscribe("ocular", async (message, topic) => {
console.log("Message Received", message)
expect(message).toEqual("expectedMessage");
done()
}, {groupId: "ocular-group"});
});
}
)
25 changes: 25 additions & 0 deletions packages/ocular/src/services/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,31 @@ export default class QueueService extends AbstractQueueService {
}
}

async sendBatch<T>(
topicName: string,
data: T[],
options?: Record<string, unknown>
): Promise<void> {
try {
await this.producer_.connect()
const messages = data.map((doc) => {
return { value: JSON.stringify(doc) }
})
const topicMessages: TopicMessages = {
topic: topicName,
messages: messages
}
const batch: ProducerBatch = {
topicMessages: [topicMessages]
}
await this.producer_.sendBatch(batch)
await this.producer_.disconnect()
} catch (error) {
this.logger_.error(`Error sending batch message to Kafka: ${error.message}`)
throw error
}
}

async subscribe<T>(topicName: string, consumer: Consumer, context: ConsumerContext ): Promise<void> {
// Check if the consumer is a function
if (typeof consumer !== `function`) {
Expand Down
11 changes: 11 additions & 0 deletions packages/types/src/interfaces/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ export interface IQueueService {
data: T,
options?: Record<string, unknown>
): Promise<void>
sendBatch<T>(
topicName: string,
data: T[],
options?: Record<string, unknown>
) : Promise<void>
subscribe<T>(topicName: string, consumer: Consumer, context:ConsumerContext): Promise<void>
}

Expand Down Expand Up @@ -59,6 +64,12 @@ export abstract class AbstractQueueService implements IQueueService {
options?: Record<string, unknown>
): Promise<void>

abstract sendBatch<T>(
topicName: string,
data: T[],
options?: Record<string, unknown>
): Promise<void>

abstract subscribe<T>(topicName: string, consumer:Consumer, context:ConsumerContext): Promise<void>
}

0 comments on commit 9053200

Please sign in to comment.