diff --git a/lib/mut.ex b/lib/mut.ex index 20e1684..f6653ec 100644 --- a/lib/mut.ex +++ b/lib/mut.ex @@ -191,16 +191,16 @@ defmodule Mutex do """ @spec release(mutex :: name, lock :: Lock.t()) :: :ok def release(mutex, %Lock{type: :single, key: key}), - do: release_key(mutex, key) + do: release_key_async(mutex, key, self()) def release(mutex, %Lock{type: :multi, keys: keys}) do - Enum.each(keys, &release_key(mutex, &1)) + # TODO send all at once + Enum.each(keys, &release_key_async(mutex, &1, self())) :ok end - defp release_key(mutex, key) do - GenServer.cast(mutex, {:release, key, self()}) - :ok + defp release_key_async(mutex, key, pid) do + :ok = GenServer.cast(mutex, {:release, key, pid}) end @doc """ @@ -278,7 +278,7 @@ defmodule Mutex do end @doc false - @spec whereis_name({mutex :: name, key :: key}) :: :yes | :no + @spec register_name({mutex :: name, key :: key}, pid) :: :yes | :no def register_name({mutex, key}, pid) do # Just support registering self for now true = self() == pid @@ -289,16 +289,27 @@ defmodule Mutex do end end + @doc false + @spec unregister_name({mutex :: name, key :: key}) :: :ok + def unregister_name({mutex, key}) do + release_key_async(mutex, key, :"$force_release") + end + # -- Server Callbacks ------------------------------------------------------- defmodule S do @moduledoc false - defstruct locks: %{}, - # owner's pids - owns: %{}, - # waiters's gen_server from value - waiters: %{}, - meta: nil + defstruct [ + # %{lock_key => owner_pid} + locks: %{}, + + # %{owner_pid => [{owned_key, monitor_ref}]} + owns: %{}, + + # %{key => [gen_server.from()]} + waiters: %{}, + meta: nil + ] end @impl true @@ -326,7 +337,7 @@ defmodule Mutex do end end - def handle_call({:where, key}, from, state) do + def handle_call({:where, key}, _from, state) do reply = case Map.fetch(state.locks, key) do {:ok, pid} -> pid @@ -342,6 +353,9 @@ defmodule Mutex do {:ok, ^pid} -> {:noreply, rm_lock(state, key, pid)} + {:ok, owner_pid} when pid == :"$force_release" -> + {:noreply, rm_lock(state, key, owner_pid)} + {:ok, other_pid} -> Logger.error("Could not release #{inspect(key)}, bad owner", key: key, @@ -361,6 +375,7 @@ defmodule Mutex do {:noreply, clear_owner(state, pid, :goodbye)} end + @impl true def handle_info(_info = {:DOWN, _ref, :process, pid, _}, state) do {:noreply, clear_owner(state, pid, :DOWN)} end @@ -379,16 +394,12 @@ defmodule Mutex do defp set_lock(state = %S{locks: locks, owns: owns}, key, pid) do # Logger.debug "LOCK #{inspect key}" - new_locks = - locks - |> Map.put(key, pid) + new_locks = Map.put(locks, key, pid) ref = Process.monitor(pid) keyref = {key, ref} - new_owns = - owns - |> Map.update(pid, [keyref], fn keyrefs -> [keyref | keyrefs] end) + new_owns = Map.update(owns, pid, [keyref], fn keyrefs -> [keyref | keyrefs] end) %S{state | locks: new_locks, owns: new_owns} end @@ -399,8 +410,7 @@ defmodule Mutex do new_locks = Map.delete(locks, key) new_owns = - owns - |> Map.update(pid, [], fn keyrefs -> + Map.update(owns, pid, [], fn keyrefs -> {{_key, ref}, new_keyrefs} = List.keytake(keyrefs, key, 0) Process.demonitor(ref, [:flush]) new_keyrefs @@ -444,9 +454,7 @@ defmodule Mutex do defp set_waiter(state = %S{waiters: waiters}, key, from) do # Maybe we should monitor the waiter to not send useless message when the # key is available if the waiter is down ? - new_waiters = - waiters - |> Map.update(key, [from], fn waiters -> [from | waiters] end) + new_waiters = Map.update(waiters, key, [from], fn waiters -> [from | waiters] end) %S{state | waiters: new_waiters} end diff --git a/mix.exs b/mix.exs index 223a6d1..ede2fd7 100644 --- a/mix.exs +++ b/mix.exs @@ -28,7 +28,8 @@ defmodule Mutex.Mixfile do [ {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, "~> 1.2", only: :dev, runtime: false}, - {:credo, "~> 1.7", only: :dev, runtime: false} + {:credo, "~> 1.7", only: :dev, runtime: false}, + {:mox, "~> 1.2", only: :test} ] end diff --git a/mix.lock b/mix.lock index 8725177..2b50136 100644 --- a/mix.lock +++ b/mix.lock @@ -12,6 +12,8 @@ "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, + "nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "poison": {:hex, :poison, "4.0.1", "bcb755a16fac91cad79bfe9fc3585bb07b9331e50cfe3420a24bcc2d735709ae", [:mix], [], "hexpm", "ba8836feea4b394bb718a161fc59a288fe0109b5006d6bdf97b6badfcf6f0f25"}, } diff --git a/test/give_away_test.exs b/test/give_away_test.exs index b43e0b5..73bc1c3 100644 --- a/test/give_away_test.exs +++ b/test/give_away_test.exs @@ -1,5 +1,4 @@ defmodule Mutex.GiveAwayTest do - alias Mutex.Lock import Mutex.Test.Utils require Logger use ExUnit.Case, async: true diff --git a/test/mutex_test.exs b/test/mutex_test.exs index 21b2d0b..e58f79e 100644 --- a/test/mutex_test.exs +++ b/test/mutex_test.exs @@ -1,6 +1,6 @@ defmodule MutexTest do - alias Mutex.LockError alias Mutex.Lock + alias Mutex.LockError import Mutex.Test.Utils require Logger use ExUnit.Case, async: true diff --git a/test/name_registration_test.exs b/test/name_registration_test.exs new file mode 100644 index 0000000..adfdba0 --- /dev/null +++ b/test/name_registration_test.exs @@ -0,0 +1,93 @@ +defmodule Mutex.NameRegistrationTest do + alias Mutex.Lock + import Mox + import Mutex.Test.Utils + require Logger + use ExUnit.Case, async: false + + @moduletag :capture_log + @mut rand_mod() + + setup :verify_on_exit! + setup :set_mox_global + + setup do + {:ok, _pid} = Mutex.start_link(name: @mut, meta: :test_mutex) + :ok + end + + test "can find a process by it's lock" do + {ack, wack} = vawack() + + tspawn(fn -> + _lock = Mutex.lock!(@mut, :i_am_here) + ack.(self()) + hang() + end) + + pid = wack.() + + assert pid == GenServer.whereis({:via, Mutex, {@mut, :i_am_here}}) + assert pid == Mutex.whereis_name({@mut, :i_am_here}) + end + + test "cannot find process after exit" do + task = Task.async(fn -> Mutex.lock!(@mut, :i_will_exit) end) + assert %Lock{} = Task.await(task) + assert nil == GenServer.whereis({:via, Mutex, {@mut, :i_will_exit}}) + end + + test "can start a proces with a name" do + mod = + rand_mod() + |> defmock(for: GenServer) + |> expect(:init, 1, fn init_arg -> {:ok, init_arg} end) + |> expect(:terminate, 1, fn :normal, _ -> :ok end) + + key = :gs + via = {:via, Mutex, {@mut, key}} + + # server will start + assert {:ok, pid} = GenServer.start_link(mod, [], name: via) + + # server will have a lock + assert {:error, :busy} = Mutex.lock(@mut, :gs) + assert {:error, {:already_started, ^pid}} = GenServer.start_link(mod, [], name: via) + + # on stop the lock is freed (this does not need uregister_name impl, it's + # done by the monitor in the mutex anyway). + assert :ok = GenServer.stop(pid) + assert {:ok, _} = Mutex.lock(@mut, key) + end + + test "unregister name on gen server init :error tuple" do + sync = syncher(2) + + key = :bad_gs + via = {:via, Mutex, {@mut, key}} + + mod = rand_mod() + + defmodule mod do + def init(sync) do + sync.(:initializing) + sync.(:tested_busy) + {:error, :changed_my_mind} + end + end + + # Start the gen server from another pid since it will be blocking + starter = Task.async(fn -> GenServer.start_link(mod, sync, name: via) end) + + # when initializing the process has been registered + sync.(:initializing) + assert {:error, :busy} = Mutex.lock(@mut, key) + + # now we will let the gen server init return stop + sync.(:tested_busy) + + # Gen server has stopped + assert {:error, :changed_my_mind} = Task.await(starter) + assert {:ok, _} = Mutex.lock(@mut, key) + end +end diff --git a/test/support/utils.ex b/test/support/utils.ex index 24d337d..cc2543d 100644 --- a/test/support/utils.ex +++ b/test/support/utils.ex @@ -29,6 +29,44 @@ defmodule Mutex.Test.Utils do {ack, wack} end + # synchronizes several processes on a label, that is, when a participant + # process calls the returned fun with a label, they'll block until all other + # participants called the same label. + def syncher(n_procs) when n_procs >= 2 do + parent = self() + n_clients = n_procs - 1 + ref = make_ref() + + fn key -> + case self() do + ^parent -> + syncher_parent_loop(n_clients, key, ref, %{}) + + from_pid -> + send(parent, {:ack, ref, key, from_pid}) + + receive do + {:continue, ^ref, ^key} -> :ok + after + 5000 -> exit(:timeout) + end + end + end + end + + defp syncher_parent_loop(n_clients, key, ref, others) when map_size(others) == n_clients do + others |> Map.keys() |> Enum.each(&send(&1, {:continue, ref, key})) + end + + defp syncher_parent_loop(n_clients, key, ref, others) do + receive do + {:ack, ^ref, ^key, from_pid} -> + syncher_parent_loop(n_clients, key, ref, Map.put(others, from_pid, true)) + after + 5000 -> exit(:timeout) + end + end + def spawn_hang(link? \\ true, fun) do executor = fn -> fun.() diff --git a/test/whereis_test.exs b/test/whereis_test.exs deleted file mode 100644 index 51de4c8..0000000 --- a/test/whereis_test.exs +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Mutex.WhereisTest do - alias Mutex.Lock - import Mutex.Test.Utils - require Logger - use ExUnit.Case, async: true - - @moduletag :capture_log - @mut rand_mod() - - setup do - {:ok, _pid} = start_supervised({Mutex, name: @mut, meta: :test_mutex}) - :ok - end - - test "can find a process by it's lock" do - {ack, wack} = vawack() - - tspawn(fn -> - lock = Mutex.lock!(@mut, :i_am_here) - ack.(self()) - hang() - end) - - pid = wack.() - - assert pid == GenServer.whereis({:via, Mutex, {@mut, :i_am_here}}) - end - - test "cannot find process after exit" do - task = Task.async(fn -> Mutex.lock!(@mut, :i_will_exit) end) - assert %Lock{} = Task.await(task) - assert nil == GenServer.whereis({:via, Mutex, {@mut, :i_will_exit}}) - end - - test "can start a proces with a name" do - mod = rand_mod() - via = {:via, Mutex, {@mut, :gs}} - - defmodule mod do - use GenServer - @via via - - def start_link do - GenServer.start_link(__MODULE__, [], name: @via) - end - - def init(init_arg) do - {:ok, init_arg} - end - def stop(pid) do - mref = Process.monitor(pid) - GenServer.cast(pid, :stop) - assert_receive {:DOWN, ^mref, :process, ^pid, :bye} - :ok - end - - def handle_cast(:stop, state ) do - {:stop, :bye, state} - end - end - - # server will start - assert {:ok, pid} = mod.start_link - - # server will have a lock - assert {:error, :busy} = Mutex.lock(@mut, :gs) - assert {:error, {:already_started, ^pid}} = mod.start_link - - # on stop the lock is freed (this does not need uregister_name impl, it's - # done by the monitor in the mutex anyway). - assert :ok = GenServer.stop(pid) - assert {:ok, _} = Mutex.lock(@mut, :gs) - end -end