Skip to content

Commit

Permalink
Simplify buffering in TCPConnection
Browse files Browse the repository at this point in the history
Prior to this commit, the buffering in TCPConnection was overly
complicated. This was a carry over from the old style of buffer
handling. However, after Dipin Hora's commits to bring over the "faster
tcp" changes from Wallaroo, the buffer handling no longer makes sense.

Prior to this commit, a TCPConnection would be provided two variables to
control read buffering: init_size, max_size. The read buffer would start
at init_size and would grow to max_size.

Before "faster tcp", the buffer would only grow in size if we read the
entire buffer's worth of data in one recv call. After the data was read,
the buffer would be truncated and then handed off to the
TCPConnectionNotify. This was very wasteful. If you had a buffer of 128
allocated bytes and only read 4, then the 124 allocated bytes would be
truncated away and the buffer would be reallocated.

With "faster tcp", we allocate a large buffer and when reading data, we
chop it off the buffer as needed. This means, that "grow the buffer"
doesn't make much sense anymore. The buffer will continually shrink in
size as we slowly chop more and more off. This meant that with the code
as it is prior to change, we would always grow to max buffer size.

If we are always going to grow to max buffer size, it makes sense to
simplify the code and our interfaces and only allow for setting the
buffer size. This commit makes that simplication. Now we will:

- allocate a buffer of size `read_buffer_size`
- reallocate when the buffer is empty
- reallocate if the caller is using `expect` and the buffer is too small
to hold the expected payload

This is a breaking change for TPCConnection and TCPListener as it
changes the constructor interfaces.
  • Loading branch information
SeanTAllen committed Jun 18, 2019
1 parent 9dc8ffc commit 68a82a3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ All notable changes to the Pony compiler and standard library will be documented
- Make TCPConnection yield on writes to not hog cpu ([PR #3176](https://github.com/ponylang/ponyc/pull/3176))
- Change how TCP connection reads data to improve performance ([PR #3178](https://github.com/ponylang/ponyc/pull/3178))
- Allow use of runtime TCP connect without ASIO one shot ([PR #3171](https://github.com/ponylang/ponyc/pull/3171))
- Simplify buffering in TCPConnection ([PR #3185](https://github.com/ponylang/ponyc/pull/3185))

## [0.28.1] - 2019-06-01

Expand Down
64 changes: 21 additions & 43 deletions packages/net/tcp_connection.pony
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ actor TCPConnection
var _read_buf_offset: USize = 0
var _max_received_called: USize = 50

var _next_size: USize
let _max_size: USize
let _read_buffer_size: USize

var _expect: USize = 0

Expand All @@ -227,16 +226,14 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv4 or IPv6. If `from` is a non-empty string, the connection
will be made from the specified interface.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -255,15 +252,13 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv4.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -282,15 +277,13 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv6.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -307,8 +300,7 @@ actor TCPConnection
listen: TCPListener,
notify: TCPConnectionNotify iso,
fd: U32,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
A new connection accepted on a server.
Expand All @@ -329,9 +321,8 @@ actor TCPConnection
@pony_asio_event_set_writeable(_event, true)
end
_writeable = true
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size

_notify.accepted(this)

Expand Down Expand Up @@ -657,7 +648,7 @@ actor TCPConnection
var bytes_to_send: USize = 0
var bytes_sent: USize = 0
while _writeable and (_pending_writev_total > 0) do
if bytes_sent >= _max_size then
if bytes_sent >= _read_buffer_size then
_write_again()
return false
end
Expand Down Expand Up @@ -775,16 +766,13 @@ actor TCPConnection
This occurs only with IOCP on Windows.
"""
ifdef windows then
match len.usize()
| 0 =>
if len.usize() == 0 then
// The socket has been closed from the other side, or a hard close has
// cancelled the queued read.
_readable = false
_shutdown_peer = true
close()
return
| _next_size =>
_next_size = _max_size.min(_next_size * 2)
end

_read_buf_offset = _read_buf_offset + len.usize()
Expand All @@ -806,16 +794,10 @@ actor TCPConnection

fun ref _read_buf_size() =>
"""
Resize the read buffer.
Resize the read buffer if it is empty or smaller than the next payload size
"""
if _expect != 0 then
// We know how much data to expect. If _read_buf is too small,
// then resize it.
if (_read_buf.size() - _read_buf_offset) <= _expect then
_read_buf.undefined(_expect.next_pow2().max(_next_size))
end
else
_read_buf.undefined(_next_size)
if _read_buf.size() <= _expect then
_read_buf.undefined(_read_buffer_size)
end

fun ref _queue_read() =>
Expand Down Expand Up @@ -879,8 +861,8 @@ actor TCPConnection
end
end

if sum >= _max_size then
// If we've read _max_size, yield and read again later.
if sum >= _read_buffer_size then
// If we've read _read_buffer_size, yield and read again later.
_read_buf_size()
_read_again()
_reading = false
Expand All @@ -898,8 +880,7 @@ actor TCPConnection
_read_buf.cpointer(_read_buf_offset),
_read_buf.size() - _read_buf_offset) ?

match len
| 0 =>
if len == 0 then
// Would block, try again later.
// this is safe because asio thread isn't currently subscribed
// for a read event so will not be writing to the readable flag
Expand All @@ -908,9 +889,6 @@ actor TCPConnection
_reading = false
@pony_asio_event_resubscribe_read(_event)
return
| (_read_buf.size() - _read_buf_offset) =>
// Increase the read buffer size.
_next_size = _max_size.min(_next_size * 2)
end

_read_buf_offset = _read_buf_offset + len
Expand Down
25 changes: 9 additions & 16 deletions packages/net/tcp_listener.pony
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ actor TCPListener
let _limit: USize
var _count: USize = 0
var _paused: Bool = false
var _init_size: USize
var _max_size: USize
var _read_buffer_size: USize

new create(
auth: TCPListenerAuth,
notify: TCPListenNotify iso,
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for both IPv4 and IPv6 connections.
Expand All @@ -65,8 +63,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp[AsioEventID](this,
host.cstring(), service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand All @@ -76,8 +73,7 @@ actor TCPListener
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for IPv4 connections.
Expand All @@ -87,8 +83,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp4[AsioEventID](this, host.cstring(),
service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand All @@ -98,8 +93,7 @@ actor TCPListener
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for IPv6 connections.
Expand All @@ -109,8 +103,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp6[AsioEventID](this, host.cstring(),
service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand Down Expand Up @@ -217,8 +210,8 @@ actor TCPListener
Spawn a new connection.
"""
try
TCPConnection._accept(this, _notify.connected(this)?, ns, _init_size,
_max_size)
TCPConnection._accept(this, _notify.connected(this)?, ns,
_read_buffer_size)
_count = _count + 1
else
@pony_os_socket_close[None](ns)
Expand Down

0 comments on commit 68a82a3

Please sign in to comment.