Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blob): multipart upload #71

Merged
merged 20 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 71 additions & 6 deletions playground/pages/blob.vue
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<script setup lang="ts">
import type { SerializeObject } from 'nitropack'

const loading = ref(false)
const newFilesValue = ref<File[]>([])
const uploadRef = ref()
Expand All @@ -15,12 +17,7 @@ async function addFile () {
loading.value = true

try {
const formData = new FormData()
newFilesValue.value.forEach((file) => formData.append('files', file))
const uploadedFiles = await $fetch('/api/blob', {
method: 'PUT',
body: formData
})
const uploadedFiles = await uploadFiles(newFilesValue.value)
files.value!.push(...uploadedFiles)
toast.add({ title: `File${uploadedFiles.length > 1 ? 's' : ''} uploaded.` })
newFilesValue.value = []
Expand All @@ -31,6 +28,74 @@ async function addFile () {
loading.value = false
}

async function uploadFiles(files: File[]) {
const bigFileLimit = 10 * 1024 * 1024 // 10MB
const chunkSize = 10 * 1024 * 1024 // 10MB

const bigFiles = files.filter((file) => file.size > bigFileLimit)
const smallFiles = files.filter((file) => file.size <= bigFileLimit)

// upload small files
const formData = new FormData()
smallFiles.forEach((file) => formData.append('files', file))

const uploadedFiles = await $fetch('/api/blob', {
method: 'PUT',
body: formData
})

// upload big files
for (const file of bigFiles) {
const chunks = Math.ceil(file.size / chunkSize)
const uploaded: BlobUploadedPart[] = []

const { pathname, uploadId } = await $fetch<{pathname: string, uploadId: string}>('/api/blob/mpu', {
method: 'POST',
query: {
action: 'create',
pathname: file.name,
}
})

for (let i = 0; i < chunks; i += 1) {
const start = i * chunkSize
const end = Math.min(start + chunkSize, file.size)
const partNumber = i + 1
const chunk = file.slice(start, end)

const part = await $fetch<BlobUploadedPart>(`/api/blob/mpu/${pathname}`, {
params: {},
method: 'PUT',
body: await chunk.arrayBuffer(),
query: {
partNumber,
uploadId,
}
})

// optional: verify the etag and reupload if not match

uploaded.push(part)
}

const complete = await $fetch<SerializeObject<BlobObject>>('/api/blob/mpu', {
method: 'POST',
query: {
action: 'complete',
pathname,
uploadId,
},
body: {
parts: uploaded,
},
})

uploadedFiles.push(complete)
}

return uploadedFiles
}

function onFileSelect (e: any) {
const target = e.target

Expand Down
22 changes: 22 additions & 0 deletions playground/server/api/blob/mpu/[...pathname].delete.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { z } from 'zod'

export default eventHandler(async (event) => {
const hub = hubBlob()

const { pathname } = await getValidatedRouterParams(event, z.object({
pathname: z.string().min(1)
}).parse)

const { uploadId } = await getValidatedQuery(event, z.object({
uploadId: z.string(),
}).parse)

const mpu = hub.resumeMultipartUpload(pathname, uploadId)

try {
return await mpu.abort()
}
catch (e: any) {
throw createError({ status: 400, message: e.message })
}
})
43 changes: 43 additions & 0 deletions playground/server/api/blob/mpu/[...pathname].put.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { z } from 'zod'

async function streamToArrayBuffer(stream: ReadableStream, streamSize: number) {
const result = new Uint8Array(streamSize)
let bytesRead = 0
const reader = stream.getReader()
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
result.set(value, bytesRead)
bytesRead += value.length
}
return result
}

export default eventHandler(async (event) => {
const { pathname } = await getValidatedRouterParams(event, z.object({
pathname: z.string().min(1)
}).parse)

const { uploadId, partNumber } = await getValidatedQuery(event, z.object({
uploadId: z.string(),
partNumber: z.coerce.number(),
}).parse)

const contentLength = Number(getHeader(event, 'content-length') || '0')

const stream = getRequestWebStream(event)!
const body = await streamToArrayBuffer(stream, contentLength)


const hub = hubBlob()
const mpu = hub.resumeMultipartUpload(pathname, uploadId)

try {
return await mpu.uploadPart(partNumber, body)
} catch (e: any) {
throw createError({ status: 400, message: e.message })
}
})
55 changes: 55 additions & 0 deletions playground/server/api/blob/mpu/index.post.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { z } from 'zod'

export default eventHandler(async (event) => {
const query = await getValidatedQuery(event, z.discriminatedUnion('action', [
z.object({
action: z.literal('create'),
pathname: z.string(),
}),
z.object({
action: z.literal('complete'),
pathname: z.string(),
uploadId: z.string(),
}),
]).parse)

if (query.action === 'create') {
const options = await readValidatedBody(event, z.record(z.string(), z.any()).optional().parse)

const blob = hubBlob()

try {
const object = await blob.createMultipartUpload(query.pathname, options)
return {
uploadId: object.uploadId,
pathname: object.pathname,
}
} catch (e: any) {
throw createError({
statusCode: 400,
message: e.message
})
}
} else {
const { uploadId, pathname } = query
const { parts } = await readValidatedBody(event,z.object({
parts: z.array(z.object({
partNumber: z.number(),
etag: z.string(),
}))
}).parse)

const blob = hubBlob()

const mpu = blob.resumeMultipartUpload(pathname, uploadId)
try {
const object = await mpu.complete(parts)
return object
} catch (e: any) {
throw createError({
statusCode: 400,
message: e.message
})
}
}
})
31 changes: 31 additions & 0 deletions src/runtime/server/api/_hub/blob/mpu/[...pathname].delete.ts
Teages marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { createError, eventHandler } from 'h3'
import { z } from 'zod'
import { hubBlob } from '../../../../utils/blob'
import { requireNuxtHubAuthorization } from '../../../../utils/auth'
import { requireNuxtHubFeature } from '../../../../utils/features'

export default eventHandler(async (event) => {
await requireNuxtHubAuthorization(event)
requireNuxtHubFeature('blob')

const { pathname } = await getValidatedRouterParams(event, z.object({
pathname: z.string().min(1)
}).parse)

const { uploadId } = await getValidatedQuery(event, z.object({
uploadId: z.string(),
}).parse)


const blob = hubBlob()
const mpu = blob.resumeMultipartUpload(pathname, uploadId)

try {
await mpu.abort()
} catch (e: any) {
throw createError({
statusCode: 500,
message: `Storage error: ${e.message}`
})
}
Teages marked this conversation as resolved.
Show resolved Hide resolved
})
52 changes: 52 additions & 0 deletions src/runtime/server/api/_hub/blob/mpu/[...pathname].put.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { createError, eventHandler } from 'h3'
import { z } from 'zod'
import { hubBlob } from '../../../../utils/blob'
import { requireNuxtHubAuthorization } from '../../../../utils/auth'
import { requireNuxtHubFeature } from '../../../../utils/features'

async function streamToArrayBuffer(stream: ReadableStream, streamSize: number) {
const result = new Uint8Array(streamSize)
let bytesRead = 0
const reader = stream.getReader()
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
result.set(value, bytesRead)
bytesRead += value.length
}
return result
}
Teages marked this conversation as resolved.
Show resolved Hide resolved

export default eventHandler(async (event) => {
await requireNuxtHubAuthorization(event)
requireNuxtHubFeature('blob')

const { pathname } = await getValidatedRouterParams(event, z.object({
pathname: z.string().min(1)
}).parse)

const { uploadId, partNumber } = await getValidatedQuery(event, z.object({
uploadId: z.string(),
partNumber: z.number()
}).parse)

const contentLength = Number(getHeader(event, 'content-length') || '0')

const stream = getRequestWebStream(event)!
const body = await streamToArrayBuffer(stream, contentLength)

const hub = hubBlob()
const mpu = hub.resumeMultipartUpload(pathname, uploadId)

try {
return await mpu.uploadPart(partNumber, body)
} catch (e: any) {
throw createError({
statusCode: 500,
message: `Storage error: ${e.message}`
})
}
})
59 changes: 59 additions & 0 deletions src/runtime/server/api/_hub/blob/mpu/index.post.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createError, eventHandler } from 'h3'
import { z } from 'zod'
import { hubBlob } from '../../../../utils/blob'
import { requireNuxtHubAuthorization } from '../../../../utils/auth'
import { requireNuxtHubFeature } from '../../../../utils/features'

export default eventHandler(async (event) => {
await requireNuxtHubAuthorization(event)
requireNuxtHubFeature('blob')

const query = await getValidatedQuery(event, z.discriminatedUnion('action', [
z.object({
action: z.literal('create'),
pathname: z.string(),
Teages marked this conversation as resolved.
Show resolved Hide resolved
}),
z.object({
action: z.literal('complete'),
pathname: z.string(),
uploadId: z.string(),
}),
]).parse)

if (query.action === 'create') {
const options = await readValidatedBody(event, z.record(z.string(), z.any()).optional().parse)

const blob = hubBlob()

try {
const object = await blob.createMultipartUpload(query.pathname, options)
return object
} catch (e: any) {
throw createError({
statusCode: 500,
message: `Storage error: ${e.message}`
})
}
} else {
const { uploadId, pathname } = query
const { parts } = await readValidatedBody(event,z.object({
parts: z.array(z.object({
partNumber: z.number(),
etag: z.string(),
}))
}).parse)

const blob = hubBlob()

const mpu = blob.resumeMultipartUpload(pathname, uploadId)
try {
const object = await mpu.complete(parts)
return object
} catch (e: any) {
throw createError({
statusCode: 500,
message: `Storage error: ${e.message}`
})
}
}
})
Loading