Skip to content

Commit

Permalink
fixed type, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nantiamak committed Oct 31, 2023
1 parent 86f8ece commit 456c32c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
44 changes: 19 additions & 25 deletions src/object.jl
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,15 @@ end
An in-memory IO stream that uploads chunks to a URL in blob storage.
Data chunks are written to a buffer and uploaded as distinct separate parts to blob storage.
Data chunks are written to a channel and spawned tasks read buffers from this channel.
Each task uploads a distinct part to blob storage.
Each part is tagged with an id and we keep the tags into a vector. We also increment a
counter to keep track of the number of parts. When there are no more data chunks to upload,
we send a POST request with an id for the entire upload.
When using Minio the minimum upload size per part is 5MB according to S3 specifications:
https://github.com/minio/minio/issues/11076
I couldn't find a minimum upload size for Azure blob storage.
**Writing to this stream is not thread-safe**.
we send a POST request with a single id for the entire upload.
# Arguments
* `store::AbstractStore`: The S3 Bucket / Azure Container object
* `key::String`: S3 key / Azure blob resource name
* `url::String`: S3 / Azure URL to resource
# Keywords
* `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP
Expand All @@ -438,7 +432,7 @@ mutable struct MultipartUploadStream <: IO
sync::OrderedSynchronizer
eTags::Vector{String}
@static if VERSION < v"1.7"
cur_part_id::Threads.Atomic{Bool}
cur_part_id::Threads.Atomic{Int}
else
@atomic cur_part_id::Int
end
Expand Down Expand Up @@ -472,30 +466,30 @@ end
end
end

function Base.write(x::MultipartUploadStream, bytes::Vector{UInt8}; kw...)
put!(x.upload_queue, bytes)
Base.@lock x.cond_wait begin
x.ntasks += 1
notify(x.cond_wait)
function Base.write(io::MultipartUploadStream, bytes::Vector{UInt8}; kw...)
put!(io.upload_queue, bytes)
Base.@lock io.cond_wait begin
io.ntasks += 1
notify(io.cond_wait)
end
part_n = x.cur_part_id
Threads.@spawn _upload_task(x, part_n; kw...)
part_n = io.cur_part_id
Threads.@spawn _upload_task(io, part_n; kw...)
# atomically increment our part counter
@static if VERSION < v"1.7"
x.cur_part_id += 1
io.cur_part_id += 1
else
@atomic x.cur_part_id += 1
@atomic io.cur_part_id += 1
end
return nothing
end

function Base.close(x::MultipartUploadStream; kw...)
Base.@lock x.cond_wait begin
function Base.close(io::MultipartUploadStream; kw...)
Base.@lock io.cond_wait begin
while true
x.ntasks == 0 && !x.is_error && break
wait(x.cond_wait)
io.ntasks == 0 && !io.is_error && break
wait(io.cond_wait)
end
end
close(x.upload_queue)
return API.completeMultipartUpload(x.store, x.url, x.eTags, x.uploadState; kw...)
close(io.upload_queue)
return API.completeMultipartUpload(io.store, io.url, io.eTags, io.uploadState; kw...)
end
3 changes: 3 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,9 @@ end
end
end

# When using Minio, the minimum upload size per part is 5MB according to
# S3 specifications: https://github.com/minio/minio/issues/11076
# I couldn't find a minimum upload size for Azure blob storage.
@testset "CloudStore.MultipartUploadStream write large bytes - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
Expand Down

0 comments on commit 456c32c

Please sign in to comment.