Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UBERF-7501: Copy few blobs in parallel #5995

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/model/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/storage": "^0.6.0",
"@hcengineering/analytics": "^0.6.0",
"toposort": "^2.0.2",
"fast-equals": "^5.0.1"
},
Expand Down
17 changes: 15 additions & 2 deletions packages/model/src/migration.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Analytics } from '@hcengineering/analytics'
import core, {
Class,
Client,
Expand Down Expand Up @@ -154,7 +155,13 @@ export async function tryMigrate (client: MigrationClient, plugin: string, migra
const states = client.migrateState.get(plugin) ?? new Set()
for (const migration of migrations) {
if (states.has(migration.state)) continue
await migration.func(client)
try {
await migration.func(client)
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
continue
}
const st: MigrationState = {
plugin,
state: migration.state,
Expand All @@ -181,7 +188,13 @@ export async function tryUpgrade (
for (const migration of migrations) {
if (states.has(migration.state)) continue
const _client = await client()
await migration.func(_client)
try {
await migration.func(_client)
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
continue
}
const st: Data<MigrationState> = {
plugin,
state: migration.state
Expand Down
8 changes: 4 additions & 4 deletions server/account/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -968,13 +968,13 @@ export async function createWorkspace (
getWorkspaceId(workspaceInfo.workspace, productId),
true,
async (value) => {
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 30) })
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) })
},
true,
getStorageAdapter()
)
const modelVersion = getModelVersion()
await updateInfo({ createProgress: 50 })
await updateInfo({ createProgress: 90 })

// Skip tx update if version of init workspace are proper one.
const skipTxUpdate =
Expand All @@ -992,11 +992,11 @@ export async function createWorkspace (
ctxModellogger,
skipTxUpdate,
async (value) => {
await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) })
await updateInfo({ createProgress: Math.round(90 + (Math.min(value, 100) / 100) * 10) })
}
)
)
await updateInfo({ createProgress: 90 })
await updateInfo({ createProgress: 99 })
} else {
await childLogger.withLog('init-workspace', {}, async (ctx) => {
await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => {
Expand Down
1 change: 1 addition & 0 deletions server/backup/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@hcengineering/client-resources": "^0.6.27",
"@hcengineering/client": "^0.6.18",
"@hcengineering/model": "^0.6.11",
"@hcengineering/analytics": "^0.6.0",
"tar-stream": "^2.2.0",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-core": "^0.6.1"
Expand Down
46 changes: 18 additions & 28 deletions server/backup/src/backup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { Writable } from 'stream'
import { extract, Pack, pack } from 'tar-stream'
import { createGunzip, gunzipSync, gzipSync } from 'zlib'
import { BackupStorage } from './storage'
import { Analytics } from '@hcengineering/analytics'
export * from './storage'

const dataBlobSize = 50 * 1024 * 1024
Expand Down Expand Up @@ -231,7 +232,7 @@ export async function cloneWorkspace (
clearTime: boolean = true,
progress: (value: number) => Promise<void>,
skipFullText: boolean,
storageAdapter?: StorageAdapter
storageAdapter: StorageAdapter
): Promise<void> {
await ctx.with(
'clone-workspace',
Expand All @@ -255,10 +256,6 @@ export async function cloneWorkspace (
admin: 'true'
})) as unknown as CoreClient & BackupClient
)

const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId)
const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId)

try {
const domains = sourceConnection
.getHierarchy()
Expand Down Expand Up @@ -290,6 +287,7 @@ export async function cloneWorkspace (
const needRetrieveChunks: Ref<Doc>[][] = []

let processed = 0
let domainProgress = 0
let st = Date.now()
// Load all digest from collection.
await ctx.with('retrieve-domain-info', { domain: c }, async (ctx) => {
Expand Down Expand Up @@ -351,12 +349,12 @@ export async function cloneWorkspace (
if (clearTime) {
docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
}
const executor = new RateLimiter(10)
for (const d of docs) {
if (d._class === core.class.Blob) {
const blob = d as Blob
const blobs: Buffer[] = []
try {
if (storageAdapter !== undefined) {
await executor.exec(async () => {
try {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id)
const passThrue = new PassThrough()
Expand All @@ -369,29 +367,18 @@ export async function cloneWorkspace (
blob.contentType,
blob.size
)
} else {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
await ctx.with('download-blob', { contentType: blob.contentType }, async (ctx) => {
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
},
end: (cb) => {
cb()
}
})
})
await ctx.with('upload-blob', { contentType: blob.contentType }, async (ctx) => {
const buffer = Buffer.concat(blobs)
await blobClientTarget.upload(ctx, blob._id, buffer.length, blob.contentType, buffer)
})
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
}
} catch (err: any) {
console.error(err)
}
domainProgress++
await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
})
} else {
domainProgress++
}
}
await executor.waitProcessing()
await ctx.with(
'upload-docs',
{},
Expand All @@ -400,8 +387,10 @@ export async function cloneWorkspace (
},
{ length: docs.length }
)
await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
} catch (err: any) {
console.log(err)
Analytics.handleError(err)
// Put back.
needRetrieveChunks.push(needRetrieve)
continue
Expand All @@ -414,6 +403,7 @@ export async function cloneWorkspace (
}
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
} finally {
ctx.info('end clone')
await ctx.with('close-source', {}, async (ctx) => {
Expand Down
21 changes: 15 additions & 6 deletions server/core/src/server/domainHelper.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
import { Analytics } from '@hcengineering/analytics'
import type {
Doc,
Domain,
DomainIndexConfiguration,
FieldIndex,
Hierarchy,
MeasureContext,
ModelDb
ModelDb,
WorkspaceId
} from '@hcengineering/core'
import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core'
import { deepEqual } from 'fast-equals'
import type { DomainHelper, DomainHelperOperations } from '../adapter'
import { Analytics } from '@hcengineering/analytics'

export class DomainIndexHelperImpl implements DomainHelper {
domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
domainConfigurations: DomainIndexConfiguration[] = []
constructor (
readonly ctx: MeasureContext,
readonly hierarchy: Hierarchy,
readonly model: ModelDb
readonly model: ModelDb,
readonly workspaceId: WorkspaceId
) {
const classes = model.findAllSync(core.class.Class, {})

this.domainConfigurations =
model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
try {
this.domainConfigurations =
model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
} catch (err: any) {
this.domainConfigurations = []
Analytics.handleError(err)
ctx.error('failed to find domain index configuration', { err })
}

this.domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
// Find all domains and indexed fields inside
Expand Down Expand Up @@ -81,7 +90,7 @@ export class DomainIndexHelperImpl implements DomainHelper {

if (forceCreate && !exists) {
await operations.create(domain)
console.log('collection will be created', domain)
ctx.info('collection will be created', domain)
exists = true
}
if (!exists) {
Expand Down
2 changes: 1 addition & 1 deletion server/core/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export async function createServerStorage (
)
}

const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb)
const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace)

return new TServerStorage(
conf.domains,
Expand Down
18 changes: 13 additions & 5 deletions server/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,16 @@ export async function initModel (
await progress(30)

// Create update indexes
await createUpdateIndexes(ctx, connection, db, logger, async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
})
await createUpdateIndexes(
ctx,
connection,
db,
logger,
async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
},
workspaceId
)
await progress(100)
} catch (e: any) {
logger.error('error', { error: e })
Expand Down Expand Up @@ -403,9 +410,10 @@ async function createUpdateIndexes (
connection: CoreClient,
db: Db,
logger: ModelLogger,
progress: (value: number) => Promise<void>
progress: (value: number) => Promise<void>,
workspaceId: WorkspaceId
): Promise<void> {
const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel())
const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId)
const dbHelper = new DBCollectionHelper(db)
await dbHelper.init()
let completed = 0
Expand Down