Skip to content

Commit

Permalink
fix(elixir-client): Should terminate on 400s (#2042)
Browse files Browse the repository at this point in the history
Ignore the branch name - 400s are terminal errors indicating a malformed
request and should not trigger restarts.
  • Loading branch information
msfstef authored Nov 27, 2024
1 parent 9c50e8f commit ea5d03f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 79 deletions.
5 changes: 5 additions & 0 deletions .changeset/nasty-emus-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix mishandling of 400s - should terminate
5 changes: 1 addition & 4 deletions packages/elixir-client/lib/electric/client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,10 @@ defmodule Electric.Client.Stream do
|> dispatch()
end

# 400: The request is invalid, most likely because the shape has been
# deleted. We should start from scratch, this will force the shape to be
# recreated
# 409: Upon receiving a 409, we should start from scratch with the newly
# provided shape handle
defp handle_response({:error, %Fetch.Response{status: status} = resp}, stream)
when status in [400, 409] do
when status in [409] do
%{value_mapper_fun: value_mapper_fun} = stream
offset = last_offset(resp, stream.offset)

Expand Down
75 changes: 0 additions & 75 deletions packages/elixir-client/test/electric/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -603,81 +603,6 @@ defmodule Electric.ClientTest do
] = stream(ctx, 4)
end

test "resets to an empty shape id when given a 400", ctx do
body1 = [
%{
"headers" => %{"operation" => "insert"},
"offset" => "1_0",
"value" => %{"id" => "1111"}
},
%{"headers" => %{"control" => "up-to-date"}}
]

{:ok, responses} =
start_supervised(
{Agent,
fn ->
%{
{"-1", nil} => [
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape",
last_offset: "1_0",
schema: Jason.encode!(%{"id" => %{type: "text"}})
),
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape-2",
last_offset: "1_0",
schema: Jason.encode!(%{"id" => %{type: "text"}})
)
],
{"1_0", "my-shape"} => [
&bypass_resp(&1, Jason.encode!([%{"headers" => %{"control" => "must-refetch"}}]),
status: 400
),
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape",
last_offset: "2_0"
)
]
}
end}
)

parent = self()

Bypass.expect(ctx.bypass, fn
%{
request_path: "/v1/shape",
query_params: %{"table" => "my_table", "offset" => offset} = query_params
} = conn ->
shape_handle = Map.get(query_params, "handle", nil)

fun =
Agent.get_and_update(responses, fn resps ->
Map.get_and_update(resps, {offset, shape_handle}, fn [fun | rest] -> {fun, rest} end)
end)

send(parent, {:offset, offset})
fun.(conn)
end)

assert [
%ChangeMessage{
headers: @insert,
offset: offset(1, 0),
value: %{"id" => "1111"}
},
up_to_date(1, 0),
%ControlMessage{control: :must_refetch, offset: offset(1, 0)},
%ChangeMessage{
headers: @insert,
offset: offset(1, 0),
value: %{"id" => "1111"}
},
up_to_date(1, 0)
] = stream(ctx, 5)
end

test "redirects to another shape id when given a 409", ctx do
body1 = [
%{
Expand Down

0 comments on commit ea5d03f

Please sign in to comment.