Skip to content

Commit

Permalink
Do not update tracker clock with registry entry clocks
Browse files Browse the repository at this point in the history
This is a follow up of bitwalker#85

I forgot to change the handle_topology_change method which is
as far as I can tell also the root cause for
bitwalker#106
  • Loading branch information
tschmittni committed Nov 14, 2018
1 parent 738decb commit 0b2c5a5
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions lib/swarm/tracker/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -823,21 +823,21 @@ defmodule Swarm.Tracker do
debug("topology change (#{type} for #{remote_node})")
current_node = state.self

new_clock =
Registry.reduce(state.clock, fn
entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, lclock
new_state =
Registry.reduce(state, fn
entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, state
when node(pid) == current_node ->
case Strategy.key_to_node(state.strategy, name) do
:undefined ->
# No node available to host process, it must be stopped
debug("#{inspect(pid)} must be stopped as no node is available to host it")
{:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
{:ok, new_state} = remove_registration(obj, state)
send(pid, {:swarm, :die})
new_state.clock
new_state

^current_node ->
# This process is correct
lclock
state

other_node ->
debug("#{inspect(pid)} belongs on #{other_node}")
Expand All @@ -846,86 +846,86 @@ defmodule Swarm.Tracker do
case GenServer.call(pid, {:swarm, :begin_handoff}) do
:ignore ->
debug("#{inspect(name)} has requested to be ignored")
lclock
state

{:resume, handoff_state} ->
debug("#{inspect(name)} has requested to be resumed")
{:ok, state} = remove_registration(obj, %{state | clock: lclock})
{:ok, new_state} = remove_registration(obj, state)
send(pid, {:swarm, :die})
debug("sending handoff for #{inspect(name)} to #{other_node}")

GenStateMachine.cast(
{__MODULE__, other_node},
{:handoff, self(), {name, meta, handoff_state, Clock.peek(state.clock)}}
{:handoff, self(), {name, meta, handoff_state, Clock.peek(new_state.clock)}}
)

state.clock
new_state

:restart ->
debug("#{inspect(name)} has requested to be restarted")
{:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
{:ok, new_state} = remove_registration(obj, state)
send(pid, {:swarm, :die})

case do_track(%Tracking{name: name, meta: meta}, new_state) do
:keep_state_and_data -> new_state.clock
{:keep_state, new_state} -> new_state.clock
:keep_state_and_data -> new_state
{:keep_state, new_state} -> new_state
end
end
catch
_, err ->
warn("handoff failed for #{inspect(name)}: #{inspect(err)}")
lclock
state
end
end

entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, lclock when is_map(meta) ->
entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, state when is_map(meta) ->
cond do
Enum.member?(state.nodes, node(pid)) ->
# the parent node is still up
lclock
state

:else ->
# pid is dead, we're going to restart it
case Strategy.key_to_node(state.strategy, name) do
:undefined ->
# No node available to restart process on, so remove registration
warn("no node available to restart #{inspect(name)}")
{:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
new_state.clock
{:ok, new_state} = remove_registration(obj, state)
new_state

^current_node ->
debug("restarting #{inspect(name)} on #{current_node}")
{:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
{:ok, new_state} = remove_registration(obj, state)

case do_track(%Tracking{name: name, meta: meta}, new_state) do
:keep_state_and_data -> new_state.clock
{:keep_state, new_state} -> new_state.clock
:keep_state_and_data -> new_state
{:keep_state, new_state} -> new_state
end

_other_node ->
# other_node will tell us to unregister/register the restarted pid
lclock
state
end
end

entry(name: name, pid: pid) = obj, lclock ->
entry(name: name, pid: pid) = obj, state ->
pid_node = node(pid)

cond do
pid_node == current_node or Enum.member?(state.nodes, pid_node) ->
# the parent node is still up
lclock
state

:else ->
# the parent node is down, but we cannot restart this pid, so unregister it
debug("removing registration for #{inspect(name)}, #{pid_node} is down")
{:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
new_state.clock
{:ok, new_state} = remove_registration(obj, state)
new_state
end
end)

info("topology change complete")
{:keep_state, %{state | clock: new_clock}}
{:keep_state, new_state}
end

# This is the callback for tracker events which are being replicated from other nodes in the cluster
Expand Down Expand Up @@ -994,7 +994,7 @@ defmodule Swarm.Tracker do
end
end

defp handle_replica_event(_from, {:untrack, pid}, rclock, state) do
defp handle_replica_event(_from, {:untrack, pid}, rclock, _state) do
debug("replica event: untrack #{inspect(pid)}")

case Registry.get_by_pid(pid) do
Expand Down Expand Up @@ -1174,7 +1174,7 @@ defmodule Swarm.Tracker do
debug "Cannot handoff #{inspect name} because there is no other node left"
other_node ->
debug "#{inspect name} has requested to be terminated and resumed on another node"
{:ok, state} = remove_registration(obj, %{state | clock: state.clock})
{:ok, state} = remove_registration(obj, state)
send(pid, {:swarm, :die})
debug "sending handoff for #{inspect name} to #{other_node}"
GenStateMachine.cast({__MODULE__, other_node},
Expand Down

0 comments on commit 0b2c5a5

Please sign in to comment.