Skip to content

Commit

Permalink
UBERF-7062: Fix backup memory usage and support missing blobs (#5665)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo authored May 27, 2024
1 parent ee04637 commit 2e6b0f3
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 68 deletions.
1 change: 1 addition & 0 deletions server/backup-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"tar-stream": "^2.2.0",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-backup": "^0.6.0",
"@hcengineering/minio": "^0.6.0",
"@hcengineering/server-token": "^0.6.7"
Expand Down
27 changes: 6 additions & 21 deletions server/backup-service/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ interface Config extends Omit<BackupConfig, 'Token'> {
Timeout: number // Timeout in seconds
BucketName: string

MinioEndpoint: string
MinioAccessKey: string
MinioSecretKey: string
MongoURL: string
}

const envMap: { [key in keyof Config]: string } = {
Expand All @@ -37,22 +35,11 @@ const envMap: { [key in keyof Config]: string } = {
Secret: 'SECRET',
BucketName: 'BUCKET_NAME',
Interval: 'INTERVAL',
MinioEndpoint: 'MINIO_ENDPOINT',
MinioAccessKey: 'MINIO_ACCESS_KEY',
MinioSecretKey: 'MINIO_SECRET_KEY',
Timeout: 'TIMEOUT'
Timeout: 'TIMEOUT',
MongoURL: 'MONGO_URL'
}

const required: Array<keyof Config> = [
'TransactorURL',
'AccountsURL',
'Secret',
'ServiceID',
'BucketName',
'MinioEndpoint',
'MinioAccessKey',
'MinioSecretKey'
]
const required: Array<keyof Config> = ['TransactorURL', 'AccountsURL', 'Secret', 'ServiceID', 'BucketName', 'MongoURL']

const config: Config = (() => {
const params: Partial<Config> = {
Expand All @@ -62,10 +49,8 @@ const config: Config = (() => {
BucketName: process.env[envMap.BucketName] ?? 'backups',
ServiceID: process.env[envMap.ServiceID] ?? 'backup-service',
Interval: parseInt(process.env[envMap.Interval] ?? '3600'),
MinioEndpoint: process.env[envMap.MinioEndpoint],
MinioAccessKey: process.env[envMap.MinioAccessKey],
MinioSecretKey: process.env[envMap.MinioSecretKey],
Timeout: parseInt(process.env[envMap.Timeout] ?? '3600')
Timeout: parseInt(process.env[envMap.Timeout] ?? '3600'),
MongoURL: process.env[envMap.MongoURL]
}

const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])
Expand Down
20 changes: 3 additions & 17 deletions server/backup-service/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,18 @@
//

import { MeasureContext, systemAccountEmail } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { setMetadata } from '@hcengineering/platform'
import { backupService } from '@hcengineering/server-backup'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import config from './config'
import { StorageAdapter } from '@hcengineering/server-core'

export function startBackup (ctx: MeasureContext): void {
setMetadata(serverToken.metadata.Secret, config.Secret)

let minioPort = 9000
let minioEndpoint = config.MinioEndpoint
const sp = minioEndpoint.split(':')
if (sp.length > 1) {
minioEndpoint = sp[0]
minioPort = parseInt(sp[1])
}

const storageAdapter: StorageAdapter = new MinioService({
endpoint: minioEndpoint,
port: minioPort,
useSSL: 'false',
accessKey: config.MinioAccessKey,
secretKey: config.MinioSecretKey
})
const storageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfiguration, config.MongoURL)

setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID)

Expand Down
49 changes: 38 additions & 11 deletions server/backup/src/backup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,32 +664,59 @@ export async function backup (
break
}

// Move processed document to processedChanges
if (changes.added.has(d._id)) {
processedChanges.added.set(d._id, changes.added.get(d._id) ?? '')
changes.added.delete(d._id)
} else {
processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '')
changes.updated.delete(d._id)
function processChanges (d: Doc, error: boolean = false): void {
// Move processed document to processedChanges
if (changes.added.has(d._id)) {
if (!error) {
processedChanges.added.set(d._id, changes.added.get(d._id) ?? '')
}
changes.added.delete(d._id)
} else {
if (!error) {
processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '')
}
changes.updated.delete(d._id)
}
}
if (d._class === core.class.Blob) {
const blob = d as Blob
const descrJson = JSON.stringify(d)
addedDocuments += descrJson.length
addedDocuments += blob.size
_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
if (err != null) throw err
})

_pack.entry({ name: d._id }, await blobClient.pipeFromStorage(blob._id, blob.size), function (err) {
let blobFiled = false
if (!(await blobClient.checkFile(ctx, blob._id))) {
ctx.error('failed to download blob', { blob: blob._id, provider: blob.provider })
processChanges(d, true)
continue
}

_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
if (err != null) throw err
})
try {
const entry = _pack?.entry({ name: d._id, size: blob.size }, (err) => {
if (err != null) {
ctx.error('error packing file', err)
}
})
await blobClient.writeTo(ctx, blob._id, blob.size, entry)
} catch (err: any) {
if (err.message?.startsWith('No file for') === true) {
ctx.error('failed to download blob', { message: err.message })
} else {
ctx.error('failed to download blob', { err })
}
blobFiled = true
}
processChanges(d, blobFiled)
} else {
const data = JSON.stringify(d)
addedDocuments += data.length
_pack.entry({ name: d._id + '.json' }, data, function (err) {
if (err != null) throw err
})
processChanges(d)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,5 @@ export type DbAdapterFactory = (
url: string,
workspaceId: WorkspaceId,
modelDb: ModelDb,
storage?: StorageAdapter
storage: StorageAdapter
) => Promise<DbAdapter>
6 changes: 2 additions & 4 deletions server/server-storage/src/blobStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ export async function createStorageDataAdapter (
url: string,
workspaceId: WorkspaceId,
modelDb: ModelDb,
storage?: StorageAdapter
storage: StorageAdapter
): Promise<DbAdapter> {
if (storage === undefined) {
throw new Error('minio storage adapter require minio')
}
// We need to create bucket if it doesn't exist
if (storage !== undefined) {
await storage.make(ctx, workspaceId)
}
await storage.make(ctx, workspaceId)
const blobAdapter = await createMongoAdapter(ctx, hierarchy, url, workspaceId, modelDb, undefined, {
calculateHash: (d) => {
return (d as Blob).etag
Expand Down
62 changes: 53 additions & 9 deletions server/tool/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
//

import client, { clientId } from '@hcengineering/client'
import { Client, LoadModelResponse, systemAccountEmail, Tx, WorkspaceId } from '@hcengineering/core'
import {
Client,
LoadModelResponse,
systemAccountEmail,
Tx,
WorkspaceId,
type MeasureContext
} from '@hcengineering/core'
import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform'
import { generateToken } from '@hcengineering/server-token'
import { mkdtempSync } from 'fs'
import crypto from 'node:crypto'
import { type Writable } from 'stream'
import plugin from './plugin'

/**
Expand Down Expand Up @@ -89,38 +97,74 @@ export class BlobClient {
this.tmpDir = mkdtempSync('blobs')
}

async pipeFromStorage (name: string, size: number): Promise<Buffer> {
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: 'bytes=0-1'
}
})
if (response.status === 404) {
return false
}
const buff = await response.arrayBuffer()
return buff.byteLength > 0
} catch (err: any) {
ctx.error('Failed to check file', { name, error: err })
return false
}
}

async writeTo (ctx: MeasureContext, name: string, size: number, writable: Writable): Promise<void> {
let written = 0
const chunkSize = 1024 * 1024
const chunks: Buffer[] = []

// Use ranges to iterave through file with retry if required.
while (written < size) {
for (let i = 0; i < 5; i++) {
let i = 0
for (; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: `bytes=${written}-${Math.min(size - 1, written + chunkSize)}`
}
})
if (response.status === 404) {
i = 5
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${name}`)
}
const chunk = Buffer.from(await response.arrayBuffer())
chunks.push(chunk)
await new Promise<void>((resolve, reject) => {
writable.write(chunk, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
})

written += chunk.length
if (size > 1024 * 1024) {
console.log('Downloaded', Math.round(written / (1024 * 1024)), 'Mb of', Math.round(size / (1024 * 1024)))
ctx.info('Downloaded', {
name,
written: Math.round(written / (1024 * 1024)),
of: Math.round(size / (1024 * 1024))
})
}
break
} catch (err: any) {
if (i === 4) {
console.error(err)
if (i > 4) {
writable.end()
throw err
}
// retry
}
}
}
return Buffer.concat(chunks)
writable.end()
}

async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
Expand Down
16 changes: 11 additions & 5 deletions server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,18 @@ function createWebsocketClientSocket (
setImmediate(doSend)
return
}
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
try {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
} catch (err: any) {
if (err.message !== 'WebSocket is not open') {
ctx.error('send error', { err })
}
resolve()
})
}
}
doSend()
})
Expand Down

0 comments on commit 2e6b0f3

Please sign in to comment.