Skip to content

Upload streaming #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 17, 2019
Merged

Upload streaming #37

merged 24 commits into from
Jun 17, 2019

Conversation

artemredkin
Copy link
Collaborator

Addressing comments in the client proposal discussion, I would like to propose the following change:

HTTPResponseDelegate.didReceivePart now returns an optional future that could be used to indicate to the client that reads should be stopped until this future is resolved (backpressure)

HTTPClient.Body is now a struct with optional length and a callback for upload streaming. In addition, there are static methods to keep API the same as before.

Motivation:
Users may want to optimize memory usage when executing request with big
bodies, same as streaming download

Modifications:
Added ChunkProvider typealias in HTTPHandler.swift
Added new Body enum case - stream in HTTPHandler.swift
Extracted body processing in HTTPHandler.swift to a separate method
Added .stream enum processing in HTTPHandler.swift
Added upload streaming test to SwiftNIOHTTPTests.swift

Result:
HTTPClient library now provides methods to stream Request body
Copy link
Contributor

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that looks like a great start. I think we need to invest a bunch more in testing this functionality though.

@@ -322,4 +322,65 @@ class SwiftHTTPTests: XCTestCase {
let res = try httpClient.get(url: "https://test/ok").wait()
XCTAssertEqual(res.status, .ok)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a bunch of other tests here:

  • upload/download interrupted
  • failed future returned from uploading & downloading
  • a test that shows that the back-pressure is working

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added failure test and backpressure test. What do you mean by interruption? External cancellation?

@tanner0101 tanner0101 added the kind/enhancement Improvements to existing feature. label May 29, 2019
Copy link
Contributor

@ianpartridge ianpartridge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be missing something... I thought didReceivePart() was for streaming incoming body data (e.g. GET response). Are you intending to also use this callback for streaming outgoing body data (e.g. POST)? If so, I think that is confusing to the user...

@weissi
Copy link
Contributor

weissi commented May 31, 2019

I must be missing something... I thought didReceivePart() was for streaming incoming body data (e.g. GET response). Are you intending to also use this callback for streaming outgoing body data (e.g. POST)? If so, I think that is confusing to the user...

@ianpartridge no, you're not missing anything but it's important to propagate back-pressure. Let's assume we have three different nodes A, B, and C with two active HTTP connections:

A --> B --> C

where A --> B is B download streaming a large file from A (eg. through a GET) and B --> C is B upload streaming the same file to C (eg. through a POST).

Now let's assume we're B and we prefer not to be killed by the OOM killer. We must now protect against A delivering us the bytes faster than C will accept them (or else we would need to buffer the bytes that C didn't accept yet in memory which would eventually lead to the OOM killer coming to find us).

How can we (as B) achieve this? We need to propagate the back-pressure that C is giving us back to A. For this HTTP API this would mean that we need to be able to tell the didReceivePart callback when we're ready to accept the next part. For blocking IO, back-pressure is easy by doing:

func didReceivePart(data: ...) {
    otherConnection.sendData(data) // blocks until `data` is fully sent
}

for asynchronous programming we can express the same concept as

func didReceivePart(data: ...) -> EventLoopFuture<Void> {
    return otherConnection.sendData(data) // returns a future that is fulfilled when the data has been sent
}

in other words: The whole thing totally works iff the API allows us to indicate when we're ready for the next chunk and that's done with the change to have didReceivePart return a future.

And yes, this isn't actually related to upload streaming but when I was looking at the back-pressure for the upload streaming I noticed that download streaming doesn't actually support back-pressure and therefore it's in this PR :).

@ianpartridge
Copy link
Contributor

OK, thanks so much for the awesome explanation. So, if I'm understanding correctly, the new EventLoopFuture<Void>? return value of didReceivePart() means as follows:

nil: I've finished handling the part, you can send me another whenever you like.
An EventLoopFuture<Void>: I'm still handling the part, you can send me another when this future is fulfilled.

Thoughts...

Is the reason the future is optional because it avoids the cost of creating an already succeeded future in the case where the user doesn't care about propagating backpressure? Otherwise the future could be non-optional and we just ask the user to create a succeeded future.

Would returning an enum with two cases (case done and case pending(EventLoopFuture<Void>)) be clearer to the user?

@Lukasa
Copy link
Collaborator

Lukasa commented May 31, 2019

I'd missed this, thanks for pointing this out @ianpartridge:

Is the reason the future is optional because it avoids the cost of creating an already succeeded future in the case where the user doesn't care about propagating backpressure? Otherwise the future could be non-optional and we just ask the user to create a succeeded future.

I don't think we should allow a nil return value here. Forgetting to respect back pressure is a common source of nasty bugs that can manifest as OOMs and crashes, so the easier we can make it to propagate the backpressure the better. I think we did a good job of this on NonBlockingFileIO.readChunked, and I'd like to see us continue that here, so I'd be strongly in favour of requiring a future to be returned.

@weissi
Copy link
Contributor

weissi commented May 31, 2019

Thoughts...

Is the reason the future is optional because it avoids the cost of creating an already succeeded future in the case where the user doesn't care about propagating backpressure? Otherwise the future could be non-optional and we just ask the user to create a succeeded future

Hah, glad you point that out! I had no idea it was optional, I don't think it should be :). @artemredkin was that deliberate?

Would returning an enum with two cases (case done and case pending(EventLoopFuture<Void>)) be clearer to the user?

This would be slightly cheaper at run-time (than always allocating) in the case where we don't need back-pressure (rare!) and as you point out make the API less clear. This feels like premature optimisation to me.

@artemredkin
Copy link
Collaborator Author

It was deliberate, yes. I'll change it to non-optional

@artemredkin
Copy link
Collaborator Author

Actually, reason I made it's optional is that there is no easy way to create completed future without passing in eventLoopGroup...

@Lukasa
Copy link
Collaborator

Lukasa commented May 31, 2019

In general I think that's an acceptable limitation. I'm happy with the idea that any reasonable delegate implementation will need access to an event loop.

@artemredkin
Copy link
Collaborator Author

@Lukasa @weissi do you think enum may be used here, or should I pass in a non-optional promise as an argument for example?

@artemredkin
Copy link
Collaborator Author

@Lukasa I wanted delegates to have simple inits without dependencies...

@Lukasa
Copy link
Collaborator

Lukasa commented May 31, 2019

Neither: I think both will be misused. If we really think it's unacceptable to require the delegate have access to an event loop then I prefer non-optional promise to the enum unless the empty case is .iAmHappyIfSomeoneDoSesMyServerAndPromiseNotToComplain.

@Lukasa
Copy link
Collaborator

Lukasa commented May 31, 2019

I wanted delegates to have simple inits without dependencies...

Why?

@artemredkin
Copy link
Collaborator Author

I think having something like:

let client = HTTPClient(...)
let delegate = CountingDelegate()
client.execute(request: request, delegate: delegate)

is slightly better than:

let client = HTTPClient(...)
let delegate = CountingDelegate(eventLoopGroup: client.eventLoopGroup)
client.execute(request: request, delegate: delegate)

but I see your point.

@Lukasa
Copy link
Collaborator

Lukasa commented May 31, 2019

Note that you only need EventLoop, not EventLoopGroup.

@artemredkin
Copy link
Collaborator Author

Task has an optional Channel unfortunately, but its easy to add EventLoop there

@artemredkin
Copy link
Collaborator Author

Added EventLoop to Task

Copy link
Contributor

@ianpartridge ianpartridge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, a pile more comments. I hope they are useful.

case .string(let string):
return string.utf8.count
struct Body {
typealias ChunkProvider = (@escaping (IOData) -> EventLoopFuture<Void>) -> EventLoopFuture<Void>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should call this PartProvider - for symmetry with didReceivePart().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the typealias holds its weight actually...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote without typealias, seems to be fine

return data.count
case .string(let string):
return string.utf8.count
struct Body {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is really a personal preference (although I think NIO style might mandate this) but I much prefer having explicit access modifiers, and just extension instead of public extension. I'm not clever enough to remember Swift's access control rules without having to think twice every time...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, fixed, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please, I think there should never be any word in front of extension. public extension for example is really bad because it means that you can add public functions without the word public appearing in a diff and that's an issue for all SemVer stuff.

@@ -207,13 +224,15 @@ internal extension URL {

public extension HTTPClient {
final class Task<Response> {
let eventLoop: EventLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be public. If I'm correct the tests are only compiling because you have @testable which users won't have...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if we can it might be good to remove @testable from the test files... But I know it's not always that straightforward. Another option might be to have to test suites: One for the public API and one that does some extra internal stuff. But this is not super high priority ofc.

@artemredkin
Copy link
Collaborator Author

Almost forgot, I also wanted to add callback for every body part upload.
@weissi @Lukasa one last question, do you think we need to return EventLoopFuture from didReceiveHead for backpressure purposes as well?

README.md Outdated
@@ -116,26 +116,32 @@ class CountingDelegate: HTTPResponseDelegate {

var count = 0

func didTransmitRequestBody() {
// this is executed when request is sent, called once
func didTransmitRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weissi @ianpartridge @tanner0101 do you think we need a callback when request head is sent as well? Also, what do you think about naming pattern (transmit/sent)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I'd probably err on the side of 'more callbacks', @Lukasa can you think of a good use-case for a callback when the head has been sent?
  • regarding naming transmit vs. send: I don't mind, think both are fine
  • not sure about the part: IOData label, part is maybe a bit unclear? I would almost make it _ part: IOData, don't think the label adds a lot of value there given the type IOData? But this is really not super important :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's already _ part: IOData :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops :), sorry

@Lukasa
Copy link
Collaborator

Lukasa commented Jun 2, 2019

Yeah, more callbacks is always better, go for it.

@artemredkin
Copy link
Collaborator Author

@weissi are you happy with the backpressure/failure tests?
@Lukasa do you think there is any value in returning Future from didReceiveHead for backpressure (maybe like opening file for writing is return code is 2xx or something)?

@Lukasa
Copy link
Collaborator

Lukasa commented Jun 4, 2019

I'm in general in favour of being able to exert back pressure at as many places as possible, yeah.

@artemredkin
Copy link
Collaborator Author

@weissi @Lukasa @ianpartridge @tanner0101 do you have any additional comments on this PR?

extension HTTPClient {
public struct Body {
public var length: Int?
public var provider: (@escaping (IOData) -> EventLoopFuture<Void>) -> EventLoopFuture<Void>
Copy link
Member

@tanner0101 tanner0101 Jun 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a struct here would give us more flexibility to add / deprecate methods in the future. It would also allow users of the package to add conformances / extensions.

Something like Body.StreamWriter or Body.Provider:

extension HTTPClient.Body {
    var writer: (StreamWriter) -> EventLoopFuture<Void>

    public struct StreamWriter {
        let closure: (IOData) -> EventLoopFuture<Void>
        public func write(_ data: IOData) -> EventLoopFuture<Void> {
            return self.closure(data)
        }
    } 

    public static func stream(length: Int? = nil, _ writer: @escaping (StreamWriter) -> EventLoopFuture<Void>) -> 
       ...
    }
}
...

This also reduces the mental load to understand the code a bit.


Here's example usage:

let body: HTTPClient.Body = .stream(length: 50) { stream in
    var request = try! Request(url: "http://localhost:\(httpBin.port)/events/10/1")
    request.headers.add(name: "Accept", value: "text/event-stream")

    let delegate = CopyingDelegate { part in
        stream.write(.byteBuffer(part))
    }
    return httpClient.execute(request: request, delegate: delegate).future
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, refactored, thanks!

Copy link
Collaborator

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good @artemredkin.

Copy link
Contributor

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@artemredkin artemredkin merged commit b019e08 into master Jun 17, 2019
@artemredkin artemredkin deleted the upload_streaming branch June 17, 2019 17:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Improvements to existing feature.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants