Skip to content

Commit

Permalink
[FileWatching] fix FileMonitor similarly and improve pidfile reliability
Browse files Browse the repository at this point in the history
Previously pidfile used the same poll_interval as sleep to detect if
this code made any concurrency mistakes, but we do not really need to do
that once FileMonitor is fixed to be reliable in the presence of
parallel concurrency (instead of using watch_file).
  • Loading branch information
vtjnash committed Sep 30, 2024
1 parent e500754 commit b6e0136
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 81 deletions.
108 changes: 43 additions & 65 deletions stdlib/FileWatching/src/FileWatching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ const UV_CHANGE = Int32(2)
struct FileEvent
renamed::Bool
changed::Bool
timedout::Bool
timedout::Bool # aka canceled
FileEvent(r::Bool, c::Bool, t::Bool) = new(r, c, t)
end
FileEvent() = FileEvent(false, false, true)
FileEvent(flags::Integer) = FileEvent((flags & UV_RENAME) != 0,
(flags & UV_CHANGE) != 0,
false)
iszero(flags))
|(a::FileEvent, b::FileEvent) =
FileEvent(a.renamed | b.renamed,
a.changed | b.changed,
Expand Down Expand Up @@ -80,32 +80,35 @@ iswritable(f::FDEvent) = f.writable

mutable struct FileMonitor
@atomic handle::Ptr{Cvoid}
file::String
notify::Base.ThreadSynchronizer
events::Int32
active::Bool
const file::String
const notify::Base.ThreadSynchronizer
events::Int32 # accumulator for events that occurred since the last wait call, similar to Event with autoreset
ioerrno::Int32 # record the error, if any occurs (unlikely)
FileMonitor(file::AbstractString) = FileMonitor(String(file))
function FileMonitor(file::String)
handle = Libc.malloc(_sizeof_uv_fs_event)
this = new(handle, file, Base.ThreadSynchronizer(), 0, false)
this = new(handle, file, Base.ThreadSynchronizer(), 0, 0)
associate_julia_struct(handle, this)
iolock_begin()
err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Libc.free(handle)
throw(_UVError("FileMonitor", err))
uv_error("FileMonitor", err)
end
iolock_end()
finalizer(uvfinalize, this)
uv_error("FileMonitor (start)",
ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32),
this.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, file, 0))
iolock_end()
return this
end
end

mutable struct FolderMonitor
@atomic handle::Ptr{Cvoid}
# notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError}
notify::Base.ThreadSynchronizer
channel::Vector{Any} # eltype = Pair{String, FileEvent}
const notify::Base.ThreadSynchronizer
const channel::Vector{Any} # eltype = Pair{String, FileEvent}
FolderMonitor(folder::AbstractString) = FolderMonitor(String(folder))
function FolderMonitor(folder::String)
handle = Libc.malloc(_sizeof_uv_fs_event)
Expand Down Expand Up @@ -152,9 +155,9 @@ Base.stat(pfw::PollingFileWatcher) = Base.checkstat(@lock pfw.notify pfw.prev_st

mutable struct _FDWatcher
@atomic handle::Ptr{Cvoid}
fdnum::Int # this is NOT the file descriptor
const fdnum::Int # this is NOT the file descriptor
refcount::Tuple{Int, Int}
notify::Base.ThreadSynchronizer
const notify::Base.ThreadSynchronizer
events::Int32
active::Tuple{Bool, Bool}

Expand Down Expand Up @@ -275,7 +278,7 @@ end

mutable struct FDWatcher
# WARNING: make sure `close` has been manually called on this watcher before closing / destroying `fd`
watcher::_FDWatcher
const watcher::_FDWatcher
mask::FDEvent
function FDWatcher(fd::RawFD, readable::Bool, writable::Bool)
return FDWatcher(fd, FDEvent(readable, writable, false, false))
Expand Down Expand Up @@ -368,9 +371,8 @@ end
function _uv_hook_close(uv::FileMonitor)
lock(uv.notify)
try
uv.active = false
Libc.free(@atomicswap :monotonic uv.handle = C_NULL)
notify(uv.notify, FileEvent())
notify(uv.notify)
finally
unlock(uv.notify)
end
Expand Down Expand Up @@ -399,10 +401,12 @@ function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, st
lock(t.notify)
try
if status != 0
t.ioerrno = status
notify_error(t.notify, _UVError("FileMonitor", status))
else
t.events |= events
notify(t.notify, FileEvent(events))
uvfinalize(t)
elseif events != t.events
events = t.events |= events
notify(t.notify, all=false)
end
finally
unlock(t.notify)
Expand Down Expand Up @@ -535,35 +539,6 @@ function start_watching(t::_FDWatcher)
nothing
end

function start_watching(t::FileMonitor)
iolock_begin()
t.handle == C_NULL && throw(ArgumentError("FileMonitor is closed"))
if !t.active
uv_error("FileMonitor (start)",
ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32),
t.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, t.file, 0))
t.active = true
end
iolock_end()
nothing
end

function stop_watching(t::FileMonitor)
iolock_begin()
lock(t.notify)
try
if t.active && isempty(t.notify)
t.active = false
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
end
finally
unlock(t.notify)
end
iolock_end()
nothing
end

# n.b. this _wait may return spuriously early with a timedout event
function _wait(fdw::_FDWatcher, mask::FDEvent)
iolock_begin()
Expand Down Expand Up @@ -705,26 +680,23 @@ function wait(pfw::PollingFileWatcher)
end

function wait(m::FileMonitor)
iolock_begin()
m.handle == C_NULL && throw(EOFError())
preserve_handle(m)
lock(m.notify)
local events
try
start_watching(m)
iolock_end()
events = wait(m.notify)::FileEvent
events |= FileEvent(m.events)
m.events = 0
unlock(m.notify)
iolock_begin()
lock(m.notify)
while true
m.handle == C_NULL && throw(EOFError())
events = @atomicswap :not_atomic m.events = 0
events == 0 || return FileEvent(events)
if m.ioerrno != 0
uv_error("FileMonitor", m.ioerrno)
end
wait(m.notify)
end
finally
unlock(m.notify)
unpreserve_handle(m)
end
stop_watching(m)
iolock_end()
return events
end

function wait(m::FolderMonitor)
Expand All @@ -743,6 +715,7 @@ function wait(m::FolderMonitor)
end
return evt::Pair{String, FileEvent}
end
Base.take!(m::FolderMonitor) = wait(m) # Channel-like API


"""
Expand Down Expand Up @@ -823,7 +796,12 @@ function watch_file(s::String, timeout_s::Float64=-1.0)
close(fm)
end
end
return wait(fm)
try
return wait(fm)
catch ex
ex isa EOFError && return FileEvent()
rethrow()
end
finally
close(fm)
@isdefined(timer) && close(timer)
Expand Down Expand Up @@ -851,7 +829,7 @@ This behavior of this function varies slightly across platforms. See
"""
watch_folder(s::AbstractString, timeout_s::Real=-1) = watch_folder(String(s), timeout_s)
function watch_folder(s::String, timeout_s::Real=-1)
fm = get!(watched_folders, s) do
fm = @lock watched_folders get!(watched_folders[], s) do
return FolderMonitor(s)
end
local timer
Expand Down Expand Up @@ -898,12 +876,12 @@ It is not recommended to do this while another task is waiting for
"""
unwatch_folder(s::AbstractString) = unwatch_folder(String(s))
function unwatch_folder(s::String)
fm = pop!(watched_folders, s, nothing)
fm = @lock watched_folders pop!(watched_folders[], s, nothing)
fm === nothing || close(fm)
nothing
end

const watched_folders = Dict{String, FolderMonitor}()
const watched_folders = Lockable(Dict{String, FolderMonitor}())

"""
poll_file(path::AbstractString, interval_s::Real=5.007, timeout_s::Real=-1) -> (previous::StatStruct, current)
Expand Down
46 changes: 35 additions & 11 deletions stdlib/FileWatching/src/pidfile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ module Pidfile
export mkpidlock, trymkpidlock

using Base:
IOError, UV_EEXIST, UV_ESRCH,
IOError, UV_EEXIST, UV_ESRCH, UV_ENOENT,
Process

using Base.Filesystem:
File, open, JL_O_CREAT, JL_O_RDWR, JL_O_RDONLY, JL_O_EXCL,
rename, samefile, path_separator

using ..FileWatching: watch_file
using ..FileWatching: FileMonitor
using Base.Sys: iswindows

"""
Expand Down Expand Up @@ -256,19 +256,43 @@ function open_exclusive(path::String;
end
end
# fall-back: wait for the lock

watch = Lockable(Core.Box(nothing))
while true
# start the file-watcher prior to checking for the pidfile existence
t = @async try
watch_file(path, poll_interval)
# now try again to create it
# try to start the file-watcher prior to checking for the pidfile existence
watch = try
FileMonitor(path)
catch ex
isa(ex, IOError) || rethrow(ex)
sleep(poll_interval) # if the watch failed, convert to just doing a sleep
ex.code != UV_ENOENT # if the file was deleted in the meantime, don't sleep at all, even if the lock fails
end
timeout = nothing
if watch isa FileMonitor && stale_age > 0
let watch = watch
timeout = Timer(stale_age) do t
close(watch)
end
end
end
try
file = tryopen_exclusive(path, mode)
file === nothing || return file
if watch isa FileMonitor
try
Base.wait(watch) # will time-out after stale_age passes
catch ex
isa(ex, EOFError) || isa(ex, IOError) || rethrow(ex)
end
end
if watch === true # if the watch failed, convert to just doing a sleep
sleep(poll_interval)
end
finally
# something changed about the path, so watch is now possibly monitoring the wrong file handle
# it will need to be recreated just before the next tryopen_exclusive attempt
timeout isa Timer && close(timeout)
watch isa FileMonitor && close(watch)
end
# now try again to create it
file = tryopen_exclusive(path, mode)
file === nothing || return file
Base.wait(t) # sleep for a bit before trying again
if stale_age > 0 && stale_pidfile(path, stale_age, refresh)
# if the file seems stale, try to remove it before attempting again
# set stale_age to zero so we won't attempt again, even if the attempt fails
Expand Down
11 changes: 6 additions & 5 deletions stdlib/FileWatching/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ file = joinpath(dir, "afile.txt")

# initialize a watch_folder instance and create afile.txt
function test_init_afile()
@test isempty(FileWatching.watched_folders)
watched_folders = FileWatching.watched_folders
@test @lock watched_folders isempty(watched_folders[])
@test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent()))
@test @elapsed(@test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent()))) <= 0.5
@test length(FileWatching.watched_folders) == 1
@test @lock(watched_folders, length(FileWatching.watched_folders[])) == 1
@test unwatch_folder(dir) === nothing
@test isempty(FileWatching.watched_folders)
@test @lock watched_folders isempty(watched_folders[])
@test 0.002 <= @elapsed(@test(watch_folder(dir, 0.004) == ("" => FileWatching.FileEvent())))
@test 0.002 <= @elapsed(@test(watch_folder(dir, 0.004) == ("" => FileWatching.FileEvent()))) <= 0.5
@test unwatch_folder(dir) === nothing
Expand Down Expand Up @@ -204,7 +205,7 @@ function test_init_afile()
@test unwatch_folder(dir) === nothing
@test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent()))
@test 0.9 <= @elapsed(@test(watch_folder(dir, 1) == ("" => FileWatching.FileEvent())))
@test length(FileWatching.watched_folders) == 1
@test @lock(watched_folders, length(FileWatching.watched_folders[])) == 1
nothing
end

Expand Down Expand Up @@ -440,7 +441,7 @@ end
(StatStruct(), EOFError()))) > 3)

unwatch_folder(dir)
@test isempty(FileWatching.watched_folders)
@test @lock FileWatching.watched_folders isempty(FileWatching.watched_folders[])
rm(file)
rm(dir)

Expand Down

0 comments on commit b6e0136

Please sign in to comment.