From 8075bbef633ea060d71a248007687e8293cd590f Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 5 Jul 2024 14:31:29 +0700 Subject: [PATCH] UBERF-7510: add logging and catch errors on cleanup (#6003) Signed-off-by: Andrey Sobolev --- models/core/src/core.ts | 6 - models/core/src/index.ts | 2 - packages/core/src/classes.ts | 8 - packages/core/src/component.ts | 2 - packages/core/src/utils.ts | 2 - packages/query/src/index.ts | 5 +- server/account/src/operations.ts | 1 - server/backup/src/backup.ts | 93 ++++++--- server/core/src/indexer/indexer.ts | 10 +- server/core/src/indexer/types.ts | 5 + server/elastic/src/__tests__/backup.test.ts | 63 ------ server/elastic/src/adapter.ts | 7 +- server/elastic/src/backup.ts | 216 ++------------------ tests/sanity/tests/chat/chat.spec.ts | 8 + tests/sanity/tests/inbox/inbox.spec.ts | 6 + tests/sanity/tests/workspace/create.spec.ts | 3 + 16 files changed, 116 insertions(+), 321 deletions(-) delete mode 100644 server/elastic/src/__tests__/backup.test.ts diff --git a/models/core/src/core.ts b/models/core/src/core.ts index 3debc7ac83d..dffd58af235 100644 --- a/models/core/src/core.ts +++ b/models/core/src/core.ts @@ -17,7 +17,6 @@ import { DOMAIN_BLOB, DOMAIN_CONFIGURATION, DOMAIN_DOC_INDEX_STATE, - DOMAIN_FULLTEXT_BLOB, DOMAIN_MIGRATION, DOMAIN_MODEL, IndexKind, @@ -38,7 +37,6 @@ import { type Enum, type EnumOf, type FieldIndex, - type FullTextData, type FullTextSearchContext, type IndexStageState, type IndexingConfiguration, @@ -309,10 +307,6 @@ export class TPluginConfiguration extends TDoc implements PluginConfiguration { enabled!: boolean beta!: boolean } -@Model(core.class.FulltextData, core.class.Doc, DOMAIN_FULLTEXT_BLOB) -export class TFulltextData extends TDoc implements FullTextData { - data!: any -} @Model(core.class.DocIndexState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) export class TDocIndexState extends TDoc implements DocIndexState { diff --git a/models/core/src/index.ts b/models/core/src/index.ts index 382bb7180ca..c4df62fdb69 100644 --- a/models/core/src/index.ts +++ b/models/core/src/index.ts @@ -49,7 +49,6 @@ import { TEnum, TEnumOf, TFullTextSearchContext, - TFulltextData, TIndexConfiguration, TIndexStageState, TInterface, @@ -164,7 +163,6 @@ export function createModel (builder: Builder): void { TUserStatus, TEnum, TTypeAny, - TFulltextData, TTypeRelatedDocument, TDocIndexState, TIndexStageState, diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index da3af8dd9ab..6db82fc8eb5 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -494,14 +494,6 @@ export function versionToString (version: Version | Data): string { return `${version?.major}.${version?.minor}.${version?.patch}` } -/** - * Blob data from s3 storage - * @public - */ -export interface FullTextData extends Doc { - data: any -} - /** * @public * diff --git a/packages/core/src/component.ts b/packages/core/src/component.ts index 92ec1ad5b63..2af98232608 100644 --- a/packages/core/src/component.ts +++ b/packages/core/src/component.ts @@ -30,7 +30,6 @@ import type { DomainIndexConfiguration, Enum, EnumOf, - FullTextData, FullTextSearchContext, Hyperlink, IndexStageState, @@ -134,7 +133,6 @@ export default plugin(coreId, { Version: '' as Ref>, PluginConfiguration: '' as Ref>, UserStatus: '' as Ref>, - FulltextData: '' as Ref>, TypeRelatedDocument: '' as Ref>>, DocIndexState: '' as Ref>, IndexStageState: '' as Ref>, diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 299f6fd9851..4d185dc29c9 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -28,7 +28,6 @@ import { DocIndexState, DOMAIN_BLOB, DOMAIN_DOC_INDEX_STATE, - DOMAIN_FULLTEXT_BLOB, DOMAIN_MODEL, DOMAIN_TRANSIENT, FullTextSearchContext, @@ -713,7 +712,6 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref>): boo domain === DOMAIN_TX || domain === DOMAIN_MODEL || domain === DOMAIN_BLOB || - domain === DOMAIN_FULLTEXT_BLOB || domain === DOMAIN_TRANSIENT ) { hierarchy.setClassifierProp(c, 'class_indexed', false) diff --git a/packages/query/src/index.ts b/packages/query/src/index.ts index 42b1e4728c1..d7621db5442 100644 --- a/packages/query/src/index.ts +++ b/packages/query/src/index.ts @@ -1276,9 +1276,8 @@ export class LiveQuery implements WithTx, Client { for (const tx of txes) { if (tx._class === core.class.TxWorkspaceEvent) { const evt = tx as TxWorkspaceEvent - console.info('checking workspace event', evt._id, evt.params) - await this.checkUpdateEvents(tx as TxWorkspaceEvent) - await this.changePrivateHandler(tx as TxWorkspaceEvent) + await this.checkUpdateEvents(evt) + await this.changePrivateHandler(evt) } result.push(await this._tx(tx, docCache)) } diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 2adc5424612..50e1774b3a6 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -970,7 +970,6 @@ export async function createWorkspace ( async (value) => { await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) }) }, - true, getStorageAdapter() ) const modelVersion = getModelVersion() diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 2676fda96ad..dfd6498dff5 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -14,12 +14,15 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import core, { AttachedDoc, BackupClient, Client as CoreClient, Doc, Domain, + DOMAIN_BLOB, + DOMAIN_DOC_INDEX_STATE, DOMAIN_FULLTEXT_BLOB, DOMAIN_MODEL, DOMAIN_TRANSIENT, @@ -33,7 +36,7 @@ import core, { type Blob, type DocIndexState } from '@hcengineering/core' -import type { StorageAdapter } from '@hcengineering/server-core' +import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core' import { BlobClient, connect } from '@hcengineering/server-tool' import { mkdtemp, writeFile } from 'node:fs/promises' import { PassThrough } from 'node:stream' @@ -43,7 +46,6 @@ import { Writable } from 'stream' import { extract, Pack, pack } from 'tar-stream' import { createGunzip, gunzipSync, gzipSync } from 'zlib' import { BackupStorage } from './storage' -import { Analytics } from '@hcengineering/analytics' export * from './storage' const dataBlobSize = 50 * 1024 * 1024 @@ -231,7 +233,6 @@ export async function cloneWorkspace ( targetWorkspaceId: WorkspaceId, clearTime: boolean = true, progress: (value: number) => Promise, - skipFullText: boolean, storageAdapter: StorageAdapter ): Promise { await ctx.with( @@ -264,10 +265,6 @@ export async function cloneWorkspace ( let i = 0 for (const c of domains) { - if (skipFullText && c === DOMAIN_FULLTEXT_BLOB) { - ctx.info('clone skip domain...', { domain: c, workspace: targetWorkspaceId.name }) - continue - } ctx.info('clone domain...', { domain: c, workspace: targetWorkspaceId.name }) // We need to clean target connection before copying something. @@ -347,7 +344,7 @@ export async function cloneWorkspace ( try { docs = await ctx.with('load-docs', {}, async (ctx) => await sourceConnection.loadDocs(c, needRetrieve)) if (clearTime) { - docs = prepareClonedDocuments(docs, sourceConnection, skipFullText) + docs = prepareClonedDocuments(docs, sourceConnection) } const executor = new RateLimiter(10) for (const d of docs) { @@ -422,11 +419,7 @@ export async function cloneWorkspace ( ) } -function prepareClonedDocuments ( - docs: Doc[], - sourceConnection: CoreClient & BackupClient, - skipFullText: boolean -): Doc[] { +function prepareClonedDocuments (docs: Doc[], sourceConnection: CoreClient & BackupClient): Doc[] { docs = docs.map((p) => { let collectionCud = false try { @@ -436,8 +429,13 @@ function prepareClonedDocuments ( } // if full text is skipped, we need to clean stages for indexes. - if (p._class === core.class.DocIndexState && skipFullText) { - ;(p as DocIndexState).stages = {} + if (p._class === core.class.DocIndexState) { + for (const k of Object.keys((p as DocIndexState).stages)) { + if (k.startsWith(fullTextPushStagePrefix)) { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete (p as DocIndexState).stages[k] + } + } } if (collectionCud) { @@ -556,6 +554,7 @@ export async function backup ( (it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && + it !== ('fulltext-blob' as Domain) && !options.skipDomains.includes(it) && (options.include === undefined || options.include.has(it)) ) @@ -1090,6 +1089,7 @@ export async function restore ( const infoFile = 'backup.json.gz' if (!(await storage.exists(infoFile))) { + ctx.error('file not pressent', { file: infoFile }) throw new Error(`${infoFile} should present to restore`) } const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) @@ -1097,13 +1097,14 @@ export async function restore ( if (opt.date !== -1) { const bk = backupInfo.snapshots.findIndex((it) => it.date === opt.date) if (bk === -1) { + ctx.error('could not restore to', { date: opt.date, file: infoFile, workspaceId: workspaceId.name }) throw new Error(`${infoFile} could not restore to ${opt.date}. Snapshot is missing.`) } snapshots = backupInfo.snapshots.slice(0, bk + 1) } else { opt.date = snapshots[snapshots.length - 1].date } - console.log('restore to ', opt.date, new Date(opt.date)) + ctx.info('restore to ', { id: opt.date, date: new Date(opt.date).toDateString() }) const rsnapshots = Array.from(snapshots).reverse() // Collect all possible domains @@ -1112,7 +1113,7 @@ export async function restore ( Object.keys(s.domains).forEach((it) => domains.add(it as Domain)) } - console.log('connecting:', transactorUrl, workspaceId.name) + ctx.info('connecting:', { transactorUrl, workspace: workspaceId.name }) const connection = (await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' @@ -1127,6 +1128,9 @@ export async function restore ( domains.add(d) } + // We do not backup elastic anymore + domains.delete('fulltext-blob' as Domain) + let uploadedMb = 0 let uploaded = 0 @@ -1138,7 +1142,8 @@ export async function restore ( uploadedMb = newId ctx.info('Uploaded', { msg, - written: newDownloadedMb + written: newDownloadedMb, + workspace: workspaceId.name }) } } @@ -1167,7 +1172,7 @@ export async function restore ( } if (el > 2500) { - console.log(c, ' loaded from server', loaded, el, chunks) + ctx.info('loaded from server', { domain: c, loaded, el, chunks, workspace: workspaceId.name }) el = 0 chunks = 0 } @@ -1180,8 +1185,12 @@ export async function restore ( await connection.closeChunk(idx) } } - console.log(' loaded', loaded) - console.log('\tcompare documents', changeset.size, serverChangeset.size) + ctx.info('loaded', { loaded, workspace: workspaceId.name }) + ctx.info('\tcompare documents', { + size: changeset.size, + serverSize: serverChangeset.size, + workspace: workspaceId.name + }) // Let's find difference const docsToAdd = new Map( @@ -1208,7 +1217,13 @@ export async function restore ( if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { totalSend += docs.length - console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) + ctx.info('upload', { + docs: docs.length, + totalSend, + from: docsToAdd.size + totalSend, + sendSize, + workspace: workspaceId.name + }) await connection.upload(c, docs) docs.length = 0 sendSize = 0 @@ -1224,13 +1239,13 @@ export async function restore ( const sDigest = await loadDigest(ctx, storage, [s], c) const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) if (requiredDocs.size > 0) { - console.log('updating', c, requiredDocs.size) + ctx.info('updating', { domain: c, requiredDocs: requiredDocs.size, workspace: workspaceId.name }) // We have required documents here. for (const sf of d.storage ?? []) { if (docsToAdd.size === 0) { break } - console.log('processing', sf, processed) + ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId.name }) const readStream = await storage.load(sf) const ex = extract() @@ -1332,11 +1347,27 @@ export async function restore ( } await sendChunk(undefined, 0) - if (docsToRemove.length > 0 && opt.merge !== true) { - console.log('cleanup', docsToRemove.length) + async function performCleanOfDomain (docsToRemove: Ref[], c: Domain): Promise { + ctx.info('cleanup', { toRemove: docsToRemove.length, workspace: workspaceId.name, domain: c }) while (docsToRemove.length > 0) { const part = docsToRemove.splice(0, 10000) - await connection.clean(c, part) + try { + await connection.clean(c, part) + } catch (err: any) { + ctx.error('failed to clean, will retry', { error: err, workspaceId: workspaceId.name }) + docsToRemove.push(...part) + } + } + } + if (c !== DOMAIN_BLOB) { + // Clean domain documents if not blob + if (docsToRemove.length > 0 && opt.merge !== true) { + if (c === DOMAIN_DOC_INDEX_STATE) { + // We need o clean a FULLTEXT domain as well + await performCleanOfDomain([...docsToRemove], DOMAIN_FULLTEXT_BLOB) + } + + await performCleanOfDomain(docsToRemove, c) } } } @@ -1352,7 +1383,7 @@ export async function restore ( continue } await limiter.exec(async () => { - console.log('processing domain', c) + ctx.info('processing domain', { domain: c, workspaceId: workspaceId.name }) let retry = 5 let delay = 1 while (retry > 0) { @@ -1360,13 +1391,13 @@ export async function restore ( try { await processDomain(c) if (delay > 1) { - console.log('retry-success') + ctx.warn('retry-success', { retry, delay, workspaceId: workspaceId.name }) } break } catch (err: any) { - console.error('error', err) + ctx.error('failed to process domain', { err, domain: c, workspaceId: workspaceId.name }) if (retry !== 0) { - console.log('cool-down to retry', delay) + ctx.warn('cool-down to retry', { delay, domain: c, workspaceId: workspaceId.name }) await new Promise((resolve) => setTimeout(resolve, delay * 1000)) delay++ } diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index ce110941714..37712cfb970 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import core, { type Class, DOMAIN_DOC_INDEX_STATE, @@ -37,7 +38,6 @@ import { type DbAdapter } from '../adapter' import { RateLimiter } from '../limitter' import type { IndexedDoc } from '../types' import { type FullTextPipeline, type FullTextPipelineStage } from './types' -import { Analytics } from '@hcengineering/analytics' export * from './content' export * from './field' @@ -383,8 +383,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { // We need to send index update event clearTimeout(this.updateBroadcast) this.updateBroadcast = setTimeout(() => { - this.broadcastUpdate(Array.from(this.broadcastClasses.values())) - this.broadcastClasses.clear() + this.broadcastClasses.delete(core.class.DocIndexState) + if (this.broadcastClasses.size > 0) { + const toSend = Array.from(this.broadcastClasses.values()) + this.broadcastClasses.clear() + this.broadcastUpdate(toSend) + } }, 5000) await new Promise((resolve) => { diff --git a/server/core/src/indexer/types.ts b/server/core/src/indexer/types.ts index fa1deb3ee79..3ce2f3c9524 100644 --- a/server/core/src/indexer/types.ts +++ b/server/core/src/indexer/types.ts @@ -108,3 +108,8 @@ export const fieldStateId = 'fld-v15' * @public */ export const fullTextPushStageId = 'fts-v17' + +/** + * @public + */ +export const fullTextPushStagePrefix = 'fts-' diff --git a/server/elastic/src/__tests__/backup.test.ts b/server/elastic/src/__tests__/backup.test.ts deleted file mode 100644 index 7013346b52c..00000000000 --- a/server/elastic/src/__tests__/backup.test.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { DbAdapter } from '@hcengineering/server-core' -import { Domain, getWorkspaceId, Hierarchy, MeasureMetricsContext } from '@hcengineering/core' -import { createElasticBackupDataAdapter } from '../backup' - -import { Client } from '@elastic/elasticsearch' - -describe('Elastic Data Adapter', () => { - const url = process.env.ELASTIC_URL ?? 'http://localhost:9200/' - const domain = 'test' as Domain - - let adapter: DbAdapter - - beforeEach(async () => { - adapter = await createElasticBackupDataAdapter( - new MeasureMetricsContext('test', {}), - new Hierarchy(), - url, - getWorkspaceId('ws1', '') - ) - }) - - afterEach(async () => { - await adapter.close() - }) - - it('should init', () => { - expect(adapter).toBeTruthy() - }) - - describe('Scroll Contexts', () => { - let client: Client - - beforeEach(async () => { - client = new Client({ node: url }) - await client.cluster.putSettings({ - body: { - persistent: { 'search.max_open_scroll_context': '2' }, - transient: { 'search.max_open_scroll_context': '2' } - } - }) - }) - - // Use afterEach() to make sure we clean up even if test fail - afterEach(async () => { - await client.cluster.putSettings({ - body: { - persistent: { 'search.max_open_scroll_context': null }, - transient: { 'search.max_open_scroll_context': null } - } - }) - await client.close() - }) - - it('should get properly closed', async () => { - const ctx = new MeasureMetricsContext('test', {}) - for (let i = 0; i <= 3; i++) { - const cursor = adapter.find(ctx, domain) - await cursor.next(ctx) - await cursor.close(ctx) - } - }) - }) -}) diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index af7e64304a6..2ae65b372c5 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -18,7 +18,6 @@ import { Class, Doc, DocumentQuery, - FullTextData, IndexingConfiguration, MeasureContext, Ref, @@ -54,8 +53,8 @@ function getIndexVersion (): string { class ElasticAdapter implements FullTextAdapter { private readonly workspaceString: string - private readonly getFulltextDocId: (doc: Ref) => Ref - private readonly getDocId: (fulltext: Ref) => Ref + private readonly getFulltextDocId: (doc: Ref) => Ref + private readonly getDocId: (fulltext: Ref) => Ref private readonly indexName: string constructor ( @@ -67,7 +66,7 @@ class ElasticAdapter implements FullTextAdapter { ) { this.indexName = `${indexBaseName}_${indexVersion}` this.workspaceString = toWorkspaceString(workspaceId) - this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref + this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref } diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index 04643663c22..ccc80f8a811 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -13,9 +13,8 @@ // limitations under the License. // -import { ApiResponse, Client } from '@elastic/elasticsearch' -import { SearchResponse } from '@elastic/elasticsearch/api/types' -import core, { +import { Client } from '@elastic/elasticsearch' +import { Class, Doc, DocumentQuery, @@ -23,21 +22,18 @@ import core, { Domain, FindOptions, FindResult, - FullTextData, Hierarchy, IndexingConfiguration, MeasureContext, Ref, - Space, StorageIterator, + toWorkspaceString, Tx, TxResult, - WorkspaceId, - toWorkspaceString + WorkspaceId } from '@hcengineering/core' -import { getMetadata, PlatformError, unknownStatus } from '@hcengineering/platform' -import serverCore, { DbAdapter, IndexedDoc } from '@hcengineering/server-core' -import { createHash } from 'node:crypto' +import { getMetadata } from '@hcengineering/platform' +import serverCore, { DbAdapter } from '@hcengineering/server-core' function getIndexName (): string { return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' @@ -49,8 +45,8 @@ function getIndexVersion (): string { class ElasticDataAdapter implements DbAdapter { private readonly workspaceString: string - private readonly getFulltextDocId: (doc: Ref) => Ref - private readonly getDocId: (fulltext: Ref) => Ref + private readonly getFulltextDocId: (doc: Ref) => Ref + private readonly getDocId: (fulltext: Ref) => Ref private readonly indexName: string constructor ( @@ -61,7 +57,7 @@ class ElasticDataAdapter implements DbAdapter { ) { this.indexName = `${indexBaseName}_${indexVersion}` this.workspaceString = toWorkspaceString(workspaceId) - this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref + this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref } @@ -86,195 +82,15 @@ class ElasticDataAdapter implements DbAdapter { } find (ctx: MeasureContext, domain: Domain): StorageIterator { - let listRecieved = false - let pos = 0 - let buffer: { _id: string, data: IndexedDoc }[] = [] - let resp: ApiResponse | null = null - let finished = false - // eslint-disable-next-line @typescript-eslint/naming-convention - let scroll_id: string | undefined - - const stIterator = { - next: async () => { - try { - if (!listRecieved) { - const q = { - index: this.indexName, - type: '_doc', - scroll: '23h', - // search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result - size: 100, - body: { - query: { - bool: { - must: { - match: { - workspaceId: { query: this.workspaceString, operator: 'and' } - } - } - } - } - } - } - resp = await this.client.search(q) - if (resp.statusCode !== 200) { - if (resp.body?.error?.type === 'index_not_found_exception') { - return undefined - } - console.error('failed elastic query', q, resp) - throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`)) - } - buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source })) - if (buffer.length === 0) { - finished = true - await stIterator.close() - } - scroll_id = (resp.body as SearchResponse)._scroll_id - listRecieved = true - } - if (resp !== null && pos === buffer.length && !finished) { - const params = { - scroll_id, - scroll: '23h' - } - resp = await this.client.scroll(params, { maxRetries: 5 }) - - if (resp.statusCode !== 200) { - console.error('failed elastic query scroll', scroll_id, resp) - throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`)) - } - buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source })) - if (buffer.length === 0) { - finished = true - await stIterator.close() - } - pos = 0 - } - if (pos < buffer.length) { - const item = buffer[pos] - const hash = createHash('sha256') - const json = JSON.stringify(item.data) - hash.update(json) - const digest = hash.digest('base64') - const result = { - id: this.getDocId(item._id as Ref), - hash: digest, - size: json.length - } - pos++ - return result - } - } catch (e: any) { - if (e?.meta?.body?.error?.type === 'index_not_found_exception') { - return undefined - } - await stIterator.close() - console.error('elastic error:', e) - throw new PlatformError(e) - } - }, - close: async () => { - if (scroll_id != null) { - await this.client.clearScroll({ scroll_id }) - scroll_id = undefined - } - } - } - return stIterator + throw new Error('Method not implemented.') } async load (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { - const result: Doc[] = [] - const toLoad = [...docs] - - while (toLoad.length > 0) { - const part = toLoad.splice(0, 5000) - const resp = await this.client.search({ - index: this.indexName, - type: '_doc', - body: { - query: { - bool: { - must: [ - { - terms: { - _id: part.map(this.getFulltextDocId), - boost: 1.0 - } - }, - { - match: { - workspaceId: { query: this.workspaceString, operator: 'and' } - } - } - ] - } - }, - size: part.length - } - }) - const buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source })) - - for (const item of buffer) { - const dta: FullTextData = { - _id: this.getDocId(item._id) as Ref, // Export without workspace portion of ID - _class: core.class.FulltextData, - space: 'fulltext-blob' as Ref, - modifiedOn: item.data.modifiedOn, - modifiedBy: item.data.modifiedBy, - data: item.data - } - result.push(dta) - } - } - return result + throw new Error('Method not implemented.') } async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { - while (docs.length > 0) { - const part = docs.splice(0, 10000) - try { - await this.client.deleteByQuery( - { - type: '_doc', - index: this.indexName, - body: { - query: { - bool: { - must: [ - { - terms: { - _id: part.map((it) => this.getFulltextDocId(it._id)), - boost: 1.0 - } - }, - { - match: { - workspaceId: { query: this.workspaceString, operator: 'and' } - } - } - ] - } - }, - size: part.length - } - }, - undefined - ) - } catch (err: any) { - console.error(err) - } - - const operations = part.flatMap((doc) => [ - { index: { _index: this.indexName, _id: this.getFulltextDocId(doc._id) } }, - { - ...(doc as FullTextData).data, - workspaceId: this.workspaceString - } - ]) - - await this.client.bulk({ refresh: true, body: operations }) - } + throw new Error('Method not implemented.') } async update (ctx: MeasureContext, domain: Domain, operations: Map, DocumentUpdate>): Promise { @@ -282,6 +98,14 @@ class ElasticDataAdapter implements DbAdapter { } async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { + const indexExists = await this.client.indices.exists({ + index: this.indexName + }) + if (!indexExists.body) { + // No need to clean, no index exists. + return + } + while (docs.length > 0) { const part = docs.splice(0, 10000) await this.client.deleteByQuery( diff --git a/tests/sanity/tests/chat/chat.spec.ts b/tests/sanity/tests/chat/chat.spec.ts index 5b10abea87c..4e29c88d349 100644 --- a/tests/sanity/tests/chat/chat.spec.ts +++ b/tests/sanity/tests/chat/chat.spec.ts @@ -95,6 +95,7 @@ test.describe('channel tests', () => { await channelPageSecond.checkIfChannelDefaultExist(false, data.channelName) await channelPageSecond.clickChannelTab() await channelPageSecond.checkIfChannelTableExist(data.channelName, false) + await page2.close() }) test('create new public channel tests and check if the new user have access to it by default', async ({ @@ -124,6 +125,7 @@ test.describe('channel tests', () => { await channelPageSecond.checkIfChannelDefaultExist(false, data.channelName) await channelPageSecond.clickChannelTab() await channelPageSecond.checkIfChannelTableExist(data.channelName, true) + await page2.close() }) test('create new private channel and test if the user can exchange the messages', async ({ browser, page }) => { @@ -158,6 +160,7 @@ test.describe('channel tests', () => { await channelPageSecond.checkMessageExist('My dream is to fly', true, 'My dream is to fly') await channelPage.clickOnClosePopupButton() await channelPage.checkMessageExist('My dream is to fly', true, 'My dream is to fly') + await page2.close() }) test('create new private channel add user to it', async ({ browser, page }) => { @@ -195,6 +198,7 @@ test.describe('channel tests', () => { await channelPageSecond.checkMessageExist('One two', true, 'One two') await channelPage.clickChooseChannel(data.channelName) await channelPage.checkMessageExist('One two', true, 'One two') + await page2.close() }) test('go to general channel add user to it', async ({ browser, page }) => { @@ -225,6 +229,7 @@ test.describe('channel tests', () => { await channelPage.clickOnClosePopupButton() await channelPage.clickChannel('general') await channelPage.checkMessageExist('One two', true, 'One two') + await page2.close() }) test('go to random channel add user to it', async ({ browser, page }) => { @@ -255,6 +260,7 @@ test.describe('channel tests', () => { await channelPage.clickOnClosePopupButton() await channelPage.clickChannel('random') await channelPage.checkMessageExist('One two', true, 'One two') + await page2.close() }) test('check if user can add emoji', async () => { @@ -374,6 +380,7 @@ test.describe('channel tests', () => { await channelPageSecond.clickChannel('general') await channelPageSecond.clickOnOpenChannelDetails() await channelPageSecond.checkIfUserIsAdded(data.lastName + ' ' + data.firstName, false) + await page2.close() }) test('Check if we can create new public channel tests and check if the new user have can be added through preview', async ({ @@ -400,5 +407,6 @@ test.describe('channel tests', () => { await channelPage.clickChannel(data.channelName) await channelPage.clickOnOpenChannelDetails() await channelPage.addMemberToChannelPreview(newUser2.lastName + ' ' + newUser2.firstName) + await page2.close() }) }) diff --git a/tests/sanity/tests/inbox/inbox.spec.ts b/tests/sanity/tests/inbox/inbox.spec.ts index 2d826e3a181..8e7128a3964 100644 --- a/tests/sanity/tests/inbox/inbox.spec.ts +++ b/tests/sanity/tests/inbox/inbox.spec.ts @@ -125,6 +125,7 @@ test.describe('Inbox tests', () => { await leftSideMenuPageSecond.clickTracker() await leftSideMenuPageSecond.clickNotification() await inboxPageSecond.checkIfTaskIsPresentInInbox(newIssue.title) + await page2.close() }) test('User is able to assign someone else and he should be able to open the task', async ({ page, browser }) => { @@ -158,6 +159,7 @@ test.describe('Inbox tests', () => { milestone: 'Milestone', estimation: '2h' }) + await page2.close() }) test.skip('User is able to create a task, assign a other user and close it from inbox', async ({ page, browser }) => { await leftSideMenuPage.openProfileMenu() @@ -192,6 +194,7 @@ test.describe('Inbox tests', () => { }) await inboxPage.clickCloseLeftSidePanel() // ADD ASSERT ONCE THE ISSUE IS FIXED + await page2.close() }) test('User is able to send message to other user and he should see it in inbox', async ({ page, browser }) => { @@ -221,6 +224,7 @@ test.describe('Inbox tests', () => { await inboxPageSecond.checkIfInboxChatExists('Channel general', true) await inboxPageSecond.clickOnInboxChat('Channel general') await inboxPageSecond.checkIfTextInChatIsPresent('Test message') + await page2.close() }) test('User is able to turn off notification and he should not receive messages to inbox', async ({ @@ -254,6 +258,7 @@ test.describe('Inbox tests', () => { await channelPage.checkMessageExist('Test message', true, 'Test message') await leftSideMenuPageSecond.clickNotification() await inboxPageSecond.checkIfInboxChatExists('Channel general', false) + await page2.close() }) test('User is able to change filter in inbox', async ({ page, browser }) => { @@ -290,5 +295,6 @@ test.describe('Inbox tests', () => { await inboxPageSecond.clickOnInboxFilter('Issues') await inboxPageSecond.checkIfIssueIsPresentInInbox(newIssue.title) await inboxPageSecond.checkIfInboxChatExists('Channel general', false) + await page2.close() }) }) diff --git a/tests/sanity/tests/workspace/create.spec.ts b/tests/sanity/tests/workspace/create.spec.ts index 5034b864bb5..4e4d2a59360 100644 --- a/tests/sanity/tests/workspace/create.spec.ts +++ b/tests/sanity/tests/workspace/create.spec.ts @@ -154,6 +154,7 @@ test.describe('Workspace tests', () => { const leftSideMenuPage2 = new LeftSideMenuPage(page2) await leftSideMenuPage2.clickTracker() + await page2.close() }) test('Create a workspace with join link - existing account', async ({ page, browser }) => { @@ -198,6 +199,7 @@ test.describe('Workspace tests', () => { const leftSideMenuPage2 = new LeftSideMenuPage(page2) await leftSideMenuPage2.clickTracker() + await page2.close() }) test('Create workspace with LastToken in the localStorage', async ({ page, browser }) => { @@ -225,6 +227,7 @@ test.describe('Workspace tests', () => { // Use the tracker in the second context const leftSideMenuPageSecond = new LeftSideMenuPage(pageSecond) await leftSideMenuPageSecond.clickTracker() + await pageSecond.close() }) })