Skip to content

Commit

Permalink
Interrupt on windows! 😯 (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored Sep 27, 2023
1 parent 7fa9ec3 commit 88b083f
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
jobs:
test:
runs-on: ${{ matrix.os }}
timeout-minutes: 20
timeout-minutes: 25

strategy:
# Without setting this, a failing test cancels all others
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ In Distributed, only "process 1 can add or remove workers". Malt.jl does not hav
### Process isolation
Malt.jl worker processes **do not inherit** `ENV` variables, command-line arguments or the Pkg environment from their host.

### Interrupt on Windows
Malt.jl supports **interrupting a worker process on Windows**, not just on UNIX.

### Heterogenous computing
Malt.jl does not have API like `@everywhere` or `Distributed.procs`: Malt is **not the right tool for homogenous computing**.

Expand Down
24 changes: 15 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ mutable struct Worker <: AbstractWorker
function Worker(; env=String[], exeflags=[])
# Spawn process
cmd = _get_worker_cmd(; env, exeflags)
proc = open(cmd, "w+")
proc = open(Cmd(
cmd;
detach=true,
windows_hide=true,
), "w+")

# Keep internal list
__iNtErNaL_get_running_procs()
Expand Down Expand Up @@ -612,18 +616,20 @@ Send an interrupt signal to the worker process. This will interrupt the
latest request (`remote_call*` or `remote_eval*`) that was sent to the worker.
"""
function interrupt(w::Worker)
if Sys.iswindows()
# TODO: not yet implemented
@warn "Malt.interrupt is not yet supported on Windows"
# _assert_is_running(w)
# _send_msg(w, MsgType.from_host_fake_interrupt, (), false)
nothing
if !isrunning(w)
@warn "Tried to interrupt a worker that has already shut down." summary(w)
else
Base.kill(w.proc, Base.SIGINT)
if Sys.iswindows()
ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc)))
else
Base.kill(w.proc, Base.SIGINT)
end
end
nothing
end
function interrupt(w::InProcessWorker)
schedule(w.latest_request_task, InterruptException(); error=true)
istaskdone(w.latest_request_task) || schedule(w.latest_request_task, InterruptException(); error=true)
nothing
end


Expand Down
146 changes: 146 additions & 0 deletions test/interrupt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
win = Sys.iswindows()

@testset "Interrupt: $W" for W in (m.DistributedStdlibWorker, m.InProcessWorker, m.Worker)
# @testset "Interrupt: $W" for W in (m.Worker,)

no_interrupt_possible = (Sys.iswindows() && W === m.DistributedStdlibWorker) || W === m.InProcessWorker


w = W()

@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)


ex1 = quote
local x = 0.0
for i = 1:4000
k = [sqrt(abs(sin(cos(tan(x))))) ^ (1 / i) for z in 1:i]
x += sum(k)
end
x
end |> Base.remove_linenums!

ex2 = quote
local x = 0.0
for i in 1:20_000_000
x += sqrt(abs(sin(cos(tan(x)))))^(1/i)
end
x
end |> Base.remove_linenums!

ex3 = :(sleep(3)) |> Base.remove_linenums!

# expressions in this list can be interrupted with a single Ctrl+C
# open a terminal and try this.
# (some expressions like `while true end` need multiple Ctrl+C in short succession to force throw SIGINT)
exs = no_interrupt_possible ? [ex1, ex3] : [
ex1,
ex3,
ex1, # second time because interrupts should be reliable
(
VERSION > v"1.10.0-0" ? [ex2, ex2] : []
)...,
]



@testset "single interrupt $ex" for ex in exs

f() = m.remote_eval(w, ex)

t1 = @elapsed wait(f())
t2 = @elapsed wait(f())

t3 = @elapsed begin
t = f()
@test !istaskdone(t)
sleep(.1)
m.interrupt(w)
r = try
fetch(t)
catch e
e
end
no_interrupt_possible || @test r isa TaskFailedException
end

t4 = @elapsed begin
t = f()
@test !istaskdone(t)
sleep(.1)
m.interrupt(w)
r = try
fetch(t)
catch e
e
end
no_interrupt_possible || @test r isa TaskFailedException
end

@info "test run" ex t1 t2 t3 t4
no_interrupt_possible || @test t4 < min(t1,t2) * 0.8

# still running and responsive
@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)

end


if !no_interrupt_possible
@testset "hard interrupt" begin

function hard_interrupt(w)
finish_task = m.remote_call(&, w, true, true)

done() = !m.isrunning(w) || istaskdone(finish_task)

while !done()
for _ in 1:5
print(" 🔥 ")
m.interrupt(w)
sleep(0.18)
if done()
break
end
end
sleep(1.5)
end
end


t = m.remote_eval(w, :(while true end))

@test !istaskdone(t)
@test m.isrunning(w)

hard_interrupt(w)


@info "xx" istaskdone(t) m.isrunning(w)

@test try
fetch(t)
catch e
e
end isa TaskFailedException

# hello
@test true

if Sys.iswindows() && VERSION < v"1.10.0-beta3"
# fixed by https://github.com/JuliaLang/julia/pull/51307 which will probably land in v1.10.0-beta3
@test_broken m.isrunning(w)
else
# still running and responsive
@test m.isrunning(w)
@test m.remote_call_fetch(&, w, true, true)
end
end
end


m.stop(w)
@test !m.isrunning(w)
end
2 changes: 2 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ using Test

v() = @assert isempty(m.__iNtErNaL_get_running_procs())

v()
include("interrupt.jl")
v()
include("basic.jl")
v()
Expand Down

0 comments on commit 88b083f

Please sign in to comment.