Skip to content

Commit

Permalink
synchronize FileWatching (#31981)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson authored May 10, 2019
1 parent 25fc3a9 commit f0ffb29
Showing 1 changed file with 114 additions and 58 deletions.
172 changes: 114 additions & 58 deletions stdlib/FileWatching/src/FileWatching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ FDEvent(flags::Integer) = FDEvent((flags & UV_READABLE) != 0,
mutable struct FileMonitor
handle::Ptr{Cvoid}
file::String
notify::Condition
notify::Base.ThreadSynchronizer
events::Int32
active::Bool
FileMonitor(file::AbstractString) = FileMonitor(String(file))
function FileMonitor(file::String)
handle = Libc.malloc(_sizeof_uv_fs_event)
this = new(handle, file, Condition(), 0, false)
this = new(handle, file, Base.ThreadSynchronizer(), 0, false)
associate_julia_struct(handle, this)
err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Expand Down Expand Up @@ -118,14 +118,14 @@ mutable struct PollingFileWatcher
handle::Ptr{Cvoid}
file::String
interval::UInt32
notify::Condition
notify::Base.ThreadSynchronizer
active::Bool
curr_error::Int32
curr_stat::StatStruct
PollingFileWatcher(file::AbstractString, interval::Float64=5.007) = PollingFileWatcher(String(file), interval)
function PollingFileWatcher(file::String, interval::Float64=5.007) # same default as nodejs
handle = Libc.malloc(_sizeof_uv_fs_poll)
this = new(handle, file, round(UInt32, interval * 1000), Condition(), false, 0, StatStruct())
this = new(handle, file, round(UInt32, interval * 1000), Base.ThreadSynchronizer(), false, 0, StatStruct())
associate_julia_struct(handle, this)
err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Expand All @@ -141,7 +141,7 @@ mutable struct _FDWatcher
handle::Ptr{Cvoid}
fdnum::Int # this is NOT the file descriptor
refcount::Tuple{Int, Int}
notify::Condition
notify::Base.ThreadSynchronizer
events::Int32
active::Tuple{Bool, Bool}

Expand Down Expand Up @@ -171,7 +171,7 @@ mutable struct _FDWatcher
handle,
fdnum,
(Int(readable), Int(writable)),
Condition(),
Base.ThreadSynchronizer(),
0,
(false, false))
associate_julia_struct(handle, this)
Expand All @@ -187,19 +187,24 @@ mutable struct _FDWatcher
end

function uvfinalize(t::_FDWatcher)
if t.handle != C_NULL
disassociate_julia_struct(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
t.handle = C_NULL
end
t.refcount = (0, 0)
t.active = (false, false)
@static if Sys.isunix()
if FDWatchers[t.fdnum] == t
FDWatchers[t.fdnum] = nothing
lock(t.notify)
try
if t.handle != C_NULL
disassociate_julia_struct(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
t.handle = C_NULL
end
t.refcount = (0, 0)
t.active = (false, false)
@static if Sys.isunix()
if FDWatchers[t.fdnum] == t
FDWatchers[t.fdnum] = nothing
end
end
notify(t.notify, FDEvent())
finally
unlock(t.notify)
end
notify(t.notify, FDEvent())
nothing
end
end
Expand All @@ -219,7 +224,7 @@ mutable struct _FDWatcher
handle,
0,
(Int(readable), Int(writable)),
Condition(),
Base.ThreadSynchronizer(),
0,
(false, false))
associate_julia_struct(handle, this)
Expand Down Expand Up @@ -289,16 +294,26 @@ function _uv_hook_close(uv::_FDWatcher)
end

function _uv_hook_close(uv::PollingFileWatcher)
uv.handle = C_NULL
uv.active = false
notify(uv.notify, StatStruct())
lock(uv.notify)
try
uv.handle = C_NULL
uv.active = false
notify(uv.notify, StatStruct())
finally
unlock(uv.notify)
end
nothing
end

function _uv_hook_close(uv::FileMonitor)
uv.handle = C_NULL
uv.active = false
notify(uv.notify, FileEvent())
lock(uv.notify)
try
uv.handle = C_NULL
uv.active = false
notify(uv.notify, FileEvent())
finally
unlock(uv.notify)
end
nothing
end

Expand All @@ -311,11 +326,16 @@ end

function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32)
t = @handle_as handle FileMonitor
if status != 0
notify_error(t.notify, _UVError("FileMonitor", status))
else
t.events |= events
notify(t.notify, FileEvent(events))
lock(t.notify)
try
if status != 0
notify_error(t.notify, _UVError("FileMonitor", status))
else
t.events |= events
notify(t.notify, FileEvent(events))
end
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -333,19 +353,24 @@ end

function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32)
t = @handle_as handle _FDWatcher
if status != 0
notify_error(t.notify, _UVError("FDWatcher", status))
else
t.events |= events
if t.active[1] || t.active[2]
if isempty(t.notify)
# if we keep hearing about events when nobody appears to be listening,
# stop the poll to save cycles
t.active = (false, false)
ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle)
lock(t.notify)
try
if status != 0
notify_error(t.notify, _UVError("FDWatcher", status))
else
t.events |= events
if t.active[1] || t.active[2]
if isempty(t.notify)
# if we keep hearing about events when nobody appears to be listening,
# stop the poll to save cycles
t.active = (false, false)
ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle)
end
end
notify(t.notify, FDEvent(events))
end
notify(t.notify, FDEvent(events))
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -359,7 +384,12 @@ function uv_fspollcb(handle::Ptr{Cvoid}, status::Int32, prev::Ptr, curr::Ptr)
end
if status == 0 || status != old_status
prev_stat = StatStruct(convert(Ptr{UInt8}, prev))
notify(t.notify, prev_stat)
lock(t.notify)
try
notify(t.notify, prev_stat)
finally
unlock(t.notify)
end
end
nothing
end
Expand Down Expand Up @@ -400,10 +430,15 @@ function start_watching(t::PollingFileWatcher)
end

function stop_watching(t::PollingFileWatcher)
if t.active && isempty(t.notify)
t.active = false
uv_error("PollingFileWatcher (stop)",
ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle))
lock(t.notify)
try
if t.active && isempty(t.notify)
t.active = false
uv_error("PollingFileWatcher (stop)",
ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle))
end
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -420,10 +455,15 @@ function start_watching(t::FileMonitor)
end

function stop_watching(t::FileMonitor)
if t.active && isempty(t.notify)
t.active = false
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
lock(t.notify)
try
if t.active && isempty(t.notify)
t.active = false
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
end
finally
unlock(t.notify)
end
nothing
end
Expand Down Expand Up @@ -456,8 +496,13 @@ function wait(fdw::_FDWatcher; readable=true, writable=true)
if fdw.refcount == (0, 0) # !open
events = EOFError()
else
start_watching(fdw) # make sure the poll is active
events = stream_wait(fdw, fdw.notify)::FDEvent
lock(fdw.notify)
try
start_watching(fdw) # make sure the poll is active
events = stream_wait(fdw, fdw.notify)::FDEvent
finally
unlock(fdw.notify)
end
end
end
end
Expand All @@ -483,8 +528,14 @@ if Sys.iswindows()
end

function wait(pfw::PollingFileWatcher)
start_watching(pfw)
prevstat = stream_wait(pfw, pfw.notify)::StatStruct
lock(pfw.notify)
local prevstat
try
start_watching(pfw)
prevstat = stream_wait(pfw, pfw.notify)::StatStruct
finally
unlock(pfw.notify)
end
stop_watching(pfw)
if pfw.handle == C_NULL
return prevstat, EOFError()
Expand All @@ -496,10 +547,16 @@ function wait(pfw::PollingFileWatcher)
end

function wait(m::FileMonitor)
start_watching(m)
events = stream_wait(m, m.notify)::FileEvent
events |= FileEvent(m.events)
m.events = 0
lock(m.notify)
local events
try
start_watching(m)
events = stream_wait(m, m.notify)::FileEvent
events |= FileEvent(m.events)
m.events = 0
finally
unlock(m.notify)
end
stop_watching(m)
return events
end
Expand Down Expand Up @@ -608,7 +665,6 @@ This behavior of this function varies slightly across platforms. See
"""
watch_folder(s::AbstractString, timeout_s::Real=-1) = watch_folder(String(s), timeout_s)
function watch_folder(s::String, timeout_s::Real=-1)
wt = Condition()
fm = get!(watched_folders, s) do
return FolderMonitor(s)
end
Expand Down

0 comments on commit f0ffb29

Please sign in to comment.