Skip to content

Commit

Permalink
Added support for non-blocking network IO
Browse files Browse the repository at this point in the history
This adds support for performing non-blocking network operations, such
as reading and writing to/from a socket. The runtime API exposed is
similar to Erlang, allowing one to write code that uses non-blocking
APIs without having to resort to using callbacks. For example, in a
typicall callback based language you may write the following to read
from a socket:

    socket.create do (socket) {
      socket.read do (data) {

      }
    }

In Inko, you instead would (more or less) write the following:

    import std::net::socket::TcpStream

    let socket = try! TcpStream.new(ip: '192.0.2.0', port: 80)
    let message = try! socket.read_string(size: 4)

The VM then takes care of using the appropriate non-blocking operations,
and will reschedule processes whenever necessary.

This functionality is exposed through the following runtime modules:

* std::net::ip: used for parsing IPv4 and IPv6 addresses.
* std::net::socket: used for TCP and UDP sockets.
* std::net::unix: used for Unix domain sockets.

The VM uses the system's native polling mechanism to determine when a
file descriptor is available for a read or write. On Linux we use epoll,
while using kqueue for the various BSDs and Mac OS. For Windows we use
wepoll (https://github.com/piscisaureus/wepoll). Wepoll exposes an API
that is compatible with the epoll API, but uses Windows IO completion
ports under the hoods.

When a process attempts to perform a non-blocking operation, the process
is registered (combined with the file descriptor to poll) in a global
poller and suspended. When the file descriptor becomes available for a
read or write, the corresponding process is rescheduled. The polling
mechanism is set up in such a way that a process can not be rescheduled
multiple times at once.

We do not use MIO (https://github.com/tokio-rs/mio), instead we use
epoll, kqueue, and wepoll (using
https://crates.io/crates/wepoll-binding) directly. At the time of
writing, while MIO offers some form of support for Windows it comes with
various issues:

1. tokio-rs/mio#921
2. tokio-rs/mio#919
3. tokio-rs/mio#776
4. tokio-rs/mio#913

It's not clear when these issues would be addressed, as the maintainers
of MIO appear to not have the experience and resources to resolve them
themselves. MIO is part of the Google Summer of Code 2019, with the goal
of improving Windows support. Unfortunately, this likely won't be done
before the end of 2019, and we don't want to wait that long.

Another issue with MIO is its implementation. Internally, MIO uses
various forms of synchronisation which can make it expensive to use a
single poller across multiple threads; it certainly is not a zero-cost
library. It also offers more than we need, such as being able to poll
arbitrary objects.

We are not the first to run into these issues. For example, the Amethyst
video game engine also ran into issues with MIO as detailed in
https://community.amethyst.rs/t/sorting-through-the-mio-mess/561.

With all of this in mind, I decided it was not worth the time to wait
for MIO to get fixed, and to instead spend time directly using epoll,
kqueue, and wepoll. This gives us total control over the code, and
allows us to implement what we need in the way we need it. Most
important of all: it works on Linux, BSD, Mac, and Windows.
  • Loading branch information
Yorick Peterse committed May 9, 2019
1 parent 99c9f04 commit 3974f47
Show file tree
Hide file tree
Showing 58 changed files with 7,443 additions and 549 deletions.
12 changes: 12 additions & 0 deletions compiler/lib/inkoc/codegen/instruction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ class Instruction
StringToFloat
FloatToBits
ProcessIdentifier
SocketCreate
SocketWrite
SocketRead
SocketAccept
SocketReceiveFrom
SocketSendTo
SocketAddress
SocketGetOption
SocketSetOption
SocketBind
SocketListen
SocketConnect
]
.each_with_index
.each_with_object({}) { |(value, index), hash| hash[value] = index }
Expand Down
2 changes: 2 additions & 0 deletions compiler/lib/inkoc/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class Config
FUNCTION_CONST = 'Function'
POINTER_CONST = 'Pointer'
PROCESS_CONST = 'Process'
SOCKET_CONST = 'Socket'
UNIX_SOCKET_CONST = 'UnixSocket'
ARRAY_TYPE_PARAMETER = 'T'
OPTIONAL_CONST = 'Optional'

Expand Down
8 changes: 5 additions & 3 deletions compiler/lib/inkoc/lexer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Lexer
).freeze

NUMBER_RANGE = '0'..'9'
NUMBER_ALLOWED_LETTERS = %w[a b c d e f A B C D E F x _]

# We allocate this once so we don't end up wasting allocations every time we
# consume a peeked value.
Expand Down Expand Up @@ -293,7 +294,7 @@ def number(skip_first: false)
type = :float

@position += next_char == '+' ? 2 : 1
when NUMBER_RANGE, '_', 'x'
when NUMBER_RANGE, *NUMBER_ALLOWED_LETTERS
@position += 1
else
break
Expand Down Expand Up @@ -378,11 +379,12 @@ def string_with_quote(quote, escaped, unescape_special = false)

if has_special && unescape_special
token.value.gsub!(
/\\t|\\r|\\n|\\e/,
/\\t|\\r|\\n|\\e|\\0/,
'\t' => "\t",
'\n' => "\n",
'\r' => "\r",
'\e' => "\e"
'\e' => "\e",
'\0' => "\0"
)
end

Expand Down
56 changes: 56 additions & 0 deletions compiler/lib/inkoc/pass/define_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,14 @@ def on_raw_get_process_prototype(*)
typedb.process_type.new_instance
end

def on_raw_get_socket_prototype(*)
typedb.socket_type.new_instance
end

def on_raw_get_unix_socket_prototype(*)
typedb.unix_socket_type.new_instance
end

def on_raw_set_object_name(*)
typedb.string_type.new_instance
end
Expand Down Expand Up @@ -1745,6 +1753,54 @@ def on_raw_float_to_bits(*)
typedb.integer_type.new_instance
end

def on_raw_socket_create(*)
TypeSystem::Dynamic.new
end

def on_raw_socket_write(*)
typedb.integer_type.new_instance
end

def on_raw_socket_read(*)
typedb.integer_type.new_instance
end

def on_raw_socket_accept(*)
TypeSystem::Dynamic.new
end

def on_raw_socket_receive_from(*)
typedb.new_array_of_type(TypeSystem::Dynamic.new)
end

def on_raw_socket_send_to(*)
typedb.integer_type.new_instance
end

def on_raw_socket_address(*)
typedb.new_array_of_type(TypeSystem::Dynamic.new)
end

def on_raw_socket_get_option(*)
TypeSystem::Dynamic.new
end

def on_raw_socket_set_option(*)
TypeSystem::Dynamic.new
end

def on_raw_socket_bind(*)
typedb.nil_type.new_instance
end

def on_raw_socket_connect(*)
typedb.nil_type.new_instance
end

def on_raw_socket_listen(*)
typedb.integer_type.new_instance
end

def define_block_signature(node, scope, expected_block = nil)
define_type_parameters(node, scope)
define_argument_types(node, scope, expected_block)
Expand Down
56 changes: 56 additions & 0 deletions compiler/lib/inkoc/pass/generate_tir.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,14 @@ def on_raw_get_process_prototype(node, body)
builtin_prototype_instruction(PrototypeID::PROCESS, node, body)
end

def on_raw_get_socket_prototype(node, body)
builtin_prototype_instruction(PrototypeID::SOCKET, node, body)
end

def on_raw_get_unix_socket_prototype(node, body)
builtin_prototype_instruction(PrototypeID::UNIX_SOCKET, node, body)
end

def on_raw_set_object_name(node, body)
loc = node.location
obj = process_node(node.arguments.fetch(0), body)
Expand Down Expand Up @@ -1505,6 +1513,54 @@ def on_raw_float_to_bits(node, body)
raw_unary_instruction(:FloatToBits, node, body)
end

def on_raw_socket_create(node, body)
raw_binary_instruction(:SocketCreate, node, body)
end

def on_raw_socket_write(node, body)
raw_binary_instruction(:SocketWrite, node, body)
end

def on_raw_socket_read(node, body)
raw_ternary_instruction(:SocketRead, node, body)
end

def on_raw_socket_accept(node, body)
raw_unary_instruction(:SocketAccept, node, body)
end

def on_raw_socket_receive_from(node, body)
raw_ternary_instruction(:SocketReceiveFrom, node, body)
end

def on_raw_socket_send_to(node, body)
raw_quaternary_instruction(:SocketSendTo, node, body)
end

def on_raw_socket_address(node, body)
raw_binary_instruction(:SocketAddress, node, body)
end

def on_raw_socket_get_option(node, body)
raw_binary_instruction(:SocketGetOption, node, body)
end

def on_raw_socket_set_option(node, body)
raw_ternary_instruction(:SocketSetOption, node, body)
end

def on_raw_socket_bind(node, body)
raw_ternary_instruction(:SocketBind, node, body)
end

def on_raw_socket_connect(node, body)
raw_ternary_instruction(:SocketConnect, node, body)
end

def on_raw_socket_listen(node, body)
raw_binary_instruction(:SocketListen, node, body)
end

def on_return(node, body)
location = node.location
register =
Expand Down
2 changes: 2 additions & 0 deletions compiler/lib/inkoc/prototype_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ module PrototypeID
FUNCTION = 13
POINTER = 14
PROCESS = 15
SOCKET = 16
UNIX_SOCKET = 17
end
end
5 changes: 4 additions & 1 deletion compiler/lib/inkoc/type_system/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Database
:object_type, :hasher_type, :boolean_type,
:read_only_file_type, :write_only_file_type,
:read_write_file_type, :byte_array_type, :library_type,
:function_type, :pointer_type, :process_type
:function_type, :pointer_type, :process_type,
:socket_type, :unix_socket_type

def initialize
@object_type = new_object_type(Config::OBJECT_CONST, nil)
Expand All @@ -31,6 +32,8 @@ def initialize
@function_type = new_object_type(Config::FUNCTION_CONST)
@pointer_type = new_object_type(Config::POINTER_CONST)
@process_type = new_object_type(Config::PROCESS_CONST)
@socket_type = new_object_type(Config::SOCKET_CONST)
@unix_socket_type = new_object_type(Config::UNIX_SOCKET_CONST)
@trait_id = -1
end

Expand Down
16 changes: 16 additions & 0 deletions compiler/spec/inkoc/lexer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@
expect(token.value).to eq('0x10')
end

it 'tokenizes a hexadecimal integer with letters' do
lexer = described_class.new('0xFF')
token = lexer.number

expect(token.type).to eq(:integer)
expect(token.value).to eq('0xFF')
end

it 'tokenizes a float using the scientific notation with a lowercase e' do
lexer = described_class.new('1e2')
token = lexer.number
Expand Down Expand Up @@ -368,6 +376,14 @@
expect(token.value).to eq("\n")
end

it 'tokenizes a double quoted string with a NULL byte' do
lexer = described_class.new('"\0"')
token = lexer.double_string

expect(token.type).to eq(:string)
expect(token.value).to eq("\0")
end

it 'tokenizes a double quoted string with a carriage return' do
lexer = described_class.new('"\r"')
token = lexer.double_string
Expand Down
1 change: 1 addition & 0 deletions runtime/src/core/prelude.inko
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import std::byte_array
import std::process
import std::vm
import std::range::(Range as _Range)
import std::integer::extensions

# These constants are re-exported so they're available to all modules by
# default. Core types such as String should be exposed in std::globals instead.
Expand Down
73 changes: 73 additions & 0 deletions runtime/src/std/integer/extensions.inko
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#! Extensions for the `Integer` type that can only be defined later on in the
#! bootstrapping process.
import std::process
import std::string_buffer::StringBuffer

## The digits to use when converting an `Integer` to a `String` using a specific
## base or radix.
##
## The order of values in this `Array` must remain as-is, as re-ordering values
## will break the code that uses this `Array`.
let INTEGER_RADIX_DIGITS = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u', 'v', 'w', 'x', 'y', 'z'
]

impl Integer {
## Formats `self` as a `String` using the given base/radix.
##
## # Panics
##
## This method will panic if `radix` is smaller than 2, or greater than 36.
##
## # Examples
##
## Formatting an integer in base 16 (hexadecimal):
##
## 0x2ff.format(radix: 16) # => '2ff'
def format(radix = 10) -> String {
radix < 2
.or { radix > 36 }
.if_true {
process.panic('The radix argument must be between 2 and 36')
}

zero?.if_true {
return '0'
}

let characters = []
let mut integer = absolute

negative?.if_true {
characters.push('-')
}

{ integer.positive? }.while_true {
characters.push(*INTEGER_RADIX_DIGITS[integer % radix])
integer /= radix
}

# The above operation pushes the digits from the back, resulting in our
# characters being in reverse order. For example, for 0x2ff the `characters`
# `Array` would be `['f', 'f', '2']`. Below we'll reverse the values
# in-place.
let start_at = negative?.if true: { 1 }, false: { 0 }
let mut old_index = characters.length - 1
let mut new_index = start_at

{ old_index > new_index }.while_true {
let old = *characters[old_index]
let new = *characters[new_index]

characters[new_index] = old
characters[old_index] = new

old_index -= 1
new_index += 1
}

StringBuffer.new(characters).to_string
}
}
Loading

0 comments on commit 3974f47

Please sign in to comment.