From 5ca7f6bb0be17cbd60c6c9951386030bded673ba Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 5 Jul 2024 17:33:26 +0700 Subject: [PATCH] UBERF-7520: Use Bulk for index query updates Signed-off-by: Andrey Sobolev --- server/core/src/indexer/indexer.ts | 43 +++++++--- server/server/src/backup.ts | 81 ++++++++++++------- .../sanity/tests/model/tracker/issues-page.ts | 11 +-- .../tests/tracker/issues-duplicate.spec.ts | 5 +- 4 files changed, 94 insertions(+), 46 deletions(-) diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 37712cfb970..61ebcd92ffb 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -79,6 +79,10 @@ export class FullTextIndexPipeline implements FullTextPipeline { indexId = indexCounter++ + updateTriggerTimer: any + updateOps = new Map, DocumentUpdate>() + uploadOps: DocIndexState[] = [] + constructor ( private readonly storage: DbAdapter, private readonly stages: FullTextPipelineStage[], @@ -96,6 +100,9 @@ export class FullTextIndexPipeline implements FullTextPipeline { this.cancelling = true clearTimeout(this.updateBroadcast) clearTimeout(this.skippedReiterationTimeout) + clearInterval(this.updateTriggerTimer) + // We need to upload all bulk changes. + await this.processUpload(this.metrics) this.triggerIndexing() await this.indexing await this.flush(true) @@ -168,6 +175,26 @@ export class FullTextIndexPipeline implements FullTextPipeline { return doc } + async processUpload (ctx: MeasureContext): Promise { + const ops = this.updateOps + this.updateOps = new Map() + const toUpload = this.uploadOps + this.uploadOps = [] + if (toUpload.length > 0) { + await ctx.with('upload', {}, async () => { + await this.storage.upload(this.metrics, DOMAIN_DOC_INDEX_STATE, toUpload) + }) + } + if (ops.size > 0) { + await ctx.with('update', {}, async () => { + await this.storage.update(this.metrics, DOMAIN_DOC_INDEX_STATE, ops) + }) + } + if (toUpload.length > 0 || ops.size > 0) { + this.triggerIndexing() + } + } + async queue ( ctx: MeasureContext, updates: Map, { create?: DocIndexState, updated: boolean, removed: boolean }> @@ -175,27 +202,20 @@ export class FullTextIndexPipeline implements FullTextPipeline { const entries = Array.from(updates.entries()) const uploads = entries.filter((it) => it[1].create !== undefined).map((it) => it[1].create) as DocIndexState[] if (uploads.length > 0) { - await ctx.with('upload', {}, async () => { - await this.storage.upload(this.metrics, DOMAIN_DOC_INDEX_STATE, uploads) - }) + this.uploadOps.push(...uploads) } const onlyUpdates = entries.filter((it) => it[1].create === undefined) if (onlyUpdates.length > 0) { - const ops = new Map, DocumentUpdate>() for (const u of onlyUpdates) { const upd: DocumentUpdate = { removed: u[1].removed } // We need to clear only first state, to prevent multiple index operations to happen. ;(upd as any)['stages.' + this.stages[0].stageId] = false - ops.set(u[0], upd) + this.updateOps.set(u[0], upd) } - await ctx.with('upload', {}, async () => { - await this.storage.update(this.metrics, DOMAIN_DOC_INDEX_STATE, ops) - }) } - this.triggerIndexing() } add (doc: DocIndexState): void { @@ -288,6 +308,11 @@ export class FullTextIndexPipeline implements FullTextPipeline { async startIndexing (): Promise { this.indexing = this.doIndexing() + + clearTimeout(this.updateTriggerTimer) + this.updateTriggerTimer = setInterval(() => { + void this.processUpload(this.metrics) + }, 250) } async initializeStages (): Promise { diff --git a/server/server/src/backup.ts b/server/server/src/backup.ts index 1d2d539f8a4..7de61b26988 100644 --- a/server/server/src/backup.ts +++ b/server/server/src/backup.ts @@ -41,40 +41,44 @@ export class BackupClientSession extends ClientSession implements BackupSession async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise { this.lastRequest = Date.now() await _ctx.ctx.with('load-chunk', { domain }, async (ctx) => { - idx = idx ?? this.idIndex++ - let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx) - if (chunk !== undefined) { - chunk.index++ - if (chunk.finished === undefined) { - return { - idx, - docs: [], - finished: true + try { + idx = idx ?? this.idIndex++ + let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx) + if (chunk !== undefined) { + chunk.index++ + if (chunk.finished === undefined) { + return { + idx, + docs: [], + finished: true + } } + } else { + chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 } + this.chunkInfo.set(idx, chunk) } - } else { - chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 } - this.chunkInfo.set(idx, chunk) - } - let size = 0 - const docs: DocInfo[] = [] + let size = 0 + const docs: DocInfo[] = [] + + while (size < chunkSize) { + const doc = await chunk.iterator.next(ctx) + if (doc === undefined) { + chunk.finished = true + break + } - while (size < chunkSize) { - const doc = await chunk.iterator.next(ctx) - if (doc === undefined) { - chunk.finished = true - break + size += estimateDocSize(doc) + docs.push(doc) } - size += estimateDocSize(doc) - docs.push(doc) + await _ctx.sendResponse({ + idx, + docs, + finished: chunk.finished + }) + } catch (err: any) { + await _ctx.sendResponse({ error: err.message }) } - - await _ctx.sendResponse({ - idx, - docs, - finished: chunk.finished - }) }) } @@ -92,18 +96,33 @@ export class BackupClientSession extends ClientSession implements BackupSession async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { this.lastRequest = Date.now() - await ctx.sendResponse(await this._pipeline.storage.load(ctx.ctx, domain, docs)) + try { + const result = await this._pipeline.storage.load(ctx.ctx, domain, docs) + await ctx.sendResponse(result) + } catch (err: any) { + await ctx.sendResponse({ error: err.message }) + } } async upload (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]): Promise { this.lastRequest = Date.now() - await this._pipeline.storage.upload(ctx.ctx, domain, docs) + try { + await this._pipeline.storage.upload(ctx.ctx, domain, docs) + } catch (err: any) { + await ctx.sendResponse({ error: err.message }) + return + } await ctx.sendResponse({}) } async clean (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { this.lastRequest = Date.now() - await this._pipeline.storage.clean(ctx.ctx, domain, docs) + try { + await this._pipeline.storage.clean(ctx.ctx, domain, docs) + } catch (err: any) { + await ctx.sendResponse({ error: err.message }) + return + } await ctx.sendResponse({}) } } diff --git a/tests/sanity/tests/model/tracker/issues-page.ts b/tests/sanity/tests/model/tracker/issues-page.ts index 8a56ce4786d..576671d5bfd 100644 --- a/tests/sanity/tests/model/tracker/issues-page.ts +++ b/tests/sanity/tests/model/tracker/issues-page.ts @@ -61,7 +61,6 @@ export class IssuesPage extends CommonTrackerPage { textPopupAddAttachmentsFile = (): Locator => this.page.locator('div.popup-tooltip div.item div.name') buttonCollapsedCategories = (): Locator => this.page.locator('div.categoryHeader.collapsed') pupupTagsPopup = (): Locator => this.page.locator('.popup#TagsPopup') - issupeByName = (issueName: string): Locator => this.page.locator('a', { hasText: issueName }) issueNotExist = (issueName: string): Locator => this.page.locator('tr', { hasText: issueName }) filterRowExists = (issueName: string): Locator => this.page.locator('div.row span', { hasText: issueName }) issueListGrid = (): Locator => this.page.locator('div.listGrid') @@ -491,7 +490,7 @@ export class IssuesPage extends CommonTrackerPage { } async openIssueByName (issueName: string): Promise { - await this.issupeByName(issueName).click() + await this.issueByName(issueName).click() } async checkIssueNotExist (issueName: string): Promise { @@ -519,7 +518,7 @@ export class IssuesPage extends CommonTrackerPage { } async doActionOnIssue (issueName: string, action: string): Promise { - await this.issupeByName(issueName).click({ button: 'right' }) + await this.issueByName(issueName).click({ button: 'right' }) await this.selectFromDropdown(this.page, action) } @@ -543,8 +542,10 @@ export class IssuesPage extends CommonTrackerPage { await this.issueAnchorById(issueId).click() } - async checkIssuesCount (issueName: string, count: number): Promise { - await expect(this.issueAnchorByName(issueName)).toHaveCount(count) + async checkIssuesCount (issueName: string, count: number, timeout?: number): Promise { + await expect(this.issueAnchorByName(issueName)).toHaveCount(count, { + timeout: timeout !== undefined ? timeout * 1000 : undefined + }) } async selectTemplate (templateName: string): Promise { diff --git a/tests/sanity/tests/tracker/issues-duplicate.spec.ts b/tests/sanity/tests/tracker/issues-duplicate.spec.ts index d08185fec5d..e1b08bc1765 100644 --- a/tests/sanity/tests/tracker/issues-duplicate.spec.ts +++ b/tests/sanity/tests/tracker/issues-duplicate.spec.ts @@ -52,10 +52,13 @@ test.describe('Tracker duplicate issue tests', () => { await trackerNavigationMenuPage.openTemplateForProject('Default') await trackerNavigationMenuPage.openIssuesForProject('Default') await issuesPage.searchIssueByName(secondIssue.title) + + await issuesPage.checkIssuesCount(secondIssue.title, 2, 30) + const secondIssueId = await issuesPage.getIssueId(secondIssue.title, 0) expect(firstIssueId).not.toEqual(secondIssueId) - await issuesPage.checkIssuesCount(firstIssue.title, 2) + await issuesPage.checkIssuesCount(firstIssue.title, 2, 30) await test.step('Update the first issue title', async () => { const newIssueTitle = `Duplicate Update issue-${generateId()}`