Skip to content

Commit

Permalink
Merge cdd1d23 into 328b427
Browse files Browse the repository at this point in the history
  • Loading branch information
nantiamak authored Oct 30, 2023
2 parents 328b427 + cdd1d23 commit f4fa85e
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/CloudStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import CloudBase: AWS, Azure, CloudTest
# for specific clouds
module API

export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType
export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType,
MultipartUploadStream

using HTTP, CodecZlib, CodecZlibNG, Mmap
import WorkerUtilities: OrderedSynchronizer
Expand Down
119 changes: 119 additions & 0 deletions src/object.jl
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,122 @@ function Base.read(io::PrefetchedDownloadStream, ::Type{UInt8})
end
return b
end

function _upload_task(io, part_n; kw...)
try
upload_buffer = take!(io.upload_queue)
# upload the part
parteTag, wb = uploadPart(io.store, io.url, upload_buffer, part_n, io.uploadState; io.credentials, kw...)
# add part eTag to our collection of eTags in the right order
put!(io.sync, part_n) do
push!(io.eTags, parteTag)
end
Base.@lock io.cond_wait begin
io.ntasks -= 1
notify(io.cond_wait)
end
catch e
isopen(io.upload_queue) && close(io.upload_queue, e)
Base.@lock io.cond_wait begin
io.is_error = true
notify(io.cond_wait, e, all=true, error=true)
end
end
return nothing
end

"""
MultipartUploadStream <: IO
MultipartUploadStream(args...; kwargs...) -> MultipartUploadStream
An in-memory IO stream that uploads chunks to a URL in blob storage.
Data chunks are written to a buffer and uploaded as distinct separate parts to blob storage.
Each part is tagged with an id and we keep the tags into a vector. We also increment a
counter to keep track of the number of parts. When there are no more data chunks to upload,
we send a POST request with an id for the entire upload.
When using Minio the minimum upload size per part is 5MB according to S3 specifications:
https://github.com/minio/minio/issues/11076
I couldn't find a minimum upload size for Azure blob storage.
**Writing to this stream is not thread-safe**.
# Arguments
* `store::AbstractStore`: The S3 Bucket / Azure Container object
* `key::String`: S3 key / Azure blob resource name
* `url::String`: S3 / Azure URL to resource
# Keywords
* `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP
requests
* `kwargs...`: HTTP keyword arguments are forwarded to underlying HTTP requests,
## Examples
```
# Get an IO stream for a remote CSV file `test.csv` living in your S3 bucket
io = MultipartUploadStream(bucket, "test.csv"; credentials)
```
"""
mutable struct MultipartUploadStream <: IO
store::AbstractStore
key::String
url::String
credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
uploadState
sync::OrderedSynchronizer
eTags::Vector{String}
@atomic cur_part_id::Int
upload_queue::Channel{Vector{UInt8}}
cond_wait::Threads.Condition
ntasks::Int
is_error::Bool

function MultipartUploadStream(
store::AbstractStore,
key::String;
credentials::Union{CloudCredentials, Nothing}=nothing,
kw...
)
url = makeURL(store, key)
uploadState = API.startMultipartUpload(store, key; credentials, kw...)
return new(
store,
key,
url,
credentials,
uploadState,
OrderedSynchronizer(1),
Vector{String}(),
1,
Channel{Vector{UInt8}}(Inf),
Threads.Condition(),
0,
false,
)
end
end

function Base.write(x::MultipartUploadStream, bytes::Vector{UInt8}; kw...)
put!(x.upload_queue, bytes)
Base.@lock x.cond_wait begin
x.ntasks += 1
notify(x.cond_wait)
end
part_n = x.cur_part_id
Threads.@spawn _upload_task(x, part_n; kw...)
# atomically increment our part counter
@atomic x.cur_part_id += 1
return nothing
end

function Base.close(x::MultipartUploadStream; kw...)
Base.@lock x.cond_wait begin
while true
x.ntasks == 0 && !x.is_error && break
wait(x.cond_wait)
end
end
close(x.upload_queue)
return API.completeMultipartUpload(x.store, x.url, x.eTags, x.uploadState; kw...)
end
47 changes: 47 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -793,5 +793,52 @@ end
end
end

@testset "CloudStore.MultipartUploadStream write large bytes - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 5500000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
@test view(buf, 1:nb) == view(codeunits(multicsv), i:i+nb-1)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

@testset "CloudStore.MultipartUploadStream write large bytes - Azure" begin
Azurite.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 2000000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
@test view(buf, 1:nb) == view(codeunits(multicsv), i:i+nb-1)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

end # @testset "CloudStore.jl"

0 comments on commit f4fa85e

Please sign in to comment.