Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UBERF-7286: Backup upload retry #5830

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ export function devTool (
productId
})
const buffer = readFileSync(local)
await blobClient.upload(remote, buffer.length, contentType, buffer)
await blobClient.upload(toolCtx, remote, buffer.length, contentType, buffer)
})
})

Expand Down
41 changes: 32 additions & 9 deletions server/backup/src/backup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ export async function cloneWorkspace (
const blob = d as Blob
const blobs: Buffer[] = []
try {
await blobClientSource.writeTo(new MeasureMetricsContext('upload', {}), blob._id, blob.size, {
const ctx = new MeasureMetricsContext('upload', {})
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
Expand All @@ -369,7 +370,7 @@ export async function cloneWorkspace (
}
})

await blobClientTarget.upload(blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
await blobClientTarget.upload(ctx, blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
} catch (err: any) {
console.error(err)
}
Expand Down Expand Up @@ -928,6 +929,22 @@ export async function restore (
domains.add(d)
}

let uploadedMb = 0
let uploaded = 0

const printUploaded = (msg: string, size: number): void => {
uploaded += size
const newDownloadedMb = Math.round(uploaded / (1024 * 1024))
const newId = Math.round(newDownloadedMb / 10)
if (uploadedMb !== newId) {
uploadedMb = newId
ctx.info('Uploaded', {
msg,
written: newDownloadedMb
})
}
}

async function processDomain (c: Domain): Promise<void> {
const changeset = await loadDigest(ctx, storage, snapshots, c, opt.date)
// We need to load full changeset from server
Expand Down Expand Up @@ -986,13 +1003,15 @@ export async function restore (
docs.push(doc)
}
sendSize = sendSize + len

if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}
printUploaded('upload', len)
}
let processed = 0

Expand Down Expand Up @@ -1033,9 +1052,10 @@ export async function restore (
blobs.delete(name)
const doc = d?.doc as Blob
;(doc as any)['%hash%'] = changeset.get(doc._id)
void blobClient.upload(doc._id, doc.size, doc.contentType, bf).then(() => {
void blobClient.upload(ctx, doc._id, doc.size, doc.contentType, bf).then(() => {
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
printUploaded('upload', bf.length)
next()
})
})
Expand All @@ -1059,13 +1079,16 @@ export async function restore (
} else {
blobs.delete(bname)
const blob = doc as Blob
void blobClient.upload(blob._id, blob.size, blob.contentType, d.buffer as Buffer).then(() => {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
void blobClient
.upload(ctx, blob._id, blob.size, blob.contentType, d.buffer as Buffer)
.then(() => {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
printUploaded('upload', bf.length)
})
})
})
}
} else {
;(doc as any)['%hash%'] = changeset.get(doc._id)
Expand Down
36 changes: 25 additions & 11 deletions server/tool/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,31 @@ export class BlobClient {
})
}

async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
await fetch(
this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': 'application/octet-stream'
},
body: buffer
async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': 'application/octet-stream'
},
body: buffer
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
await new Promise<void>((resolve) => {
setTimeout(resolve, 500)
})
}
)
}
}
}