Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify buffering in TCPConnection #3185

Merged
merged 1 commit into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ All notable changes to the Pony compiler and standard library will be documented
- Make TCPConnection yield on writes to not hog cpu ([PR #3176](https://github.com/ponylang/ponyc/pull/3176))
- Change how TCP connection reads data to improve performance ([PR #3178](https://github.com/ponylang/ponyc/pull/3178))
- Allow use of runtime TCP connect without ASIO one shot ([PR #3171](https://github.com/ponylang/ponyc/pull/3171))
- Simplify buffering in TCPConnection ([PR #3185](https://github.com/ponylang/ponyc/pull/3185))

## [0.28.1] - 2019-06-01

Expand Down
64 changes: 21 additions & 43 deletions packages/net/tcp_connection.pony
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ actor TCPConnection
var _read_buf_offset: USize = 0
var _max_received_called: USize = 50

var _next_size: USize
let _max_size: USize
let _read_buffer_size: USize

var _expect: USize = 0

Expand All @@ -227,16 +226,14 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv4 or IPv6. If `from` is a non-empty string, the connection
will be made from the specified interface.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -255,15 +252,13 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv4.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -282,15 +277,13 @@ actor TCPConnection
host: String,
service: String,
from: String = "",
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Connect via IPv6.
"""
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size
_notify = consume notify
let asio_flags =
ifdef not windows then
Expand All @@ -307,8 +300,7 @@ actor TCPConnection
listen: TCPListener,
notify: TCPConnectionNotify iso,
fd: U32,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
A new connection accepted on a server.
Expand All @@ -329,9 +321,8 @@ actor TCPConnection
@pony_asio_event_set_writeable(_event, true)
end
_writeable = true
_read_buf = recover Array[U8] .> undefined(init_size) end
_next_size = init_size
_max_size = max_size
_read_buf = recover Array[U8] .> undefined(read_buffer_size) end
_read_buffer_size = read_buffer_size

_notify.accepted(this)

Expand Down Expand Up @@ -657,7 +648,7 @@ actor TCPConnection
var bytes_to_send: USize = 0
var bytes_sent: USize = 0
while _writeable and (_pending_writev_total > 0) do
if bytes_sent >= _max_size then
if bytes_sent >= _read_buffer_size then
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened an issue #3186 to address this issue. I think it should be a different PR.

_write_again()
return false
end
Expand Down Expand Up @@ -775,16 +766,13 @@ actor TCPConnection
This occurs only with IOCP on Windows.
"""
ifdef windows then
match len.usize()
| 0 =>
if len.usize() == 0 then
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this usize() probably isn't needed but I wanted to do the minimal change.

// The socket has been closed from the other side, or a hard close has
// cancelled the queued read.
_readable = false
_shutdown_peer = true
close()
return
| _next_size =>
_next_size = _max_size.min(_next_size * 2)
end

_read_buf_offset = _read_buf_offset + len.usize()
Expand All @@ -806,16 +794,10 @@ actor TCPConnection

fun ref _read_buf_size() =>
"""
Resize the read buffer.
Resize the read buffer if it is empty or smaller than the next payload size
"""
if _expect != 0 then
// We know how much data to expect. If _read_buf is too small,
// then resize it.
if (_read_buf.size() - _read_buf_offset) <= _expect then
_read_buf.undefined(_expect.next_pow2().max(_next_size))
end
else
_read_buf.undefined(_next_size)
if _read_buf.size() <= _expect then
_read_buf.undefined(_read_buffer_size)
end

fun ref _queue_read() =>
Expand Down Expand Up @@ -879,8 +861,8 @@ actor TCPConnection
end
end

if sum >= _max_size then
// If we've read _max_size, yield and read again later.
if sum >= _read_buffer_size then
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider freeing "yield after" from buffer size in which case #3186 should be updated.

// If we've read _read_buffer_size, yield and read again later.
_read_buf_size()
_read_again()
_reading = false
Expand All @@ -898,8 +880,7 @@ actor TCPConnection
_read_buf.cpointer(_read_buf_offset),
_read_buf.size() - _read_buf_offset) ?

match len
| 0 =>
if len == 0 then
// Would block, try again later.
// this is safe because asio thread isn't currently subscribed
// for a read event so will not be writing to the readable flag
Expand All @@ -908,9 +889,6 @@ actor TCPConnection
_reading = false
@pony_asio_event_resubscribe_read(_event)
return
| (_read_buf.size() - _read_buf_offset) =>
// Increase the read buffer size.
_next_size = _max_size.min(_next_size * 2)
end

_read_buf_offset = _read_buf_offset + len
Expand Down
25 changes: 9 additions & 16 deletions packages/net/tcp_listener.pony
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ actor TCPListener
let _limit: USize
var _count: USize = 0
var _paused: Bool = false
var _init_size: USize
var _max_size: USize
var _read_buffer_size: USize

new create(
auth: TCPListenerAuth,
notify: TCPListenNotify iso,
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for both IPv4 and IPv6 connections.
Expand All @@ -65,8 +63,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp[AsioEventID](this,
host.cstring(), service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand All @@ -76,8 +73,7 @@ actor TCPListener
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for IPv4 connections.
Expand All @@ -87,8 +83,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp4[AsioEventID](this, host.cstring(),
service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand All @@ -98,8 +93,7 @@ actor TCPListener
host: String = "",
service: String = "0",
limit: USize = 0,
init_size: USize = 64,
max_size: USize = 16384)
read_buffer_size: USize = 16384)
=>
"""
Listens for IPv6 connections.
Expand All @@ -109,8 +103,7 @@ actor TCPListener
_event =
@pony_os_listen_tcp6[AsioEventID](this, host.cstring(),
service.cstring())
_init_size = init_size
_max_size = max_size
_read_buffer_size = read_buffer_size
_fd = @pony_asio_event_fd(_event)
_notify_listening()

Expand Down Expand Up @@ -217,8 +210,8 @@ actor TCPListener
Spawn a new connection.
"""
try
TCPConnection._accept(this, _notify.connected(this)?, ns, _init_size,
_max_size)
TCPConnection._accept(this, _notify.connected(this)?, ns,
_read_buffer_size)
_count = _count + 1
else
@pony_os_socket_close[None](ns)
Expand Down