Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request - Swarm.send_after/3 #115

Open
devonestes opened this issue Nov 13, 2018 · 6 comments
Open

Feature request - Swarm.send_after/3 #115

devonestes opened this issue Nov 13, 2018 · 6 comments

Comments

@devonestes
Copy link

First off, thanks for the awesome library! Sure makes clustering easy when running kubernetes.

I ran into an issue today where I want to be able to send a message after some given time to a process that's registered with Swarm (using Swarm.register_name/4), but I want to look up the PID to send to through Swarm at the time that the message is sent, not at the time of calling Process.send_after/4, that way if the PID and node for that process changes between the time of calling Process.send_after/4 and the end of the delay, the message is still sent and not dropped because the process doesn't exist.

I tried the following:

Process.send_after(
  {:via, :swarm, process_name(device_id)},
  :do_a_thing,
  @one_hour
)

But that {:via, :swarm, name} thing doesn't work for Process.send_after/4 in the same way it does when registering a name for a process.

I'm probably going to build this on my own anyway since I need this functionality, but if you're interested in having this feature as part of Swarm, let me know and I'll gladly extract it so it can be pushed upstream.

Or, if I'm going about this all wrong and there's a smarter way to do this, just let me know!

Thanks again for the great library!

@devonestes
Copy link
Author

So I have a little update on this.

While implementing this yesterday, I actually found that I liked doing something a little different than send_after better since I could go through the public API for my GenServer instead of sending it a message directly (which made testing that GenServer much more clear).

How about Swarm.execute_after/3, which looks kind of like this:

Swarm.execute_after(
  :process_name_registered_with_swarm,
  fn pid -> MyServer.perform_a_cast_instead_of_bare_send(pid, :do_a_thing) end,
  1000
)

Now after the given timeout we look up the current PID for the given registered process name, and then call the function we're given, passing in that PID for that name.

It turned out to not be so difficult of an implementation to handle this stuff, so I can see if y'all don't want to add this to the library and have this as either a standalone module or let users implement it themselves if they need it, but to me it seems like a really helpful and useful thing to have, so I'm happy to push either send_after/3 or execute_after/3 upstream if y'all are interested.

@arjan
Copy link
Collaborator

arjan commented Nov 14, 2018

This looks nice Devon. However I'm not sure whether this has a place as a part of the library... let me explain:

  • A basic implementation of this feels pretty trivial to implement yourself using an agent / genserver combined with Process.send_after. Or even a bare spawn with a Process.sleep in it... ;-)
  • What happens when the node goes down before the timeout? When it's part of the library, people might expect that the registered execute_after is still executed, which means its state should be stored inside the tracker? That makes it suddenly a whole lot more complicated.

Maybe the others have different opinions, though, what do you think @bitwalker @beardedeagle ?

@devonestes
Copy link
Author

Hey @arjan!

So, yeah, it isn't super difficult to implement, but there is some complexity involved with keeping these timer processes alive in the event of topology changes. Here's the basic implementation I have so far, which should give you some idea of what I'm thinking of.

defmodule SendAfter do
  use GenServer

  @type timer_ref :: reference()

  ## PUBLIC API

  @spec send_after(term, term, non_neg_integer) :: timer_ref()
  def send_after(name, message, delay_in_milliseconds) do
    ref = make_ref()

    Swarm.register_name(ref, GenServer, :start_link, [
      __MODULE__,
      {name, message, delay_in_milliseconds}
    ])

    ref
  end

  @spec send_after(term(), (pid() -> term()), non_neg_integer()) :: timer_ref()
  def execute_after(name, function, delay_in_milliseconds) do
    ref = make_ref()

    Swarm.register_name(ref, GenServer, :start_link, [
      __MODULE__,
      {name, function, delay_in_milliseconds}
    ])

    ref
  end

  @spec cancel_timer(timer_ref()) :: :ok
  def cancel_timer(timer_ref) do
    timer_ref
    |> Swarm.whereis_name()
    |> GenServer.cast(:cancel_timer)
  end

  ## CALLBACKS

  @impl true
  def init({name, function, delay}) when is_function(function, 1) do
    end_time = System.system_time(:milliseconds) + delay
    initial_state = %{end_time: end_time, function: function, process_name: name}
    {:ok, initial_state, {:continue, :after_init}}
  end

  @impl true
  def init({name, message, delay}) do
    end_time = System.system_time(:milliseconds) + delay
    initial_state = %{end_time: end_time, message: message, process_name: name}
    {:ok, initial_state, {:continue, :after_init}}
  end

  @impl true
  def handle_continue(:after_init, state) do
    GenServer.cast(self(), :check_end_time)
    {:noreply, state}
  end

  @impl true
  def handle_call({:swarm, :begin_handoff}, _from, state) do
    {:reply, {:resume, state}, state}
  end

  @impl true
  def handle_cast({:swarm, :end_handoff, state}, _state) do
    {:noreply, state}
  end

  @impl true
  def handle_cast({:swarm, :resolve_conflict, _state}, state) do
    {:noreply, state}
  end

  @impl true
  def handle_cast(:cancel_timer, _) do
    {:stop, :normal, nil}
  end

  @impl true
  def handle_cast(:check_end_time, state) do
    if System.system_time(:milliseconds) >= state.end_time do
      deliver_message(state)
    else
      wait(state)
    end
  end

  defp deliver_message(state) do
    case Swarm.whereis_name(state.process_name) do
      :undefined -> # The semantics of handling this case could be totally different, but it works for me for now
        GenServer.cast(self(), :check_end_time)
        {:noreply, state}

      pid ->
        deliver_or_execute(pid, state)
        {:stop, :normal, nil}
    end
  end

  defp deliver_or_execute(pid, %{function: function}), do: function.(pid)
  defp deliver_or_execute(pid, %{message: message}), do: send(pid, message)

  defp wait(state) do
    Process.sleep(10) # This interval could be configured by users
    GenServer.cast(self(), :check_end_time)
    {:noreply, state}
  end
end

It's rough, and imperfect for sure, but (as far as I can tell) it won't drop messages that we've queued up if there's a topology change, which is a big step up over the current state of things.

@arjan
Copy link
Collaborator

arjan commented Dec 14, 2018

This looks great @devonestes, however wouldn't it be better to create a separate library for this? Just trying to keep the code base small.. :-)

@beardedeagle
Copy link
Collaborator

I'm wondering if when we model this lib after libclusters app structure we won't be able to have some sort of pluggable system where code like this could be loaded? @bitwalker

@devonestes
Copy link
Author

Yeah, I could publish it as a library, but I probably won’t. It’s quite small, and it only works as an extension to swarm. I guess if folks are curious about this kind of thing then the code is already here and they can take it if they want. It doesn’t have to actually be part of swarm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants