From 88b083ffb11a76fc22dfdadbfaf7274c550b67aa Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Wed, 27 Sep 2023 18:02:28 +0200 Subject: [PATCH] =?UTF-8?q?Interrupt=20on=20windows!=20=F0=9F=98=AF=20(#60?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/test.yml | 2 +- README.md | 3 + src/Malt.jl | 24 +++--- test/interrupt.jl | 146 +++++++++++++++++++++++++++++++++++++ test/runtests.jl | 2 + 5 files changed, 167 insertions(+), 10 deletions(-) create mode 100644 test/interrupt.jl diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f4b6a9b..a77dc2c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ on: jobs: test: runs-on: ${{ matrix.os }} - timeout-minutes: 20 + timeout-minutes: 25 strategy: # Without setting this, a failing test cancels all others diff --git a/README.md b/README.md index 7379143..1cb8676 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,9 @@ In Distributed, only "process 1 can add or remove workers". Malt.jl does not hav ### Process isolation Malt.jl worker processes **do not inherit** `ENV` variables, command-line arguments or the Pkg environment from their host. +### Interrupt on Windows +Malt.jl supports **interrupting a worker process on Windows**, not just on UNIX. + ### Heterogenous computing Malt.jl does not have API like `@everywhere` or `Distributed.procs`: Malt is **not the right tool for homogenous computing**. diff --git a/src/Malt.jl b/src/Malt.jl index 6dd21df..84d0c75 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -103,7 +103,11 @@ mutable struct Worker <: AbstractWorker function Worker(; env=String[], exeflags=[]) # Spawn process cmd = _get_worker_cmd(; env, exeflags) - proc = open(cmd, "w+") + proc = open(Cmd( + cmd; + detach=true, + windows_hide=true, + ), "w+") # Keep internal list __iNtErNaL_get_running_procs() @@ -612,18 +616,20 @@ Send an interrupt signal to the worker process. This will interrupt the latest request (`remote_call*` or `remote_eval*`) that was sent to the worker. """ function interrupt(w::Worker) - if Sys.iswindows() - # TODO: not yet implemented - @warn "Malt.interrupt is not yet supported on Windows" - # _assert_is_running(w) - # _send_msg(w, MsgType.from_host_fake_interrupt, (), false) - nothing + if !isrunning(w) + @warn "Tried to interrupt a worker that has already shut down." summary(w) else - Base.kill(w.proc, Base.SIGINT) + if Sys.iswindows() + ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) + else + Base.kill(w.proc, Base.SIGINT) + end end + nothing end function interrupt(w::InProcessWorker) - schedule(w.latest_request_task, InterruptException(); error=true) + istaskdone(w.latest_request_task) || schedule(w.latest_request_task, InterruptException(); error=true) + nothing end diff --git a/test/interrupt.jl b/test/interrupt.jl new file mode 100644 index 0000000..ee2efe0 --- /dev/null +++ b/test/interrupt.jl @@ -0,0 +1,146 @@ +win = Sys.iswindows() + +@testset "Interrupt: $W" for W in (m.DistributedStdlibWorker, m.InProcessWorker, m.Worker) +# @testset "Interrupt: $W" for W in (m.Worker,) + + no_interrupt_possible = (Sys.iswindows() && W === m.DistributedStdlibWorker) || W === m.InProcessWorker + + + w = W() + + @test m.isrunning(w) + @test m.remote_call_fetch(&, w, true, true) + + + ex1 = quote + local x = 0.0 + for i = 1:4000 + k = [sqrt(abs(sin(cos(tan(x))))) ^ (1 / i) for z in 1:i] + x += sum(k) + end + x + end |> Base.remove_linenums! + + ex2 = quote + local x = 0.0 + for i in 1:20_000_000 + x += sqrt(abs(sin(cos(tan(x)))))^(1/i) + end + x + end |> Base.remove_linenums! + + ex3 = :(sleep(3)) |> Base.remove_linenums! + + # expressions in this list can be interrupted with a single Ctrl+C + # open a terminal and try this. + # (some expressions like `while true end` need multiple Ctrl+C in short succession to force throw SIGINT) + exs = no_interrupt_possible ? [ex1, ex3] : [ + ex1, + ex3, + ex1, # second time because interrupts should be reliable + ( + VERSION > v"1.10.0-0" ? [ex2, ex2] : [] + )..., + ] + + + + @testset "single interrupt $ex" for ex in exs + + f() = m.remote_eval(w, ex) + + t1 = @elapsed wait(f()) + t2 = @elapsed wait(f()) + + t3 = @elapsed begin + t = f() + @test !istaskdone(t) + sleep(.1) + m.interrupt(w) + r = try + fetch(t) + catch e + e + end + no_interrupt_possible || @test r isa TaskFailedException + end + + t4 = @elapsed begin + t = f() + @test !istaskdone(t) + sleep(.1) + m.interrupt(w) + r = try + fetch(t) + catch e + e + end + no_interrupt_possible || @test r isa TaskFailedException + end + + @info "test run" ex t1 t2 t3 t4 + no_interrupt_possible || @test t4 < min(t1,t2) * 0.8 + + # still running and responsive + @test m.isrunning(w) + @test m.remote_call_fetch(&, w, true, true) + + end + + + if !no_interrupt_possible + @testset "hard interrupt" begin + + function hard_interrupt(w) + finish_task = m.remote_call(&, w, true, true) + + done() = !m.isrunning(w) || istaskdone(finish_task) + + while !done() + for _ in 1:5 + print(" 🔥 ") + m.interrupt(w) + sleep(0.18) + if done() + break + end + end + sleep(1.5) + end + end + + + t = m.remote_eval(w, :(while true end)) + + @test !istaskdone(t) + @test m.isrunning(w) + + hard_interrupt(w) + + + @info "xx" istaskdone(t) m.isrunning(w) + + @test try + fetch(t) + catch e + e + end isa TaskFailedException + + # hello + @test true + + if Sys.iswindows() && VERSION < v"1.10.0-beta3" + # fixed by https://github.com/JuliaLang/julia/pull/51307 which will probably land in v1.10.0-beta3 + @test_broken m.isrunning(w) + else + # still running and responsive + @test m.isrunning(w) + @test m.remote_call_fetch(&, w, true, true) + end + end + end + + + m.stop(w) + @test !m.isrunning(w) +end diff --git a/test/runtests.jl b/test/runtests.jl index f445673..a684edf 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -4,6 +4,8 @@ using Test v() = @assert isempty(m.__iNtErNaL_get_running_procs()) +v() +include("interrupt.jl") v() include("basic.jl") v()