From 456c32c19614dfaf0b38ea2890a0cb318084df5e Mon Sep 17 00:00:00 2001 From: nantiamak Date: Tue, 31 Oct 2023 15:35:52 +0100 Subject: [PATCH] fixed type, cleanup --- src/object.jl | 44 +++++++++++++++++++------------------------- test/runtests.jl | 3 +++ 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/src/object.jl b/src/object.jl index 1e94b15..035d48b 100644 --- a/src/object.jl +++ b/src/object.jl @@ -402,21 +402,15 @@ end An in-memory IO stream that uploads chunks to a URL in blob storage. -Data chunks are written to a buffer and uploaded as distinct separate parts to blob storage. +Data chunks are written to a channel and spawned tasks read buffers from this channel. +Each task uploads a distinct part to blob storage. Each part is tagged with an id and we keep the tags into a vector. We also increment a counter to keep track of the number of parts. When there are no more data chunks to upload, -we send a POST request with an id for the entire upload. - -When using Minio the minimum upload size per part is 5MB according to S3 specifications: -https://github.com/minio/minio/issues/11076 -I couldn't find a minimum upload size for Azure blob storage. - -**Writing to this stream is not thread-safe**. +we send a POST request with a single id for the entire upload. # Arguments * `store::AbstractStore`: The S3 Bucket / Azure Container object * `key::String`: S3 key / Azure blob resource name -* `url::String`: S3 / Azure URL to resource # Keywords * `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP @@ -438,7 +432,7 @@ mutable struct MultipartUploadStream <: IO sync::OrderedSynchronizer eTags::Vector{String} @static if VERSION < v"1.7" - cur_part_id::Threads.Atomic{Bool} + cur_part_id::Threads.Atomic{Int} else @atomic cur_part_id::Int end @@ -472,30 +466,30 @@ end end end -function Base.write(x::MultipartUploadStream, bytes::Vector{UInt8}; kw...) - put!(x.upload_queue, bytes) - Base.@lock x.cond_wait begin - x.ntasks += 1 - notify(x.cond_wait) +function Base.write(io::MultipartUploadStream, bytes::Vector{UInt8}; kw...) + put!(io.upload_queue, bytes) + Base.@lock io.cond_wait begin + io.ntasks += 1 + notify(io.cond_wait) end - part_n = x.cur_part_id - Threads.@spawn _upload_task(x, part_n; kw...) + part_n = io.cur_part_id + Threads.@spawn _upload_task(io, part_n; kw...) # atomically increment our part counter @static if VERSION < v"1.7" - x.cur_part_id += 1 + io.cur_part_id += 1 else - @atomic x.cur_part_id += 1 + @atomic io.cur_part_id += 1 end return nothing end -function Base.close(x::MultipartUploadStream; kw...) - Base.@lock x.cond_wait begin +function Base.close(io::MultipartUploadStream; kw...) + Base.@lock io.cond_wait begin while true - x.ntasks == 0 && !x.is_error && break - wait(x.cond_wait) + io.ntasks == 0 && !io.is_error && break + wait(io.cond_wait) end end - close(x.upload_queue) - return API.completeMultipartUpload(x.store, x.url, x.eTags, x.uploadState; kw...) + close(io.upload_queue) + return API.completeMultipartUpload(io.store, io.url, io.eTags, io.uploadState; kw...) end diff --git a/test/runtests.jl b/test/runtests.jl index 81c62d9..5577642 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -793,6 +793,9 @@ end end end +# When using Minio, the minimum upload size per part is 5MB according to +# S3 specifications: https://github.com/minio/minio/issues/11076 +# I couldn't find a minimum upload size for Azure blob storage. @testset "CloudStore.MultipartUploadStream write large bytes - S3" begin Minio.with(; debug=true) do conf credentials, bucket = conf