-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wait()
with timeout
#36217
Comments
Maybe something like function wait_timeout(c, timeout)
ch = Channel(2)
@async begin
res = wait(c)
push!(ch, Some(res))
end
@async begin
sleep(timeout)
push!(ch, nothing)
end
res = take!(ch)
return res
end That way if the return value is |
The two implementations suggested here seem to leak tasks. It might be possible to avoid this by using In general, it'd be nice if we can take concurrency API seriously #33248 and avoid unstructured concurrency piling up. Timeout is one of the biggest success story of structured concurrency, after all: Timeouts and cancellation for humans — njs blog |
What does that mean? |
It means that tasks spawn by the function are still running after the function call ends. This violates the black box rule; i.e., all the relevant computations end at the end of the function (see the Julep and #33248 (comment)). This is bad because the errors occur in the leaked tasks are silently ignored and there is no backpressure for flooding the task scheduler. |
Can't that be solved by wrapping the entire function in a |
Don't we need to cancel the other task for that? That's why the black box rule and task cancellation are the two must-have properties of the structured concurrency. (But it might be possible to solve this particular issue by just using |
Ah, you're right of course. I got confused and thought we were notifying @JeffBezanson is flooding the task scheduler with tasks all sitting and waiting on things an issue for the runtime? For instance, if I am timing out a lot, I could imagine adding a new task to the scheduler every couple of seconds, and for a long-running instance (like a StorageServer) this could add up to thousands of tasks. |
Yeah, notifying |
Even without closing the timer, the timer will eventually elapse, and the task will go away. It may throw an error, but that's fine, because we're not catching them. ;) Cleaner to close the timer, for sure. |
What happens if you (Edit: edited twice; first I thought it was stupid but then realized that it might happen for re- |
Concretely: c :: Channel
wait_timeout(c, 0.1) === nothing && return
x = take!(c)
wait_timeout(c, 0.1) === nothing && return # the timer from the first `wait_timeout` can fire here
y = take!(c) |
The timer within the first |
It's taking two things from the same channel |
Oh, I understand now. Your concern is that if Perhaps instead we can instead using Test
function wait_timeout(c, timeout)
ch = Channel{Bool}(2)
happy_task = @async begin
wait(c)
push!(ch, true)
end
timeout_task = @async begin
sleep(timeout)
push!(ch, false)
# Stop the other task from waiting anymore
Base.throwto(happy_task, InterruptException())
end
return take!(ch)
end
# First, test no wait, wait
@testset "no wait, wait" begin
c = Channel(10)
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
@test wait_timeout(c, 1.0) == false
end
# Next, test wait, no wait
@testset "wait, no wait" begin
c = Channel(10)
@test wait_timeout(c, 1.0) == false
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
end
# Next, test no wait, no wait, after sleeping for more than timeout:
@testset "no wait, no wait" begin
c = Channel(10)
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
sleep(2.0)
put!(c, 2)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 2
end
# Finally, test wait, wait
@testset "wait, wait" begin
c = Channel(10)
@test wait_timeout(c, 1.0) == false
@test wait_timeout(c, 1.0) == false
end |
I guess the question is whether we are absolutely positive that Is it so bad to continue to just wait in the happy task? The downside are that the task and the channel live longer than strictly necessary, I guess? |
IIUC it's unsafe to call Reading the code briefly, my impression is that these functions assume the ownership of the thread-local queue. So, I guess it's unsafe to run them outside the scheduler code. But |
OK, so here is what I meant by function wait_timeout(c::Channel, timeout::Real)
cancel = Atomic{Bool}(false)
isready(c) && return true
@sync begin
timer = Timer(timeout)
try
@async begin
try
wait(timer)
catch
return
end
cancel[] = true
lock(c.cond_wait) do
notify(c.cond_wait)
end
end
lock(c)
try
while !isready(c)
check_channel_state(c)
wait(c.cond_wait)
cancel[] && return false
end
finally
unlock(c)
end
finally
close(timer)
end
end
return true
end I don't think it leaks a task (or more precisely a leaked task would be cleaned up soon enough; it'd be nice if It passes @staticfloat's tests (thanks for sharing those, BTW!). |
Alright, this all inspired me to read up on the various links that @tkf posted. All the structured concurrency stuff is interesting, but in terms of a short term solution that can be added incrementally to packages, I liked .Net's cancellation framework best, by far. We also need something like that for the language server rather sooner than later, and that is the same framework that the VS Code team is using across VS Code. Sooo, I created https://github.com/davidanthoff/CancellationTokens.jl :) For now it is not thread safe, so can only be used with single threaded tasks, but the .Net original is thread safe and the source MIT licensed, so we just need to copy their design over. But the public API of the package should be more or less done. Feedback and help welcome! |
Mission accomplished! 😆
So my initial response was "why not just use |
There is also support for waiting on a cancel token, which I don't think would work with a pure atomic, right? And the .Net implementation supports callback handlers, which I haven't added, but I think also need more than just an atomic. |
Ah, I see. That makes sense. |
My implementation I wonder if it makes sense to define internal interface something like wait′(waitable, cancel) -> success::Bool
_notify(waitable) where struct NoCancel end
Base.getindex(::NoCancel) = false It's pretty straightforward to implement this for using Base.Threads: Atomic
using Base: check_channel_state
function _wait(t::Task, cancel)
if !istaskdone(t)
lock(t.donenotify)
try
while !istaskdone(t)
wait(t.donenotify)
cancel[] && return false # added this line
end
finally
unlock(t.donenotify)
end
end
return true # returning `true` instead
end
function _notify(t::Task)
lock(t.donenotify) do
notify(t.donenotify)
end
end
function wait′(t::Task, cancel = NoCancel())
t === current_task() && error("deadlock detected: cannot wait on current task")
ok = _wait(t, cancel)
if istaskfailed(t)
throw(TaskFailedException(t))
end
return ok
end
function _notify(c::Channel)
lock(c.cond_wait) do
notify(c.cond_wait)
end
end
function wait′(c::Channel, cancel = NoCancel())
isready(c) && return true # returning `true` instead
lock(c)
try
while !isready(c)
check_channel_state(c)
wait(c.cond_wait)
cancel[] && return false # added this line
end
finally
unlock(c)
end
return true # returning `true` instead
end With this implementation, we can now define function waitfirst(waitables...)
winner = Ref{Any}()
cancel = Atomic{Bool}(false)
@sync for w in waitables
@async begin
if wait′($w, $cancel)
$cancel[] = true
foreach(_notify, $waitables)
$winner[] = $w
end
end
end
return winner[]
end I think it's possible to define function wait_timeout(w, timeout::Real)
t = Timer()
try
return waitfirst(w, t) === w
finally
close(T)
end
end |
I don't think a function like one of these should be implemented by notifying the waited-for object. Others might be waiting for the same objects, and they would get spurious wakeups. |
What are the other implementation strategies? Maybe we can define |
That seems rather complex, like you're trying to put a round peg (old multiplexing event systems) into a square hole (our Task-based ownership design). Our objects are intended to essentially already have both capabilities through the existing Task system, which is why (unlike Go and C#) we have been trying not to additionally bolt things like CancellationTokens and function wait_until(c, timeout::Real) # `c` is any object that is both wait-able and cancel-able (e.g. any IO or a Channel, etc.)
timer = Timer(timeout) do t
isready(c) || close(c)
end
try
return wait(c)
finally
close(timer)
end
end Short, sweet, simple, no minor resource leaks—and essentially how the timeouts are implemented for the various resources in the FileWatching module. |
So essentially you I don't see, though, how that would make something like a cancellation token framework entirely unnecessary, given that you can only really close a thing, not an operation? |
Because you can also only |
Don't you need function wait_until(c::Channel, timeout::Real)
w = @async wait(c)
timer = Timer(timeout) do t
isready(c) || close(w)
end
try
return wait(w)
finally
close(timer)
end
end I'd be very happy if we have |
That code isn't the pattern that I posted for your use case: using
I'm going to need a reference for that. I tried google, but the titles of the results I got back for "why is a state machine bad" seemed to follow the general pattern "why I thought they were {bad | hard to write | maintain | complicated | use | debug | antiquated}, but came to realize they led to far better code" |
Let's step away from the example of retransmitting on failure and think of the more general case; I want to |
That seems like poor evidence, given that we use a platform abstraction layer (libuv) to improve upon the timeout capabilities from those APIs. As we talked about offline, there are good design patterns from only using the few primitives we have already. |
These are two examples of "structured programming" approach vs state machine:
As a more specific example in concurrent programming, Deadlocks in non-hierarchical CSP - Roman Elizarov - Medium is a nice post explaining how tricky communicating finite state machines can be. There are a few links to relevant discussions on |
Arguably, the entire point of both closures and coroutines is to express what is ultimately just a state machine in a more convenient, less error prone way. After going to that much language design trouble to not force users to explicitly write out state machines, it seems like a bit of weak sauce to say that here it's suddenly just fine. |
Thanks for the links. The conclusion in the trio link particularly seems be exactly the same as mine. That first post (Medium) includes a CFSM—it's not an attempt to avoid them, only to demonstrate problems with getting them wrong. And the second one (trio) shows ways to simplify most FSM into linear form (by breaking them up into their independent parts), but hypothesizes that they can't be simplified if any cleanup, timeout, or retry actions are present aside from I also appreciated this conclusion to the trio posts:
|
Following my own advice at JuliaLang#36217. Avoids a minor temporary resource leak.
Following my own advice at JuliaLang#36217. Avoids a minor temporary resource leak.
I had also asked about this on Discourse, and I agree that there should be a way to timeout on a wait/read. I would love to be able to use julia to run my instruments, but this has proven impossible without this feature. In my particular case, I want to work with a SCPI instrument over TCP. This involves sending text commands and reading back the response. If I make a typo in the command, I would want the read to time out so that I can try again. I find the given solution of needing to close the socket (to abort the read) and recreate everything over again to be pretty poor. As an example (this could be interactive or not, but I might not be able to send an interrupt): tcp = connect(host,port)
println(tcp, ":MEASURE:VOLT?")
voltage = readline(tcp) # all good here
println(tcp, ":MEASURE:VILT?") # oh no!
voltage2 = readline(tcp) # blocks forever Note that I tried the following: function read_timeout(tcp::TCPSocket, timeout=10)
# read asynchronously
ch = Channel{String}(c -> put!(c,readline(tcp)), 0)
# set timeout timer
tm = Timer(t -> isready(ch) || close(ch), timeout)
# wait until value is read or timeout
out = nothing
try
out = take!(ch)
close(ch)
catch
@info "Command timed out!"
finally
close(tm)
end
return out
end But it stops working when a timeout does happen, I can't seem to read from the socket anymore, even though it is still open. |
For what it's worth, inspired by PR #39027, I was able to write a function to read from a function read_timeout(tcp::TCPSocket, timeout=5)
# create timeout timer
status_before = tcp.status
tm = Timer(timeout) do t
lock(tcp.cond)
tcp.status = Base.StatusEOF
notify(tcp.cond)
unlock(tcp.cond)
end
# read synchronously
out = readline(tcp)
# check timer
if isopen(tm) # readline worked
close(tm)
return out
else # got EOF because of timer
tcp.status = status_before
@info "Read timed out!"
return nothing
end
end |
You should not mutate |
It's not that the connection is unreliable, it's that sometimes there is nothing to read and I want to get control back (being able to write again) without having to close the socket. If there is a better way to do this (having a timeout read), I would very much appreciate knowing how. |
|
An example where I want this: # Attempt to gracefully kill the process, because force-killing it
# will get reported as a crash in our test suite.
please_die_now(process)
# If the process failed to die after 5 seconds, forcibly kill it, since it's possibly stuck,
# which indicates a bug, but we do want to move on with the rest of our tests.
wait_until(process, 5)
if !process_exited(process)
kill(process, 9)
end
# ...
run_more_test(...) |
Just adding that I think that spelling this as |
That would be awesome, I missed having that many times in the past. Would also be super helpful with channels, for interaction with hardware, in scenarios with distributed workers (remotecall) and so on. |
@NHDaly's example could be written as: t = @spawn begin
# Attempt to gracefully kill the process, because force-killing it
# will get reported as a crash in our test suite.
please_die_now(process)
sleep(5)
# If the process failed to die after 5 seconds, forcibly kill it, since it's possibly stuck,
# which indicates a bug, but we do want to move on with the rest of our tests.
if !process_exited(process)
kill(process, 9)
end
end
Base.errormonitor(t)
wait(process) # hang if kill fails or if the task errors
# ...
run_more_test(...) Which notably hangs if the process does not terminate after kill |
Triage talked about this extensively and didn't come to any conclusions. |
See below. |
Julia now has |
Oh, I hasn't seen that - thanks @gbaraldi ! |
oh, me neither! Yeah, just adding a timer in there seems like it would address this well! Can we add support for that? |
If you're using Fundamentally if you cannot interrupt tasks safely, you have to release the object it's waiting on to end the task. The user just has to manage the waitable object to prevent leaks. As previous posts have said, |
I now have a workarounds in ParallelProcessingTools now ( |
Triage wants this (adding support for |
The one drawback with |
It appears that we do not support timeouts in
wait()
. To motivate the need for this, it's a pretty critical basic functionality for writing resilient servers; blocking forever due to a dropped packet is unacceptable, so servers must instead be written in an explicitly polling style, which is bad for latency and for CPU usage.Implementation-wise,
wait(c)
waits upon a condition for anotify()
to push it back onto the scheduler's work queue. It seems to me that timeouts can be implemented by starting a second task thatsleep()
's for a certain amount of time, then unconditionallynotify()
's that condition. The consumer of thewait()
call would then need to disambiguate a timeout from an actual event.Taking a look at the different things that can be
wait()
'ed upon, most are implemented in terms ofCondition
objects, so a very simple@async (sleep(timeout); notify(c))
should work, and the_FDWatcher
fromFileWatching
also notifies a condition in the end, therefore I believe a uniform API is possible here.The text was updated successfully, but these errors were encountered: