diff --git a/base/clusterserialize.jl b/base/clusterserialize.jl index d8bdc25e5151f..93d2e2e5cbe47 100644 --- a/base/clusterserialize.jl +++ b/base/clusterserialize.jl @@ -9,9 +9,15 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer counter::Int table::ObjectIdDict - sent_objects::Set{UInt64} # used by serialize (track objects sent) + pid::Int # Worker we are connected to. + tn_obj_sent::Set{UInt64} # TypeName objects sent + glbs_sent::Dict{UInt64, UInt64} # (key,value) -> (object_id, hash_value) + glbs_in_tnobj::Dict{UInt64, Vector{Symbol}} # Track globals referenced in + # anonymous functions. + anonfunc_id::UInt64 - ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(), Set{UInt64}()) + ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(), Base.worker_id_from_socket(io), + Set{UInt64}(), Dict{UInt64, UInt64}(), Dict{UInt64, Vector{Symbol}}(), 0) end ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io) @@ -28,6 +34,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName}) else tn = deserialize_typename(s, number) end + + # retrieve arrays of global syms sent if any and deserialize them all. + foreach(sym->deserialize_global_from_main(s, sym), deserialize(s)) return tn end @@ -36,13 +45,116 @@ function serialize(s::ClusterSerializer, t::TypeName) writetag(s.io, TYPENAME_TAG) identifier = object_number(t) - send_whole = !(identifier in s.sent_objects) + send_whole = !(identifier in s.tn_obj_sent) serialize(s, send_whole) write(s.io, identifier) if send_whole + # Track globals referenced in this anonymous function. + # This information is used to resend modified globals when we + # only send the identifier. + prev = s.anonfunc_id + s.anonfunc_id = identifier serialize_typename(s, t) - push!(s.sent_objects, identifier) + s.anonfunc_id = prev + push!(s.tn_obj_sent, identifier) + finalizer(t, x->cleanup_tname_glbs(s, identifier)) end -# println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent") + + # Send global refs if required. + syms = syms_2b_sent(s, identifier) + serialize(s, syms) + foreach(sym->serialize_global_from_main(s, sym), syms) nothing end + +function serialize(s::ClusterSerializer, g::GlobalRef) + # Record if required and then invoke the default GlobalRef serializer. + sym = g.name + if g.mod === Main && isdefined(g.mod, sym) + v = getfield(Main, sym) + if !isa(v, DataType) && !isa(v, Module) && + (binding_module(Main, sym) === Main) && (s.anonfunc_id != 0) + push!(get!(s.glbs_in_tnobj, s.anonfunc_id, []), sym) + end + end + + invoke(serialize, Tuple{AbstractSerializer, GlobalRef}, s, g) +end + +# Send/resend a global object if +# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or, +# b) hash value has changed or +# c) is a bitstype +function syms_2b_sent(s::ClusterSerializer, identifier) + lst = Symbol[] + check_syms = get(s.glbs_in_tnobj, identifier, []) + for sym in check_syms + v = getfield(Main, sym) + + if isbits(v) + push!(lst, sym) + else + oid = object_id(v) + if haskey(s.glbs_sent, oid) + # We have sent this object before, see if it has changed. + s.glbs_sent[oid] != hash(v) && push!(lst, sym) + else + push!(lst, sym) + end + end + end + return unique(lst) +end + +function serialize_global_from_main(s::ClusterSerializer, sym) + v = getfield(Main, sym) + + oid = object_id(v) + record_v = true + if isbits(v) + record_v = false + elseif !haskey(s.glbs_sent, oid) + # set up a finalizer the first time this object is sent + try + finalizer(v, x->delete_global_tracker(s,x)) + catch ex + # Do not track objects that cannot be finalized. + if isa(ex, ErrorException) + record_v = false + else + rethrow(ex) + end + end + end + record_v && (s.glbs_sent[oid] = hash(v)) + + serialize(s, isconst(Main, sym)) + serialize(s, v) +end + +function deserialize_global_from_main(s::ClusterSerializer, sym) + sym_isconst = deserialize(s) + v = deserialize(s) + if sym_isconst + eval(Main, :(const $sym = $v)) + else + eval(Main, :($sym = $v)) + end +end + +function delete_global_tracker(s::ClusterSerializer, v) + oid = object_id(v) + if haskey(s.glbs_sent, oid) + delete!(s.glbs_sent, oid) + end + + # TODO: A global binding is released and gc'ed here but it continues + # to occupy memory on the remote node. Would be nice to release memory + # if possible. +end + +function cleanup_tname_glbs(s::ClusterSerializer, identifier) + delete!(s.glbs_in_tnobj, identifier) +end + +# TODO: cleanup from s.tn_obj_sent diff --git a/base/event.jl b/base/event.jl index a6a67c8074353..2b671aaa1d826 100644 --- a/base/event.jl +++ b/base/event.jl @@ -66,10 +66,12 @@ n_waiters(c::Condition) = length(c.waitq) @schedule Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. +Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks +started with an `@schedule`. """ macro schedule(expr) - expr = :(()->($expr)) - :(enq_work(Task($(esc(expr))))) + thunk = esc(:(()->($expr))) + :(enq_work(Task($thunk))) end ## scheduler and work queue diff --git a/base/expr.jl b/base/expr.jl index 5d255314617c9..45ca5010ab7ef 100644 --- a/base/expr.jl +++ b/base/expr.jl @@ -188,37 +188,6 @@ end ## some macro utilities ## -find_vars(e) = find_vars(e, []) -function find_vars(e, lst) - if isa(e,Symbol) - if current_module()===Main && isdefined(e) - # Main runs on process 1, so send globals from there, excluding - # things defined in Base. - if !isdefined(Base,e) || eval(Base,e)!==eval(current_module(),e) - push!(lst, e) - end - end - elseif isa(e,Expr) && e.head !== :quote && e.head !== :top && e.head !== :core - for x in e.args - find_vars(x,lst) - end - end - lst -end - -# wrap an expression in "let a=a,b=b,..." for each var it references -localize_vars(expr) = localize_vars(expr, true) -function localize_vars(expr, esca) - v = find_vars(expr) - # requires a special feature of the front end that knows how to insert - # the correct variables. the list of free variables cannot be computed - # from a macro. - if esca - v = map(esc,v) - end - Expr(:localize, expr, v...) -end - function pushmeta!(ex::Expr, sym::Symbol, args::Any...) if isempty(args) tag = sym diff --git a/base/multi.jl b/base/multi.jl index b60019ee1c5b7..1fbc440db3306 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1937,8 +1937,8 @@ end ## higher-level functions: spawn, pmap, pfor, etc. ## let nextidx = 0 - global chooseproc - function chooseproc(thunk::Function) + global nextproc + function nextproc() p = -1 if p == -1 p = workers()[(nextidx % nworkers()) + 1] @@ -1950,16 +1950,16 @@ end spawnat(p, thunk) = sync_add(remotecall(thunk, p)) -spawn_somewhere(thunk) = spawnat(chooseproc(thunk),thunk) +spawn_somewhere(thunk) = spawnat(nextproc(),thunk) macro spawn(expr) - expr = localize_vars(esc(:(()->($expr))), false) - :(spawn_somewhere($expr)) + thunk = esc(:(()->($expr))) + :(spawn_somewhere($thunk)) end macro spawnat(p, expr) - expr = localize_vars(esc(:(()->($expr))), false) - :(spawnat($(esc(p)), $expr)) + thunk = esc(:(()->($expr))) + :(spawnat($(esc(p)), $thunk)) end """ @@ -1969,11 +1969,8 @@ Equivalent to `fetch(@spawn expr)`. See [`fetch`](@ref) and [`@spawn`](@ref). """ macro fetch(expr) - expr = localize_vars(esc(:(()->($expr))), false) - quote - thunk = $expr - remotecall_fetch(thunk, chooseproc(thunk)) - end + thunk = esc(:(()->($expr))) + :(remotecall_fetch($thunk, nextproc())) end """ @@ -1983,8 +1980,8 @@ Equivalent to `fetch(@spawnat p expr)`. See [`fetch`](@ref) and [`@spawnat`](@ref). """ macro fetchfrom(p, expr) - expr = localize_vars(esc(:(()->($expr))), false) - :(remotecall_fetch($expr, $(esc(p)))) + thunk = esc(:(()->($expr))) + :(remotecall_fetch($thunk, $(esc(p)))) end """ @@ -2140,7 +2137,7 @@ macro parallel(args...) else thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r)))) end - localize_vars(thecall) + thecall end @@ -2286,3 +2283,25 @@ function getindex(r::RemoteChannel, args...) end return remotecall_fetch(getindex, r.where, r, args...) end + +""" + clear!(syms, pids=workers(); mod=Main) + +Clears global bindings in modules by initializing them to `nothing`. +`syms` should be of type `Symbol` or a collection of `Symbol`s . `pids` and `mod` +identify the processes and the module in which global variables are to be +reinitialized. Only those names found to be defined under `mod` are cleared. + +An exception is raised if a global constant is requested to be cleared. +""" +function clear!(syms, pids=workers(); mod=Main) + @sync for p in pids + @async remotecall_wait(clear_impl!, p, syms, mod) + end +end +clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod) +clear!(sym::Symbol, pids=workers(); mod=Main) = clear!([sym], pids; mod=mod) +clear!(syms, pid::Int; mod=Main) = clear!(syms, [pid]; mod=mod) + +clear_impl!(syms, mod::Module) = foreach(x->clear_impl!(x,mod), syms) +clear_impl!(sym::Symbol, mod::Module) = isdefined(mod, sym) && eval(mod, :(global $sym = nothing)) diff --git a/base/task.jl b/base/task.jl index ff40447b205e4..050a68c475692 100644 --- a/base/task.jl +++ b/base/task.jl @@ -325,10 +325,9 @@ end Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local machine's scheduler queue. Additionally it adds the task to the set of items that the -nearest enclosing `@sync` waits for. `@async` also wraps the expression in a `let x=x, y=y, ...` -block to create a new scope with copies of all variables referenced in the expression. +nearest enclosing `@sync` waits for. """ macro async(expr) - expr = localize_vars(esc(:(()->($expr))), false) - :(async_run_thunk($expr)) + thunk = esc(:(()->($expr))) + :(async_run_thunk($thunk)) end diff --git a/doc/src/manual/faq.md b/doc/src/manual/faq.md index a8cd3c1436e29..2caf8a9dfdc21 100644 --- a/doc/src/manual/faq.md +++ b/doc/src/manual/faq.md @@ -472,6 +472,79 @@ have, it ends up having a substantial cost due to compilers (LLVM and GCC) not g around the added overflow checks. If this improves in the future, we could consider defaulting to checked integer arithmetic in Julia, but for now, we have to live with the possibility of overflow. +### What are the possible causes of an `UndefVarError` during remote execution? + +As the error states, an immediate cause of an `UndefVarError` on a remote node is that a binding +by that name does not exist. Let us explore some of the possible causes. + +```julia +julia> module Foo + foo() = remotecall_fetch(x->x, 2, "Hello") + end + +julia> Foo.foo() +ERROR: On worker 2: +UndefVarError: Foo not defined +``` + +The closure `x->x` carries a reference to `Foo`, and since `Foo` is unavailable on node 2, +an `UndefVarError` is thrown. + +Globals under modules other than `Main` are not serialized by value to the remote node. Only a reference is sent. +Functions which create global bindings (except under `Main`) may cause an `UndefVarError` to be thrown later. + +```julia +julia> @everywhere module Foo + function foo() + global gvar = "Hello" + remotecall_fetch(()->gvar, 2) + end + end + +julia> Foo.foo() +ERROR: On worker 2: +UndefVarError: gvar not defined +``` + +In the above example, `@everywhere module Foo` defined `Foo` on all nodes. However the call to `Foo.foo()` created +a new global binding `gvar` on the local node, but this was not found on node 2 resulting in an `UndefVarError` error. + +Note that this does not apply to globals created under module `Main`. Globals under module `Main` are serialized +and new bindings created under `Main` on the remote node. + +```julia +julia> gvar_self = "Node1" +"Node1" + +julia> remotecall_fetch(()->gvar_self, 2) +"Node1" + +julia> remotecall_fetch(whos, 2) + From worker 2: Base 41762 KB Module + From worker 2: Core 27337 KB Module + From worker 2: Foo 2477 bytes Module + From worker 2: Main 46191 KB Module + From worker 2: gvar_self 13 bytes String +``` + +This does not apply to `function` or `type` declarations. However, anonymous functions bound to global +variables are serialized as can be seen below. + +```julia +julia> bar() = 1 +bar (generic function with 1 method) + +julia> remotecall_fetch(bar, 2) +ERROR: On worker 2: +UndefVarError: #bar not defined + +julia> anon_bar = ()->1 +(::#21) (generic function with 1 method) + +julia> remotecall_fetch(anon_bar, 2) +1 +``` + ## Packages and Modules ### What is the difference between "using" and "importall"? diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index 011afbeb3992a..c9731bc61e38b 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -240,6 +240,76 @@ and `fetch(Bref)`, it might be better to eliminate the parallelism altogether. O is replaced with a more expensive operation. Then it might make sense to add another [`@spawn`](@ref) statement just for this step. +# Global variables +Expressions executed remotely via `@spawn`, or closures specified for remote execution using +`remotecall` may refer to global variables. Global bindings under module `Main` are treated +a little differently compared to global bindings in other modules. Consider the following code +snippet: + +```julia +A = rand(10,10) +remotecall_fetch(()->foo(A), 2) +``` + +Note that `A` is a global variable defined in the local workspace. Worker 2 does not have a variable called +`A` under `Main`. The act of shipping the closure `()->foo(A)` to worker 2 results in `Main.A` being defined +on 2. `Main.A` continues to exist on worker 2 even after the call `remotecall_fetch` returns. Remote calls +with embedded global references (under `Main` module only) manage globals as follows: + +- New global bindings are created on destination workers if they are referenced as part of a remote call. + +- Global constants are declared as constants on remote nodes too. + +- Globals are re-sent to a destination worker only in the context of a remote call, and then only + if its value has changed. Also, the cluster does not synchronize global bindings across nodes. + For example: + +```julia +A = rand(10,10) +remotecall_fetch(()->foo(A), 2) # worker 2 +A = rand(10,10) +remotecall_fetch(()->foo(A), 3) # worker 3 +A = nothing +``` + + Executing the above snippet results in `Main.A` on worker 2 having a different value from + `Main.A` on worker 3, while the value of `Main.A` on node 1 is set to `nothing`. + +As you may have realized, while memory associated with globals may be collected when they are reassigned +on the master, no such action is taken on the workers as the bindings continue to be valid. +[`clear!`](@ref) can be used to manually reassign specific globals on remote nodes to `nothing` once +they are no longer required. This will release any memory associated with them as part of a regular garbage +collection cycle. + +Thus programs should be careful referencing globals in remote calls. In fact, it is preferable to avoid them +altogether if possible. If you must reference globals, consider using `let` blocks to localize global variables. + +For example: + +```julia +julia> A = rand(10,10); + +julia> remotecall_fetch(()->A, 2); + +julia> B = rand(10,10); + +julia> let B = B + remotecall_fetch(()->B, 2) + end; + +julia> @spawnat 2 whos(); + +julia> From worker 2: A 800 bytes 10×10 Array{Float64,2} + From worker 2: Base Module + From worker 2: Core Module + From worker 2: Main Module + +``` + +As can be seen, global variable `A` is defined on worker 2, but `B` is captured as a local variable +and hence a binding for `B` does not exist on worker 2. + + ## Parallel Map and Loops Fortunately, many useful parallel computations do not require data movement. A common example diff --git a/doc/src/stdlib/parallel.md b/doc/src/stdlib/parallel.md index 89986a99a66de..433c5730f9b3d 100644 --- a/doc/src/stdlib/parallel.md +++ b/doc/src/stdlib/parallel.md @@ -60,6 +60,7 @@ Base.isready(::Future) Base.WorkerPool Base.CachingPool Base.default_worker_pool +Base.clear!(::CachingPool) Base.remote Base.remotecall(::Any, ::Base.AbstractWorkerPool, ::Any...) Base.remotecall_wait(::Any, ::Base.AbstractWorkerPool, ::Any...) @@ -74,7 +75,7 @@ Base.@async Base.@sync Base.@parallel Base.@everywhere -Base.clear! +Base.clear!(::Any, ::Any; ::Any) Base.remoteref_id Base.channel_from_id Base.worker_id_from_socket diff --git a/src/macroexpand.scm b/src/macroexpand.scm index 430ac31d12c07..0f05868f2560b 100644 --- a/src/macroexpand.scm +++ b/src/macroexpand.scm @@ -278,17 +278,6 @@ (cadr e)) ,(resolve-expansion-vars- (caddr e) env m inarg)))) - ((localize) - (let ((expr (cadr e)) - (lvars (map unescape (cddr e)))) - (let ((vs (delete-duplicates - (expr-find-all (lambda (v) - (and (symbol? v) (or (memq v lvars) - (assq v env)))) - expr identity))) - (e2 (resolve-expansion-vars-with-new-env expr env m inarg))) - `(call (-> (tuple ,@vs) ,e2) ,@vs)))) - ((let) (let* ((newenv (new-expansion-env-for e env)) (body (resolve-expansion-vars- (cadr e) newenv m inarg))) @@ -329,7 +318,6 @@ (define (find-declared-vars-in-expansion e decl (outer #t)) (cond ((or (not (pair? e)) (quoted? e)) '()) ((eq? (car e) 'escape) '()) - ((eq? (car e) 'localize) '()) ((eq? (car e) decl) (map decl-var* (cdr e))) ((and (not outer) (function-def? e)) '()) (else @@ -340,7 +328,6 @@ (define (find-assigned-vars-in-expansion e (outer #t)) (cond ((or (not (pair? e)) (quoted? e)) '()) ((eq? (car e) 'escape) '()) - ((eq? (car e) 'localize) '()) ((and (not outer) (function-def? e)) ;; pick up only function name (let ((fname (cond ((eq? (car e) '=) (cadr (cadr e))) diff --git a/test/core.jl b/test/core.jl index ee76350adc613..cee8edac22a62 100644 --- a/test/core.jl +++ b/test/core.jl @@ -3222,16 +3222,6 @@ abstract B11327 <: A11327 f11327{T}(::Type{T},x::T) = x @test_throws MethodError f11327(Type{A11327},B11327) -# issue 13855 -macro m13855() - Expr(:localize, :(() -> $(esc(:x)))) -end -@noinline function foo13855(x) - @m13855() -end -@test foo13855(+)() == + -@test foo13855(*)() == * - # issue #8487 @test [x for x in 1:3] == [x for x ∈ 1:3] == [x for x = 1:3] let A = Array{Int}(4,3) diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index 1684110332cd3..9fe3144a2547a 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -212,8 +212,14 @@ sleep(0.5) # to ensure that wid2 messages have been executed on wid1 @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == false @test fetch(@spawnat id_other myid()) == id_other -@test @fetchfrom id_other begin myid() end == id_other -@fetch begin myid() end +@test (@fetchfrom id_other myid()) == id_other + +pids=[] +for i in 1:nworkers() + push!(pids, @fetch myid()) +end +@test sort(pids) == sort(workers()) + # test getindex on Futures and RemoteChannels function test_indexing(rr) @@ -533,13 +539,14 @@ num_small_requests = 10000 # test parallel sends of large arrays from multiple tasks to the same remote worker ntasks = 10 -rr_list = [Channel(32) for x in 1:ntasks] -a = ones(2*10^5) +rr_list = [Channel(1) for x in 1:ntasks] + for rr in rr_list - @async let rr=rr - try + let rr=rr + @async try for i in 1:10 - @test a == remotecall_fetch((x)->x, id_other, a) + a = rand(2*10^5) + @test a == remotecall_fetch(x->x, id_other, a) yield() end put!(rr, :OK) @@ -1092,8 +1099,21 @@ remotecall_fetch(()->eval(:(f16091a() = 2)), wid) f16091b = () -> 1 remotecall_fetch(()->eval(:(f16091b = () -> 2)), wid) @test remotecall_fetch(f16091b, 2) === 1 -@test remotecall_fetch((myid)->remotecall_fetch(f16091b, myid), wid, myid()) === 2 +# Global anonymous functions are over-written... +@test remotecall_fetch((myid)->remotecall_fetch(f16091b, myid), wid, myid()) === 1 +# ...while local anonymous functions are by definition, local. +let + f16091c = () -> 1 + @test remotecall_fetch(f16091c, 2) === 1 + @test remotecall_fetch( + myid -> begin + let + f16091c = () -> 2 + remotecall_fetch(f16091c, myid) + end + end, wid, myid()) === 2 +end # issue #16451 rng=RandomDevice() @@ -1250,3 +1270,196 @@ if DoFullTest pids=addprocs(4); @test_throws ErrorException rmprocs(pids; waitfor=0.001); end + +# Auto serialization of globals from Main. +# bitstypes +global v1 = 1 +@test remotecall_fetch(()->v1, id_other) == v1 +@test remotecall_fetch(()->isdefined(Main, :v1), id_other) +for i in 2:5 + global v1 = i + @test remotecall_fetch(()->v1, id_other) == i +end + +# non-bitstypes +global v2 = zeros(10) +for i in 1:5 + v2[i] = i + @test remotecall_fetch(()->v2, id_other) == v2 +end + +# Test that a global is not being repeatedly serialized when +# a) referenced multiple times in the closure +# b) hash value has not changed. + +@everywhere begin + global testsercnt_d = Dict() + type TestSerCnt + v + end + import Base.hash, Base.== + hash(x::TestSerCnt, h::UInt) = hash(hash(x.v), h) + ==(x1::TestSerCnt, x2::TestSerCnt) = (x1.v == x2.v) + + function Base.serialize(s::AbstractSerializer, t::TestSerCnt) + Base.Serializer.serialize_type(s, TestSerCnt) + serialize(s, t.v) + global testsercnt_d + cnt = get!(testsercnt_d, object_id(t), 0) + testsercnt_d[object_id(t)] = cnt+1 + end + + Base.deserialize(s::AbstractSerializer, ::Type{TestSerCnt}) = TestSerCnt(deserialize(s)) +end + +# hash value of tsc is not changed +global tsc = TestSerCnt(zeros(10)) +for i in 1:5 + remotecall_fetch(()->tsc, id_other) +end +# should have been serialized only once +@test testsercnt_d[object_id(tsc)] == 1 + +# hash values are changed +n=5 +testsercnt_d[object_id(tsc)] = 0 +for i in 1:n + tsc.v[i] = i + remotecall_fetch(()->tsc, id_other) +end +# should have been serialized as many times as the loop +@test testsercnt_d[object_id(tsc)] == n + +# Multiple references in a closure should be serialized only once. +global mrefs = TestSerCnt(ones(10)) +@test remotecall_fetch(()->(mrefs.v, 2*mrefs.v, 3*mrefs.v), id_other) == (ones(10), 2*ones(10), 3*ones(10)) +@test testsercnt_d[object_id(mrefs)] == 1 + + +# nested anon functions +global f1 = x->x +global f2 = x->f1(x) +v = rand() +@test remotecall_fetch(f2, id_other, v) == v +@test remotecall_fetch(x->f2(x), id_other, v) == v + +# consts +const c1 = ones(10) +@test remotecall_fetch(()->c1, id_other) == c1 +@test remotecall_fetch(()->isconst(Main, :c1), id_other) + +# Test same calls with local vars +function wrapped_var_ser_tests() + # bitstypes + local lv1 = 1 + @test remotecall_fetch(()->lv1, id_other) == lv1 + @test !remotecall_fetch(()->isdefined(Main, :lv1), id_other) + for i in 2:5 + lv1 = i + @test remotecall_fetch(()->lv1, id_other) == i + end + + # non-bitstypes + local lv2 = zeros(10) + for i in 1:5 + lv2[i] = i + @test remotecall_fetch(()->lv2, id_other) == lv2 + end + + # nested anon functions + local lf1 = x->x + local lf2 = x->lf1(x) + v = rand() + @test remotecall_fetch(lf2, id_other, v) == v + @test remotecall_fetch(x->lf2(x), id_other, v) == v +end + +wrapped_var_ser_tests() + +# Test internal data structures being cleaned up upon gc. +global ids_cleanup = ones(6) +global ids_func = ()->ids_cleanup + +clust_ser = (Base.worker_from_id(id_other)).w_serializer +@test remotecall_fetch(ids_func, id_other) == ids_cleanup + +@test haskey(clust_ser.glbs_sent, object_id(ids_cleanup)) +finalize(ids_cleanup) +@test !haskey(clust_ser.glbs_sent, object_id(ids_cleanup)) + +# TODO Add test for cleanup from `clust_ser.glbs_in_tnobj` + +# reported github issues - Mostly tests with globals and various parallel macros +#2669, #5390 +v2669=10 +@test fetch(@spawn (1+v2669)) == 11 + +#12367 +refs = [] +if true + n = 10 + for p in procs() + push!(refs, @spawnat p begin + @sync for i in 1:n + nothing + end + end) + end +end +foreach(wait, refs) + +#14399 +s = convert(SharedArray, [1,2,3,4]) +@test pmap(i->length(s), 1:2) == [4,4] + +#6760 +if true + a = 2 + x = @parallel (vcat) for k=1:2 + sin(a) + end +end +@test x == map(_->sin(2), 1:2) + +# Testing clear! +function setup_syms(n, pids) + syms = [] + for i in 1:n + symstr = string("clrtest", randstring()) + sym = Symbol(symstr) + eval(:(global $sym = rand())) + for p in pids + eval(:(@test $sym == remotecall_fetch(()->$sym, $p))) + eval(:(@test remotecall_fetch(isdefined, $p, Symbol($symstr)))) + end + push!(syms, sym) + end + syms +end + +function test_clear(syms, pids) + for p in pids + for sym in syms + remote_val = remotecall_fetch(()->getfield(Main, sym), p) + @test remote_val === nothing + @test remote_val != getfield(Main, sym) + end + end +end + +syms = setup_syms(1, [id_other]) +clear!(syms[1], id_other) +test_clear(syms, [id_other]) + +syms = setup_syms(1, workers()) +clear!(syms[1], workers()) +test_clear(syms, workers()) + +syms = setup_syms(3, [id_other]) +clear!(syms, id_other) +test_clear(syms, [id_other]) + +syms = setup_syms(3, workers()) +clear!(syms, workers()) +test_clear(syms, workers()) +