Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt the coalescing writer for clients #1539

Merged
merged 2 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 57 additions & 23 deletions Sources/GRPC/GRPCClientChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
// Feed the request message into the state machine:
let result = self.stateMachine.sendRequest(
request.message,
compressed: request.compressed
compressed: request.compressed,
promise: promise
)
switch result {
case let .success((buffer, maybeBuffer)):
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
// If there's a second buffer, attach the promise to the second write.
let promise1 = maybeBuffer == nil ? promise : nil
context.write(self.wrapOutboundOut(frame1), promise: promise1)

if let actuallyBuffer = maybeBuffer {
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(frame2), promise: promise)
}
switch result {
case .success:
()

case let .failure(writeError):
switch writeError {
Expand All @@ -572,13 +556,37 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
}

case .end:
// About to send end: write any outbound messages first.
while let (result, promise) = self.stateMachine.nextRequest() {
switch result {
case let .success(buffer):
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(buffer), endStream: false)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case let .failure(error):
context.fireErrorCaught(error)
promise?.fail(error)
return
}
}

// Okay: can we close the request stream?
switch self.stateMachine.sendEndOfRequestStream() {
case .success:
// We can. Send an empty DATA frame with end-stream set.
let empty = context.channel.allocator.buffer(capacity: 0)
let framePayload = HTTP2Frame.FramePayload
.data(.init(data: .byteBuffer(empty), endStream: true))
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(empty), endStream: true)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "0",
Expand All @@ -605,4 +613,30 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
}
}
}

func flush(context: ChannelHandlerContext) {
// Drain any requests.
while let (result, promise) = self.stateMachine.nextRequest() {
switch result {
case let .success(buffer):
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(buffer), endStream: false)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case let .failure(error):
context.fireErrorCaught(error)
promise?.fail(error)
return
}
}

context.flush()
}
}
49 changes: 41 additions & 8 deletions Sources/GRPC/GRPCClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,18 @@ struct GRPCClientStateMachine {
/// request will be written.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
compressed: Bool,
promise: EventLoopPromise<Void>? = nil
) -> Result<Void, MessageWriteError> {
return self.withStateAvoidingCoWs { state in
state.sendRequest(message, compressed: compressed)
state.sendRequest(message, compressed: compressed, promise: promise)
}
}

mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
return self.state.nextRequest()
}

/// Closes the request stream.
///
/// The client must be streaming requests in order to terminate the request stream. Valid
Expand Down Expand Up @@ -394,18 +399,21 @@ extension GRPCClientStateMachine.State {
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
compressed: Bool,
promise: EventLoopPromise<Void>?
) -> Result<Void, MessageWriteError> {
let result: Result<Void, MessageWriteError>

switch self {
case .clientActiveServerIdle(var writeState, let pendingReadState):
result = writeState.write(message, compressed: compressed)
let result = writeState.write(message, compressed: compressed, promise: promise)
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
return result

case .clientActiveServerActive(var writeState, let readState):
result = writeState.write(message, compressed: compressed)
let result = writeState.write(message, compressed: compressed, promise: promise)
self = .clientActiveServerActive(writeState: writeState, readState: readState)
return result

case .clientClosedServerIdle,
.clientClosedServerActive,
Expand All @@ -422,6 +430,31 @@ extension GRPCClientStateMachine.State {
return result
}

mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
switch self {
case .clientActiveServerIdle(var writeState, let pendingReadState):
self = .modifying
let result = writeState.next()
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
return result

case .clientActiveServerActive(var writeState, let readState):
self = .modifying
let result = writeState.next()
self = .clientActiveServerActive(writeState: writeState, readState: readState)
return result

case .clientIdleServerIdle,
.clientClosedServerIdle,
.clientClosedServerActive,
.clientClosedServerClosed:
return nil

case .modifying:
preconditionFailure("State left as 'modifying'")
}
}

/// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
let result: Result<Void, SendEndOfRequestStreamError>
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/LengthPrefixedMessageWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
import Foundation
import NIOCore
import NIOHPACK

internal struct LengthPrefixedMessageWriter {
static let metadataLength = 5
Expand Down
69 changes: 40 additions & 29 deletions Sources/GRPC/ReadWriteStates.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,61 @@ struct PendingWriteState {
compression = nil
}

let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator)
return .writing(self.arity, self.contentType, writer)
let writer = CoalescingLengthPrefixedMessageWriter(
compression: compression,
allocator: allocator
)
return .init(arity: self.arity, contentType: self.contentType, writer: writer)
}
}

/// The write state of a stream.
enum WriteState {
/// Writing may be attempted using the given writer.
case writing(MessageArity, ContentType, LengthPrefixedMessageWriter)

/// Writing may not be attempted: either a write previously failed or it is not valid for any
/// more messages to be written.
case notWriting
struct WriteState {
private var arity: MessageArity
private var contentType: ContentType
private var writer: CoalescingLengthPrefixedMessageWriter
private var canWrite: Bool

init(
arity: MessageArity,
contentType: ContentType,
writer: CoalescingLengthPrefixedMessageWriter
) {
self.arity = arity
self.contentType = contentType
self.writer = writer
self.canWrite = true
}

/// Writes a message into a buffer using the `writer`.
///
/// - Parameter message: The `Message` to write.
mutating func write(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
switch self {
case .notWriting:
compressed: Bool,
promise: EventLoopPromise<Void>?
) -> Result<Void, MessageWriteError> {
guard self.canWrite else {
return .failure(.cardinalityViolation)
}

case .writing(let writeArity, let contentType, var writer):
self = .notWriting
let buffers: (ByteBuffer, ByteBuffer?)
self.writer.append(buffer: message, compress: compressed, promise: promise)

do {
buffers = try writer.write(buffer: message, compressed: compressed)
} catch {
self = .notWriting
return .failure(.serializationFailed)
}
switch self.arity {
case .one:
self.canWrite = false
case .many:
()
}

// If we only expect to write one message then we're no longer writable.
if case .one = writeArity {
self = .notWriting
} else {
self = .writing(writeArity, contentType, writer)
}
return .success(())
}

return .success(buffers)
mutating func next() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
if let next = self.writer.next() {
return (next.0.mapError { _ in .serializationFailed }, next.1)
} else {
return nil
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions Tests/GRPCTests/GRPCClientStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,7 @@ extension GRPCClientStateMachineTests {
stateMachine.sendRequest(
ByteBuffer(string: request),
compressed: false
).assertSuccess { buffers in
var buffer = buffers.0
XCTAssertNil(buffers.1)
// Remove the length and compression flag prefix.
buffer.moveReaderIndex(forwardBy: 5)
let data = buffer.readString(length: buffer.readableBytes)!
XCTAssertEqual(request, data)
}
).assertSuccess()
}

func testSendRequestFromIdle() {
Expand Down Expand Up @@ -1299,10 +1292,18 @@ extension PendingWriteState {

extension WriteState {
static func one() -> WriteState {
return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
return .init(
arity: .one,
contentType: .protobuf,
writer: .init(compression: .none, allocator: .init())
)
}

static func many() -> WriteState {
return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
return .init(
arity: .many,
contentType: .protobuf,
writer: .init(compression: .none, allocator: .init())
)
}
}