diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index d929d1ebfb98d2..405f6219fc1cf8 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -21,15 +21,17 @@ export import Base: @handle_as, wait, close, eventloop, notify_error, IOError, _sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, iolock_begin, iolock_end, associate_julia_struct, disassociate_julia_struct, - preserve_handle, unpreserve_handle, isreadable, iswritable, | + preserve_handle, unpreserve_handle, isreadable, iswritable, isopen, + |, getproperty, propertynames import Base.Filesystem.StatStruct if Sys.iswindows() import Base.WindowsRawSocket end + # libuv file watching event flags -const UV_RENAME = 1 -const UV_CHANGE = 2 +const UV_RENAME = Int32(1) +const UV_CHANGE = Int32(2) struct FileEvent renamed::Bool changed::Bool @@ -45,30 +47,33 @@ FileEvent(flags::Integer) = FileEvent((flags & UV_RENAME) != 0, a.changed | b.changed, a.timedout | b.timedout) +# libuv file descriptor event flags +const UV_READABLE = Int32(1) +const UV_WRITABLE = Int32(2) +const UV_DISCONNECT = Int32(4) +const UV_PRIORITIZED = Int32(8) struct FDEvent - readable::Bool - writable::Bool - disconnect::Bool - timedout::Bool - FDEvent(r::Bool, w::Bool, d::Bool, t::Bool) = new(r, w, d, t) + events::Int32 + FDEvent(flags::Integer=0) = new(flags) end -# libuv file descriptor event flags -const UV_READABLE = 1 -const UV_WRITABLE = 2 -const UV_DISCONNECT = 4 + +FDEvent(r::Bool, w::Bool, d::Bool, t::Bool) = FDEvent((UV_READABLE * r) | (UV_WRITABLE * w) | (UV_DISCONNECT * d)) # deprecated method + +function getproperty(f::FDEvent, field::Symbol) + events = getfield(f, :events) + field === :readable && return (events & UV_READABLE) != 0 + field === :writable && return (events & UV_WRITABLE) != 0 + field === :disconnect && return (events & UV_DISCONNECT) != 0 + field === :prioritized && return (events & UV_PRIORITIZED) != 0 + field === :timedout && return events == 0 + field === :events && return Int(events) + getfield(f, field)::Union{} +end +propertynames(f::FDEvent) = (:readable, :writable, :disconnect, :prioritized, :timedout, :events) isreadable(f::FDEvent) = f.readable iswritable(f::FDEvent) = f.writable -FDEvent() = FDEvent(false, false, false, true) -FDEvent(flags::Integer) = FDEvent((flags & UV_READABLE) != 0, - (flags & UV_WRITABLE) != 0, - (flags & UV_DISCONNECT) != 0, - false) -|(a::FDEvent, b::FDEvent) = - FDEvent(a.readable | b.readable, - a.writable | b.writable, - a.disconnect | b.disconnect, - a.timedout | b.timedout) +|(a::FDEvent, b::FDEvent) = FDEvent(getfield(a, :events) | getfield(b, :events)) mutable struct FileMonitor handle::Ptr{Cvoid} @@ -93,15 +98,13 @@ mutable struct FileMonitor end end - mutable struct FolderMonitor handle::Ptr{Cvoid} notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError} - open::Bool FolderMonitor(folder::AbstractString) = FolderMonitor(String(folder)) function FolderMonitor(folder::String) handle = Libc.malloc(_sizeof_uv_fs_event) - this = new(handle, Channel(Inf), false) + this = new(handle, Channel(Inf)) associate_julia_struct(handle, this) iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) @@ -109,7 +112,6 @@ mutable struct FolderMonitor Libc.free(handle) throw(_UVError("FolderMonitor", err)) end - this.open = true finalizer(uvfinalize, this) uv_error("FolderMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), @@ -152,9 +154,10 @@ mutable struct _FDWatcher events::Int32 active::Tuple{Bool, Bool} - let FDWatchers = Vector{Any}() + let FDWatchers = Vector{Any}() # XXX: this structure and refcount need thread-safety locks global _FDWatcher, uvfinalize @static if Sys.isunix() + _FDWatcher(fd::RawFD, mask::FDEvent) = _FDWatcher(fd, mask.readable, mask.writable) function _FDWatcher(fd::RawFD, readable::Bool, writable::Bool) if !readable && !writable throw(ArgumentError("must specify at least one of readable or writable to create a FDWatcher")) @@ -181,7 +184,7 @@ mutable struct _FDWatcher fdnum, (Int(readable), Int(writable)), Base.ThreadSynchronizer(), - 0, + Int32(0), (false, false)) associate_julia_struct(handle, this) err = ccall(:uv_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, RawFD), eventloop(), handle, fd) @@ -212,7 +215,7 @@ mutable struct _FDWatcher FDWatchers[t.fdnum] = nothing end end - notify(t.notify, FDEvent()) + notify(t.notify, Int32(0)) finally unlock(t.notify) end @@ -222,10 +225,12 @@ mutable struct _FDWatcher end @static if Sys.iswindows() + _FDWatcher(fd::RawFD, mask::FDEvent) = _FDWatcher(fd, mask.readable, mask.writable) function _FDWatcher(fd::RawFD, readable::Bool, writable::Bool) handle = Libc._get_osfhandle(fd) return _FDWatcher(handle, readable, writable) end + _FDWatcher(fd::WindowsRawSocket, mask::FDEvent) = _FDWatcher(fd, mask.readable, mask.writable) function _FDWatcher(fd::WindowsRawSocket, readable::Bool, writable::Bool) if !readable && !writable throw(ArgumentError("must specify at least one of readable or writable to create a FDWatcher")) @@ -254,30 +259,39 @@ mutable struct _FDWatcher end end -function iswaiting(fwd::_FDWatcher, t::Task) - return fwd.notify.waitq === t.queue -end - mutable struct FDWatcher - watcher::_FDWatcher - readable::Bool - writable::Bool # WARNING: make sure `close` has been manually called on this watcher before closing / destroying `fd` + watcher::_FDWatcher + mask::FDEvent function FDWatcher(fd::RawFD, readable::Bool, writable::Bool) - this = new(_FDWatcher(fd, readable, writable), readable, writable) + return FDWatcher(fd, FDEvent(readable, writable, false, false)) + end + function FDWatcher(fd::RawFD, mask::FDEvent) + this = new(_FDWatcher(fd, mask), mask) finalizer(close, this) return this end @static if Sys.iswindows() function FDWatcher(fd::WindowsRawSocket, readable::Bool, writable::Bool) - this = new(_FDWatcher(fd, readable, writable), readable, writable) + return FDWatcher(fd, FDEvent(readable, writable, false, false)) + end + function FDWatcher(fd::WindowsRawSocket, mask::FDEvent) + this = new(_FDWatcher(fd, mask), mask) finalizer(close, this) return this end end end +function getproperty(fdw::FDWatcher, s::Symbol) + # support deprecated field names + s === :readable && return fdw.mask.readable + s === :writable && return fdw.mask.writable + return getfield(fdw, s) +end + +close(t::_FDWatcher, mask::FDEvent) = close(t, mask.readable, mask.writable) function close(t::_FDWatcher, readable::Bool, writable::Bool) iolock_begin() if t.refcount != (0, 0) @@ -285,15 +299,17 @@ function close(t::_FDWatcher, readable::Bool, writable::Bool) end if t.refcount == (0, 0) uvfinalize(t) + else + @lock t.notify notify(t.notify, Int32(0)) end iolock_end() nothing end function close(t::FDWatcher) - r, w = t.readable, t.writable - t.readable = t.writable = false - close(t.watcher, r, w) + mask = t.mask + t.mask = FDEvent() + close(t.watcher, mask) end function uvfinalize(uv::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) @@ -339,12 +355,17 @@ function _uv_hook_close(uv::FileMonitor) end function _uv_hook_close(uv::FolderMonitor) - uv.open = false uv.handle = C_NULL close(uv.notify) nothing end +isopen(fm::FileMonitor) = fm.handle != C_NULL +isopen(fm::FolderMonitor) = fm.handle != C_NULL +isopen(pfw::PollingFileWatcher) = pfw.handle != C_NULL +isopen(pfw::_FDWatcher) = pfw.refcount != (0, 0) +isopen(pfw::FDWatcher) = !pfw.mask.timedout + function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) t = @handle_as handle FileMonitor lock(t.notify) @@ -388,7 +409,7 @@ function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32) ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle) end end - notify(t.notify, FDEvent(events)) + notify(t.notify, events) end finally unlock(t.notify) @@ -499,67 +520,78 @@ function stop_watching(t::FileMonitor) nothing end -function wait(fdw::FDWatcher) - GC.@preserve fdw begin - return wait(fdw.watcher, readable = fdw.readable, writable = fdw.writable) - end -end - -function wait(fdw::_FDWatcher; readable=true, writable=true) - events = FDEvent(Int32(0)) +# n.b. this _wait may return spuriously early with a timedout event +function _wait(fdw::_FDWatcher, mask::FDEvent) iolock_begin() preserve_handle(fdw) lock(fdw.notify) try - while true - haveevent = false - events |= FDEvent(fdw.events) - if readable && isreadable(events) - fdw.events &= ~UV_READABLE - haveevent = true - end - if writable && iswritable(events) - fdw.events &= ~UV_WRITABLE - haveevent = true - end - if haveevent - break - end - if fdw.refcount == (0, 0) # !open - throw(EOFError()) - else - start_watching(fdw) # make sure the poll is active - iolock_end() - events = wait(fdw.notify)::FDEvent - unlock(fdw.notify) - iolock_begin() - lock(fdw.notify) - end + events = FDEvent(fdw.events & mask.events) + if !isopen(fdw) # !open + throw(EOFError()) + elseif events.timedout + start_watching(fdw) # make sure the poll is active + iolock_end() + return FDEvent(wait(fdw.notify)::Int32) + else + iolock_end() + return events end finally unlock(fdw.notify) unpreserve_handle(fdw) end - iolock_end() - return events end -function wait(fd::RawFD; readable=false, writable=false) - fdw = _FDWatcher(fd, readable, writable) +function wait(fdw::_FDWatcher; readable=true, writable=true) + return wait(fdw, FDEvent(readable, writable, false, false)) +end +function wait(fdw::_FDWatcher, mask::FDEvent) + while true + mask.timedout && return mask + events = _wait(fdw, mask) + if !events.timedout + @lock fdw.notify fdw.events &= ~events.events + return events + end + end +end + +function wait(fdw::FDWatcher) + isopen(fdw) || throw(EOFError()) + while true + events = GC.@preserve fdw _wait(fdw.watcher, fdw.mask) + isopen(fdw) || throw(EOFError()) + if !events.timedout + @lock fdw.notify fdw.events &= ~events.events + return events + end + end +end + +function wait(socket::RawFD; readable=false, writable=false) + return wait(socket, FDEvent(readable, writable, false, false)) +end +function wait(fd::RawFD, mask::FDEvent) + fdw = _FDWatcher(fd, mask) try - return wait(fdw, readable=readable, writable=writable) + return wait(fdw, mask) finally - close(fdw, readable, writable) + close(fdw, mask) end end + if Sys.iswindows() function wait(socket::WindowsRawSocket; readable=false, writable=false) - fdw = _FDWatcher(socket, readable, writable) + return wait(socket, FDEvent(readable, writable, false, false)) + end + function wait(socket::WindowsRawSocket, mask::FDEvent) + fdw = _FDWatcher(socket, mask) try - return wait(fdw, readable=readable, writable=writable) + return wait(fdw, mask) finally - close(fdw, readable, writable) + close(fdw, mask) end end end @@ -651,38 +683,44 @@ The returned value is an object with boolean fields `readable`, `writable`, and giving the result of the polling. """ function poll_fd(s::Union{RawFD, Sys.iswindows() ? WindowsRawSocket : Union{}}, timeout_s::Real=-1; readable=false, writable=false) - wt = Condition() - fdw = _FDWatcher(s, readable, writable) + mask = FDEvent(readable, writable, false, false) + mask.timedout && return mask + fdw = _FDWatcher(s, mask) local timer + timedout = false # TODO: make this atomic try if timeout_s >= 0 - result::FDEvent = FDEvent() - t = @async begin - timer = Timer(timeout_s) do t - notify(wt) - end - try - result = wait(fdw, readable=readable, writable=writable) - catch e - notify_error(wt, e) - return + # delay creating the timer until shortly before we start the poll wait + timer = Timer(timeout_s) do t + timedout && return + timedout = true + close(fdw, mask) + end + try + while true + events = _wait(fdw, mask) + if timedout || !events.timedout + @lock fdw.notify fdw.events &= ~events.events + return events + end end - notify(wt) + catch ex + ex isa EOFError() || rethrow() + return FDEvent() end - wait(wt) - # It's possible that both the timer and the poll fired on the same - # libuv loop. In that case, which event we see here first depends - # on task schedule order. If we can see that the task isn't waiting - # on the file watcher anymore, just let it finish so we can see - # the modification to `result` - iswaiting(fdw, t) || wait(t) - return result else - return wait(fdw, readable=readable, writable=writable) + return wait(fdw, mask) end finally - close(fdw, readable, writable) - @isdefined(timer) && close(timer) + if @isdefined(timer) + if !timedout + timedout = true + close(timer) + close(fdw, mask) + end + else + close(fdw, mask) + end end end diff --git a/stdlib/FileWatching/test/runtests.jl b/stdlib/FileWatching/test/runtests.jl index 345ffce07482f6..f302f28295a01c 100644 --- a/stdlib/FileWatching/test/runtests.jl +++ b/stdlib/FileWatching/test/runtests.jl @@ -22,7 +22,7 @@ for i in 1:n uv_error("pipe", ccall(:uv_pipe, Cint, (Ptr{NTuple{2, Base.OS_HANDLE}}, Cint, Cint), Ref(pipe_fds, i), 0, 0)) end Ctype = Sys.iswindows() ? Ptr{Cvoid} : Cint - FDmax = Sys.iswindows() ? 0x7fff : (n + 60) # expectations on reasonable values + FDmax = Sys.iswindows() ? 0x7fff : (n + 60 + (isdefined(Main, :Revise) * 30)) # expectations on reasonable values fd_in_limits = 0 <= Int(Base.cconvert(Ctype, pipe_fds[i][1])) <= FDmax && 0 <= Int(Base.cconvert(Ctype, pipe_fds[i][2])) <= FDmax