Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reland "IO: tie lifetime of handle field to container" #44883

Merged
merged 2 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -86,9 +86,9 @@ once. When the timer is closed (by [`close`](@ref)) waiting tasks are woken with

"""
mutable struct Timer
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -157,12 +157,12 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen
isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if t.handle != C_NULL && isopen(t)
t.isopen = false
if isopen(t)
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -174,12 +174,12 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
disassociate_julia_struct(t.handle) # not going to call the usual close hooks anymore
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
end
t.handle = C_NULL
@atomic :monotonic t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -192,9 +192,9 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
@atomic :monotonic t.isopen = false
Libc.free(@atomicswap :monotonic t.handle = C_NULL)
notify(t.cond, false)
finally
unlock(t.cond)
end
Expand Down
7 changes: 5 additions & 2 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = uvhandles[x]::Int
if v == 1
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
8 changes: 4 additions & 4 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc) # ensure that data field is set to C_NULL
disassociate_julia_struct(proc.handle) # ensure that data field is set to C_NULL
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -70,7 +70,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
proc.handle = C_NULL
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
nothing
end

Expand Down Expand Up @@ -607,10 +607,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is no longer synchronized with the user application
# TODO: due to threading, this method is only weakly synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL
if p.handle != C_NULL # e.g. process_running
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
24 changes: 13 additions & 11 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,34 +496,37 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
should_wait && wait_close(stream)
wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit
close(uv)
else
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
Libc.free(uv.handle)
end
uv.status = StatusClosed
uv.handle = C_NULL
uv.status = StatusClosed
end
iolock_end()
nothing
Expand Down Expand Up @@ -713,7 +716,6 @@ end
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 1 addition & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
handle->data = NULL;
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
54 changes: 27 additions & 27 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


void jl_uv_call_close_callback(jl_value_t *val)
static void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
jl_value_t **args;
JL_GC_PUSHARGS(args, 2); // val is "rooted" in the finalizer list only right now
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close
args[1] = val;
assert(args[0]);
jl_apply(args, 2); // TODO: wrap in try-catch?
JL_GC_POP();
}

static void jl_uv_closeHandle(uv_handle_t *handle)
Expand All @@ -105,6 +107,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -125,6 +128,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -134,12 +141,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return; // success
}
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -224,47 +229,42 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
return;
uv_unref(handle);
}
JL_UV_LOCK();
if (handle->type == UV_FILE) {
else if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // avoid double-closing the stream
JL_UV_LOCK();
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // double-check
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 0 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading