-
-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite streamUpload #22
Conversation
79a50ac
to
59bc3b4
Compare
cc @axman6 |
Something is wrong, it's leaking memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @domenkozar, despite the mostly negative comments below, I do very much like the restructuring of this - splitting things into conduits is something I probably should have already done, and the use of concurrentMapM_ quite nice, I like the idea of being able to build up Parts while others are being uploaded.
It looks like there's very serious issue with vectorBuilder
- unless I'm really not understanding the code, it looks like it completely avoids streaming the data, which will lead to huge memory problems, and defeat the purpose of this library.
import qualified Data.Vector.Storable as ScopedTypeVariables | ||
import Conduit ( MonadUnliftIO(..), mapC, PrimMonad ) | ||
import Data.Conduit ( ConduitT, Void, await, catchC, handleC, (.|), leftover, yield, awaitForever) | ||
import Data.Conduit.Combinators (vectorBuilder, sinkList) | ||
import Data.Conduit.List ( sourceList ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the formatting of imports consistent? (Looks like I wasn't very consistent, but I though this repo had a stylish-haskell config so it should be fixed automatically if you run that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (I hope, there weren't consistent beforehand).
startUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => | ||
ConduitT | ||
(Int, ByteString) | ||
Void | ||
m | ||
(Either | ||
(AbortMultipartUploadResponse, SomeException) | ||
CompleteMultipartUploadResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => | |
ConduitT | |
(Int, ByteString) | |
Void | |
m | |
(Either | |
(AbortMultipartUploadResponse, SomeException) | |
CompleteMultipartUploadResponse) | |
startUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) | |
=> ConduitT (Int, ByteString) Void m | |
(Either (AbortMultipartUploadResponse, SomeException) | |
CompleteMultipartUploadResponse) |
@@ -85,74 +92,107 @@ See the AWS documentation for more details. | |||
|
|||
May throw 'Network.AWS.Error' | |||
-} | |||
streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m) | |||
streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m, PrimMonad m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have been talking about replacing thew MonadAWS m
constraint with AWSConstraint r m
, if possible it'd be good to do that here too. That would bring in the MonadResource constraint IIRC. Can any of these constraints be removed easily enough? MonadFail is a bit of a pain, I had thought about making the return type of this conduit a sum type instead of the Either, which could include a constructor for failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would appreciate if that's out of scope here.
- more performance chunking - concurrent upload - a tiny bit better flow
@axman6 I've updated the PR to use raw memory buffer that should become part of the conduit api at some point snoyberg/conduit#438 The last blocking thing is amazonka release as it needs some extra monad instances for AWST. See brendanhay/amazonka#574 |
I can confirm with this code I can upload to s3 bucket with ~80MB/s. |
@axman6 ping? |
Sorry, I hadn't seen the updates here. One of the goals I'd had for the conduit side of things was to avoid copying data if possible, which is why the I was using DLists of bytestrings. I'm not sure that is a great idea though, and the use of a preallocated buffer is nice, but it makes me think that maybe we should just be using byte string builders if we're going to go that route, and be more sure that we're not going to have dangerous memory accesses. I might make a fork of this branch and play with it to see if I can come up with something I'm happier with. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a go at rewriting this at the moment to use Bytestring.Builder. Initial results are very promising, but I'm worried something is going wrong with my benchmarks - I'm unable achieve the 80MB/s speeds you mentioned, for some reason I'm only getting 8MB/s, and there's definitely a memory leak, it seems to be holding onto all of the input data.
With the Builder version, I'm achieving 80MB/s from an EC2 host in the same region as the bucket. I'm also seeing a similar memory leak though.
processChunk :: ChunkSize -> ByteString -> S -> IO ([ByteString], S) | ||
processChunk chunkSize input = | ||
loop id 0 | ||
where | ||
loop front idxIn s@(S fptr ptr idxOut) | ||
| idxIn >= B.length input = return (front [], s) | ||
| otherwise = do | ||
pokeByteOff ptr idxOut (unsafeIndex input idxIn) | ||
let idxOut' = idxOut + 1 | ||
idxIn' = idxIn + 1 | ||
if idxOut' >= chunkSize | ||
then do | ||
let bs = PS fptr 0 idxOut' | ||
s' <- newS chunkSize | ||
loop (front . (bs:)) idxIn' s' | ||
else loop front idxIn' (S fptr ptr idxOut') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it's quite a convoluted way to implement memcpy
, which already exists in as copyBytes
. Also does line 316 mean that the buffer might end up being filled to less than chunkSize
? the multipart upload API requires that parts are at least 5MB, so we need to be sure that we always send at least chunkSize
bytes in each request.
I'm struggling to assess the correctness of this code, for example I can't tell without thinking very hard whether it's possible that chunks might get not be included in the output - what happens to input
in the idxIn >= B.length input
case? I think this is another good reason to use Builders - we can accumulate input bytestrings until we have allocated at least chunkSize
bytes, and then output the builder, and its length, so we can use https://hackage.haskell.org/package/bytestring-0.10.10.0/docs/Data-ByteString-Builder-Extra.html to produce a lazy byte string with a single chunk. This avoids doing all the pointer mangling by hand.
@axman6 what would you like to do with this? |
I'm using this in production for http://cachix.org/. I understand the code is not ideal, but it improves effectiveness and CPU/memory usage over the current code. I also understand you want to keep the code tidy and correct. Closing as I don't see a way forward. That said, thanks a lot for this library! |
Hey @domenkozar, would you be interested in taking over this package? I don't have a use for it or time to maintain it (not that I ever did really). If so, I am happy to add/make you the maintainer on Hackage and you can keep it going with releases if you like. It's great to hear it's being used in production for something so useful, and I'm even happier to know it's not using my atrocious code! 😅 |
Hey @axman6 / @axman6-da, I gave this a thought and I'm not sure I'm in a good place to maintain this library as well. Already have too many packages on my plate :( Maybe you could make a call for maintainers and see if someone steps up? |
If the memory leak is in request bodies like the Glacier issue linked upthread, snoyberg/http-client#538 seems pretty suspicious as a cause. |
I wanted to rewrite streaming to improve performance of chunking.
DList
is quite inefficient compared to vectorBuilder. See https://www.fpcomplete.com/blog/2014/07/vectorbuilder-packed-conduit-yieldingA few improvements along the way:
Need testing.
Out of scope: rewriting
concurrentUpload
in terms ofstreamUpload