Skip to content

Commit

Permalink
reduce allocations & test we don't regress (#316)
Browse files Browse the repository at this point in the history
Motivation:

The ping/pong test should only do 4 allocations:
- readFromSocket does unconditionally allocate one ByteBuffer
  (allocating one ByteBuffer does 2 allocations)
- we have a ping and a pong side, each call readFromSocket

so 2 * 2 = 4 allocations per message. Before adding the `@_inlineable`
to the read/write/get/set integer methods in ByteBuffer however I was
seeing way more allocations. So looked at them and fixed them.

Turns out that massively reduces the number of allocations for HTTP too.

On Linux I was seeing this change:

1000_reqs_1_conn: 282000 -> 70000
1_reqs_1000_conn: 1287000 -> 907000

Modifications:

- made the ByteBuffer integer methods inlineable
- added a test that makes sure we don't regress on the simplest possible
  use of NIO: a ping/pong server

Result:

After your change, what will change.
  • Loading branch information
weissi authored and Lukasa committed Apr 17, 2018
1 parent 4e4890d commit c3706cd
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ cd ..
"$swift_bin" run -c release | tee "$tmp/output"
)

for test in 1000_reqs_1_conn 1_reqs_1000_conn; do
for test in 1000_reqs_1_conn 1_reqs_1000_conn ping_pong_1000_reqs_1_conn; do
cat "$tmp/output" # helps debugging
total_allocations=$(grep "^$test.total_allocations:" "$tmp/output" | cut -d: -f2 | sed 's/ //g')
not_freed_allocations=$(grep "^$test.remaining_allocations:" "$tmp/output" | cut -d: -f2 | sed 's/ //g')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,81 @@ private final class SimpleHTTPServer: ChannelInboundHandler {
}
}

private struct PingPongFailure: Error, CustomStringConvertible {
public var description: String

init(problem: String) {
self.description = problem
}
}

private final class PingHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

private var pingBuffer: ByteBuffer!
private let numberOfRequests: Int
private var remainingNumberOfRequests: Int
private let allDone: EventLoopPromise<Void>
public static let pingCode: UInt8 = 0xbe

public init(numberOfRequests: Int, eventLoop: EventLoop) {
self.numberOfRequests = numberOfRequests
self.remainingNumberOfRequests = numberOfRequests
self.allDone = eventLoop.newPromise()
}

public func channelActive(ctx: ChannelHandlerContext) {
self.pingBuffer = ctx.channel.allocator.buffer(capacity: 1)
self.pingBuffer.write(integer: PingHandler.pingCode)

ctx.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil)
}

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
var buf = self.unwrapInboundIn(data)
if buf.readableBytes == 1 &&
buf.readInteger(as: UInt8.self) == PongHandler.pongCode {
if self.remainingNumberOfRequests > 0 {
self.remainingNumberOfRequests -= 1
ctx.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil)
} else {
ctx.close(promise: self.allDone)
}
} else {
ctx.close(promise: nil)
self.allDone.fail(error: PingPongFailure(problem: "wrong buffer received: \(buf.debugDescription)"))
}
}

public func wait() throws {
try self.allDone.futureResult.wait()
}
}

private final class PongHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

private var pongBuffer: ByteBuffer!
public static let pongCode: UInt8 = 0xef

public func channelActive(ctx: ChannelHandlerContext) {
self.pongBuffer = ctx.channel.allocator.buffer(capacity: 1)
self.pongBuffer.write(integer: PongHandler.pongCode)
}

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
var buf = self.unwrapInboundIn(data)
if buf.readableBytes == 1 &&
buf.readInteger(as: UInt8.self) == PingHandler.pingCode {
ctx.writeAndFlush(self.wrapOutboundOut(self.pongBuffer), promise: nil)
} else {
ctx.close(promise: nil)
}
}
}

@_cdecl("swift_main")
public func swiftMain() -> Int {
final class RepeatedRequests: ChannelInboundHandler {
Expand Down Expand Up @@ -109,6 +184,7 @@ public func swiftMain() -> Int {
}

_ = measureOne(fn) /* pre-heat and throw away */
usleep(100_000) // allocs/frees happen on multiple threads, allow some cool down time
var measurements: [[String: Int]] = []
for _ in 0..<10 {
measurements.append(measureOne(fn))
Expand All @@ -128,6 +204,7 @@ public func swiftMain() -> Int {
func doRequests(group: EventLoopGroup, number numberOfRequests: Int) throws -> Int {
let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).then {
channel.pipeline.add(handler: SimpleHTTPServer())
Expand All @@ -147,6 +224,7 @@ public func swiftMain() -> Int {
channel.pipeline.add(handler: repeatedRequestsHandler)
}
}
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.connect(to: serverChannel.localAddress!)
.wait()

Expand All @@ -155,6 +233,33 @@ public func swiftMain() -> Int {
return try repeatedRequestsHandler.wait()
}

func doPingPongRequests(group: EventLoopGroup, number numberOfRequests: Int) throws -> Int {
let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 4))
.childChannelInitializer { channel in
channel.pipeline.add(handler: PongHandler())
}.bind(host: "127.0.0.1", port: 0).wait()

defer {
try! serverChannel.close().wait()
}

let pingHandler = PingHandler(numberOfRequests: numberOfRequests, eventLoop: group.next())
_ = try ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.add(handler: pingHandler)
}
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 4))
.connect(to: serverChannel.localAddress!)
.wait()

try pingHandler.wait()
return numberOfRequests
}

let group = MultiThreadedEventLoopGroup(numThreads: System.coreCount)
defer {
try! group.syncShutdownGracefully()
Expand All @@ -176,5 +281,11 @@ public func swiftMain() -> Int {
return numberDone
}

measureAndPrint(desc: "ping_pong_1000_reqs_1_conn") {
let numberDone = try! doPingPongRequests(group: group, number: 1000)
precondition(numberDone == 1000)
return numberDone
}

return 0
}
11 changes: 8 additions & 3 deletions Sources/NIO/ByteBuffer-int.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
//===----------------------------------------------------------------------===//

extension ByteBuffer {
private func toEndianness<T: FixedWidthInteger> (value: T, endianness: Endianness) -> T {
@_inlineable @_versioned
func _toEndianness<T: FixedWidthInteger> (value: T, endianness: Endianness) -> T {
switch endianness {
case .little:
return value.littleEndian
Expand All @@ -28,6 +29,7 @@ extension ByteBuffer {
/// - endianness: The endianness of the integer in this `ByteBuffer` (defaults to big endian).
/// - as: the desired `FixedWidthInteger` type (optional parameter)
/// - returns: An integer value deserialized from this `ByteBuffer` or `nil` if there aren't enough bytes readable.
@_inlineable
public mutating func readInteger<T: FixedWidthInteger>(endianness: Endianness = .big, as: T.Type = T.self) -> T? {
guard self.readableBytes >= MemoryLayout<T>.size else {
return nil
Expand All @@ -45,6 +47,7 @@ extension ByteBuffer {
/// - endianness: The endianness of the integer in this `ByteBuffer` (defaults to big endian).
/// - as: the desired `FixedWidthInteger` type (optional parameter)
/// - returns: An integer value deserialized from this `ByteBuffer` or `nil` if the bytes of interest aren't contained in the `ByteBuffer`.
@_inlineable
public func getInteger<T: FixedWidthInteger>(at index: Int, endianness: Endianness = Endianness.big, as: T.Type = T.self) -> T? {
precondition(index >= 0, "index must not be negative")
return self.withVeryUnsafeBytes { ptr in
Expand All @@ -56,7 +59,7 @@ extension ByteBuffer {
valuePtr.copyMemory(from: UnsafeRawBufferPointer(start: ptr.baseAddress!.advanced(by: index),
count: MemoryLayout<T>.size))
}
return toEndianness(value: value, endianness: endianness)
return _toEndianness(value: value, endianness: endianness)
}
}

Expand All @@ -67,6 +70,7 @@ extension ByteBuffer {
/// - endianness: The endianness to use, defaults to big endian.
/// - returns: The number of bytes written.
@discardableResult
@_inlineable
public mutating func write<T: FixedWidthInteger>(integer: T, endianness: Endianness = .big, as: T.Type = T.self) -> Int {
let bytesWritten = self.set(integer: integer, at: self.writerIndex, endianness: endianness)
self.moveWriterIndex(forwardBy: bytesWritten)
Expand All @@ -81,8 +85,9 @@ extension ByteBuffer {
/// - endianness: The endianness to use, defaults to big endian.
/// - returns: The number of bytes written.
@discardableResult
@_inlineable
public mutating func set<T: FixedWidthInteger>(integer: T, at index: Int, endianness: Endianness = .big, as: T.Type = T.self) -> Int {
var value = toEndianness(value: integer, endianness: endianness)
var value = _toEndianness(value: integer, endianness: endianness)
return Swift.withUnsafeBytes(of: &value) { ptr in
self.set(bytes: ptr, at: index)
}
Expand Down

0 comments on commit c3706cd

Please sign in to comment.