Skip to content

Upload streaming #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8db481c
Provide a way to stream Request.body
artemredkin Apr 18, 2019
be5594f
use IOData instead of ByteBuffer to allow for more efficient transfer…
artemredkin Apr 20, 2019
b255af6
Merge branch 'master' into upload_streaming
artemredkin May 8, 2019
b97a24a
address some discussion comments
artemredkin May 18, 2019
97a1fc2
implement backpressure
artemredkin May 18, 2019
4ae7f70
Merge branch 'master' into upload_streaming
artemredkin May 18, 2019
39542ab
Update README
artemredkin May 21, 2019
131fea1
add missing tests for linux
artemredkin May 22, 2019
b661f08
review fix: do not use unsafe
artemredkin May 30, 2019
c5a93a8
address review comments
artemredkin May 30, 2019
2906c43
review fix: make returned future non-optional
artemredkin May 31, 2019
88cfca2
update readme
artemredkin May 31, 2019
00cb07e
review fix, namespace ChunkProvider inside Body
artemredkin May 31, 2019
6d23788
review fix: fail on future failure and encapsulate eventloop inside Task
artemredkin May 31, 2019
51b821a
review fixes
artemredkin Jun 1, 2019
3b0e3f9
add upload streaming failure cases
artemredkin Jun 1, 2019
e0fd437
add upload streaming backpressure test
artemredkin Jun 1, 2019
4a35b34
fix missing linux tests
artemredkin Jun 1, 2019
96d303d
add callbacks for upload body parts
artemredkin Jun 1, 2019
06d2f2c
update readme
artemredkin Jun 1, 2019
634bbd4
rename didTransim to didSend, added didSendHead callback
artemredkin Jun 4, 2019
15d11a8
add backpressure to response head callback
artemredkin Jun 4, 2019
e1b427e
review fix: extract provider to a separate struct
artemredkin Jun 12, 2019
9fcf8f0
rename property
artemredkin Jun 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,38 @@ class CountingDelegate: HTTPResponseDelegate {

var count = 0

func didTransmitRequestBody() {
// this is executed when request is sent, called once
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
// this is executed right after request head was sent, called once
}

func didReceiveHead(_ head: HTTPResponseHead) {
func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
// this is executed when request body part is sent, could be called zero or more times
}

func didSendRequest(task: HTTPClient.Task<Response>) {
// this is executed when request is fully sent, called once
}

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
// this is executed when we receive HTTP Reponse head part of the request (it contains response code and headers), called once
// in case backpressure is needed, all reads will be paused until returned future is resolved
return task.eventLoop.makeSucceededFuture(())
}

func didReceivePart(_ buffer: ByteBuffer) {
func didReceivePart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
// this is executed when we receive parts of the response body, could be called zero or more times
count += buffer.readableBytes
// in case backpressure is needed, all reads will be paused until returned future is resolved
return task.eventLoop.makeSucceededFuture(())
}

func didFinishRequest() throws -> Int {
func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Int {
// this is called when the request is fully read, called once
// this is where you return a result or throw any errors you require to propagate to the client
return count
}

func didReceiveError(_ error: Error) {
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
// this is called when we receive any network-related error, called once
}
}
Expand Down
182 changes: 127 additions & 55 deletions Sources/NIOHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,48 @@ import NIOConcurrencyHelpers
import NIOHTTP1
import NIOSSL

public extension HTTPClient {
enum Body: Equatable {
case byteBuffer(ByteBuffer)
case data(Data)
case string(String)

var length: Int {
switch self {
case .byteBuffer(let buffer):
return buffer.readableBytes
case .data(let data):
return data.count
case .string(let string):
return string.utf8.count
extension HTTPClient {

public struct Body {
public struct StreamWriter {
let closure: (IOData) -> EventLoopFuture<Void>

public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.closure(data)
}
}

public var length: Int?
public var stream: (StreamWriter) -> EventLoopFuture<Void>

public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(length: buffer.readableBytes) { writer in
writer.write(.byteBuffer(buffer))
}
}

public static func stream(length: Int? = nil, _ stream: @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(length: length, stream: stream)
}

public static func data(_ data: Data) -> Body {
return Body(length: data.count) { writer in
var buffer = ByteBufferAllocator().buffer(capacity: data.count)
buffer.writeBytes(data)
return writer.write(.byteBuffer(buffer))
}
}

public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
var buffer = ByteBufferAllocator().buffer(capacity: string.utf8.count)
buffer.writeString(string)
return writer.write(.byteBuffer(buffer))
}
}
}

struct Request: Equatable {
public struct Request {
public var version: HTTPVersion
public var method: HTTPMethod
public var url: URL
Expand All @@ -53,7 +76,7 @@ public extension HTTPClient {
try self.init(url: url, version: version, method: method, headers: headers, body: body)
}

public init(url: URL, version: HTTPVersion, method: HTTPMethod = .GET, headers: HTTPHeaders = HTTPHeaders(), body: Body? = nil) throws {
public init(url: URL, version: HTTPVersion = HTTPVersion(major: 1, minor: 1), method: HTTPMethod = .GET, headers: HTTPHeaders = HTTPHeaders(), body: Body? = nil) throws {
guard let scheme = url.scheme else {
throw HTTPClientError.emptyScheme
}
Expand Down Expand Up @@ -88,7 +111,7 @@ public extension HTTPClient {
}
}

struct Response: Equatable {
public struct Response {
public var host: String
public var status: HTTPResponseStatus
public var headers: HTTPHeaders
Expand All @@ -114,9 +137,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
self.request = request
}

func didTransmitRequestBody(task: HTTPClient.Task<Response>) {}

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) {
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
switch self.state {
case .idle:
self.state = .head(head)
Expand All @@ -129,9 +150,10 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
case .error:
break
}
return task.eventLoop.makeSucceededFuture(())
}

func didReceivePart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) {
func didReceivePart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
switch self.state {
case .idle:
preconditionFailure("no head received before body")
Expand All @@ -146,6 +168,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
case .error:
break
}
return task.eventLoop.makeSucceededFuture(())
}

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
Expand Down Expand Up @@ -174,25 +197,33 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate {
public protocol HTTPClientResponseDelegate: AnyObject {
associatedtype Response

func didTransmitRequestBody(task: HTTPClient.Task<Response>)
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead)

func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData)

func didSendRequest(task: HTTPClient.Task<Response>)

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead)
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void>

func didReceivePart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer)
func didReceivePart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void>

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error)

func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response
}

extension HTTPClientResponseDelegate {
func didTransmitRequestBody(task: HTTPClient.Task<Response>) {}
public func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {}

func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) {}
public func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {}

func didReceivePart(task: HTTPClient.Task<Response>, _: ByteBuffer) {}
public func didSendRequest(task: HTTPClient.Task<Response>) {}

func didReceiveError(task: HTTPClient.Task<Response>, _: Error) {}
public func didReceiveHead(task: HTTPClient.Task<Response>, _: HTTPResponseHead) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }

public func didReceivePart(task: HTTPClient.Task<Response>, _: ByteBuffer) -> EventLoopFuture<Void> { return task.eventLoop.makeSucceededFuture(()) }

public func didReceiveError(task: HTTPClient.Task<Response>, _: Error) {}
}

internal extension URL {
Expand All @@ -207,13 +238,15 @@ internal extension URL {

public extension HTTPClient {
final class Task<Response> {
public let eventLoop: EventLoop
let future: EventLoopFuture<Response>

private var channel: Channel?
private var cancelled: Bool
private let lock: Lock

init(future: EventLoopFuture<Response>) {
init(eventLoop: EventLoop, future: EventLoopFuture<Response>) {
self.eventLoop = eventLoop
self.future = future
self.cancelled = false
self.lock = Lock()
Expand Down Expand Up @@ -267,6 +300,8 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
let redirectHandler: RedirectHandler<T.Response>?

var state: State = .idle
var pendingRead = false
var mayRead = true

init(task: HTTPClient.Task<T.Response>, delegate: T, promise: EventLoopPromise<T.Response>, redirectHandler: RedirectHandler<T.Response>?) {
self.task = task
Expand Down Expand Up @@ -298,35 +333,52 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler

head.headers = headers

context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(wrapOutboundOut(.head(head))).whenSuccess {
self.delegate.didSendRequestHead(task: self.task, head)
}

if let body = request.body {
let part: HTTPClientRequestPart
switch body {
case .byteBuffer(let buffer):
part = HTTPClientRequestPart.body(.byteBuffer(buffer))
case .data(let data):
var buffer = context.channel.allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
part = HTTPClientRequestPart.body(.byteBuffer(buffer))
case .string(let string):
var buffer = context.channel.allocator.buffer(capacity: string.utf8.count)
buffer.writeString(string)
part = HTTPClientRequestPart.body(.byteBuffer(buffer))
}
self.writeBody(request: request, context: context).whenComplete { result in
switch result {
case .success:
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
context.flush()

context.write(wrapOutboundOut(part), promise: nil)
}
self.state = .sent
self.delegate.didSendRequest(task: self.task)

context.write(wrapOutboundOut(.end(nil)), promise: promise)
context.flush()
let channel = context.channel
self.promise.futureResult.whenComplete { _ in
channel.close(promise: nil)
}
case .failure(let error):
self.state = .end
self.delegate.didReceiveError(task: self.task, error)
self.promise.fail(error)
context.close(promise: nil)
}
}
}

self.state = .sent
self.delegate.didTransmitRequestBody(task: self.task)
private func writeBody(request: HTTPClient.Request, context: ChannelHandlerContext) -> EventLoopFuture<Void> {
if let body = request.body {
return body.stream(HTTPClient.Body.StreamWriter { part in
let future = context.writeAndFlush(self.wrapOutboundOut(.body(part)))
future.whenSuccess { _ in
self.delegate.didSendRequestPart(task: self.task, part)
}
return future
})
} else {
return context.eventLoop.makeSucceededFuture(())
}
}

let channel = context.channel
self.promise.futureResult.whenComplete { _ in
channel.close(promise: nil)
public func read(context: ChannelHandlerContext) {
if self.mayRead {
self.pendingRead = false
context.read()
} else {
self.pendingRead = true
}
}

Expand All @@ -338,15 +390,21 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
self.state = .redirected(head, redirectURL)
} else {
self.state = .head
self.delegate.didReceiveHead(task: self.task, head)
self.mayRead = false
self.delegate.didReceiveHead(task: self.task, head).whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
}
case .body(let body):
switch self.state {
case .redirected:
break
default:
self.state = .body
self.delegate.didReceivePart(task: self.task, body)
self.mayRead = false
self.delegate.didReceivePart(task: self.task, body).whenComplete { result in
self.handleBackpressureResult(context: context, result: result)
}
}
case .end:
switch self.state {
Expand All @@ -365,6 +423,20 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
}
}

private func handleBackpressureResult(context: ChannelHandlerContext, result: Result<Void, Error>) {
switch result {
case .success:
self.mayRead = true
if self.pendingRead {
context.read()
}
case .failure(let error):
self.state = .end
self.delegate.didReceiveError(task: self.task, error)
self.promise.fail(error)
}
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if (event as? IdleStateHandler.IdleStateEvent) == .read {
self.state = .end
Expand Down
10 changes: 8 additions & 2 deletions Sources/NIOHTTPClient/RequestValidation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ extension HTTPHeaders {
}

if encodings.isEmpty {
contentLength = body.length
guard let length = body.length else {
throw HTTPClientError.contentLengthMissing
}
contentLength = length
} else {
transferEncoding = encodings.joined(separator: ", ")
if !encodings.contains("chunked") {
contentLength = body.length
guard let length = body.length else {
throw HTTPClientError.contentLengthMissing
}
contentLength = length
}
}
} else {
Expand Down
7 changes: 5 additions & 2 deletions Sources/NIOHTTPClient/SwiftNIOHTTP.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public class HTTPClient {

public func execute<T: HTTPClientResponseDelegate>(request: Request, delegate: T, timeout: Timeout? = nil) -> Task<T.Response> {
let timeout = timeout ?? configuration.timeout
let promise: EventLoopPromise<T.Response> = self.eventLoopGroup.next().makePromise()
let eventLoop = self.eventLoopGroup.next()
let promise: EventLoopPromise<T.Response> = eventLoop.makePromise()

let redirectHandler: RedirectHandler<T.Response>?
if self.configuration.followRedirects {
Expand All @@ -126,7 +127,7 @@ public class HTTPClient {
redirectHandler = nil
}

let task = Task(future: promise.futureResult)
let task = Task(eventLoop: eventLoop, future: promise.futureResult)

var bootstrap = ClientBootstrap(group: self.eventLoopGroup)
.channelOption(ChannelOptions.socket(SocketOptionLevel(IPPROTO_TCP), TCP_NODELAY), value: 1)
Expand Down Expand Up @@ -256,6 +257,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
case identityCodingIncorrectlyPresent
case chunkedSpecifiedMultipleTimes
case invalidProxyResponse
case contentLengthMissing
}

private var code: Code
Expand All @@ -279,4 +281,5 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
public static let identityCodingIncorrectlyPresent = HTTPClientError(code: .identityCodingIncorrectlyPresent)
public static let chunkedSpecifiedMultipleTimes = HTTPClientError(code: .chunkedSpecifiedMultipleTimes)
public static let invalidProxyResponse = HTTPClientError(code: .invalidProxyResponse)
public static let contentLengthMissing = HTTPClientError(code: .contentLengthMissing)
}
Loading