diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index b5e5bde0..7920efbb 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -5,6 +5,7 @@ defmodule Radiator.Application do use Application + alias Radiator.Outline.EventConsumer alias Radiator.Outline.EventProducer @impl true @@ -20,7 +21,8 @@ defmodule Radiator.Application do # {Radiator.Worker, arg}, # Start to serve requests, typically the last entry RadiatorWeb.Endpoint, - {EventProducer, name: EventProducer} + {EventProducer, name: EventProducer}, + {EventConsumer, name: EventConsumer} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/outline/dispatch.ex b/lib/radiator/outline/dispatch.ex new file mode 100644 index 00000000..b6dc0404 --- /dev/null +++ b/lib/radiator/outline/dispatch.ex @@ -0,0 +1,19 @@ +defmodule Radiator.Outline.Dispatch do + @moduledoc false + + alias Radiator.Outline.Event + alias Radiator.Outline.EventProducer + + def insert_node(attributes, user_id, event_id) do + "insert_node" + |> Event.build(attributes, user_id, event_id) + |> EventProducer.enqueue() + end + + # TODO + # update_node + # delete_node + # move_node + + # list_node different case, sync call +end diff --git a/lib/radiator/outline/event.ex b/lib/radiator/outline/event.ex index c9a3fb79..9201896a 100644 --- a/lib/radiator/outline/event.ex +++ b/lib/radiator/outline/event.ex @@ -1,10 +1,11 @@ defmodule Radiator.Outline.Event do @moduledoc false - def build(event_id, event_type, user_id, payload) do - %{ + alias Radiator.Outline.Event.InsertNodeEvent + + def build("insert_node", payload, user_id, event_id) do + %InsertNodeEvent{ event_id: event_id, - event_type: event_type, user_id: user_id, payload: payload } diff --git a/lib/radiator/outline/event/insert_node_event.ex b/lib/radiator/outline/event/insert_node_event.ex new file mode 100644 index 00000000..21f08394 --- /dev/null +++ b/lib/radiator/outline/event/insert_node_event.ex @@ -0,0 +1,5 @@ +defmodule Radiator.Outline.Event.InsertNodeEvent do + @moduledoc false + + defstruct [:event_id, :user_id, :payload] +end diff --git a/lib/radiator/outline/event_consumer.ex b/lib/radiator/outline/event_consumer.ex new file mode 100644 index 00000000..4d7d71a7 --- /dev/null +++ b/lib/radiator/outline/event_consumer.ex @@ -0,0 +1,48 @@ +defmodule Radiator.Outline.EventConsumer do + @moduledoc false + + use GenStage + + alias Radiator.Outline + alias Radiator.Outline.Event.InsertNodeEvent + alias Radiator.Outline.EventProducer + + def start_link(opts \\ []) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(opts \\ [max_demand: 1]) do + {:consumer, :event_producer, subscribe_to: [{EventProducer, opts}]} + end + + def handle_events([event], _from, state) do + process_event(event) + + {:noreply, [], state} + end + + defp process_event(%InsertNodeEvent{payload: payload} = _event) do + payload + |> Outline.create_node() + |> handle_insert_result() + + # validate + # true-> + # database action: insert node() + # create && persist event (event contains all attributes, user, event_id, timestamps) + # broadcast event (topic: episode_id) + # broadcast node (topic: episode_id) + # false-> + # log error and return error (audit log) + end + + defp handle_insert_result({:ok, node}) do + {:ok, node} + end + + defp handle_insert_result({:error, _error}) do + # log_error_please :-) + + :error + end +end diff --git a/lib/radiator/outline/event_producer.ex b/lib/radiator/outline/event_producer.ex index 5b37b91d..b322c8ca 100644 --- a/lib/radiator/outline/event_producer.ex +++ b/lib/radiator/outline/event_producer.ex @@ -8,7 +8,7 @@ defmodule Radiator.Outline.EventProducer do end def init(_opts) do - {:producer, []} + {:producer, {:queue.new(), 0}} end def enqueue(event) do @@ -16,7 +16,23 @@ defmodule Radiator.Outline.EventProducer do :ok end - def handle_cast({:enqueue, event}, state) do - {:noreply, [event], state} + def handle_cast({:enqueue, event}, {queue, 0}) do + queue = :queue.in(event, queue) + {:noreply, [], {queue, 0}} + end + + def handle_cast({:enqueue, event}, {queue, demand}) do + queue = :queue.in(event, queue) + {{:value, event}, queue} = :queue.out(queue) + {:noreply, [event], {queue, demand - 1}} + end + + def handle_demand(_incoming, {queue, demand}) do + with {item, queue} <- :queue.out(queue), + {:value, event} <- item do + {:noreply, [event], {queue, demand}} + else + _ -> {:noreply, [], {queue, demand + 1}} + end end end diff --git a/lib/radiator/outline/server.ex b/lib/radiator/outline/server.ex deleted file mode 100644 index 46df4477..00000000 --- a/lib/radiator/outline/server.ex +++ /dev/null @@ -1,31 +0,0 @@ -defmodule Radiator.Outline.Server do - @moduledoc false - - alias Radiator.Outline.Event - alias Radiator.Outline.EventProducer - - def insert_node(attributes, user_id, event_id) do - "insert_node" - |> Event.build(attributes, user_id, event_id) - |> EventProducer.enqueue() - - # generate event - # send to Eventserver - # validate - # true-> - # database action: insert node() - # create && persist event (event contains all attributes, user, event_id, timestamps) - # broadcast event (topic: episode_id) - # broadcast node (topic: episode_id) - # false-> - # log error and return error (audit log) - :ok - end - - # TODO - # update_node - # delete_node - # move_node - - # list_node different case, sync call -end diff --git a/test/radiator/outline/dispatch_test.exs b/test/radiator/outline/dispatch_test.exs new file mode 100644 index 00000000..550a088c --- /dev/null +++ b/test/radiator/outline/dispatch_test.exs @@ -0,0 +1,34 @@ +defmodule Radiator.Outline.DispatchTest do + use Radiator.DataCase + + import Radiator.AccountsFixtures + import Radiator.PodcastFixtures + + # alias Radiator.Outline + alias Radiator.Outline.Dispatch + alias Radiator.Outline.Node + + describe "outline dispatch" do + setup do + %{episode: episode_fixture()} + end + + test "insert_node does WHAT?", %{episode: episode} do + user = user_fixture() + + node = %Node{episode_id: episode.id, content: "something very special 1!1"} + attributes = Map.from_struct(node) + + event_id = Ecto.UUID.generate() + + Dispatch.insert_node(attributes, user.id, event_id) + + # _inserted_node = + # Outline.list_nodes() + # |> Enum.find(&(&1.content == "something very special 1!1")) + + # assert inserted_node.episode_id == node.episode_id + # assert inserted_node.content == node.content + end + end +end