Skip to content

Commit

Permalink
Add batch uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
whalemare committed May 2, 2021
1 parent 622bd84 commit 8b8aeba
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 11 deletions.
24 changes: 20 additions & 4 deletions __tests__/DropboxUploader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import * as fsRaw from 'fs'

const fs = fsRaw.promises

import globby from 'globby'

import { DropboxUploader } from '../src/upload/dropbox/DropboxUploader'
import { uploadBatch } from '../src/upload/uploadBatch'

Expand All @@ -13,7 +15,7 @@ import { uploadBatch } from '../src/upload/uploadBatch'
*/
const ACCESS_TOKEN = 'Cms2dEbdMIsAAAAAAAAAAavdNFDJ0yalT_GQcbY5GWcXghNm-4rikfZmfycs8lL7'

describe.skip('upload', () => {
describe('upload', () => {
const file = '__tests__/shouldRetry.test.ts'

test(
Expand Down Expand Up @@ -108,7 +110,21 @@ describe.skip('upload', () => {
60 * 1000,
)

// test('download', () => {
// new Dropbox({accessToken}).
// })
test(
'upload files',
async () => {
const uploader = DropboxUploader.create({
accessToken: ACCESS_TOKEN,
logger: console,
})

const files = await globby('src/**/*')
try {
await uploader.uploadFiles(files, '/test')
} catch (e) {
console.error(e.error)
}
},
60 * 1000,
)
})
99 changes: 96 additions & 3 deletions src/upload/dropbox/DropboxUploader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as fsRaw from 'fs'
const fs = fsRaw.promises
import { join } from 'path'

import { Dropbox } from 'dropbox'

Expand All @@ -9,6 +9,8 @@ import { UploadArgs, Uploader } from '../Uploader'
import { DropboxUploaderArgs } from './types/DropboxUploaderArgs'
import { StreamUploader } from './types/StreamUploader'

const fs = fsRaw.promises

/**
* 8Mb - Dropbox JavaScript API suggested max file / chunk size
*/
Expand Down Expand Up @@ -78,14 +80,19 @@ export class DropboxUploader implements Uploader {
}

/**
* Upload file larger than 150Mb or for batch uploading
* Upload file larger than 150Mb
*
* @param buffer file content
* @param destination file name on dropbox. Should be started from /
* @param onProgress optional function that indicate progress
* @returns
*/
uploadStream = async ({ file, destination, partSizeBytes = DROPBOX_MAX_BLOB_SIZE, onProgress }: StreamUploader) => {
uploadStream = async ({
file,
destination,
partSizeBytes = DROPBOX_MAX_BLOB_SIZE,
onProgress,
}: StreamUploader & UploadArgs) => {
const fileStream = fsRaw.createReadStream(file, { highWaterMark: partSizeBytes })

let sessionId: string | undefined = undefined
Expand All @@ -104,6 +111,7 @@ export class DropboxUploader implements Uploader {
// @ts-ignore incorrect cursor typings here, that required `contents`, but crashed in runtime
cursor: { session_id: sessionId, offset: uploaded },
contents: chunk,
close: false,
})
}
onProgress?.(uploaded, size)
Expand All @@ -126,5 +134,90 @@ export class DropboxUploader implements Uploader {
}
}

/**
* Batch file uploading
*
* @param files
* @param destination
*/
uploadFiles = async (files: string[], destination: string) => {
const sessions: {
[k in string]: {
sessionId: string
fileSize: number
}
} = {}
const promises = files.map(async (file) => {
const fileStat = fsRaw.statSync(file)
const fileSize = fileStat.size
if (fileSize > 0) {
const fileStream = fsRaw.createReadStream(file, { highWaterMark: 1024 })

let sessionId: string | undefined = undefined
let uploaded = 0

for await (const chunk of fileStream) {
const isLastChunk = uploaded + chunk.length === fileSize

this.logger?.debug(`
File ${file}
filesize: ${fileSize}
sessionId: ${sessionId}
uploaded: ${uploaded}
chunk: ${chunk.length}
isLastChunk: ${isLastChunk}
`)

if (sessionId === undefined) {
sessionId = (await this.dropbox.filesUploadSessionStart({ contents: chunk, close: isLastChunk })).result
.session_id
sessions[file] = {
sessionId: sessionId,
fileSize: fileSize,
}
} else {
await this.dropbox.filesUploadSessionAppendV2({
// @ts-ignore incorrect cursor typings here, that required `contents`, but crashed in runtime
cursor: { session_id: sessionId, offset: uploaded },
contents: chunk,
close: isLastChunk,
})
}
uploaded += chunk.length
}
}
})
await Promise.all(promises)

await this.dropbox.filesUploadSessionFinishBatch({
// @ts-ignore incorrect typing with contents
entries: files.map((file) => {
return {
commit: {
path: join(destination, file),
mode: { '.tag': 'overwrite' },
},
cursor: {
session_id: sessions[file].sessionId,
offset: sessions[file].fileSize,
},
}
}),
})

// TODO: wait processing flag
// console.log('response', response.result)

// let repeat = 5
// while (repeat-- >= 0) {
// const respose = await this.dropbox.filesUploadSessionFinishBatchCheck({
// // @ts-ignore
// async_job_id: response.result.async_job_id,
// })
// console.log('response checking', JSON.stringify(respose.result))
// await new Promise((r) => setTimeout(r, 2500))
// }
}

constructor(private dropbox: Dropbox, private logger?: Logger) {}
}
6 changes: 2 additions & 4 deletions src/upload/dropbox/types/StreamUploader.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { Uploader } from './Uploader'

export type StreamUploader = {
export interface StreamUploader {
onProgress?: (uploaded: number, total: number) => void

/**
Expand All @@ -13,4 +11,4 @@ export type StreamUploader = {
* @default 8Mb
*/
partSizeBytes?: number
} & Uploader
}

0 comments on commit 8b8aeba

Please sign in to comment.