From 441bb98bd1ea5fad6626575be39ef3b7a903c9ed Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 4 Apr 2023 01:44:33 +0700 Subject: [PATCH] Fix concurrency and full text refresh event Signed-off-by: Andrey Sobolev --- packages/core/src/component.ts | 4 +- packages/core/src/tx.ts | 26 ++++- packages/presentation/src/utils.ts | 9 +- packages/query/src/index.ts | 97 ++++++++++++++----- plugins/client-resources/src/connection.ts | 9 +- .../src/components/list/ListCategories.svelte | 2 +- server/core/src/indexer/indexer.ts | 18 +++- server/core/src/storage.ts | 22 ++++- 8 files changed, 148 insertions(+), 39 deletions(-) diff --git a/packages/core/src/component.ts b/packages/core/src/component.ts index 0872e215c6d..d9fd4442391 100644 --- a/packages/core/src/component.ts +++ b/packages/core/src/component.ts @@ -55,7 +55,8 @@ import type { TxMixin, TxModelUpgrade, TxRemoveDoc, - TxUpdateDoc + TxUpdateDoc, + TxWorkspaceEvent } from './tx' /** @@ -79,6 +80,7 @@ export default plugin(coreId, { Attribute: '' as Ref>, Tx: '' as Ref>, TxModelUpgrade: '' as Ref>, + TxWorkspaceEvent: '' as Ref>, TxApplyIf: '' as Ref>, TxCUD: '' as Ref>>, TxCreateDoc: '' as Ref>>, diff --git a/packages/core/src/tx.ts b/packages/core/src/tx.ts index 86d8e25fabd..e7fa57f1806 100644 --- a/packages/core/src/tx.ts +++ b/packages/core/src/tx.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import justClone from 'just-clone' import type { KeysByType } from 'simplytyped' import type { Account, @@ -34,7 +35,6 @@ import { _getOperator } from './operator' import { _toDoc } from './proxy' import type { DocumentQuery, TxResult } from './storage' import { generateId } from './utils' -import justClone from 'just-clone' /** * @public @@ -43,10 +43,34 @@ export interface Tx extends Doc { objectSpace: Ref // space where transaction will operate } +/** + * @public + */ +export enum WorkspaceEvent { + UpgradeScheduled, + Upgrade, + IndexingUpdate +} + /** * Event to be send by server during model upgrade procedure. * @public */ +export interface TxWorkspaceEvent extends Tx { + event: WorkspaceEvent + params: any +} + +/** + * @public + */ +export interface IndexingUpdateEvent { + _class: Ref>[] +} + +/** + * @public + */ export interface TxModelUpgrade extends Tx {} /** diff --git a/packages/presentation/src/utils.ts b/packages/presentation/src/utils.ts index bbbe2b0a165..dee4001bf8d 100644 --- a/packages/presentation/src/utils.ts +++ b/packages/presentation/src/utils.ts @@ -186,16 +186,17 @@ export class LiveQuery { callback: (result: FindResult) => void, options: FindOptions | undefined ): Promise { + const piplineQuery = await pipeline.subscribe(_class, query, options, () => { + // Refresh query if pipeline decide it is required. + this.refreshClient() + }) + this.unsubscribe() this.oldCallback = callback this.oldClass = _class this.oldOptions = options this.oldQuery = query - const piplineQuery = await pipeline.subscribe(_class, query, options, () => { - // Refresh query if pipeline decide it is required. - this.refreshClient() - }) const unsub = liveQuery.query(_class, piplineQuery.query ?? query, callback, piplineQuery.options ?? options) this.unsubscribe = () => { unsub() diff --git a/packages/query/src/index.ts b/packages/query/src/index.ts index 9afe9c002b5..ddfe68ab0ed 100644 --- a/packages/query/src/index.ts +++ b/packages/query/src/index.ts @@ -24,8 +24,10 @@ import core, { FindOptions, findProperty, FindResult, + generateId, getObjectValue, Hierarchy, + IndexingUpdateEvent, Lookup, LookupData, matchQuery, @@ -43,7 +45,9 @@ import core, { TxRemoveDoc, TxResult, TxUpdateDoc, - WithLookup + TxWorkspaceEvent, + WithLookup, + WorkspaceEvent } from '@hcengineering/core' import { deepEqual } from 'fast-equals' @@ -57,7 +61,7 @@ interface Query { result: Doc[] | Promise options?: FindOptions total: number - callbacks: Callback[] + callbacks: Map } /** @@ -138,12 +142,10 @@ export class LiveQuery extends TxProcessor implements Client { options?: FindOptions ): Query { const callback: () => void = () => {} - const q = this.createQuery(_class, query, callback, options) - const index = q.callbacks.indexOf(callback as (result: Doc[]) => void) - if (index !== -1) { - q.callbacks.splice(index, 1) - } - if (q.callbacks.length === 0) { + const callbackId = generateId() + const q = this.createQuery(_class, query, { callback, callbackId }, options) + q.callbacks.delete(callbackId) + if (q.callbacks.size === 0) { this.queue.push(q) } return q @@ -213,7 +215,7 @@ export class LiveQuery extends TxProcessor implements Client { } private removeFromQueue (q: Query): boolean { - if (q.callbacks.length === 0) { + if (q.callbacks.size === 0) { const queueIndex = this.queue.indexOf(q) if (queueIndex !== -1) { this.queue.splice(queueIndex, 1) @@ -223,8 +225,14 @@ export class LiveQuery extends TxProcessor implements Client { return false } - private pushCallback (q: Query, callback: (result: Doc[]) => void): void { - q.callbacks.push(callback) + private pushCallback ( + q: Query, + callback: { + callback: (result: Doc[]) => void + callbackId: string + } + ): void { + q.callbacks.set(callback.callbackId, callback.callback) setTimeout(async () => { if (q !== undefined) { await this.callback(q) @@ -235,7 +243,10 @@ export class LiveQuery extends TxProcessor implements Client { private getQuery( _class: Ref>, query: DocumentQuery, - callback: (result: Doc[]) => void, + callback: { + callback: (result: Doc[]) => void + callbackId: string + }, options?: FindOptions ): Query | undefined { const current = this.findQuery(_class, query, options) @@ -250,7 +261,7 @@ export class LiveQuery extends TxProcessor implements Client { private createQuery( _class: Ref>, query: DocumentQuery, - callback: (result: FindResult) => void, + callback: { callback: (result: FindResult) => void, callbackId: string }, options?: FindOptions ): Query { const queries = this.queries.get(_class) ?? [] @@ -261,8 +272,9 @@ export class LiveQuery extends TxProcessor implements Client { result, total: 0, options: options as FindOptions, - callbacks: [callback as (result: Doc[]) => void] + callbacks: new Map() } + q.callbacks.set(callback.callbackId, callback.callback as unknown as Callback) queries.push(q) result .then(async (result) => { @@ -303,16 +315,14 @@ export class LiveQuery extends TxProcessor implements Client { modifiedOn: 1 } } + const callbackId = generateId() const q = - this.getQuery(_class, query, callback as (result: Doc[]) => void, options) ?? - this.createQuery(_class, query, callback, options) + this.getQuery(_class, query, { callback: callback as (result: Doc[]) => void, callbackId }, options) ?? + this.createQuery(_class, query, { callback, callbackId }, options) return () => { - const index = q.callbacks.indexOf(callback as (result: Doc[]) => void) - if (index !== -1) { - q.callbacks.splice(index, 1) - } - if (q.callbacks.length === 0) { + q.callbacks.delete(callbackId) + if (q.callbacks.size === 0) { this.queue.push(q) } } @@ -329,12 +339,16 @@ export class LiveQuery extends TxProcessor implements Client { return true } else { const pos = q.result.findIndex((p) => p._id === _id) - q.result.splice(pos, 1) - q.total-- + if (pos !== -1) { + q.result.splice(pos, 1) + q.total-- + } } } else { const pos = q.result.findIndex((p) => p._id === _id) - q.result[pos] = match + if (pos !== -1) { + q.result[pos] = match + } } return false } @@ -772,7 +786,7 @@ export class LiveQuery extends TxProcessor implements Client { q.result = await q.result } const result = q.result - q.callbacks.forEach((callback) => { + Array.from(q.callbacks.values()).forEach((callback) => { callback(toFindResult(this.clone(result), q.total)) }) } @@ -938,9 +952,42 @@ export class LiveQuery extends TxProcessor implements Client { } async tx (tx: Tx): Promise { + if (tx._class === core.class.TxWorkspaceEvent) { + await this.checkUpdateFulltextQueries(tx) + return {} + } return await super.tx(tx) } + private async checkUpdateFulltextQueries (tx: Tx): Promise { + const evt = tx as TxWorkspaceEvent + if (evt.event === WorkspaceEvent.IndexingUpdate) { + const indexingParam = evt.params as IndexingUpdateEvent + for (const q of [...this.queue]) { + if (indexingParam._class.includes(q._class) && q.query.$search !== undefined) { + if (!(await this.removeFromQueue(q))) { + try { + await this.refresh(q) + } catch (err) { + console.error(err) + } + } + } + } + for (const v of this.queries.values()) { + for (const q of v) { + if (indexingParam._class.includes(q._class) && q.query.$search !== undefined) { + try { + await this.refresh(q) + } catch (err) { + console.error(err) + } + } + } + } + } + } + private async __updateLookup (q: Query, updatedDoc: WithLookup, ops: any): Promise { for (const key in ops) { if (!key.startsWith('$')) { diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index ddd11fa0d29..f665bb16de5 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -29,7 +29,9 @@ import core, { Tx, TxApplyIf, TxHandler, - TxResult + TxResult, + TxWorkspaceEvent, + WorkspaceEvent } from '@hcengineering/core' import { getMetadata, @@ -167,7 +169,10 @@ class Connection implements ClientConnection { } } else { const tx = resp.result as Tx - if (tx?._class === core.class.TxModelUpgrade) { + if ( + (tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) || + tx?._class === core.class.TxModelUpgrade + ) { console.log('Processing upgrade') websocket.send( serialize({ diff --git a/plugins/view-resources/src/components/list/ListCategories.svelte b/plugins/view-resources/src/components/list/ListCategories.svelte index 6c9125bf46b..f0a3576a2d2 100644 --- a/plugins/view-resources/src/components/list/ListCategories.svelte +++ b/plugins/view-resources/src/components/list/ListCategories.svelte @@ -122,7 +122,7 @@ const dispatch = createEventDispatcher() -{#each categories as category, i (category)} +{#each categories as category, i (typeof category === 'object' ? category.name : category)} {@const items = groupByKey === noCategory || category === undefined ? docs : getGroupByValues(groupByDocs, category)} >[]) => void ) { this.readyStages = stages.map((it) => it.stageId) this.readyStages.sort() @@ -261,18 +262,24 @@ export class FullTextIndexPipeline implements FullTextPipeline { return } await this.initStates() + const classes = new Set>>() while (!this.cancelling) { await this.initializeStages() await this.processRemove() console.log('Indexing:', this.indexId, this.workspace) - await rateLimitter.exec(() => this.processIndex()) + const _classes = await rateLimitter.exec(() => this.processIndex()) + _classes.forEach((it) => classes.add(it)) if (this.toIndex.size === 0 || this.stageChanged === 0) { if (this.toIndex.size === 0) { console.log(`${this.workspace.name} Indexing complete`, this.indexId) } if (!this.cancelling) { + // We need to send index update event + this.broadcastUpdate(Array.from(classes.values())) + classes.clear() + await new Promise((resolve) => { this.triggerIndexing = () => { resolve(null) @@ -291,14 +298,15 @@ export class FullTextIndexPipeline implements FullTextPipeline { console.log('Exit indexer', this.indexId, this.workspace) } - private async processIndex (): Promise { + private async processIndex (): Promise>[]> { let idx = 0 + const _classUpdate = new Set>>() for (const st of this.stages) { idx++ while (true) { try { if (this.cancelling) { - return + return Array.from(_classUpdate.values()) } if (!st.enabled) { break @@ -347,6 +355,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { // Do Indexing this.currentStage = st await st.collect(toIndex, this) + toIndex.forEach((it) => _classUpdate.add(it.objectClass)) // go with next stages if they accept it @@ -374,6 +383,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } } + return Array.from(_classUpdate.values()) } private async processRemove (): Promise { diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index 6eacd6770af..105a92ee317 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -29,7 +29,9 @@ import core, { DOMAIN_TX, FindOptions, FindResult, + generateId, Hierarchy, + IndexingUpdateEvent, MeasureContext, Mixin, ModelDb, @@ -45,6 +47,8 @@ import core, { TxRemoveDoc, TxResult, TxUpdateDoc, + TxWorkspaceEvent, + WorkspaceEvent, WorkspaceId } from '@hcengineering/core' import { MinioService } from '@hcengineering/minio' @@ -818,7 +822,23 @@ export async function createServerStorage ( hierarchy, conf.workspace, metrics.newChild('fulltext', {}), - modelDb + modelDb, + (classes: Ref>[]) => { + const evt: IndexingUpdateEvent = { + _class: classes + } + const tx: TxWorkspaceEvent = { + _class: core.class.TxWorkspaceEvent, + _id: generateId(), + event: WorkspaceEvent.IndexingUpdate, + modifiedBy: core.account.System, + modifiedOn: Date.now(), + objectSpace: core.space.DerivedTx, + space: core.space.DerivedTx, + params: evt + } + options.broadcast?.([tx]) + } ) return new FullTextIndex( hierarchy,