Skip to content

Commit

Permalink
Alternate impl to auto serialize globals. Only track referenced globals
Browse files Browse the repository at this point in the history
in TypeName and serialize if required. As parallel macros and remotecalls
create a 0-arg thunk, globals are automatically serialized if changed.
  • Loading branch information
amitmurthy committed Dec 20, 2016
1 parent ba65424 commit e839fc6
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 97 deletions.
141 changes: 84 additions & 57 deletions base/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import .Serializer: known_object_data, object_number, serialize_cycle, deserialize_cycle, writetag,
__deserialized_types__, serialize_typename, deserialize_typename,
TYPENAME_TAG, GLOBALREF_TAG, object_numbers,
serialize_global_from_main, deserialize_global_from_main
TYPENAME_TAG, object_numbers

type ClusterSerializer{I<:IO} <: AbstractSerializer
io::I
Expand All @@ -13,10 +12,13 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer
pid::Int # Worker we are connected to.
sent_objects::Set{UInt64} # used by serialize (track objects sent)
sent_globals::Dict
glbs_in_tname::Dict # A dict tracking globals referenced in anonymous
# functions.
anonfunc_id::UInt64

ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(),
Base.worker_id_from_socket(io),
Set{UInt64}(), Dict())
Set{UInt64}(), Dict(), Dict(), 0)
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

Expand All @@ -33,6 +35,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName})
else
tn = deserialize_typename(s, number)
end

# retrieve arrays of global syms sent if any and deserialize them all.
foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
return tn
end

Expand All @@ -45,78 +50,95 @@ function serialize(s::ClusterSerializer, t::TypeName)
serialize(s, send_whole)
write(s.io, identifier)
if send_whole
# Track globals referenced in this anonymous function.
# This information is used to resend modified globals when we
# only send the identifier.
prev = s.anonfunc_id
s.anonfunc_id = identifier
serialize_typename(s, t)
s.anonfunc_id = prev
push!(s.sent_objects, identifier)
finalizer(t, x->cleanup_tname_glbs(s, identifier))
end
# #println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent")

# Send global refs if required.
syms = syms_2b_sent(s, identifier)
serialize(s, syms)
foreach(sym->serialize_global_from_main(s, sym), syms)
nothing
end

const FLG_SER_VAL = UInt8(1)
const FLG_ISCONST_VAL = UInt8(2)
isflagged(v, flg) = (v & flg == flg)

# We will send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the
# for the first time, or,
# b) hash value has changed

function serialize_global_from_main(s::ClusterSerializer, g::GlobalRef)
v = getfield(Main, g.name)
println(g)
function serialize(s::ClusterSerializer, g::GlobalRef)
# Record if required and then invoke the default GlobalRef serializer.
sym = g.name
if g.mod === Main && isdefined(g.mod, sym)
v = getfield(Main, sym)
if !isa(v, DataType) && !isa(v, Module) &&
(sym in names(Main, false, false)) && (s.anonfunc_id != 0)
# FIXME : There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
push!(get!(s.glbs_in_tname, s.anonfunc_id, []), sym)
end
end

serialize(s, g.name)
invoke(serialize, (AbstractSerializer, GlobalRef), s, g)
end

flags = UInt8(0)
if isbits(v)
flags = flags | FLG_SER_VAL
else
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
prev_hash = s.sent_globals[oid]
new_hash = hash(v)
if new_hash != prev_hash
flags = flags | FLG_SER_VAL
s.sent_globals[oid] = new_hash

# No need to setup a new finalizer as only the hash
# value and not the object itself has changed.
end
# Send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or,
# b) hash value has changed or
# c) is a bitstype
function syms_2b_sent(s::ClusterSerializer, identifier)
lst=Symbol[]
check_syms = get(s.glbs_in_tname, identifier, [])
for sym in check_syms
v = getfield(Main, sym)

if isbits(v)
push!(lst, sym)
else
flags = flags | FLG_SER_VAL
try
finalizer(v, x->delete_global_tracker(s,x))
s.sent_globals[oid] = hash(v)
catch ex
# Do not track objects that cannot be finalized.
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
s.sent_globals[oid] != hash(v) && push!(lst, sym)
else
push!(lst, sym)
end
end
end
isconst(Main, g.name) && (flags = flags | FLG_ISCONST_VAL)

write(s.io, flags)
isflagged(flags, FLG_SER_VAL) && serialize(s, v)
return unique(lst)
end

function deserialize_global_from_main(s::ClusterSerializer)
sym = deserialize(s)::Symbol
flags = read(s.io, UInt8)
function serialize_global_from_main(s::ClusterSerializer, sym)
v = getfield(Main, sym)

if isflagged(flags, FLG_SER_VAL)
v = deserialize(s)
end

# create/update binding under Main only if the value has been sent
if isflagged(flags, FLG_SER_VAL)
if isflagged(flags, FLG_ISCONST_VAL)
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
oid = object_id(v)
record_v = true
if isbits(v)
record_v = false
elseif !haskey(s.sent_globals, oid)
# set up a finalizer the first time this object is sent
try
finalizer(v, x->delete_global_tracker(s,x))
catch ex
# Do not track objects that cannot be finalized.
record_v = false
end
end
record_v && (s.sent_globals[oid] = hash(v))

return GlobalRef(Main, sym)
serialize(s, isconst(Main, sym))
serialize(s, v)
end

function deserialize_global_from_main(s::ClusterSerializer, sym)
sym_isconst = deserialize(s)
v = deserialize(s)
if sym_isconst
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
end
end

function delete_global_tracker(s::ClusterSerializer, v)
Expand All @@ -128,3 +150,8 @@ function delete_global_tracker(s::ClusterSerializer, v)
# TODO: Should release memory from the remote nodes.
end

function cleanup_tname_glbs(s::ClusterSerializer, identifier)
delete!(s.glbs_in_tname, identifier)
end

# TODO: cleanup from s.sent_objects
28 changes: 3 additions & 25 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -381,36 +381,21 @@ end

function serialize(s::AbstractSerializer, g::GlobalRef)
writetag(s.io, GLOBALREF_TAG)
if g.mod === Main && isdefined(g.mod, g.name)
if g.mod === Main && isdefined(g.mod, g.name) && isconst(g.mod, g.name)
v = getfield(g.mod, g.name)
if isa(v, DataType) && v === v.name.primary && should_send_whole_type(s, v)
# handle references to types in Main by sending the whole type.
# needed to be able to send nested functions (#15451).
write(s.io, UInt8(1))
serialize(s, v)
return
elseif g.name in names(Main, false, false)
# FIXME :
# 1. There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
# 2. Handle bindings in Main pointing to bindings in Base, e.g., my_foo=myid.
write(s.io, UInt8(2))
serialize_global_from_main(s, g)
return
end
end

write(s.io, UInt8(0))
serialize_global_ref(s, g)
end

function serialize_global_ref(s::AbstractSerializer, g::GlobalRef)
serialize(s, g.mod)
serialize(s, g.name)
end

# default impl only serializes the symbol.
serialize_global_from_main(s::AbstractSerializer, g::GlobalRef) = serialize_global_ref(s, g)

function serialize(s::AbstractSerializer, t::TypeName)
serialize_cycle(s, t) && return
Expand Down Expand Up @@ -745,20 +730,13 @@ end
function deserialize(s::AbstractSerializer, ::Type{GlobalRef})
kind = read(s.io, UInt8)
if kind == 0
return deserialize_global_ref(s)
elseif kind == 1
return GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)
else
ty = deserialize(s)
return GlobalRef(ty.name.module, ty.name.name)
else # kind == 2
return deserialize_global_from_main(s)
end
end

deserialize_global_ref(s::AbstractSerializer) = GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)

# default impl is same as any global ref, i.e., only the module and symbol.
deserialize_global_from_main(s::AbstractSerializer) = deserialize_global_ref()

function deserialize(s::AbstractSerializer, ::Type{Union})
types = deserialize(s)
Union{types...}
Expand Down
Loading

0 comments on commit e839fc6

Please sign in to comment.