Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elixir-client): Should terminate on 400s #2042

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nasty-emus-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix mishandling of 400s - should terminate
5 changes: 1 addition & 4 deletions packages/elixir-client/lib/electric/client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,10 @@ defmodule Electric.Client.Stream do
|> dispatch()
end

# 400: The request is invalid, most likely because the shape has been
# deleted. We should start from scratch, this will force the shape to be
# recreated
# 409: Upon receiving a 409, we should start from scratch with the newly
# provided shape handle
defp handle_response({:error, %Fetch.Response{status: status} = resp}, stream)
when status in [400, 409] do
when status in [409] do
%{value_mapper_fun: value_mapper_fun} = stream
offset = last_offset(resp, stream.offset)

Expand Down
75 changes: 0 additions & 75 deletions packages/elixir-client/test/electric/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -603,81 +603,6 @@ defmodule Electric.ClientTest do
] = stream(ctx, 4)
end

test "resets to an empty shape id when given a 400", ctx do
body1 = [
%{
"headers" => %{"operation" => "insert"},
"offset" => "1_0",
"value" => %{"id" => "1111"}
},
%{"headers" => %{"control" => "up-to-date"}}
]

{:ok, responses} =
start_supervised(
{Agent,
fn ->
%{
{"-1", nil} => [
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape",
last_offset: "1_0",
schema: Jason.encode!(%{"id" => %{type: "text"}})
),
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape-2",
last_offset: "1_0",
schema: Jason.encode!(%{"id" => %{type: "text"}})
)
],
{"1_0", "my-shape"} => [
&bypass_resp(&1, Jason.encode!([%{"headers" => %{"control" => "must-refetch"}}]),
status: 400
),
&bypass_resp(&1, Jason.encode!(body1),
shape_handle: "my-shape",
last_offset: "2_0"
)
]
}
end}
)

parent = self()

Bypass.expect(ctx.bypass, fn
%{
request_path: "/v1/shape",
query_params: %{"table" => "my_table", "offset" => offset} = query_params
} = conn ->
shape_handle = Map.get(query_params, "handle", nil)

fun =
Agent.get_and_update(responses, fn resps ->
Map.get_and_update(resps, {offset, shape_handle}, fn [fun | rest] -> {fun, rest} end)
end)

send(parent, {:offset, offset})
fun.(conn)
end)

assert [
%ChangeMessage{
headers: @insert,
offset: offset(1, 0),
value: %{"id" => "1111"}
},
up_to_date(1, 0),
%ControlMessage{control: :must_refetch, offset: offset(1, 0)},
%ChangeMessage{
headers: @insert,
offset: offset(1, 0),
value: %{"id" => "1111"}
},
up_to_date(1, 0)
] = stream(ctx, 5)
end

test "redirects to another shape id when given a 409", ctx do
body1 = [
%{
Expand Down
Loading