Skip to content

Commit

Permalink
Add workspace event send threshold and fix live query update
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo committed Apr 4, 2023
1 parent f6755e2 commit 763a660
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
7 changes: 7 additions & 0 deletions packages/presentation/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export class LiveQuery {
private oldQuery: DocumentQuery<Doc> | undefined
private oldOptions: FindOptions<Doc> | undefined
private oldCallback: ((result: FindResult<any>) => void) | undefined
private reqId = 0
unsubscribe = () => {}
clientRecreated = false

Expand Down Expand Up @@ -186,10 +187,16 @@ export class LiveQuery {
callback: (result: FindResult<T>) => void,
options: FindOptions<T> | undefined
): Promise<void> {
const id = ++this.reqId
const piplineQuery = await pipeline.subscribe(_class, query, options, () => {
// Refresh query if pipeline decide it is required.
this.refreshClient()
})
if (id !== this.reqId) {
// If we have one more request after this one, no need to do something.
piplineQuery.unsubscribe()
return
}

this.unsubscribe()
this.oldCallback = callback
Expand Down
13 changes: 9 additions & 4 deletions server/core/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}

broadcastClasses = new Set<Ref<Class<Doc>>>()
updateBroadcast: any = undefined

async doIndexing (): Promise<void> {
// Check model is upgraded to support indexer.

Expand All @@ -262,23 +265,25 @@ export class FullTextIndexPipeline implements FullTextPipeline {
return
}
await this.initStates()
const classes = new Set<Ref<Class<Doc>>>()
while (!this.cancelling) {
await this.initializeStages()
await this.processRemove()

console.log('Indexing:', this.indexId, this.workspace)
const _classes = await rateLimitter.exec(() => this.processIndex())
_classes.forEach((it) => classes.add(it))
_classes.forEach((it) => this.broadcastClasses.add(it))

if (this.toIndex.size === 0 || this.stageChanged === 0) {
if (this.toIndex.size === 0) {
console.log(`${this.workspace.name} Indexing complete`, this.indexId)
}
if (!this.cancelling) {
// We need to send index update event
this.broadcastUpdate(Array.from(classes.values()))
classes.clear()
clearTimeout(this.updateBroadcast)
this.updateBroadcast = setTimeout(() => {
this.broadcastUpdate(Array.from(this.broadcastClasses.values()))
this.broadcastClasses.clear()
}, 5000)

await new Promise((resolve) => {
this.triggerIndexing = () => {
Expand Down
1 change: 1 addition & 0 deletions server/core/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ export async function createServerStorage (
throw new Error('No storage adapter')
}
const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter)

const indexer = new FullTextIndexPipeline(
defaultAdapter,
stages,
Expand Down

0 comments on commit 763a660

Please sign in to comment.