Skip to content

Commit

Permalink
threads: expand thread-safe region for I/O (#32309)
Browse files Browse the repository at this point in the history
This should hopefully cover most I/O operations which go through libuv to make them thread-safe.

There is no scaling here, just one big global lock, so expect
worse-than-single-threaded performance if doing I/O on multiple threads (as
compared to doing the same work on one thread). The intention is to handle
performance improvement incrementally later.

It also necessarily redesigns parts of the UDPSocket implementation
to properly handle concurrent (single-threaded) usage, as a necessary
part of making it handle parallel (thread-safe) usage.
  • Loading branch information
vtjnash authored and JeffBezanson committed Jun 21, 2019
1 parent 6a77884 commit fd7afa0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered
using Base.Threads: Event

using Serialization, Sockets
import Serialization: serialize, deserialize
import Sockets: connect
import Sockets: connect, wait_connected

# NOTE: clusterserialize.jl imports additional symbols from Serialization for use

Expand Down
13 changes: 9 additions & 4 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,15 @@ function socket_reuse_port()
end
end

function bind_client_port(s)
err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, Cuint),
s.handle, hton(client_port[]), hton(UInt32(0)), 0)
uv_error("bind() failed", err)
# TODO: this doesn't belong here, it belongs in Sockets
function bind_client_port(s::TCPSocket)
Sockets.iolock_begin()
@assert s.status == Sockets.StatusInit
host_in = Ref(hton(UInt32(0))) # IPv4 0.0.0.0
err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
s, hton(client_port[]), host_in, 0, false)
Sockets.iolock_end()
uv_error("tcp_bind", err)

_addr, port = getsockname(s)
client_port[] = port
Expand Down

0 comments on commit fd7afa0

Please sign in to comment.