Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a MultipartUploadStream IO object #46

Merged
merged 30 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e6173ab
WIP MultipartUploadStream
nantiamak Oct 11, 2023
57dfc36
more debugging and testing
nantiamak Oct 12, 2023
4eabf65
added test writing large bytes to S3 with MultipleUploadStream
nantiamak Oct 16, 2023
57c2d16
cleanup, added comments and a test with Azurite
nantiamak Oct 17, 2023
b711349
WIP - spawn multiple tasks for uploading
nantiamak Oct 23, 2023
2273210
WIP - upload with multiple threads
nantiamak Oct 24, 2023
1f16ffc
2nd attempt - upload with multiple threads
nantiamak Oct 26, 2023
174464c
fixed Azure test, S3 test still fails with error 400
nantiamak Oct 27, 2023
1d7ef83
Julia scheduling error debugging
nantiamak Oct 27, 2023
7d7ef7b
fixed S3 test failures
nantiamak Oct 30, 2023
5f903f1
added error flag to avoid deadlocks
nantiamak Oct 30, 2023
2ed517d
put ntasks increment behind lock
nantiamak Oct 30, 2023
a9232f4
put back tests
nantiamak Oct 30, 2023
cdd1d23
cleanup
nantiamak Oct 30, 2023
86f8ece
fixed julia 1.6 incompatibility issue with @atomic
nantiamak Oct 31, 2023
456c32c
fixed type, cleanup
nantiamak Oct 31, 2023
b5a7ad5
fixed initialization for Julia 1.6
nantiamak Oct 31, 2023
97c9ca6
fixed type again
nantiamak Oct 31, 2023
737c408
another attempt for Julia 1.6
nantiamak Oct 31, 2023
b2cf37d
atomic_add! for Julia 1.6
nantiamak Nov 1, 2023
0458ef5
Tomas' changes
nantiamak Nov 6, 2023
6072e72
cleanup
nantiamak Nov 6, 2023
e6ba825
addressed feedback
nantiamak Nov 14, 2023
0214836
replaced acquire macro with acquire function for Julia 1.6 compatibility
nantiamak Nov 14, 2023
6439a3e
small refactoring of MultipartUploadStream based on feedback
nantiamak Nov 16, 2023
8910fb1
added tests for failures
nantiamak Nov 16, 2023
ac71f14
alternative syntax, fixed semaphore, cleanup
nantiamak Nov 16, 2023
f72716d
added specialized constructors
nantiamak Nov 16, 2023
0790f79
comments and cleanup
nantiamak Nov 16, 2023
c18952b
more comments and cleanup
nantiamak Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/CloudStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import CloudBase: AWS, Azure, CloudTest
# for specific clouds
module API

export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType
export Object, PrefetchedDownloadStream, ResponseBodyType, RequestBodyType,
MultipartUploadStream

using HTTP, CodecZlib, CodecZlibNG, Mmap
import WorkerUtilities: OrderedSynchronizer
Expand Down
206 changes: 206 additions & 0 deletions src/object.jl
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,209 @@ function Base.read(io::PrefetchedDownloadStream, ::Type{UInt8})
end
return b
end

function _upload_task(io; kw...)
try
(part_n, upload_buffer) = take!(io.upload_queue)
# upload the part
parteTag, wb = uploadPart(io.store, io.url, upload_buffer, part_n, io.uploadState; io.credentials, kw...)
Base.release(io.sem)
# add part eTag to our collection of eTags
Base.@lock io.cond_wait begin
if length(io.eTags) < part_n
resize!(io.eTags, part_n)
end
io.eTags[part_n] = parteTag
io.ntasks -= 1
notify(io.cond_wait)
end
catch e
isopen(io.upload_queue) && close(io.upload_queue, e)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Drvi Could you help me with adding a test case that triggers an error during uploading and covers these lines? I tried with uploading a part that was smaller than the minimum S3 size and got the following error, but it didn't exercise this catch block. 🤔

HTTP.Exceptions.StatusError(400, "POST", "/jl-minio-23869/test.csv?uploadId=7e57586f-e75d-4cf6-a14e-f480ebe655cd", HTTP.Messages.Response:
  """
  HTTP/1.1 400 Bad Request
  Accept-Ranges: bytes
  Cache-Control: no-cache
  Content-Length: 364
  Content-Security-Policy: block-all-mixed-content
  Content-Type: application/xml
  Server: MinIO
  Strict-Transport-Security: max-age=31536000; includeSubDomains
  Vary: Origin, Accept-Encoding
  X-Accel-Buffering: no
  X-Amz-Request-Id: 179736FE62556FB8
  X-Content-Type-Options: nosniff
  X-Xss-Protection: 1; mode=block
  Date: Mon, 13 Nov 2023 15:04:10 GMT
 
  <?xml version="1.0" encoding="UTF-8"?>
  <Error><Code>EntityTooSmall</Code><Message>Your proposed upload is smaller than the minimum allowed object size.</Message><Key>test.csv</Key><BucketName>jl-minio-23869</BucketName><Resource>/jl-minio-23869/test.csv</Resource><RequestId>179736FE62556FB8</RequestId><HostId>7b2b4a12-8baf-4d22-bc30-e4c3f2f12bff</HostId></Error>""")
  Stacktrace:
    [1] (::HTTP.ConnectionRequest.var"#connections#4"{HTTP.ConnectionRequest.var"#connections#1#5"{HTTP.TimeoutRequest.var"#timeouts#3"{HTTP.TimeoutRequest.var"#timeouts#1#4"{HTTP.ExceptionRequest.var"#exceptions#2"
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you had a good idea, try writing a small file and then testing, that the channel is closed and that the exp field is populated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but when the errors happens I'm getting Exiting on signal: TERMINATED and the test terminates before I get to check the channel and the exception.

Base.@lock io.cond_wait begin
io.exc = e
notify(io.cond_wait, e, all=true, error=true)
end
end
return nothing
end

"""
This is an *experimental* API.

MultipartUploadStream <: IO
MultipartUploadStream(args...; kwargs...) -> MultipartUploadStream

An in-memory IO stream that uploads chunks to a URL in blob storage.

For every data chunk we call write(io, data;) to write it to a channel. We spawn one task
per chunk to read data from this channel and uploads it as a distinct part to blob storage
to the same remote object.
We expect the chunks to be written in order.
For cases where there is no need to upload data in parts or the data size is too small, `put`
can be used instead.

# Arguments
* `store::AbstractStore`: The S3 Bucket / Azure Container object
* `key::String`: S3 key / Azure blob resource name

# Keywords
* `credentials::Union{CloudCredentials, Nothing}=nothing`: Credentials object used in HTTP
requests
* `concurrent_writes_to_channel::Int=(4 * Threads.nthreads())`: represents the max number of
chunks in flight. Defaults to 4 times the number of threads. We use this value to
initialize a semaphore to perform throttling in case the writing to the channel is much
faster to uploading to blob storage, i.e. `write` will block as a result of this limit
being reached.
* `kwargs...`: HTTP keyword arguments are forwarded to underlying HTTP requests,

## Examples
```
# Get an IO stream for a remote CSV file `test.csv` living in your S3 bucket
io = MultipartUploadStream(bucket, "test.csv"; credentials)

# Write a chunk of data (Vector{UInt8}) to the stream
write(io, data;)

# Wait for all chunks to be uploaded
wait(io)

# Close the stream
close(io; credentials)

# Alternative syntax that encapsulates all these steps
MultipartUploadStream(bucket, "test.csv"; credentials) do io
write(io, data;)
end
```

## Note on upload size
```
Some cloud storage providers might have a lower limit on the size of the uploaded object.
For example it seems that S3 requires at minimum an upload of 5MB:
https://github.com/minio/minio/issues/11076.
We haven't found a similar setting for Azure.
For such cases where the size of the data is too small, one can use `put`
```
"""
mutable struct MultipartUploadStream{T <: AbstractStore} <: IO
store::T
url::String
credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
uploadState::Union{String, Nothing}
eTags::Vector{String}
upload_queue::Channel{Tuple{Int, Vector{UInt8}}}
cond_wait::Threads.Condition
cur_part_id::Int
ntasks::Int
exc::Union{Exception, Nothing}
sem::Base.Semaphore

function MultipartUploadStream(
Drvi marked this conversation as resolved.
Show resolved Hide resolved
store::AWS.Bucket,
key::String;
credentials::Union{Nothing, AWS.Credentials}=nothing,
concurrent_writes_to_channel::Int=(4 * Threads.nthreads()),
kw...
)
url = makeURL(store, key)
uploadState = API.startMultipartUpload(store, key; credentials, kw...)
io = new{AWS.Bucket}(
store,
url,
credentials,
uploadState,
Vector{String}(),
Channel{Tuple{Int, Vector{UInt8}}}(Inf),
Threads.Condition(),
0,
0,
nothing,
Base.Semaphore(concurrent_writes_to_channel)
)
return io
end

function MultipartUploadStream(
store::Azure.Container,
key::String;
credentials::Union{Nothing, Azure.Credentials}=nothing,
concurrent_writes_to_channel::Int=(4 * Threads.nthreads()),
kw...
)
url = makeURL(store, key)
uploadState = API.startMultipartUpload(store, key; credentials, kw...)
io = new{Azure.Container}(
store,
url,
credentials,
uploadState,
Vector{String}(),
Channel{Tuple{Int, Vector{UInt8}}}(Inf),
Threads.Condition(),
0,
0,
nothing,
Base.Semaphore(concurrent_writes_to_channel)
)
return io
end

# Alternative syntax that applies the function `f` to the result of
# `MultipartUploadStream(args...; kwargs...)`, waits for all parts to be uploaded and
# and closes the stream.
function MultipartUploadStream(f::Function, args...; kw...)
io = MultipartUploadStream(args...; kw...)
try
f(io)
wait(io)
close(io; kw...)
catch e
# todo, we need a function here to signal abort to S3/Blobs. We don't have that
# yet in CloudStore.jl
rethrow()
end
end
end

# Writes a data chunk to the channel and spawn
function Base.write(io::MultipartUploadStream, bytes::Vector{UInt8}; kw...)
local part_n
Base.@lock io.cond_wait begin
io.ntasks += 1
io.cur_part_id += 1
part_n = io.cur_part_id
notify(io.cond_wait)
end
Base.acquire(io.sem)
Drvi marked this conversation as resolved.
Show resolved Hide resolved
# We expect the data chunks to be written in order in the channel.
put!(io.upload_queue, (part_n, bytes))
Threads.@spawn _upload_task($io; $(kw)...)
return nothing
end

# Waits for all parts to be uploaded
function Base.wait(io::MultipartUploadStream)
try
Base.@lock io.cond_wait begin
while true
!isnothing(io.exc) && throw(io.exc)
io.ntasks == 0 && break
wait(io.cond_wait)
end
end
catch e
rethrow()
Copy link
Member

@Drvi Drvi Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a way to signal to S3/Blobs that we're aborting the multi-part upload a la AbortMultipartUpload... can you open an issue about it?

Also, I think we should consider the following syntax to the user

MultipartUploadStream(...) do io 
    ...
end

something like

function MultipartUploadStream(f::Function, args...; kwargs...)
    io = MultipartUploadStream(args...; kwargs...)
    try
        f(io)
        wait(io)
        close(io)
    catch e
        abort(io, e) # todo, we don't have this yet
        rethrow()
    end
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already an issue for adding more low-level functionality regarding multipart uploads including list parts and and list parts that are in progress, which are related to aborting: #3. Should I add a comment there about abort?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the alternative syntax, f would need to encapsulate a call to MultipartUploadStream.write() though, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3

ah, sorry, I didn't realize there was an issue already

For the alternative syntax, f would need to encapsulate a call to MultipartUploadStream.write() though, right?

Yes, similar to what one would do with a local file

open("file") do io
    write(io, "my stuff")
end

end
end

# When there are no more data chunks to upload, this function closes the channel and sends
# a POST request with a single id for the entire upload.
function Base.close(io::MultipartUploadStream; kw...)
try
close(io.upload_queue)
return API.completeMultipartUpload(io.store, io.url, io.eTags, io.uploadState; kw...)
catch e
io.exc = e
rethrow()
end
end
119 changes: 119 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -793,5 +793,124 @@ end
end
end

# When using Minio, the minimum upload size per part is 5MB according to
# S3 specifications: https://github.com/minio/minio/issues/11076
# I couldn't find a minimum upload size for Azure blob storage.
@testset "CloudStore.MultipartUploadStream write large bytes - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 5500000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

@testset "CloudStore.MultipartUploadStream failure due to too small upload size - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 55000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)
try
i = 1
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials) # This should fail
catch e
@test isnothing(mus_obj.exc) == false
end
end
end

@testset "CloudStore.MultipartUploadStream failure due to changed url - S3" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 5500000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)
try
i = 1
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
# Changing the url after the MultipartUploadStream object was created
mus_obj.url = "http://127.0.0.1:23252/jl-minio-22377/test_nantia.csv"
CloudStore.write(mus_obj, buf;) # This should fail
CloudStore.wait(mus_obj)
catch e
@test isnothing(mus_obj.exc) == false
end
end
end

@testset "CloudStore.MultipartUploadStream write large bytes - Azure" begin
Azurite.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 2000000
mus_obj = CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials)

i = 1
while i < sizeof(multicsv)
nb = i + N > length(multicsv) ? length(multicsv)-i+1 : N
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += N
end

CloudStore.wait(mus_obj)
CloudStore.close(mus_obj; credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

@testset "CloudStore.MultipartUploadStream test alternative syntax - Azure" begin
Azurite.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB

N = 2000000
function uploading_loop(multicsv, batch_size, mus_obj)
i = 1
while i < sizeof(multicsv)
nb = i + batch_size > length(multicsv) ? length(multicsv)-i+1 : batch_size
buf = Vector{UInt8}(undef, nb)
copyto!(buf, 1, codeunits(multicsv), i, nb)
CloudStore.write(mus_obj, buf;)
i += batch_size
end
end

CloudStore.MultipartUploadStream(bucket, "test.csv"; credentials) do mus_obj
uploading_loop(multicsv, N, mus_obj)
end

obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
end
end

end # @testset "CloudStore.jl"
Loading