Skip to content

Commit

Permalink
4017 improve queries on messages write (#4207)
Browse files Browse the repository at this point in the history
* modify code to reduce nested loops and improve performances

* is working

* fix lastSyncHistoryId

* create new service to share it betweent partial sync and full sync

* update partial sync

* update batch limit

* renaming

* adding logs

* update logs

* update logs

* update logs

* delete messages if error while saving the participants

* refactoring

* improving logs

* update logs

* delete historyId if outdated
  • Loading branch information
bosiraphael authored Feb 27, 2024
1 parent 16fe79b commit a19de71
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { CreateCompaniesAndContactsModule } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.module';
import { CompanyModule } from 'src/workspace/messaging/repositories/company/company.module';
import { PersonModule } from 'src/workspace/messaging/repositories/person/person.module';
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
import { MessagingConnectedAccountListener } from 'src/workspace/messaging/listeners/messaging-connected-account.listener';
@Module({
imports: [
Expand Down Expand Up @@ -54,6 +55,7 @@ import { MessagingConnectedAccountListener } from 'src/workspace/messaging/liste
MessagingWorkspaceMemberListener,
MessagingMessageChannelListener,
MessageService,
SaveMessagesAndCreateContactsService,
MessagingConnectedAccountListener,
],
exports: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,41 @@ export class ConnectedAccountService {
);
}

public async updateLastSyncHistoryIdIfHigher(
historyId: string,
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = $1
WHERE "id" = $2
AND ("lastSyncHistoryId" < $1 OR "lastSyncHistoryId" = '')`,
[historyId, connectedAccountId],
workspaceId,
transactionManager,
);
}

public async deleteHistoryId(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = '' WHERE "id" = $1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}

public async updateAccessToken(
accessToken: string,
connectedAccountId: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { MessageParticipantObjectMetadata } from 'src/workspace/workspace-sync-m
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
import {
ParticipantWithId,
Participant,
ParticipantWithMessageId,
} from 'src/workspace/messaging/types/gmail-message';
import { PersonService } from 'src/workspace/messaging/repositories/person/person.service';

Expand Down Expand Up @@ -138,8 +138,7 @@ export class MessageParticipantService {
}

public async saveMessageParticipants(
participants: Participant[],
messageId: string,
participants: ParticipantWithMessageId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
Expand Down Expand Up @@ -169,7 +168,7 @@ export class MessageParticipantService {
);

const messageParticipantsToSave = participants.map((participant) => [
messageId,
participant.messageId,
participant.role,
participant.handle,
participant.displayName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ export class MessageService {
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
gmailMessageChannelId: string,
workspaceId: string,
) {
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();

for (const message of messages) {
if (this.shouldSkipImport(message)) {
continue;
Expand Down Expand Up @@ -159,6 +161,11 @@ export class MessageService {
manager,
);

messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);

await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
Expand All @@ -171,6 +178,8 @@ export class MessageService {
);
});
}

return messageExternalIdsAndIdsMap;
}

private shouldSkipImport(message: GmailMessage): boolean {
Expand Down Expand Up @@ -216,53 +225,19 @@ export class MessageService {
],
);

const isContactAutoCreationEnabled =
await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail(
connectedAccount.id,
workspaceId,
);

if (isContactAutoCreationEnabled && messageDirection === 'outgoing') {
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
connectedAccount.handle,
message.participants,
workspaceId,
manager,
);

const handles = message.participants.map(
(participant) => participant.handle,
);

const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId(
handles,
workspaceId,
);

await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
manager,
);
}

await this.messageParticipantService.saveMessageParticipants(
message.participants,
newMessageId,
workspaceId,
manager,
);

return Promise.resolve(newMessageId);
}

public async deleteMessages(
workspaceDataSource: DataSource,
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);

await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';

import axios, { AxiosInstance, AxiosResponse } from 'axios';
import { simpleParser, AddressObject } from 'mailparser';
Expand All @@ -14,6 +14,7 @@ import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail-
@Injectable()
export class FetchMessagesByBatchesService {
private readonly httpService: AxiosInstance;
private readonly logger = new Logger(FetchMessagesByBatchesService.name);

constructor() {
this.httpService = axios.create({
Expand All @@ -24,22 +25,46 @@ export class FetchMessagesByBatchesService {
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
jobName?: string,
workspaceId?: string,
connectedAccountId?: string,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
let startTime = Date.now();
const batchResponses = await this.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
let endTime = Date.now();

return this.formatBatchResponsesAsGmailMessages(batchResponses);
this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length
} messages in ${endTime - startTime}ms`,
);

startTime = Date.now();

const formattedResponse =
await this.formatBatchResponsesAsGmailMessages(batchResponses);

endTime = Date.now();

this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length
} messages in ${endTime - startTime}ms`,
);

return formattedResponse;
}

async fetchAllByBatches(
queries: MessageQuery[],
accessToken: string,
boundary: string,
): Promise<AxiosResponse<any, any>[]> {
const batchLimit = 100;
const batchLimit = 50;

let batchOffset = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import {
import { ConnectedAccountService } from 'src/workspace/messaging/repositories/connected-account/connected-account.service';
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util';
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';

@Injectable()
export class GmailFullSyncService {
Expand All @@ -24,23 +23,17 @@ export class GmailFullSyncService {
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly messageService: MessageService,
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
) {}

public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
nextPageToken?: string,
): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);

const connectedAccount = await this.connectedAccountService.getByIdOrFail(
connectedAccountId,
workspaceId,
Expand All @@ -64,12 +57,22 @@ export class GmailFullSyncService {
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);

let startTime = Date.now();

const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults: 500,
pageToken: nextPageToken,
});

let endTime = Date.now();

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${
endTime - startTime
}ms.`,
);

const messagesData = messages.data.messages;

const messageExternalIds = messagesData
Expand All @@ -80,13 +83,23 @@ export class GmailFullSyncService {
return;
}

startTime = Date.now();

const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
gmailMessageChannelId,
workspaceId,
);

endTime = Date.now();

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${
endTime - startTime
}ms.`,
);

const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
Expand All @@ -102,27 +115,41 @@ export class GmailFullSyncService {

const messageQueries = createQueriesFromMessageIds(messagesToFetch);

startTime = Date.now();

const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
'gmail full-sync',
workspaceId,
connectedAccountId,
);

endTime = Date.now();

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${
endTime - startTime
}ms.`,
);

if (messagesToSave.length === 0) {
if (errors.length) throw new Error('Error fetching messages');

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);

return;
}

await this.messageService.saveMessages(
this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
workspaceId,
gmailMessageChannelId,
'gmail full-sync',
);

if (errors.length) throw new Error('Error fetching messages');
Expand All @@ -135,12 +162,22 @@ export class GmailFullSyncService {

if (!historyId) throw new Error('No history id found');

await this.connectedAccountService.updateLastSyncHistoryId(
startTime = Date.now();

await this.connectedAccountService.updateLastSyncHistoryIdIfHigher(
historyId,
connectedAccount.id,
workspaceId,
);

endTime = Date.now();

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${
endTime - startTime
}ms.`,
);

this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${
nextPageToken ? `and ${nextPageToken} pageToken` : ''
Expand Down
Loading

0 comments on commit a19de71

Please sign in to comment.