Skip to content

Commit

Permalink
UBERF-7510: add logging and catch errors on cleanup
Browse files Browse the repository at this point in the history
1. UBERF-7510: add logging and catch errors on cleanup
2. Remove Elastic backup and trigger re-push to elastic on changed documents on restore.

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo committed Jul 5, 2024
1 parent 6e3917d commit 60a54e2
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 321 deletions.
6 changes: 0 additions & 6 deletions models/core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
DOMAIN_BLOB,
DOMAIN_CONFIGURATION,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_FULLTEXT_BLOB,
DOMAIN_MIGRATION,
DOMAIN_MODEL,
IndexKind,
Expand All @@ -38,7 +37,6 @@ import {
type Enum,
type EnumOf,
type FieldIndex,
type FullTextData,
type FullTextSearchContext,
type IndexStageState,
type IndexingConfiguration,
Expand Down Expand Up @@ -309,10 +307,6 @@ export class TPluginConfiguration extends TDoc implements PluginConfiguration {
enabled!: boolean
beta!: boolean
}
@Model(core.class.FulltextData, core.class.Doc, DOMAIN_FULLTEXT_BLOB)
export class TFulltextData extends TDoc implements FullTextData {
data!: any
}

@Model(core.class.DocIndexState, core.class.Doc, DOMAIN_DOC_INDEX_STATE)
export class TDocIndexState extends TDoc implements DocIndexState {
Expand Down
2 changes: 0 additions & 2 deletions models/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import {
TEnum,
TEnumOf,
TFullTextSearchContext,
TFulltextData,
TIndexConfiguration,
TIndexStageState,
TInterface,
Expand Down Expand Up @@ -164,7 +163,6 @@ export function createModel (builder: Builder): void {
TUserStatus,
TEnum,
TTypeAny,
TFulltextData,
TTypeRelatedDocument,
TDocIndexState,
TIndexStageState,
Expand Down
8 changes: 0 additions & 8 deletions packages/core/src/classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,14 +494,6 @@ export function versionToString (version: Version | Data<Version>): string {
return `${version?.major}.${version?.minor}.${version?.patch}`
}

/**
* Blob data from s3 storage
* @public
*/
export interface FullTextData extends Doc {
data: any
}

/**
* @public
*
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import type {
DomainIndexConfiguration,
Enum,
EnumOf,
FullTextData,
FullTextSearchContext,
Hyperlink,
IndexStageState,
Expand Down Expand Up @@ -134,7 +133,6 @@ export default plugin(coreId, {
Version: '' as Ref<Class<Version>>,
PluginConfiguration: '' as Ref<Class<PluginConfiguration>>,
UserStatus: '' as Ref<Class<UserStatus>>,
FulltextData: '' as Ref<Class<FullTextData>>,
TypeRelatedDocument: '' as Ref<Class<Type<RelatedDocument>>>,
DocIndexState: '' as Ref<Class<DocIndexState>>,
IndexStageState: '' as Ref<Class<IndexStageState>>,
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
DocIndexState,
DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_FULLTEXT_BLOB,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
FullTextSearchContext,
Expand Down Expand Up @@ -713,7 +712,6 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref<Class<Doc>>): boo
domain === DOMAIN_TX ||
domain === DOMAIN_MODEL ||
domain === DOMAIN_BLOB ||
domain === DOMAIN_FULLTEXT_BLOB ||
domain === DOMAIN_TRANSIENT
) {
hierarchy.setClassifierProp(c, 'class_indexed', false)
Expand Down
5 changes: 2 additions & 3 deletions packages/query/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,8 @@ export class LiveQuery implements WithTx, Client {
for (const tx of txes) {
if (tx._class === core.class.TxWorkspaceEvent) {
const evt = tx as TxWorkspaceEvent
console.info('checking workspace event', evt._id, evt.params)
await this.checkUpdateEvents(tx as TxWorkspaceEvent)
await this.changePrivateHandler(tx as TxWorkspaceEvent)
await this.checkUpdateEvents(evt)
await this.changePrivateHandler(evt)
}
result.push(await this._tx(tx, docCache))
}
Expand Down
1 change: 0 additions & 1 deletion server/account/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@ export async function createWorkspace (
async (value) => {
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) })
},
true,
getStorageAdapter()
)
const modelVersion = getModelVersion()
Expand Down
93 changes: 62 additions & 31 deletions server/backup/src/backup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
// limitations under the License.
//

import { Analytics } from '@hcengineering/analytics'
import core, {
AttachedDoc,
BackupClient,
Client as CoreClient,
Doc,
Domain,
DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_FULLTEXT_BLOB,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
Expand All @@ -33,7 +36,7 @@ import core, {
type Blob,
type DocIndexState
} from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
import { BlobClient, connect } from '@hcengineering/server-tool'
import { mkdtemp, writeFile } from 'node:fs/promises'
import { PassThrough } from 'node:stream'
Expand All @@ -43,7 +46,6 @@ import { Writable } from 'stream'
import { extract, Pack, pack } from 'tar-stream'
import { createGunzip, gunzipSync, gzipSync } from 'zlib'
import { BackupStorage } from './storage'
import { Analytics } from '@hcengineering/analytics'
export * from './storage'

const dataBlobSize = 50 * 1024 * 1024
Expand Down Expand Up @@ -231,7 +233,6 @@ export async function cloneWorkspace (
targetWorkspaceId: WorkspaceId,
clearTime: boolean = true,
progress: (value: number) => Promise<void>,
skipFullText: boolean,
storageAdapter: StorageAdapter
): Promise<void> {
await ctx.with(
Expand Down Expand Up @@ -264,10 +265,6 @@ export async function cloneWorkspace (

let i = 0
for (const c of domains) {
if (skipFullText && c === DOMAIN_FULLTEXT_BLOB) {
ctx.info('clone skip domain...', { domain: c, workspace: targetWorkspaceId.name })
continue
}
ctx.info('clone domain...', { domain: c, workspace: targetWorkspaceId.name })

// We need to clean target connection before copying something.
Expand Down Expand Up @@ -347,7 +344,7 @@ export async function cloneWorkspace (
try {
docs = await ctx.with('load-docs', {}, async (ctx) => await sourceConnection.loadDocs(c, needRetrieve))
if (clearTime) {
docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
docs = prepareClonedDocuments(docs, sourceConnection)
}
const executor = new RateLimiter(10)
for (const d of docs) {
Expand Down Expand Up @@ -422,11 +419,7 @@ export async function cloneWorkspace (
)
}

function prepareClonedDocuments (
docs: Doc[],
sourceConnection: CoreClient & BackupClient,
skipFullText: boolean
): Doc[] {
function prepareClonedDocuments (docs: Doc[], sourceConnection: CoreClient & BackupClient): Doc[] {
docs = docs.map((p) => {
let collectionCud = false
try {
Expand All @@ -436,8 +429,13 @@ function prepareClonedDocuments (
}

// if full text is skipped, we need to clean stages for indexes.
if (p._class === core.class.DocIndexState && skipFullText) {
;(p as DocIndexState).stages = {}
if (p._class === core.class.DocIndexState) {
for (const k of Object.keys((p as DocIndexState).stages)) {
if (k.startsWith(fullTextPushStagePrefix)) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete (p as DocIndexState).stages[k]
}
}
}

if (collectionCud) {
Expand Down Expand Up @@ -556,6 +554,7 @@ export async function backup (
(it) =>
it !== DOMAIN_TRANSIENT &&
it !== DOMAIN_MODEL &&
it !== ('fulltext-blob' as Domain) &&
!options.skipDomains.includes(it) &&
(options.include === undefined || options.include.has(it))
)
Expand Down Expand Up @@ -1090,20 +1089,22 @@ export async function restore (
const infoFile = 'backup.json.gz'

if (!(await storage.exists(infoFile))) {
ctx.error('file not pressent', { file: infoFile })
throw new Error(`${infoFile} should present to restore`)
}
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
let snapshots = backupInfo.snapshots
if (opt.date !== -1) {
const bk = backupInfo.snapshots.findIndex((it) => it.date === opt.date)
if (bk === -1) {
ctx.error('could not restore to', { date: opt.date, file: infoFile, workspaceId: workspaceId.name })
throw new Error(`${infoFile} could not restore to ${opt.date}. Snapshot is missing.`)
}
snapshots = backupInfo.snapshots.slice(0, bk + 1)
} else {
opt.date = snapshots[snapshots.length - 1].date
}
console.log('restore to ', opt.date, new Date(opt.date))
ctx.info('restore to ', { id: opt.date, date: new Date(opt.date).toDateString() })
const rsnapshots = Array.from(snapshots).reverse()

// Collect all possible domains
Expand All @@ -1112,7 +1113,7 @@ export async function restore (
Object.keys(s.domains).forEach((it) => domains.add(it as Domain))
}

console.log('connecting:', transactorUrl, workspaceId.name)
ctx.info('connecting:', { transactorUrl, workspace: workspaceId.name })
const connection = (await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup',
model: 'upgrade'
Expand All @@ -1127,6 +1128,9 @@ export async function restore (
domains.add(d)
}

// We do not backup elastic anymore
domains.delete('fulltext-blob' as Domain)

let uploadedMb = 0
let uploaded = 0

Expand All @@ -1138,7 +1142,8 @@ export async function restore (
uploadedMb = newId
ctx.info('Uploaded', {
msg,
written: newDownloadedMb
written: newDownloadedMb,
workspace: workspaceId.name
})
}
}
Expand Down Expand Up @@ -1167,7 +1172,7 @@ export async function restore (
}

if (el > 2500) {
console.log(c, ' loaded from server', loaded, el, chunks)
ctx.info('loaded from server', { domain: c, loaded, el, chunks, workspace: workspaceId.name })
el = 0
chunks = 0
}
Expand All @@ -1180,8 +1185,12 @@ export async function restore (
await connection.closeChunk(idx)
}
}
console.log(' loaded', loaded)
console.log('\tcompare documents', changeset.size, serverChangeset.size)
ctx.info('loaded', { loaded, workspace: workspaceId.name })
ctx.info('\tcompare documents', {
size: changeset.size,
serverSize: serverChangeset.size,
workspace: workspaceId.name
})

// Let's find difference
const docsToAdd = new Map(
Expand All @@ -1208,7 +1217,13 @@ export async function restore (

if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
totalSend += docs.length
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
ctx.info('upload', {
docs: docs.length,
totalSend,
from: docsToAdd.size + totalSend,
sendSize,
workspace: workspaceId.name
})
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
Expand All @@ -1224,13 +1239,13 @@ export async function restore (
const sDigest = await loadDigest(ctx, storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
if (requiredDocs.size > 0) {
console.log('updating', c, requiredDocs.size)
ctx.info('updating', { domain: c, requiredDocs: requiredDocs.size, workspace: workspaceId.name })
// We have required documents here.
for (const sf of d.storage ?? []) {
if (docsToAdd.size === 0) {
break
}
console.log('processing', sf, processed)
ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId.name })

const readStream = await storage.load(sf)
const ex = extract()
Expand Down Expand Up @@ -1332,11 +1347,27 @@ export async function restore (
}

await sendChunk(undefined, 0)
if (docsToRemove.length > 0 && opt.merge !== true) {
console.log('cleanup', docsToRemove.length)
async function performCleanOfDomain (docsToRemove: Ref<Doc>[], c: Domain): Promise<void> {
ctx.info('cleanup', { toRemove: docsToRemove.length, workspace: workspaceId.name, domain: c })
while (docsToRemove.length > 0) {
const part = docsToRemove.splice(0, 10000)
await connection.clean(c, part)
try {
await connection.clean(c, part)
} catch (err: any) {
ctx.error('failed to clean, will retry', { error: err, workspaceId: workspaceId.name })
docsToRemove.push(...part)
}
}
}
if (c !== DOMAIN_BLOB) {
// Clean domain documents if not blob
if (docsToRemove.length > 0 && opt.merge !== true) {
if (c === DOMAIN_DOC_INDEX_STATE) {
// We need o clean a FULLTEXT domain as well
await performCleanOfDomain([...docsToRemove], DOMAIN_FULLTEXT_BLOB)
}

await performCleanOfDomain(docsToRemove, c)
}
}
}
Expand All @@ -1352,21 +1383,21 @@ export async function restore (
continue
}
await limiter.exec(async () => {
console.log('processing domain', c)
ctx.info('processing domain', { domain: c, workspaceId: workspaceId.name })
let retry = 5
let delay = 1
while (retry > 0) {
retry--
try {
await processDomain(c)
if (delay > 1) {
console.log('retry-success')
ctx.warn('retry-success', { retry, delay, workspaceId: workspaceId.name })
}
break
} catch (err: any) {
console.error('error', err)
ctx.error('failed to process domain', { err, domain: c, workspaceId: workspaceId.name })
if (retry !== 0) {
console.log('cool-down to retry', delay)
ctx.warn('cool-down to retry', { delay, domain: c, workspaceId: workspaceId.name })
await new Promise((resolve) => setTimeout(resolve, delay * 1000))
delay++
}
Expand Down
Loading

0 comments on commit 60a54e2

Please sign in to comment.