From fe9465d39700ba1e3c4f75db6d1a430ed2cf3341 Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 26 Nov 2024 13:37:23 +0200 Subject: [PATCH 1/3] Should terminate on 400s --- .changeset/nasty-emus-rule.md | 5 +++++ packages/elixir-client/lib/electric/client/stream.ex | 7 ++----- 2 files changed, 7 insertions(+), 5 deletions(-) create mode 100644 .changeset/nasty-emus-rule.md diff --git a/.changeset/nasty-emus-rule.md b/.changeset/nasty-emus-rule.md new file mode 100644 index 0000000000..b006a0a735 --- /dev/null +++ b/.changeset/nasty-emus-rule.md @@ -0,0 +1,5 @@ +--- +"@core/elixir-client": patch +--- + +Fix mishandling of 400s - should terminate diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 50c69c976c..83e846cbaf 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -19,7 +19,7 @@ defmodule Electric.Client.Stream do replica: :default, offset: Offset.before_all(), shape_handle: nil, - next_cursor: nil, + next_cursor: System.os_time(), state: :init, opts: %{} ] @@ -168,13 +168,10 @@ defmodule Electric.Client.Stream do |> dispatch() end - # 400: The request is invalid, most likely because the shape has been - # deleted. We should start from scratch, this will force the shape to be - # recreated # 409: Upon receiving a 409, we should start from scratch with the newly # provided shape handle defp handle_response({:error, %Fetch.Response{status: status} = resp}, stream) - when status in [400, 409] do + when status in [409] do %{value_mapper_fun: value_mapper_fun} = stream offset = last_offset(resp, stream.offset) From 94ee76a52c0c0199186f7ebe0e2085db490e9b71 Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 26 Nov 2024 13:39:47 +0200 Subject: [PATCH 2/3] Start from nil cursor --- packages/elixir-client/lib/electric/client/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 83e846cbaf..7b5a51d176 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -19,7 +19,7 @@ defmodule Electric.Client.Stream do replica: :default, offset: Offset.before_all(), shape_handle: nil, - next_cursor: System.os_time(), + next_cursor: nil, state: :init, opts: %{} ] From 7dc62206a357dd4bd318e41dc15549e0c48ade9d Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 26 Nov 2024 13:46:03 +0200 Subject: [PATCH 3/3] Remove 400 test --- .../test/electric/client_test.exs | 75 ------------------- 1 file changed, 75 deletions(-) diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 0e1c632bd6..6b0205fd29 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -603,81 +603,6 @@ defmodule Electric.ClientTest do ] = stream(ctx, 4) end - test "resets to an empty shape id when given a 400", ctx do - body1 = [ - %{ - "headers" => %{"operation" => "insert"}, - "offset" => "1_0", - "value" => %{"id" => "1111"} - }, - %{"headers" => %{"control" => "up-to-date"}} - ] - - {:ok, responses} = - start_supervised( - {Agent, - fn -> - %{ - {"-1", nil} => [ - &bypass_resp(&1, Jason.encode!(body1), - shape_handle: "my-shape", - last_offset: "1_0", - schema: Jason.encode!(%{"id" => %{type: "text"}}) - ), - &bypass_resp(&1, Jason.encode!(body1), - shape_handle: "my-shape-2", - last_offset: "1_0", - schema: Jason.encode!(%{"id" => %{type: "text"}}) - ) - ], - {"1_0", "my-shape"} => [ - &bypass_resp(&1, Jason.encode!([%{"headers" => %{"control" => "must-refetch"}}]), - status: 400 - ), - &bypass_resp(&1, Jason.encode!(body1), - shape_handle: "my-shape", - last_offset: "2_0" - ) - ] - } - end} - ) - - parent = self() - - Bypass.expect(ctx.bypass, fn - %{ - request_path: "/v1/shape", - query_params: %{"table" => "my_table", "offset" => offset} = query_params - } = conn -> - shape_handle = Map.get(query_params, "handle", nil) - - fun = - Agent.get_and_update(responses, fn resps -> - Map.get_and_update(resps, {offset, shape_handle}, fn [fun | rest] -> {fun, rest} end) - end) - - send(parent, {:offset, offset}) - fun.(conn) - end) - - assert [ - %ChangeMessage{ - headers: @insert, - offset: offset(1, 0), - value: %{"id" => "1111"} - }, - up_to_date(1, 0), - %ControlMessage{control: :must_refetch, offset: offset(1, 0)}, - %ChangeMessage{ - headers: @insert, - offset: offset(1, 0), - value: %{"id" => "1111"} - }, - up_to_date(1, 0) - ] = stream(ctx, 5) - end - test "redirects to another shape id when given a 409", ctx do body1 = [ %{