Skip to content

Commit d3472b6

Browse files
committed
chore: update BroadcastHandler
1 parent 8dd94ff commit d3472b6

File tree

3 files changed

+127
-38
lines changed

3 files changed

+127
-38
lines changed

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
123123
end
124124
end
125125

126-
# Message payload was built by V2 Serializer which was originally UserBroadcast
126+
# Message payload was built by V2 Serializer which was originally UserBroadcastPush
127127
defp build_broadcast(topic, {user_event, user_payload_encoding, user_payload}) do
128128
%RealtimeWeb.Socket.UserBroadcast{
129129
topic: topic,

test/integration/rt_channel_test.exs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ defmodule Realtime.Integration.RtChannelTest do
22
# async: false due to the fact that multiple operations against the same tenant and usage of mocks
33
# Also using dev_tenant due to distributed test
44
alias Realtime.Api
5-
alias Phoenix.Socket.V1
6-
alias RealtimeWeb.Socket.V2Serializer
7-
use RealtimeWeb.ConnCase, async: false, parameterize: [%{serializer: V1.JSONSerializer}, %{serializer: V2Serializer}]
5+
6+
use RealtimeWeb.ConnCase,
7+
async: false,
8+
parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}]
9+
810
use Mimic
911
import ExUnit.CaptureLog
1012
import Generators

test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs

Lines changed: 121 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
2-
use Realtime.DataCase, async: true
2+
use Realtime.DataCase,
3+
async: true,
4+
parameterize: [%{serializer: Phoenix.Socket.V1.JSONSerializer}, %{serializer: RealtimeWeb.Socket.V2Serializer}]
5+
36
use Mimic
47

58
import Generators
@@ -17,21 +20,24 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
1720

1821
setup [:initiate_tenant]
1922

23+
@payload %{"a" => "b"}
24+
2025
describe "handle/3" do
21-
test "with write true policy, user is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
26+
test "with write true policy, user is able to send message",
27+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
2228
socket = socket_fixture(tenant, topic, policies: %Policies{broadcast: %BroadcastPolicies{write: true}})
2329

2430
for _ <- 1..100, reduce: socket do
2531
socket ->
26-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
32+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
2733
socket
2834
end
2935

3036
for _ <- 1..100 do
3137
topic = "realtime:#{topic}"
3238
assert_receive {:socket_push, :text, data}
33-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
34-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
39+
40+
assert Jason.decode!(data) == message(serializer, topic, @payload)
3541
end
3642

3743
Process.sleep(120)
@@ -57,20 +63,20 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
5763
end
5864

5965
@tag policies: [:authenticated_read_broadcast, :authenticated_write_broadcast]
60-
test "with nil policy but valid user, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
66+
test "with nil policy but valid user, is able to send message",
67+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
6168
socket = socket_fixture(tenant, topic)
6269

6370
for _ <- 1..100, reduce: socket do
6471
socket ->
65-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
72+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
6673
socket
6774
end
6875

6976
for _ <- 1..100 do
7077
topic = "realtime:#{topic}"
7178
assert_received {:socket_push, :text, data}
72-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
73-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
79+
assert Jason.decode!(data) == message(serializer, topic, @payload)
7480
end
7581

7682
Process.sleep(120)
@@ -80,7 +86,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
8086
end
8187

8288
@tag policies: [:authenticated_read_matching_user_sub, :authenticated_write_matching_user_sub], sub: UUID.generate()
83-
test "with valid sub, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn, sub: sub} do
89+
test "with valid sub, is able to send message",
90+
%{topic: topic, tenant: tenant, db_conn: db_conn, sub: sub, serializer: serializer} do
8491
socket =
8592
socket_fixture(tenant, topic,
8693
policies: %Policies{broadcast: %BroadcastPolicies{write: nil, read: true}},
@@ -89,15 +96,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
8996

9097
for _ <- 1..100, reduce: socket do
9198
socket ->
92-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
99+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
93100
socket
94101
end
95102

96103
for _ <- 1..100 do
97104
topic = "realtime:#{topic}"
98105
assert_received {:socket_push, :text, data}
99-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
100-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
106+
assert Jason.decode!(data) == message(serializer, topic, @payload)
101107
end
102108
end
103109

@@ -119,7 +125,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
119125
end
120126

121127
@tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon"
122-
test "with valid role, is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
128+
test "with valid role, is able to send message",
129+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
123130
socket =
124131
socket_fixture(tenant, topic,
125132
policies: %Policies{broadcast: %BroadcastPolicies{write: nil, read: true}},
@@ -128,15 +135,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
128135

129136
for _ <- 1..100, reduce: socket do
130137
socket ->
131-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
138+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
132139
socket
133140
end
134141

135142
for _ <- 1..100 do
136143
topic = "realtime:#{topic}"
137144
assert_received {:socket_push, :text, data}
138-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
139-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
145+
assert Jason.decode!(data) == message(serializer, topic, @payload)
140146
end
141147
end
142148

@@ -173,7 +179,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
173179
end
174180

175181
@tag policies: [:authenticated_read_broadcast, :authenticated_write_broadcast]
176-
test "validation only runs once on nil and valid policies", %{topic: topic, tenant: tenant, db_conn: db_conn} do
182+
test "validation only runs once on nil and valid policies",
183+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
177184
socket = socket_fixture(tenant, topic)
178185

179186
expect(Authorization, :get_write_authorizations, 1, fn conn, db_conn, auth_context ->
@@ -184,15 +191,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
184191

185192
for _ <- 1..100, reduce: socket do
186193
socket ->
187-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
194+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
188195
socket
189196
end
190197

191198
for _ <- 1..100 do
192199
topic = "realtime:#{topic}"
193200
assert_receive {:socket_push, :text, data}
194-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
195-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
201+
assert Jason.decode!(data) == message(serializer, topic, @payload)
196202
end
197203
end
198204

@@ -212,7 +218,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
212218
refute_receive _, 100
213219
end
214220

215-
test "no ack still sends message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
221+
test "no ack still sends message", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
216222
socket =
217223
socket_fixture(tenant, topic,
218224
policies: %Policies{broadcast: %BroadcastPolicies{write: true}},
@@ -221,7 +227,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
221227

222228
for _ <- 1..100, reduce: socket do
223229
socket ->
224-
{:noreply, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
230+
{:noreply, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
225231
socket
226232
end
227233

@@ -230,25 +236,24 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
230236
for _ <- 1..100 do
231237
topic = "realtime:#{topic}"
232238
assert_received {:socket_push, :text, data}
233-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
234-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
239+
assert Jason.decode!(data) == message(serializer, topic, @payload)
235240
end
236241
end
237242

238-
test "public channels are able to send messages", %{topic: topic, tenant: tenant, db_conn: db_conn} do
243+
test "public channels are able to send messages",
244+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
239245
socket = socket_fixture(tenant, topic, private?: false, policies: nil)
240246

241247
for _ <- 1..100, reduce: socket do
242248
socket ->
243-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
249+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
244250
socket
245251
end
246252

247253
for _ <- 1..100 do
248254
topic = "realtime:#{topic}"
249255
assert_received {:socket_push, :text, data}
250-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
251-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
256+
assert Jason.decode!(data) == message(serializer, topic, @payload)
252257
end
253258

254259
Process.sleep(120)
@@ -257,20 +262,20 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
257262
assert avg > 0.0
258263
end
259264

260-
test "public channels are able to send messages and ack", %{topic: topic, tenant: tenant, db_conn: db_conn} do
265+
test "public channels are able to send messages and ack",
266+
%{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
261267
socket = socket_fixture(tenant, topic, private?: false, policies: nil)
262268

263269
for _ <- 1..100, reduce: socket do
264270
socket ->
265-
{:reply, :ok, socket} = BroadcastHandler.handle(%{"a" => "b"}, db_conn, socket)
271+
{:reply, :ok, socket} = BroadcastHandler.handle(@payload, db_conn, socket)
266272
socket
267273
end
268274

269275
for _ <- 1..100 do
270276
topic = "realtime:#{topic}"
271277
assert_receive {:socket_push, :text, data}
272-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
273-
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
278+
assert Jason.decode!(data) == message(serializer, topic, @payload)
274279
end
275280

276281
Process.sleep(120)
@@ -280,6 +285,82 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
280285
assert avg > 0.0
281286
end
282287

288+
test "V2 json UserBroadcastPush", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
289+
socket = socket_fixture(tenant, topic, private?: false, policies: nil)
290+
291+
user_broadcast_payload = %{"a" => "b"}
292+
json_encoded_user_broadcast_payload = Jason.encode!(user_broadcast_payload)
293+
294+
{:reply, :ok, socket} =
295+
BroadcastHandler.handle({"event123", :json, json_encoded_user_broadcast_payload}, db_conn, socket)
296+
297+
topic = "realtime:#{topic}"
298+
assert_receive {:socket_push, code, data}
299+
300+
if serializer == RealtimeWeb.Socket.V2Serializer do
301+
assert code == :binary
302+
303+
assert data ==
304+
<<
305+
# user broadcast = 4
306+
4::size(8),
307+
# topic_size
308+
byte_size(topic),
309+
# user_event_size
310+
byte_size("event123"),
311+
# metadata_size
312+
0,
313+
# json encoding
314+
1::size(8),
315+
topic::binary,
316+
"event123"
317+
>> <> json_encoded_user_broadcast_payload
318+
else
319+
assert code == :text
320+
321+
assert Jason.decode!(data) ==
322+
message(serializer, topic, %{
323+
"event" => "event123",
324+
"payload" => user_broadcast_payload,
325+
"type" => "broadcast"
326+
})
327+
end
328+
end
329+
330+
test "V2 binary UserBroadcastPush", %{topic: topic, tenant: tenant, db_conn: db_conn, serializer: serializer} do
331+
socket = socket_fixture(tenant, topic, private?: false, policies: nil)
332+
333+
user_broadcast_payload = <<123, 456, 789>>
334+
335+
{:reply, :ok, socket} =
336+
BroadcastHandler.handle({"event123", :binary, user_broadcast_payload}, db_conn, socket)
337+
338+
topic = "realtime:#{topic}"
339+
340+
if serializer == RealtimeWeb.Socket.V2Serializer do
341+
assert_receive {:socket_push, :binary, data}
342+
343+
assert data ==
344+
<<
345+
# user broadcast = 4
346+
4::size(8),
347+
# topic_size
348+
byte_size(topic),
349+
# user_event_size
350+
byte_size("event123"),
351+
# metadata_size
352+
0,
353+
# binary encoding
354+
0::size(8),
355+
topic::binary,
356+
"event123"
357+
>> <> user_broadcast_payload
358+
else
359+
# Can't receive binary payloads on V1 serializer
360+
refute_receive {:socket_push, _code, _data}
361+
end
362+
end
363+
283364
@tag policies: [:broken_write_presence]
284365
test "handle failing rls policy", %{topic: topic, tenant: tenant, db_conn: db_conn} do
285366
socket = socket_fixture(tenant, topic)
@@ -384,7 +465,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
384465
fastlane =
385466
RealtimeWeb.RealtimeChannel.MessageDispatcher.fastlane_metadata(
386467
self(),
387-
Phoenix.Socket.V1.JSONSerializer,
468+
context.serializer,
388469
"realtime:#{topic}",
389470
:warning,
390471
"tenant_id"
@@ -442,4 +523,10 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
442523
}
443524
}
444525
end
526+
527+
defp message(RealtimeWeb.Socket.V2Serializer, topic, payload), do: [nil, nil, topic, "broadcast", payload]
528+
529+
defp message(Phoenix.Socket.V1.JSONSerializer, topic, payload) do
530+
%{"event" => "broadcast", "payload" => payload, "ref" => nil, "topic" => topic}
531+
end
445532
end

0 commit comments

Comments
 (0)