Skip to content

Commit 681ae1a

Browse files
vchuravymaleadt
andauthored
Use Malt instead of Distributed.jl (#48)
Co-authored-by: Tim Besard <tim.besard@gmail.com>
1 parent 2cebb2a commit 681ae1a

File tree

3 files changed

+33
-37
lines changed

3 files changed

+33
-37
lines changed

Project.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ version = "1.0.2"
55

66
[deps]
77
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
8-
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
98
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
9+
Malt = "36869731-bdee-424d-aa32-cab38c994e3b"
1010
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
1111
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1212
Scratch = "6c6a2e73-6563-6170-7368-637461726353"
@@ -16,8 +16,8 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1616

1717
[compat]
1818
Dates = "1"
19-
Distributed = "1"
2019
IOCapture = "0.2.5"
20+
Malt = "1.2.1"
2121
Printf = "1"
2222
Random = "1"
2323
Scratch = "1.3.0"

src/ParallelTestRunner.jl

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module ParallelTestRunner
22

33
export runtests, addworkers, addworker
44

5-
using Distributed
5+
using Malt
66
using Dates
77
using Printf: @sprintf
88
using Base.Filesystem: path_separator
@@ -375,26 +375,28 @@ function test_exe()
375375
return test_exeflags
376376
end
377377

378+
# Map PIDs to logical worker IDs
379+
# Malt doesn't have a global worker ID, and PID make printing ugly
380+
const WORKER_IDS = Dict{Int32, Int32}()
381+
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]
382+
378383
"""
379384
addworkers(X; kwargs...)
380385
381-
Add `X` worker processes, with additional keyword arguments passed to `Distributed.addprocs`.
386+
Add `X` worker processes.
382387
"""
383-
function addworkers(X; kwargs...)
388+
addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X]
389+
function addworker(; env=Vector{Pair{String, String}}())
384390
exe = test_exe()
385-
exename = exe[1]
386391
exeflags = exe[2:end]
387392

388-
return withenv("JULIA_NUM_THREADS" => 1, "OPENBLAS_NUM_THREADS" => 1) do
389-
addprocs(X; exename, exeflags, kwargs...)
390-
end
391-
end
392-
addworker(; kwargs...) = addworkers(1; kwargs...)[1]
393+
push!(env, "JULIA_NUM_THREADS" => "1")
394+
# Malt already sets OPENBLAS_NUM_THREADS to 1
395+
push!(env, "OPENBLAS_NUM_THREADS" => "1")
393396

394-
function recycle_worker(p)
395-
rmprocs(p, waitfor = 30)
396-
397-
return nothing
397+
wrkr = Malt.Worker(;exeflags, env)
398+
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
399+
return wrkr
398400
end
399401

400402
"""
@@ -572,7 +574,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
572574
end
573575
jobs = clamp(jobs, 1, length(tests))
574576
println(stdout, "Running $jobs tests in parallel. If this is too many, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
575-
addworkers(min(jobs, length(tests)))
577+
workers = addworkers(min(jobs, length(tests)))
578+
nworkers = length(workers)
576579

577580
t0 = time()
578581
results = []
@@ -601,11 +604,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
601604
workerheader = "(Worker)"
602605
name_align = maximum(
603606
[
604-
textwidth(testgroupheader) + textwidth(" ") +
605-
textwidth(workerheader); map(
606-
x -> textwidth(x) +
607-
3 + ndigits(nworkers()), tests
608-
)
607+
textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader);
608+
map(x -> textwidth(x) + 5, tests)
609609
]
610610
)
611611

@@ -765,11 +765,11 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
765765
#
766766

767767
worker_tasks = Task[]
768-
for p in workers()
768+
for p in workers
769769
push!(worker_tasks, @async begin
770770
while !done
771771
# if a worker failed, spawn a new one
772-
if p === nothing
772+
if !Malt.isrunning(p)
773773
p = addworker()
774774
end
775775

@@ -780,16 +780,16 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
780780
wrkr = something(test_worker(test), p)
781781

782782
test_t0 = time()
783-
running_tests[test] = (wrkr, test_t0)
783+
running_tests[test] = (worker_id(wrkr), test_t0)
784784

785785
test, wrkr, test_t0
786786
end
787787

788788
# run the test
789-
put!(printer_channel, (:started, test, wrkr))
789+
put!(printer_channel, (:started, test, worker_id(wrkr)))
790790
result = try
791-
Distributed.remotecall_eval(Main, wrkr, :(import ParallelTestRunner))
792-
remotecall_fetch(runtest, wrkr, RecordType, test_runners[test], test,
791+
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
792+
Malt.remote_call_fetch(invokelatest, wrkr, runtest, RecordType, test_runners[test], test,
793793
init_code, io_ctx.color)
794794
catch ex
795795
if isa(ex, InterruptException)
@@ -806,27 +806,27 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
806806
# act on the results
807807
if result isa AbstractTestRecord
808808
@assert result isa RecordType
809-
put!(printer_channel, (:finished, test, wrkr, result))
809+
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
810810

811811
if memory_usage(result) > max_worker_rss
812812
# the worker has reached the max-rss limit, recycle it
813813
# so future tests start with a smaller working set
814-
p = recycle_worker(p)
814+
Malt.stop(wrkr)
815815
end
816816
else
817817
@assert result isa Exception
818-
put!(printer_channel, (:crashed, test, wrkr))
818+
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
819819
if do_quickfail
820820
stop_work()
821821
end
822822

823823
# the worker encountered some serious failure, recycle it
824-
p = recycle_worker(p)
824+
Malt.stop(wrkr)
825825
end
826826

827827
# get rid of the custom worker
828828
if wrkr != p
829-
recycle_worker(wrkr)
829+
Malt.stop(wrkr)
830830
end
831831

832832
delete!(running_tests, test)

test/runtests.jl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,17 @@ end
127127
end
128128
)
129129

130-
print("""NOTE: The next test is expected to crash a worker process,
131-
which may print some output to the terminal.
132-
""")
133130
io = IOBuffer()
134131
@test_throws Test.FallbackTestSetException("Test run finished with errors") begin
135132
runtests(ParallelTestRunner, ["--verbose"]; custom_tests, stdout=io, stderr=io)
136133
end
137-
println()
138134

139135
str = String(take!(io))
140136
@test contains(str, r"abort .+ started at")
141137
@test contains(str, r"abort .+ crashed at")
142138
@test contains(str, "FAILURE")
143139
@test contains(str, "Error During Test")
144-
@test contains(str, "ProcessExitedException")
140+
@test contains(str, "Malt.TerminatedWorkerException")
145141
end
146142

147143
@testset "test output" begin

0 commit comments

Comments
 (0)