|
| 1 | +defmodule Realtime.Integration.DistributedRealtimeChannelTest do |
| 2 | + use RealtimeWeb.ConnCase, |
| 3 | + async: false, |
| 4 | + parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}] |
| 5 | + |
| 6 | + alias Phoenix.Socket.Message |
| 7 | + |
| 8 | + alias Realtime.Tenants.Connect |
| 9 | + alias Realtime.Integration.WebsocketClient |
| 10 | + |
| 11 | + setup do |
| 12 | + tenant = Realtime.Api.get_tenant_by_external_id("dev_tenant") |
| 13 | + |
| 14 | + Realtime.RateCounter.stop(tenant.external_id) |
| 15 | + |
| 16 | + Connect.shutdown(tenant.external_id) |
| 17 | + # Sleeping so that syn can forget about this Connect process |
| 18 | + Process.sleep(100) |
| 19 | + |
| 20 | + on_exit(fn -> |
| 21 | + Connect.shutdown(tenant.external_id) |
| 22 | + # Sleeping so that syn can forget about this Connect process |
| 23 | + Process.sleep(100) |
| 24 | + end) |
| 25 | + |
| 26 | + on_exit(fn -> Connect.shutdown(tenant.external_id) end) |
| 27 | + {:ok, node} = Clustered.start() |
| 28 | + region = Realtime.Tenants.region(tenant) |
| 29 | + {:ok, db_conn} = :erpc.call(node, Connect, :connect, ["dev_tenant", region]) |
| 30 | + assert Connect.ready?(tenant.external_id) |
| 31 | + |
| 32 | + assert node(db_conn) == node |
| 33 | + %{tenant: tenant, topic: random_string()} |
| 34 | + end |
| 35 | + |
| 36 | + describe "distributed broadcast" do |
| 37 | + @tag mode: :distributed |
| 38 | + test "it works", %{tenant: tenant, topic: topic, serializer: serializer} do |
| 39 | + {:ok, token} = |
| 40 | + generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()}) |
| 41 | + |
| 42 | + {:ok, remote_socket} = |
| 43 | + WebsocketClient.connect(self(), uri(tenant, serializer, 4012), serializer, [{"x-api-key", token}]) |
| 44 | + |
| 45 | + {:ok, socket} = WebsocketClient.connect(self(), uri(tenant, serializer), serializer, [{"x-api-key", token}]) |
| 46 | + |
| 47 | + config = %{broadcast: %{self: false}, private: false} |
| 48 | + topic = "realtime:#{topic}" |
| 49 | + |
| 50 | + :ok = WebsocketClient.join(remote_socket, topic, %{config: config}) |
| 51 | + :ok = WebsocketClient.join(socket, topic, %{config: config}) |
| 52 | + |
| 53 | + # Send through one socket and receive through the other (self: false) |
| 54 | + payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"} |
| 55 | + :ok = WebsocketClient.send_event(remote_socket, topic, "broadcast", payload) |
| 56 | + |
| 57 | + assert_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 2000 |
| 58 | + end |
| 59 | + end |
| 60 | +end |
0 commit comments