Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

require explicitly specifying node name in subscription pubsub #607

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Alpha 0 note: 1.5.0 alpha is safe to use on existing schemas. However, there are
- Feature: SDL support
- Feature: Schema decorators
- Breaking Change: `default_value: DateTime.utc_now()` will have its time set at compile time. IE: DON'T DO THIS. It only worked by accident before anyway, and now it no longer works, which is correct.
- Breaking change: added `node_name/0` callback to `Absinthe.Subscription.PubSub` behaviour. To retain old behaviour, implement this callback to return `Kernel.node/0`.
8 changes: 5 additions & 3 deletions lib/absinthe/subscription/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule Absinthe.Subscription.Proxy do
use GenServer

defstruct [
:pubsub
:pubsub,
:node
]

alias Absinthe.Subscription
Expand All @@ -16,11 +17,12 @@ defmodule Absinthe.Subscription.Proxy do
def topic(shard), do: "__absinthe__:proxy:#{shard}"

def init({pubsub, shard}) do
node_name = pubsub.node_name()
:ok = pubsub.subscribe(topic(shard))
{:ok, %__MODULE__{pubsub: pubsub}}
{:ok, %__MODULE__{pubsub: pubsub, node: node_name}}
end

def handle_info(%{node: src_node}, state) when src_node == node() do
def handle_info(%{node: src_node}, %{node: node} = state) when src_node == node do
{:noreply, state}
end

Expand Down
17 changes: 14 additions & 3 deletions lib/absinthe/subscription/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@ defmodule Absinthe.Subscription.Pubsub do
@callback subscribe(topic :: binary) :: term

@doc """
An Absinthe.Subscription.Pubsub system may extend across multiple nodes in a
cluster. Processes need only subscribe to the pubsub process that
An Absinthe.Subscription.Pubsub system may extend across multiple nodes
connected by some mechanism. Regardless of this mechanism, all nodes should
have unique names.

Absinthe invokes `node_name` function to get current node's name. If you
are running inside erlang cluster, you can use `Kernel.node/0` as a node
name.
"""
@callback node_name() :: binary

@doc """
An Absinthe.Subscription.Pubsub system may extend across multiple nodes.
Processes need only subscribe to the pubsub process that
is running on their own node.

However, mutations can happen on any node in the custer and must to be
Expand All @@ -38,7 +49,7 @@ defmodule Absinthe.Subscription.Pubsub do
The message broadcast should be a map that contains, at least

%{
node: node_id, # probably from Kernel.node/0
node: node_name, # should be equal to `node_name/0`
mutation_result: …, # from arguments
subscribed_fields: … # from arguments

Expand Down
4 changes: 4 additions & 0 deletions test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule Absinthe.Execution.SubscriptionTest do
Registry.start_link(keys: :unique, name: __MODULE__)
end

def node_name() do
node()
end

def subscribe(topic) do
Registry.register(__MODULE__, topic, [])
:ok
Expand Down