Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make HTTP2StreamChannel non-generic #242

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions Sources/NIOHTTP2/HTTP2Frame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,38 @@ public struct HTTP2Frame {
}
}

extension HTTP2Frame: HTTP2FrameConvertible, HTTP2FramePayloadConvertible {
init(http2Frame: HTTP2Frame) {
self = http2Frame
}
extension HTTP2Frame.FramePayload {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
var estimatedFrameSize: Int {
let frameHeaderSize = 9

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
assert(self.streamID == streamID, "streamID does not match")
return self
}
}

extension HTTP2Frame.FramePayload: HTTP2FrameConvertible, HTTP2FramePayloadConvertible {
var payload: HTTP2Frame.FramePayload {
return self
}

init(http2Frame: HTTP2Frame) {
self = http2Frame.payload
}

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
return HTTP2Frame(streamID: streamID, payload: self)
switch self {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
64 changes: 0 additions & 64 deletions Sources/NIOHTTP2/HTTP2FrameConvertible.swift

This file was deleted.

81 changes: 62 additions & 19 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,40 @@ private enum StreamChannelState {
}
}

/// An `HTTP2StreamChannel` which deals in `HTTPFrame`s.
typealias HTTP2FrameBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame>
// The type of data read from and written to the channel.
enum HTTP2StreamDataType {
/// `HTTP2Frame`
case frame
/// `HTTP2Frame.FramePayload`
case framePayload
}

private enum HTTP2StreamData {
case frame(HTTP2Frame)
case framePayload(HTTP2Frame.FramePayload)

var estimatedFrameSize: Int {
switch self {
case .frame(let frame):
return frame.payload.estimatedFrameSize
case .framePayload(let payload):
return payload.estimatedFrameSize
}
}
}

/// An `HTTP2StreamChannel` which reads and writes `HTTPFrame.FramePayload`s.
typealias HTTP2PayloadBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame.FramePayload>
final class HTTP2StreamChannel: Channel, ChannelCore {
/// The stream data type of the channel.
private let streamDataType: HTTP2StreamDataType

final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2FrameConvertible>: Channel, ChannelCore {
internal init(allocator: ByteBufferAllocator,
parent: Channel,
multiplexer: HTTP2StreamMultiplexer,
streamID: HTTP2StreamID?,
targetWindowSize: Int32,
outboundBytesHighWatermark: Int,
outboundBytesLowWatermark: Int) {
outboundBytesLowWatermark: Int,
streamDataType: HTTP2StreamDataType) {
self.allocator = allocator
self.closePromise = parent.eventLoop.makePromise()
self.localAddress = parent.localAddress
Expand All @@ -147,6 +167,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
self._isActiveAtomic = .makeAtomic(value: false)
self._isWritable = .makeAtomic(value: true)
self.state = .idle
self.streamDataType = streamDataType
self.writabilityManager = StreamChannelFlowController(highWatermark: outboundBytesHighWatermark,
lowWatermark: outboundBytesLowWatermark,
parentIsWritable: parent.isWritable)
Expand All @@ -158,6 +179,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
}

internal func configure(initializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)?, userPromise promise: EventLoopPromise<Channel>?){
assert(self.streamDataType == .frame)
// We need to configure this channel. This involves doing four things:
// 1. Setting our autoRead state from the parent
// 2. Calling the initializer, if provided.
Expand All @@ -176,6 +198,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
}

internal func configure(initializer: ((Channel) -> EventLoopFuture<Void>)?, userPromise promise: EventLoopPromise<Channel>?){
assert(self.streamDataType == .framePayload)
// We need to configure this channel. This involves doing four things:
// 1. Setting our autoRead state from the parent
// 2. Calling the initializer, if provided.
Expand Down Expand Up @@ -385,7 +408,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
/// In the future this buffer will be used to manage interactions with read() and even, one day,
/// with flow control. For now, though, all this does is hold frames until we have set the
/// channel up.
private var pendingReads: CircularBuffer<Message> = CircularBuffer(initialCapacity: 8)
private var pendingReads: CircularBuffer<HTTP2Frame> = CircularBuffer(initialCapacity: 8)

/// Whether `autoRead` is enabled. By default, all `HTTP2StreamChannel` objects inherit their `autoRead`
/// state from their parent.
Expand All @@ -399,7 +422,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
///
/// To correctly respect flushes, we deliberately withold frames from the parent channel until this
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
private var pendingWrites: MarkedCircularBuffer<(Message, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)
private var pendingWrites: MarkedCircularBuffer<(HTTP2StreamData, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

/// A list node used to hold stream channels.
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()
Expand All @@ -425,13 +448,19 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
return
}

let outbound = self.unwrapData(data, as: Message.self)
let streamData: HTTP2StreamData
switch self.streamDataType {
case .frame:
streamData = .frame(self.unwrapData(data))
case .framePayload:
streamData = .framePayload(self.unwrapData(data))
}

// We need a promise to attach our flow control callback to.
// Regardless of whether the write succeeded or failed, we don't count
// the bytes any longer.
let promise = promise ?? self.eventLoop.makePromise()
let writeSize = outbound.estimatedFrameSize
let writeSize = streamData.estimatedFrameSize

// Right now we deal with this math by just attaching a callback to all promises. This is going
// to be annoyingly expensive, but for now it's the most straightforward approach.
Expand All @@ -440,7 +469,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
self.changeWritability(to: value)
}
}
self.pendingWrites.append((outbound, promise))
self.pendingWrites.append((streamData, promise))

// Ok, we can make an outcall now, which means we can safely deal with the flow control.
if case .changed(newValue: let value) = self.writabilityManager.bufferedBytes(writeSize) {
Expand Down Expand Up @@ -630,16 +659,24 @@ private extension HTTP2StreamChannel {
while self.pendingReads.count > 0 {
let frame = self.pendingReads.removeFirst()

let anyStreamData: NIOAny
let dataLength: Int?

switch self.streamDataType {
case .frame:
anyStreamData = NIOAny(frame)
case .framePayload:
anyStreamData = NIOAny(frame.payload)
}

switch frame.payload {
case .data(let data):
dataLength = data.data.readableBytes
default:
dataLength = nil
}

self.pipeline.fireChannelRead(NIOAny(frame))
self.pipeline.fireChannelRead(anyStreamData)

if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) {
// To have a pending read, we must have a stream ID.
Expand All @@ -665,9 +702,17 @@ private extension HTTP2StreamChannel {
}

while self.pendingWrites.hasMark {
let (outbound, promise) = self.pendingWrites.removeFirst()
// This unwrap is okay: we just ensured that `self.streamID` was set above.
let frame = outbound.makeHTTP2Frame(streamID: self.streamID!)
let (streamData, promise) = self.pendingWrites.removeFirst()
let frame: HTTP2Frame

switch streamData {
case .frame(let f):
frame = f
case .framePayload(let payload):
// This unwrap is okay: we just ensured that `self.streamID` was set above.
frame = HTTP2Frame(streamID: self.streamID!, payload: payload)
}

self.receiveOutboundFrame(frame, promise: promise)
}
self.multiplexer.childChannelFlush()
Expand All @@ -694,8 +739,6 @@ internal extension HTTP2StreamChannel {
return
}

let message = Message(http2Frame: frame)

// Record the size of the frame so that when we receive a window update event our
// calculation on whether we emit a WINDOW_UPDATE frame is based on the bytes we have
// actually delivered into the pipeline.
Expand All @@ -710,9 +753,9 @@ internal extension HTTP2StreamChannel {
// No further window update frames should be sent.
self.windowManager.closed = true
}
self.pendingReads.append(message)
}

self.pendingReads.append(frame)
}

/// Called when a frame is sent to the network.
///
Expand Down
Loading