diff --git a/IntegrationTests/tests_04_performance/test_01_allocation_counts_for_http1.sh b/IntegrationTests/tests_04_performance/test_01_allocation_counts_for_http1.sh index 9b015ef423..87f4182496 100644 --- a/IntegrationTests/tests_04_performance/test_01_allocation_counts_for_http1.sh +++ b/IntegrationTests/tests_04_performance/test_01_allocation_counts_for_http1.sh @@ -58,7 +58,7 @@ cd .. "$swift_bin" run -c release | tee "$tmp/output" ) -for test in 1000_reqs_1_conn 1_reqs_1000_conn; do +for test in 1000_reqs_1_conn 1_reqs_1000_conn ping_pong_1000_reqs_1_conn; do cat "$tmp/output" # helps debugging total_allocations=$(grep "^$test.total_allocations:" "$tmp/output" | cut -d: -f2 | sed 's/ //g') not_freed_allocations=$(grep "^$test.remaining_allocations:" "$tmp/output" | cut -d: -f2 | sed 's/ //g') diff --git a/IntegrationTests/tests_04_performance/test_01_resources/template/Sources/SwiftBootstrap/SwiftMain.swift b/IntegrationTests/tests_04_performance/test_01_resources/template/Sources/SwiftBootstrap/SwiftMain.swift index 4edd2f5fb9..5a5c6a94ac 100644 --- a/IntegrationTests/tests_04_performance/test_01_resources/template/Sources/SwiftBootstrap/SwiftMain.swift +++ b/IntegrationTests/tests_04_performance/test_01_resources/template/Sources/SwiftBootstrap/SwiftMain.swift @@ -50,6 +50,81 @@ private final class SimpleHTTPServer: ChannelInboundHandler { } } +private struct PingPongFailure: Error, CustomStringConvertible { + public var description: String + + init(problem: String) { + self.description = problem + } +} + +private final class PingHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + typealias OutboundOut = ByteBuffer + + private var pingBuffer: ByteBuffer! + private let numberOfRequests: Int + private var remainingNumberOfRequests: Int + private let allDone: EventLoopPromise + public static let pingCode: UInt8 = 0xbe + + public init(numberOfRequests: Int, eventLoop: EventLoop) { + self.numberOfRequests = numberOfRequests + self.remainingNumberOfRequests = numberOfRequests + self.allDone = eventLoop.newPromise() + } + + public func channelActive(ctx: ChannelHandlerContext) { + self.pingBuffer = ctx.channel.allocator.buffer(capacity: 1) + self.pingBuffer.write(integer: PingHandler.pingCode) + + ctx.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil) + } + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + var buf = self.unwrapInboundIn(data) + if buf.readableBytes == 1 && + buf.readInteger(as: UInt8.self) == PongHandler.pongCode { + if self.remainingNumberOfRequests > 0 { + self.remainingNumberOfRequests -= 1 + ctx.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil) + } else { + ctx.close(promise: self.allDone) + } + } else { + ctx.close(promise: nil) + self.allDone.fail(error: PingPongFailure(problem: "wrong buffer received: \(buf.debugDescription)")) + } + } + + public func wait() throws { + try self.allDone.futureResult.wait() + } +} + +private final class PongHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + typealias OutboundOut = ByteBuffer + + private var pongBuffer: ByteBuffer! + public static let pongCode: UInt8 = 0xef + + public func channelActive(ctx: ChannelHandlerContext) { + self.pongBuffer = ctx.channel.allocator.buffer(capacity: 1) + self.pongBuffer.write(integer: PongHandler.pongCode) + } + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + var buf = self.unwrapInboundIn(data) + if buf.readableBytes == 1 && + buf.readInteger(as: UInt8.self) == PingHandler.pingCode { + ctx.writeAndFlush(self.wrapOutboundOut(self.pongBuffer), promise: nil) + } else { + ctx.close(promise: nil) + } + } +} + @_cdecl("swift_main") public func swiftMain() -> Int { final class RepeatedRequests: ChannelInboundHandler { @@ -109,6 +184,7 @@ public func swiftMain() -> Int { } _ = measureOne(fn) /* pre-heat and throw away */ + usleep(100_000) // allocs/frees happen on multiple threads, allow some cool down time var measurements: [[String: Int]] = [] for _ in 0..<10 { measurements.append(measureOne(fn)) @@ -128,6 +204,7 @@ public func swiftMain() -> Int { func doRequests(group: EventLoopGroup, number numberOfRequests: Int) throws -> Int { let serverChannel = try ServerBootstrap(group: group) .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) .childChannelInitializer { channel in channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).then { channel.pipeline.add(handler: SimpleHTTPServer()) @@ -147,6 +224,7 @@ public func swiftMain() -> Int { channel.pipeline.add(handler: repeatedRequestsHandler) } } + .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) .connect(to: serverChannel.localAddress!) .wait() @@ -155,6 +233,33 @@ public func swiftMain() -> Int { return try repeatedRequestsHandler.wait() } + func doPingPongRequests(group: EventLoopGroup, number numberOfRequests: Int) throws -> Int { + let serverChannel = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) + .childChannelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 4)) + .childChannelInitializer { channel in + channel.pipeline.add(handler: PongHandler()) + }.bind(host: "127.0.0.1", port: 0).wait() + + defer { + try! serverChannel.close().wait() + } + + let pingHandler = PingHandler(numberOfRequests: numberOfRequests, eventLoop: group.next()) + _ = try ClientBootstrap(group: group) + .channelInitializer { channel in + channel.pipeline.add(handler: pingHandler) + } + .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) + .channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 4)) + .connect(to: serverChannel.localAddress!) + .wait() + + try pingHandler.wait() + return numberOfRequests + } + let group = MultiThreadedEventLoopGroup(numThreads: System.coreCount) defer { try! group.syncShutdownGracefully() @@ -176,5 +281,11 @@ public func swiftMain() -> Int { return numberDone } + measureAndPrint(desc: "ping_pong_1000_reqs_1_conn") { + let numberDone = try! doPingPongRequests(group: group, number: 1000) + precondition(numberDone == 1000) + return numberDone + } + return 0 } diff --git a/Sources/NIO/ByteBuffer-int.swift b/Sources/NIO/ByteBuffer-int.swift index 99c4fa02c0..9f9e251746 100644 --- a/Sources/NIO/ByteBuffer-int.swift +++ b/Sources/NIO/ByteBuffer-int.swift @@ -13,7 +13,8 @@ //===----------------------------------------------------------------------===// extension ByteBuffer { - private func toEndianness (value: T, endianness: Endianness) -> T { + @_inlineable @_versioned + func _toEndianness (value: T, endianness: Endianness) -> T { switch endianness { case .little: return value.littleEndian @@ -28,6 +29,7 @@ extension ByteBuffer { /// - endianness: The endianness of the integer in this `ByteBuffer` (defaults to big endian). /// - as: the desired `FixedWidthInteger` type (optional parameter) /// - returns: An integer value deserialized from this `ByteBuffer` or `nil` if there aren't enough bytes readable. + @_inlineable public mutating func readInteger(endianness: Endianness = .big, as: T.Type = T.self) -> T? { guard self.readableBytes >= MemoryLayout.size else { return nil @@ -45,6 +47,7 @@ extension ByteBuffer { /// - endianness: The endianness of the integer in this `ByteBuffer` (defaults to big endian). /// - as: the desired `FixedWidthInteger` type (optional parameter) /// - returns: An integer value deserialized from this `ByteBuffer` or `nil` if the bytes of interest aren't contained in the `ByteBuffer`. + @_inlineable public func getInteger(at index: Int, endianness: Endianness = Endianness.big, as: T.Type = T.self) -> T? { precondition(index >= 0, "index must not be negative") return self.withVeryUnsafeBytes { ptr in @@ -56,7 +59,7 @@ extension ByteBuffer { valuePtr.copyMemory(from: UnsafeRawBufferPointer(start: ptr.baseAddress!.advanced(by: index), count: MemoryLayout.size)) } - return toEndianness(value: value, endianness: endianness) + return _toEndianness(value: value, endianness: endianness) } } @@ -67,6 +70,7 @@ extension ByteBuffer { /// - endianness: The endianness to use, defaults to big endian. /// - returns: The number of bytes written. @discardableResult + @_inlineable public mutating func write(integer: T, endianness: Endianness = .big, as: T.Type = T.self) -> Int { let bytesWritten = self.set(integer: integer, at: self.writerIndex, endianness: endianness) self.moveWriterIndex(forwardBy: bytesWritten) @@ -81,8 +85,9 @@ extension ByteBuffer { /// - endianness: The endianness to use, defaults to big endian. /// - returns: The number of bytes written. @discardableResult + @_inlineable public mutating func set(integer: T, at index: Int, endianness: Endianness = .big, as: T.Type = T.self) -> Int { - var value = toEndianness(value: integer, endianness: endianness) + var value = _toEndianness(value: integer, endianness: endianness) return Swift.withUnsafeBytes(of: &value) { ptr in self.set(bytes: ptr, at: index) }