From dd5834024735ca68b5acf833992593e6ba8b2f80 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:24:28 +0000 Subject: [PATCH 1/7] Fix empty error array handling in stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename unwrap to unwrap_error and add handling for empty error arrays from the server. Previously an empty array would return [] as the error message; now it returns "Unknown error". 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- packages/elixir-client/lib/electric/client/stream.ex | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 5da032a990..6f9f898ec6 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -205,7 +205,7 @@ defmodule Electric.Client.Stream do defp handle_response({:error, %Fetch.Response{} = resp}, stream) do %Fetch.Response{body: body} = resp - handle_error(%Client.Error{message: unwrap(body), resp: resp}, stream) + handle_error(%Client.Error{message: unwrap_error(body), resp: resp}, stream) end defp handle_response({:error, error}, stream) do @@ -238,9 +238,10 @@ defmodule Electric.Client.Stream do {:halt, %{stream | buffer: :queue.in(resume_message, stream.buffer), state: :done}} end - defp unwrap([msg]), do: msg - defp unwrap([_ | _] = msgs), do: msgs - defp unwrap(msg), do: msg + defp unwrap_error([]), do: "Unknown error" + defp unwrap_error([msg]), do: msg + defp unwrap_error([_ | _] = msgs), do: msgs + defp unwrap_error(msg), do: msg defp handle_error(error, %{opts: %{errors: :stream}} = stream) do %{stream | buffer: :queue.in(error, stream.buffer), state: :done} From a6355054e5e0ec0e55cc8f8d4d076587c0f6a25d Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:24:47 +0000 Subject: [PATCH 2/7] Fix resume to use live requests for long-polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When resuming a stream, the client was making non-live requests (live: false) because up_to_date? defaulted to false. This caused the server to return immediately with up-to-date instead of long-polling for new data. Since a ResumeMessage is only created when the client is already up-to-date, resuming should set up_to_date?: true so the first request uses long-polling to wait for any changes that occurred while disconnected. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- packages/elixir-client/lib/electric/client/stream.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 6f9f898ec6..2d00be3c42 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -357,10 +357,12 @@ defmodule Electric.Client.Stream do defp resume(%{opts: %{resume: %Message.ResumeMessage{} = resume}} = stream) do %{shape_handle: shape_handle, offset: offset, schema: schema} = resume + stream = %{stream | shape_handle: shape_handle, offset: offset, up_to_date?: true} + if schema do - generate_value_mapper(schema, %{stream | shape_handle: shape_handle, offset: offset}) + generate_value_mapper(schema, stream) else - %{stream | shape_handle: shape_handle, offset: offset} + stream end end From 0bf0f39283c64f85e8bd194f6a513466ad5dd7aa Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:25:13 +0000 Subject: [PATCH 3/7] Increase Postgres max_connections for tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set max_connections = 200 in local dev configs and CI workflow to support integration tests that open multiple database connections. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .github/workflows/sync_service_tests.yml | 1 + packages/sync-service/dev/postgres.conf | 1 + packages/sync-service/dev/postgres2.conf | 1 + 3 files changed, 3 insertions(+) diff --git a/.github/workflows/sync_service_tests.yml b/.github/workflows/sync_service_tests.yml index b1559de77f..9fa77be1d2 100644 --- a/.github/workflows/sync_service_tests.yml +++ b/.github/workflows/sync_service_tests.yml @@ -42,6 +42,7 @@ jobs: --health-retries 5 ports: - 54321:5432 + command: postgres -c max_connections=200 pgbouncer: image: bitnamilegacy/pgbouncer:latest diff --git a/packages/sync-service/dev/postgres.conf b/packages/sync-service/dev/postgres.conf index e18a913260..b320e296e5 100644 --- a/packages/sync-service/dev/postgres.conf +++ b/packages/sync-service/dev/postgres.conf @@ -1,3 +1,4 @@ listen_addresses = '*' wal_level = logical # minimal, replica, or logical max_replication_slots = 100 +max_connections = 200 diff --git a/packages/sync-service/dev/postgres2.conf b/packages/sync-service/dev/postgres2.conf index 3f5b60c308..e6f72319e6 100644 --- a/packages/sync-service/dev/postgres2.conf +++ b/packages/sync-service/dev/postgres2.conf @@ -1,4 +1,5 @@ listen_addresses = '*' wal_level = logical # minimal, replica, or logical max_replication_slots = 100 +max_connections = 200 port = 5433 \ No newline at end of file From 6943438a193eb80004938690a34588ec80720631 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:25:44 +0000 Subject: [PATCH 4/7] Reduce default connection pool size and add override tags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduce default pool_size from 2 to 1 in test setup to minimize connection usage. Tests that require multiple connections now use explicit @tag connection_opt_overrides to request a larger pool. Also set partitioned_tables_test to async: false to avoid connection contention. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../sync-service/test/electric/postgres/configuration_test.exs | 1 + .../test/electric/postgres/lock_breaker_connection_test.exs | 1 + packages/sync-service/test/support/db_setup.ex | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/test/electric/postgres/configuration_test.exs b/packages/sync-service/test/electric/postgres/configuration_test.exs index 9c3716bdfb..ea336eca19 100644 --- a/packages/sync-service/test/electric/postgres/configuration_test.exs +++ b/packages/sync-service/test/electric/postgres/configuration_test.exs @@ -104,6 +104,7 @@ defmodule Electric.Postgres.ConfigurationTest do Configuration.drop_table_from_publication(conn, "nonexistent", oid_rel) end + @tag connection_opt_overrides: [pool_size: 2] test "fails relation configuration if timing out on lock", %{ pool: conn, publication_name: publication diff --git a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs index 18fb168f20..02a6e66bda 100644 --- a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs +++ b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs @@ -13,6 +13,7 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do :with_slot_name ] + @tag connection_opt_overrides: [pool_size: 2] test "should break an abandoned lock if slot is inactive", ctx do Postgrex.query!( ctx.db_conn, diff --git a/packages/sync-service/test/support/db_setup.ex b/packages/sync-service/test/support/db_setup.ex index 7e5dfae482..46c305e840 100644 --- a/packages/sync-service/test/support/db_setup.ex +++ b/packages/sync-service/test/support/db_setup.ex @@ -4,7 +4,7 @@ defmodule Support.DbSetup do @postgrex_start_opts [ backoff_type: :stop, max_restarts: 0, - pool_size: 2, + pool_size: 1, types: PgInterop.Postgrex.Types ] From fa890fa1e16419950a9e204d603d104ee48f9ab7 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:27:20 +0000 Subject: [PATCH 5/7] Add integration test infrastructure and basic streaming tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add test helpers for running integration tests with Electric.Client against a real HTTP server: - Add electric_client as test dependency in mix.exs - Start electric_client application in test_helper.exs - Add Support.IntegrationSetup with with_electric_client/2 - Add Support.StreamConsumer for consuming streams in tests - Add basic streaming integration tests to demonstrate usage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- packages/sync-service/mix.exs | 3 +- .../test/integration/streaming_test.exs | 71 +++++ .../test/support/integration_setup.ex | 42 +++ .../test/support/stream_consumer.ex | 290 ++++++++++++++++++ packages/sync-service/test/test_helper.exs | 6 + 5 files changed, 411 insertions(+), 1 deletion(-) create mode 100644 packages/sync-service/test/integration/streaming_test.exs create mode 100644 packages/sync-service/test/support/integration_setup.ex create mode 100644 packages/sync-service/test/support/stream_consumer.ex diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index 8d0fed733b..d81e7f2d5e 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -119,7 +119,8 @@ defmodule Electric.MixProject do {:junit_formatter, "~> 3.4", only: [:test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:stream_data, "~> 1.2", only: [:dev, :test]}, - {:repatch, "~> 1.0", only: [:test]} + {:repatch, "~> 1.0", only: [:test]}, + {:electric_client, path: "../elixir-client", only: [:test], runtime: false} ] end diff --git a/packages/sync-service/test/integration/streaming_test.exs b/packages/sync-service/test/integration/streaming_test.exs new file mode 100644 index 0000000000..08c0f85bb7 --- /dev/null +++ b/packages/sync-service/test/integration/streaming_test.exs @@ -0,0 +1,71 @@ +defmodule Electric.Integration.StreamingTest do + @moduledoc """ + Integration tests that spin up an Electric HTTP API + stack for a unique test DB, + then use Electric.Client to stream a shape over HTTP. + """ + use ExUnit.Case, async: false + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + import Support.IntegrationSetup + import Support.StreamConsumer + + alias Electric.Client + + @moduletag :tmp_dir + + describe "Electric.Client streaming over HTTP" do + setup [:with_unique_db, :with_basic_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + @tag with_sql: [ + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'initial value')" + ] + test "initial snapshot contains pre-existing row", %{client: client} do + stream = Client.stream(client, "items", live: false) + + with_consumer stream do + assert_insert(consumer, %{ + "id" => "00000000-0000-0000-0000-000000000001", + "value" => "initial value" + }) + + assert_up_to_date(consumer) + end + end + + @tag with_sql: [ + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'initial value')" + ] + test "receives live changes after initial snapshot", %{client: client, db_conn: db_conn} do + stream = Client.stream(client, "items", live: true) + + with_consumer stream do + assert_insert(consumer, %{"value" => "initial value"}) + assert_up_to_date(consumer) + + Postgrex.query!( + db_conn, + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000002', 'new value')", + [] + ) + + assert_insert(consumer, %{ + "id" => "00000000-0000-0000-0000-000000000002", + "value" => "new value" + }) + end + end + + test "streaming empty table returns up-to-date", %{client: client} do + stream = Client.stream(client, "items", live: false) + + with_consumer stream do + assert_up_to_date(consumer) + end + end + end +end diff --git a/packages/sync-service/test/support/integration_setup.ex b/packages/sync-service/test/support/integration_setup.ex new file mode 100644 index 0000000000..1122bbc41d --- /dev/null +++ b/packages/sync-service/test/support/integration_setup.ex @@ -0,0 +1,42 @@ +defmodule Support.IntegrationSetup do + @moduledoc """ + Helper functions for setting up integration tests that need an HTTP server. + """ + + import Support.ComponentSetup, only: [build_router_opts: 2] + + @doc """ + Starts a Bandit HTTP server and creates an Electric.Client. + + Returns a map with: + - `client` - Electric.Client configured to connect to the server + - `base_url` - The base URL of the server + - `server_pid` - The Bandit server process + - `port` - The port the server is listening on + """ + def with_electric_client(ctx, opts \\ []) do + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 2000) + + router_opts = build_router_opts(ctx, Keyword.get(opts, :router_opts, [])) + + {:ok, server_pid} = + ExUnit.Callbacks.start_supervised( + {Bandit, + plug: {Electric.Plug.Router, router_opts}, + port: 0, + ip: :loopback, + thousand_island_options: [num_acceptors: 1]} + ) + + {:ok, {_ip, port}} = ThousandIsland.listener_info(server_pid) + base_url = "http://localhost:#{port}" + {:ok, client} = Electric.Client.new(base_url: base_url) + + Map.merge(ctx, %{ + client: client, + base_url: base_url, + server_pid: server_pid, + port: port + }) + end +end diff --git a/packages/sync-service/test/support/stream_consumer.ex b/packages/sync-service/test/support/stream_consumer.ex new file mode 100644 index 0000000000..d06ee96cdf --- /dev/null +++ b/packages/sync-service/test/support/stream_consumer.ex @@ -0,0 +1,290 @@ +defmodule Support.StreamConsumer do + @moduledoc """ + Test helper for consuming Electric.Client streams in integration tests. + """ + + alias Electric.Client.Message.ChangeMessage + alias Electric.Client.Message.ControlMessage + alias Electric.Client.Message.ResumeMessage + + import ExUnit.Assertions + + @default_timeout 5_000 + + defstruct [:task, :task_pid, :timeout] + + @doc """ + Start consuming a stream, forwarding messages to the test process. + + ## Options + + * `:timeout` - default timeout for assertions (default: 5000ms) + """ + def start(stream, opts \\ []) do + test_pid = self() + timeout = Keyword.get(opts, :timeout, @default_timeout) + + # Start the streaming task + task = + Task.async(fn -> + stream + |> Stream.each(fn msg -> + send(test_pid, {:stream_message, self(), msg}) + end) + |> Stream.run() + end) + + {:ok, + %__MODULE__{ + task: task, + task_pid: task.pid, + timeout: timeout + }} + end + + @doc """ + Stop the consumer and cleanup resources. + """ + def stop(%__MODULE__{task: task}) do + Task.shutdown(task, :brutal_kill) + :ok + end + + @doc """ + Assert an insert message is received with matching value fields. + """ + def assert_insert(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :insert}, value: value} -> + pattern_matches?(value, value_pattern) + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert an update message is received with matching value fields. + Optionally match old_value (only present in :full replica mode). + """ + def assert_update( + %__MODULE__{} = consumer, + value_pattern, + old_value_pattern \\ nil, + timeout \\ nil + ) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :update}, value: value, old_value: old_value} -> + value_matches = pattern_matches?(value, value_pattern) + + old_value_matches = + old_value_pattern == nil or pattern_matches?(old_value || %{}, old_value_pattern) + + value_matches and old_value_matches + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert a delete message is received with matching value fields. + """ + def assert_delete(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :delete}, value: value} -> + pattern_matches?(value, value_pattern) + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert an up_to_date control message is received. + """ + def assert_up_to_date(%__MODULE__{} = consumer, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ControlMessage{control: :up_to_date} -> true + _ -> false + end, + timeout + ) + end + + @doc """ + Assert a resume message is received. + """ + def assert_resume(%__MODULE__{} = consumer, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ResumeMessage{} -> true + _ -> false + end, + timeout + ) + end + + @doc """ + Wait for N messages matching a condition. + + ## Options + + * `:match` - function to filter messages (default: matches all) + * `:timeout` - timeout in ms (default: consumer timeout) + """ + def await_count(%__MODULE__{} = consumer, count, opts \\ []) do + matcher = Keyword.get(opts, :match, fn _ -> true end) + timeout = Keyword.get(opts, :timeout, consumer.timeout) + + do_await_count(consumer, count, matcher, [], timeout, System.monotonic_time(:millisecond)) + end + + defp do_await_count(_consumer, 0, _matcher, collected, _timeout, _start) do + {:ok, Enum.reverse(collected)} + end + + defp do_await_count( + %{task_pid: task_pid} = consumer, + count, + matcher, + collected, + timeout, + start_time + ) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + do_await_count(consumer, count - 1, matcher, [msg | collected], timeout, start_time) + else + do_await_count(consumer, count, matcher, collected, timeout, start_time) + end + after + remaining -> {:error, :timeout} + end + end + + # Private helpers + + defp assert_receive_message(%__MODULE__{task_pid: task_pid}, matcher, timeout) do + do_assert_receive_message(task_pid, matcher, timeout, System.monotonic_time(:millisecond)) + end + + defp do_assert_receive_message(task_pid, matcher, timeout, start_time) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + msg + else + flunk("Received unexpected message: #{inspect(msg)}") + end + after + remaining -> + flunk("Expected to receive matching message within #{timeout}ms") + end + end + + defp pattern_matches?(map, pattern) when is_map(map) and is_map(pattern) do + Enum.all?(pattern, fn {key, expected} -> + case Map.fetch(map, key) do + {:ok, actual} when is_map(expected) and is_map(actual) -> + pattern_matches?(actual, expected) + + {:ok, actual} -> + actual == expected + + :error -> + false + end + end) + end + + @doc """ + Collect all messages received within the timeout period. + Returns a list of messages (possibly empty). + + ## Options + + * `:timeout` - how long to wait for messages (default: 100ms) + * `:match` - optional function to filter messages + """ + def collect_messages(%__MODULE__{task_pid: task_pid}, opts \\ []) do + timeout = Keyword.get(opts, :timeout, 100) + matcher = Keyword.get(opts, :match, fn _ -> true end) + + do_collect_messages(task_pid, matcher, [], timeout, System.monotonic_time(:millisecond)) + end + + defp do_collect_messages(task_pid, matcher, collected, timeout, start_time) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + do_collect_messages(task_pid, matcher, [msg | collected], timeout, start_time) + else + do_collect_messages(task_pid, matcher, collected, timeout, start_time) + end + after + remaining -> Enum.reverse(collected) + end + end + + @doc """ + Macro for cleaner test blocks - auto start/stop. + + ## Example + + import Support.StreamConsumer + + stream = Client.stream(client, "items", live: true) + + with_consumer stream do + assert_insert(consumer, %{"id" => "123"}) + assert_up_to_date(consumer) + end + """ + defmacro with_consumer(stream, opts \\ [], do: block) do + quote do + {:ok, var!(consumer)} = Support.StreamConsumer.start(unquote(stream), unquote(opts)) + + try do + unquote(block) + after + Support.StreamConsumer.stop(var!(consumer)) + end + end + end +end diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index 375392b593..9648fd7caf 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -7,6 +7,12 @@ ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true) +# Start electric_client application directly, bypassing OTP's dependency resolution. +# This avoids a circular dependency: electric_client has :electric as an optional dep, +# which gets added to its applications list when compiled in sync-service context, +# causing a deadlock when OTP tries to start both applications. +{:ok, _} = Electric.Client.Application.start(:normal, []) + # Repatch in async tests has lazy recompilation issues, so as a temporary fix # we force recompilation in the setup. The issue is tracked here: # https://github.com/hissssst/repatch/issues/2 From e40b491a8f6b9302192a251f3176688c7176bc50 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:27:52 +0000 Subject: [PATCH 6/7] Guard Ecto-dependent code with Code.ensure_loaded? MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap ArrayDecoder module in Code.ensure_loaded?(Ecto) guard to prevent compile errors when the elixir-client is used as a dependency in projects that don't have Ecto available. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../client/ecto_adapter/array_decoder.ex | 132 +++++++++--------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex b/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex index 788c677c62..0bbf6d273b 100644 --- a/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex +++ b/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex @@ -1,92 +1,94 @@ -defmodule Electric.Client.EctoAdapter.ArrayDecoder do - alias Electric.Client.EctoAdapter +if Code.ensure_loaded?(Ecto) do + defmodule Electric.Client.EctoAdapter.ArrayDecoder do + alias Electric.Client.EctoAdapter - def decode!("{}", _type), do: [] + def decode!("{}", _type), do: [] - def decode!(encoded_array, type) when is_binary(encoded_array) do - {"", [result]} = - decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array}) + def decode!(encoded_array, type) when is_binary(encoded_array) do + {"", [result]} = + decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array}) - result - end + result + end - ############################## + ############################## - defp decode_array("", acc, _state) do - {"", :lists.reverse(acc)} - end + defp decode_array("", acc, _state) do + {"", :lists.reverse(acc)} + end - defp decode_array(<<"{}", rest::bitstring>>, acc, state) do - decode_array(rest, [[] | acc], state) - end + defp decode_array(<<"{}", rest::bitstring>>, acc, state) do + decode_array(rest, [[] | acc], state) + end - defp decode_array(<<"{", rest::bitstring>>, acc, state) do - {rest, array} = decode_array(rest, [], state) - decode_array(rest, [array | acc], state) - end + defp decode_array(<<"{", rest::bitstring>>, acc, state) do + {rest, array} = decode_array(rest, [], state) + decode_array(rest, [array | acc], state) + end - defp decode_array(<<"}", rest::bitstring>>, acc, _state) do - {rest, :lists.reverse(acc)} - end + defp decode_array(<<"}", rest::bitstring>>, acc, _state) do + {rest, :lists.reverse(acc)} + end - defp decode_array(<>, acc, state) do - decode_array(rest, acc, state) - end + defp decode_array(<>, acc, state) do + decode_array(rest, acc, state) + end - defp decode_array(<>, acc, state) do - {rest, elem} = decode_quoted_elem(rest, [], state) - decode_array(rest, [elem | acc], state) - end + defp decode_array(<>, acc, state) do + {rest, elem} = decode_quoted_elem(rest, [], state) + decode_array(rest, [elem | acc], state) + end - defp decode_array(rest, acc, state) do - {rest, elem} = decode_elem(rest, [], state) - decode_array(rest, [elem | acc], state) - end + defp decode_array(rest, acc, state) do + {rest, elem} = decode_elem(rest, [], state) + decode_array(rest, [elem | acc], state) + end - ############################## + ############################## - defp decode_elem(<<",", _::bitstring>> = rest, acc, state), - do: {rest, cast(acc, state)} + defp decode_elem(<<",", _::bitstring>> = rest, acc, state), + do: {rest, cast(acc, state)} - defp decode_elem(<<"}", _::bitstring>> = rest, acc, state), - do: {rest, cast(acc, state)} + defp decode_elem(<<"}", _::bitstring>> = rest, acc, state), + do: {rest, cast(acc, state)} - defp decode_elem(<>, acc, state), - do: decode_elem(rest, [acc | <>], state) + defp decode_elem(<>, acc, state), + do: decode_elem(rest, [acc | <>], state) - defp decode_elem("", _acc, {_cast_fun, type, source}) do - raise Ecto.CastError, type: {:array, type}, value: source - end + defp decode_elem("", _acc, {_cast_fun, type, source}) do + raise Ecto.CastError, type: {:array, type}, value: source + end - ############################## + ############################## - defp decode_quoted_elem(<>, acc, state), - do: {rest, cast_quoted(acc, state)} + defp decode_quoted_elem(<>, acc, state), + do: {rest, cast_quoted(acc, state)} - defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state), - do: decode_quoted_elem(rest, [acc | [?"]], state) + defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state), + do: decode_quoted_elem(rest, [acc | [?"]], state) - defp decode_quoted_elem(<>, acc, state), - do: decode_quoted_elem(rest, [acc | <>], state) + defp decode_quoted_elem(<>, acc, state), + do: decode_quoted_elem(rest, [acc | <>], state) - defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do - raise Ecto.CastError, type: {:array, type}, value: source - end + defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do + raise Ecto.CastError, type: {:array, type}, value: source + end - ############################## + ############################## - defp cast(iodata, {cast_fun, _type, _source}) do - iodata - |> IO.iodata_to_binary() - |> case do - "NULL" -> nil - value -> cast_fun.(value) + defp cast(iodata, {cast_fun, _type, _source}) do + iodata + |> IO.iodata_to_binary() + |> case do + "NULL" -> nil + value -> cast_fun.(value) + end end - end - defp cast_quoted(iodata, {cast_fun, _type, _source}) do - iodata - |> IO.iodata_to_binary() - |> cast_fun.() + defp cast_quoted(iodata, {cast_fun, _type, _source}) do + iodata + |> IO.iodata_to_binary() + |> cast_fun.() + end end end From 0631be3c4d24aee9aa375b2cb428a8fa9795664f Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 15 Jan 2026 12:32:09 +0000 Subject: [PATCH 7/7] Implement move-out support for subqueries in Elixir client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add client-side support for handling move-out events from the server when using shapes with subquery filters. Changes to message handling: - Add tags and removed_tags fields to Headers struct - Add request_timestamp parameter to Message.parse/4 - Add MoveOutMessage type for parsing move-out control messages - Add tag_to_keys and key_data fields to ResumeMessage for state preservation Changes to stream processing: - Track row tags in tag_to_keys and key_data indices - Handle MoveOutMessage by generating synthetic delete messages - Reset tag indices on shape reset - Preserve and restore tag state when resuming streams The move-out feature enables the server to notify clients when rows should be removed from their view because they no longer match the shape's subquery filter. The client generates synthetic delete messages to cleanly remove these rows from local state. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .changeset/beige-ladybugs-give.md | 6 + .../lib/electric/client/message.ex | 133 ++- .../lib/electric/client/stream.ex | 228 ++++- .../test/electric/client/message_test.exs | 157 ++- .../test/electric/client_test.exs | 892 +++++++++++++++++- .../integration/subquery_move_out_test.exs | 382 ++++++++ 6 files changed, 1764 insertions(+), 34 deletions(-) create mode 100644 .changeset/beige-ladybugs-give.md create mode 100644 packages/sync-service/test/integration/subquery_move_out_test.exs diff --git a/.changeset/beige-ladybugs-give.md b/.changeset/beige-ladybugs-give.md new file mode 100644 index 0000000000..9457104261 --- /dev/null +++ b/.changeset/beige-ladybugs-give.md @@ -0,0 +1,6 @@ +--- +'@core/elixir-client': patch +'@core/sync-service': patch +--- + +Add Move-Out Support for Subqueries in Elixir Client diff --git a/packages/elixir-client/lib/electric/client/message.ex b/packages/elixir-client/lib/electric/client/message.ex index 6fcfeb2dd0..1be05998aa 100644 --- a/packages/elixir-client/lib/electric/client/message.ex +++ b/packages/elixir-client/lib/electric/client/message.ex @@ -5,19 +5,31 @@ defmodule Electric.Client.Message do alias Electric.Client.Offset defmodule Headers do - defstruct [:operation, :relation, :handle, :lsn, txids: [], op_position: 0] + defstruct [ + :operation, + :relation, + :handle, + :lsn, + txids: [], + op_position: 0, + tags: [], + removed_tags: [] + ] @type operation :: :insert | :update | :delete @type relation :: [String.t(), ...] @type lsn :: binary() @type txids :: [pos_integer(), ...] | nil + @type tag :: String.t() @type t :: %__MODULE__{ operation: operation(), relation: relation(), handle: Client.shape_handle(), lsn: lsn(), txids: txids(), - op_position: non_neg_integer() + op_position: non_neg_integer(), + tags: [tag()], + removed_tags: [tag()] } @doc false @@ -30,7 +42,9 @@ defmodule Electric.Client.Message do handle: handle, txids: Map.get(msg, "txids", []), lsn: Map.get(msg, "lsn", nil), - op_position: Map.get(msg, "op_position", 0) + op_position: Map.get(msg, "op_position", 0), + tags: Map.get(msg, "tags", []), + removed_tags: Map.get(msg, "removed_tags", []) } end @@ -56,23 +70,27 @@ defmodule Electric.Client.Message do def from_message( %{"headers" => %{"control" => control} = headers}, - handle + handle, + request_timestamp ) do %__MODULE__{ control: control_atom(control), global_last_seen_lsn: global_last_seen_lsn(headers), - handle: handle + handle: handle, + request_timestamp: request_timestamp } end def from_message( %{headers: %{control: control} = headers}, - handle + handle, + request_timestamp ) do %__MODULE__{ control: control_atom(control), global_last_seen_lsn: global_last_seen_lsn(headers), - handle: handle + handle: handle, + request_timestamp: request_timestamp } end @@ -108,7 +126,7 @@ defmodule Electric.Client.Message do require Logger - def from_message(msg, handle, value_mapping_fun) do + def from_message(msg, handle, value_mapping_fun, request_timestamp) do %{ "headers" => headers, "value" => raw_value @@ -120,7 +138,8 @@ defmodule Electric.Client.Message do key: msg["key"], headers: Headers.from_message(headers, handle), value: value, - old_value: old_value(msg, value_mapping_fun) + old_value: old_value(msg, value_mapping_fun), + request_timestamp: request_timestamp } end @@ -168,30 +187,108 @@ defmodule Electric.Client.Message do @enforce_keys [:shape_handle, :offset, :schema] - defstruct [:shape_handle, :offset, :schema] + defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}] @type t :: %__MODULE__{ shape_handle: Client.shape_handle(), offset: Offset.t(), - schema: Client.schema() + schema: Client.schema(), + tag_to_keys: %{String.t() => MapSet.t(String.t())}, + key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}} } end + defmodule MoveOutMessage do + @moduledoc """ + Represents a move-out event from the server. + + Move-out events are sent when rows should be removed from the client's view + because they no longer match the shape's subquery filter. The `patterns` field + contains tag hashes that identify which rows should be removed. + + The client should use these patterns to generate synthetic delete messages + for any tracked rows that have matching tags. + """ + + defstruct [:patterns, :handle, :request_timestamp] + + @type pattern :: %{pos: non_neg_integer(), value: String.t()} + @type t :: %__MODULE__{ + patterns: [pattern()], + handle: Client.shape_handle(), + request_timestamp: DateTime.t() + } + + def from_message( + %{"headers" => %{"event" => "move-out", "patterns" => patterns}}, + handle, + request_timestamp + ) do + %__MODULE__{ + patterns: normalize_patterns(patterns), + handle: handle, + request_timestamp: request_timestamp + } + end + + def from_message( + %{headers: %{event: "move-out", patterns: patterns}}, + handle, + request_timestamp + ) do + %__MODULE__{ + patterns: normalize_patterns(patterns), + handle: handle, + request_timestamp: request_timestamp + } + end + + defp normalize_patterns(patterns) do + Enum.map(patterns, fn + %{"pos" => pos, "value" => value} -> %{pos: pos, value: value} + %{pos: _, value: _} = pattern -> pattern + end) + end + end + defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert - def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun) do - [ChangeMessage.from_message(msg, shape_handle, value_mapper_fun)] + def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do + [ChangeMessage.from_message(msg, shape_handle, value_mapper_fun, request_timestamp)] + end + + def parse( + %{"headers" => %{"control" => _}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [ControlMessage.from_message(msg, shape_handle, request_timestamp)] + end + + def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun, request_timestamp) do + [ControlMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse(%{"headers" => %{"control" => _}} = msg, shape_handle, _value_mapper_fun) do - [ControlMessage.from_message(msg, shape_handle)] + def parse( + %{"headers" => %{"event" => "move-out"}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun) do - [ControlMessage.from_message(msg, shape_handle)] + def parse( + %{headers: %{event: "move-out"}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse("", _handle, _value_mapper_fun) do + def parse("", _handle, _value_mapper_fun, _request_timestamp) do [] end end diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 2d00be3c42..8bb818fbac 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -19,7 +19,12 @@ defmodule Electric.Client.Stream do shape_handle: nil, next_cursor: nil, state: :init, - opts: %{} + opts: %{}, + # Move-out support: tracks tags per key and keys per tag + # tag_to_keys: %{tag_value => MapSet} - which keys have this tag + # key_data: %{key => %{tags: MapSet, msg: msg}} - each key's tags and latest message + tag_to_keys: %{}, + key_data: %{} ] @external_options [ @@ -89,7 +94,9 @@ defmodule Electric.Client.Stream do } @type t :: %__MODULE__{ + id: integer(), client: Client.t(), + shape: Client.shape() | nil, schema: Client.schema(), value_mapper_fun: Client.ValueMapper.mapper_fun(), parser: nil | {module(), term()}, @@ -98,8 +105,11 @@ defmodule Electric.Client.Stream do offset: Client.offset(), replica: Client.replica(), shape_handle: nil | Client.shape_handle(), + next_cursor: binary() | nil, state: :init | :stream | :done, - opts: opts() + opts: opts(), + tag_to_keys: %{optional(term()) => MapSet.t()}, + key_data: %{optional(term()) => %{tags: MapSet.t(), msg: Message.ChangeMessage.t()}} } alias __MODULE__, as: S @@ -182,8 +192,7 @@ defmodule Electric.Client.Stream do resp.body |> ensure_enum() - |> Enum.flat_map(&Message.parse(&1, shape_handle, value_mapper_fun)) - |> Enum.map(&Map.put(&1, :request_timestamp, resp.request_timestamp)) + |> Enum.flat_map(&Message.parse(&1, shape_handle, value_mapper_fun, resp.request_timestamp)) |> Enum.reduce_while(stream, &handle_msg/2) |> dispatch() end @@ -198,7 +207,12 @@ defmodule Electric.Client.Stream do stream |> reset(handle) - |> buffer(Enum.flat_map(resp.body, &Message.parse(&1, handle, value_mapper_fun))) + |> buffer( + Enum.flat_map( + resp.body, + &Message.parse(&1, handle, value_mapper_fun, resp.request_timestamp) + ) + ) |> dispatch() end @@ -221,9 +235,31 @@ defmodule Electric.Client.Stream do end defp handle_msg(%Message.ChangeMessage{} = msg, stream) do + stream = update_tag_index(stream, msg) {:cont, %{stream | buffer: :queue.in(msg, stream.buffer)}} end + defp handle_msg( + %Message.MoveOutMessage{patterns: patterns, request_timestamp: request_timestamp} = _msg, + stream + ) do + # Assumption: move-out events are only emitted after the initial snapshot is complete. + # We therefore apply them immediately and do not buffer for later inserts. + + # Generate synthetic deletes for rows matching the move-out patterns + {synthetic_deletes, updated_tag_to_keys, updated_key_data} = + generate_synthetic_deletes(stream, patterns, request_timestamp) + + # Add synthetic deletes to the buffer + buffer = + Enum.reduce(synthetic_deletes, stream.buffer, fn delete_msg, buf -> + :queue.in(delete_msg, buf) + end) + + {:cont, + %{stream | buffer: buffer, tag_to_keys: updated_tag_to_keys, key_data: updated_key_data}} + end + defp handle_up_to_date(%{opts: %{live: true}} = stream) do {:cont, stream} end @@ -232,7 +268,9 @@ defmodule Electric.Client.Stream do resume_message = %Message.ResumeMessage{ schema: stream.schema, offset: stream.offset, - shape_handle: stream.shape_handle + shape_handle: stream.shape_handle, + tag_to_keys: stream.tag_to_keys, + key_data: stream.key_data } {:halt, %{stream | buffer: :queue.in(resume_message, stream.buffer), state: :done}} @@ -302,7 +340,9 @@ defmodule Electric.Client.Stream do up_to_date?: false, buffer: :queue.new(), schema: nil, - value_mapper_fun: nil + value_mapper_fun: nil, + tag_to_keys: %{}, + key_data: %{} } end @@ -356,8 +396,17 @@ defmodule Electric.Client.Stream do defp resume(%{opts: %{resume: %Message.ResumeMessage{} = resume}} = stream) do %{shape_handle: shape_handle, offset: offset, schema: schema} = resume + tag_to_keys = Map.get(resume, :tag_to_keys, %{}) + key_data = Map.get(resume, :key_data, %{}) - stream = %{stream | shape_handle: shape_handle, offset: offset, up_to_date?: true} + stream = %{ + stream + | shape_handle: shape_handle, + offset: offset, + tag_to_keys: tag_to_keys, + key_data: key_data, + up_to_date?: true + } if schema do generate_value_mapper(schema, stream) @@ -374,6 +423,169 @@ defmodule Electric.Client.Stream do %{stream | state: :stream} end + # Tag index management for move-out support + # + # We maintain two data structures: + # - tag_to_keys: %{tag_value => MapSet} - which keys have each tag + # - key_data: %{key => %{tags: MapSet, msg: msg}} - each key's current tags and latest message + # + # This allows us to: + # 1. Avoid duplicate entries when a row is updated (we update the msg, not add a new entry) + # 2. Check if a row still has other tags before generating a synthetic delete + + defp update_tag_index(stream, %Message.ChangeMessage{headers: headers, key: key} = msg) do + %{tag_to_keys: tag_to_keys, key_data: key_data} = stream + new_tags = headers.tags || [] + removed_tags = headers.removed_tags || [] + + # Get current data for this key + current_data = Map.get(key_data, key) + current_tags = if current_data, do: current_data.tags, else: MapSet.new() + + # Calculate the new set of tags for this key + updated_tags = + current_tags + |> MapSet.difference(MapSet.new(removed_tags)) + |> MapSet.union(MapSet.new(new_tags)) + + # For deletes, remove the key entirely + {_final_tags, final_key_data, final_tag_to_keys} = + case headers.operation do + :delete -> + # Remove key from all its tags in tag_to_keys + updated_tag_to_keys = + Enum.reduce(updated_tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + + # Remove key from key_data + {MapSet.new(), Map.delete(key_data, key), updated_tag_to_keys} + + _ -> + # If no tags (current or new), don't track this key + if MapSet.size(updated_tags) == 0 do + # Remove key from all its previous tags in tag_to_keys + updated_tag_to_keys = + Enum.reduce(current_tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + + # Remove key from key_data + {MapSet.new(), Map.delete(key_data, key), updated_tag_to_keys} + else + # Update tag_to_keys: remove from old tags, add to new tags + tags_to_remove = MapSet.difference(current_tags, updated_tags) + tags_to_add = MapSet.difference(updated_tags, current_tags) + + updated_tag_to_keys = + tag_to_keys + |> remove_key_from_tags(tags_to_remove, key) + |> add_key_to_tags(tags_to_add, key) + + # Update key_data with new tags and latest message + updated_key_data = Map.put(key_data, key, %{tags: updated_tags, msg: msg}) + + {updated_tags, updated_key_data, updated_tag_to_keys} + end + end + + %{stream | tag_to_keys: final_tag_to_keys, key_data: final_key_data} + end + + defp remove_key_from_tags(tag_to_keys, tags, key) do + Enum.reduce(tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + end + + defp remove_key_from_tag(tag_to_keys, tag, key) do + case Map.get(tag_to_keys, tag) do + nil -> + tag_to_keys + + keys -> + updated_keys = MapSet.delete(keys, key) + + if MapSet.size(updated_keys) == 0 do + Map.delete(tag_to_keys, tag) + else + Map.put(tag_to_keys, tag, updated_keys) + end + end + end + + defp add_key_to_tags(tag_to_keys, tags, key) do + Enum.reduce(tags, tag_to_keys, fn tag, acc -> + keys = Map.get(acc, tag, MapSet.new()) + Map.put(acc, tag, MapSet.put(keys, key)) + end) + end + + defp generate_synthetic_deletes(stream, patterns, request_timestamp) do + %{tag_to_keys: tag_to_keys, key_data: key_data} = stream + + # Assumption: move-out patterns only include simple tag values; positional matching + # for composite tags is not needed with the current server behavior. + + # First pass: collect all keys that match any pattern and remove those tags + {matched_keys_with_tags, updated_tag_to_keys} = + Enum.reduce(patterns, {%{}, tag_to_keys}, fn %{value: tag_value}, {keys_acc, ttk_acc} -> + case Map.pop(ttk_acc, tag_value) do + {nil, ttk_acc} -> + {keys_acc, ttk_acc} + + {keys_in_tag, ttk_acc} -> + # Track which tags were removed for each key + updated_keys_acc = + Enum.reduce(keys_in_tag, keys_acc, fn key, acc -> + removed_tags = Map.get(acc, key, MapSet.new()) + Map.put(acc, key, MapSet.put(removed_tags, tag_value)) + end) + + {updated_keys_acc, ttk_acc} + end + end) + + # Second pass: for each matched key, update its tags and check if it should be deleted + {keys_to_delete, updated_key_data} = + Enum.reduce(matched_keys_with_tags, {[], key_data}, fn {key, removed_tags}, + {deletes, kd_acc} -> + case Map.get(kd_acc, key) do + nil -> + {deletes, kd_acc} + + %{tags: current_tags, msg: msg} -> + remaining_tags = MapSet.difference(current_tags, removed_tags) + + if MapSet.size(remaining_tags) == 0 do + # No remaining tags - key should be deleted + {[{key, msg} | deletes], Map.delete(kd_acc, key)} + else + # Still has other tags - update key_data but don't delete + {deletes, Map.put(kd_acc, key, %{tags: remaining_tags, msg: msg})} + end + end + end) + + # Generate synthetic delete messages + synthetic_deletes = + Enum.map(keys_to_delete, fn {key, original_msg} -> + %Message.ChangeMessage{ + key: key, + value: original_msg.value, + old_value: nil, + headers: + Message.Headers.delete( + relation: original_msg.headers.relation, + handle: original_msg.headers.handle + ), + request_timestamp: request_timestamp + } + end) + + {synthetic_deletes, updated_tag_to_keys, updated_key_data} + end + defimpl Enumerable do alias Electric.Client diff --git a/packages/elixir-client/test/electric/client/message_test.exs b/packages/elixir-client/test/electric/client/message_test.exs index 3c4d32203f..8fbd16c8fc 100644 --- a/packages/elixir-client/test/electric/client/message_test.exs +++ b/packages/elixir-client/test/electric/client/message_test.exs @@ -2,31 +2,174 @@ defmodule Electric.Client.MessageTest do use ExUnit.Case, async: true alias Electric.Client.Message - alias Electric.Client.Message.ControlMessage + alias Electric.Client.Message.{ChangeMessage, ControlMessage, MoveOutMessage} + + # request_timestamp + @ts ~U[2024-01-15 10:30:00Z] describe "ControlMessage" do test "up-to-date" do assert [%ControlMessage{control: :up_to_date}] = - Message.parse(%{"headers" => %{"control" => "up-to-date"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "up-to-date"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :up_to_date}] = - Message.parse(%{headers: %{control: "up-to-date"}}, "handle", & &1) + Message.parse(%{headers: %{control: "up-to-date"}}, "handle", & &1, @ts) end test "must-refetch" do assert [%ControlMessage{control: :must_refetch}] = - Message.parse(%{"headers" => %{"control" => "must-refetch"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "must-refetch"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :must_refetch}] = - Message.parse(%{headers: %{control: "must-refetch"}}, "handle", & &1) + Message.parse(%{headers: %{control: "must-refetch"}}, "handle", & &1, @ts) end test "snapshot-end" do assert [%ControlMessage{control: :snapshot_end}] = - Message.parse(%{"headers" => %{"control" => "snapshot-end"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "snapshot-end"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :snapshot_end}] = - Message.parse(%{headers: %{control: "snapshot-end"}}, "handle", & &1) + Message.parse(%{headers: %{control: "snapshot-end"}}, "handle", & &1, @ts) + end + end + + describe "MoveOutMessage" do + test "parses move-out with string keys" do + msg = %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-hash-abc"}] + } + } + + assert [%MoveOutMessage{patterns: [%{pos: 0, value: "tag-hash-abc"}], handle: "my-handle"}] = + Message.parse(msg, "my-handle", & &1, @ts) + end + + test "parses move-out with atom keys" do + msg = %{ + headers: %{ + event: "move-out", + patterns: [%{pos: 0, value: "tag-hash-xyz"}] + } + } + + assert [%MoveOutMessage{patterns: [%{pos: 0, value: "tag-hash-xyz"}], handle: "my-handle"}] = + Message.parse(msg, "my-handle", & &1, @ts) + end + + test "parses move-out with multiple patterns" do + msg = %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [ + %{"pos" => 0, "value" => "tag-1"}, + %{"pos" => 1, "value" => "tag-2"}, + %{"pos" => 2, "value" => "tag-3"} + ] + } + } + + assert [ + %MoveOutMessage{ + patterns: [ + %{pos: 0, value: "tag-1"}, + %{pos: 1, value: "tag-2"}, + %{pos: 2, value: "tag-3"} + ] + } + ] = Message.parse(msg, "handle", & &1, @ts) + end + end + + describe "Headers with tags" do + test "parses headers with tags" do + msg = %{ + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == ["tag-a", "tag-b"] + end + + test "parses headers with removed_tags" do + msg = %{ + "headers" => %{"operation" => "update", "removed_tags" => ["old-tag"]}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.removed_tags == ["old-tag"] + end + + test "parses headers with both tags and removed_tags" do + msg = %{ + "headers" => %{ + "operation" => "update", + "tags" => ["new-tag"], + "removed_tags" => ["old-tag"] + }, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == ["new-tag"] + assert headers.removed_tags == ["old-tag"] + end + + test "defaults tags and removed_tags to empty lists" do + msg = %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == [] + assert headers.removed_tags == [] + end + end + + describe "ChangeMessage" do + test "parses insert with tags in headers" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "value" => %{"id" => "123", "name" => "test"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "my-handle", & &1, @ts) + assert change.key == "row-key" + assert change.value == %{"id" => "123", "name" => "test"} + assert change.headers.operation == :insert + assert change.headers.tags == ["my-tag"] + assert change.headers.handle == "my-handle" + end + + test "parses update with old_value" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "update", "tags" => ["tag-1"]}, + "value" => %{"id" => "123", "name" => "updated"}, + "old_value" => %{"name" => "original"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "handle", & &1, @ts) + assert change.headers.operation == :update + assert change.value == %{"id" => "123", "name" => "updated"} + assert change.old_value == %{"name" => "original"} + end + + test "parses delete" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "delete", "tags" => ["tag-1"]}, + "value" => %{"id" => "123"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "handle", & &1, @ts) + assert change.headers.operation == :delete + assert change.value == %{"id" => "123"} end end end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 71a8c4e2b4..bdbfa3ff17 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -35,6 +35,17 @@ defmodule Electric.ClientTest do [bypass: Bypass.open()] end + # Drains the stream by repeatedly calling next/1 until :halt is returned. + # Returns the final stream struct so we can inspect internal state. + defp drain_stream(stream) do + alias Electric.Client.Stream + + case Stream.next(stream) do + {:halt, stream} -> stream + {_msgs, stream} -> drain_stream(stream) + end + end + describe "new" do test ":base_url is used as the base of the endpoint" do endpoint = URI.new!("http://localhost:3000/v1/shape") @@ -884,7 +895,7 @@ defmodule Electric.ClientTest do connect_options: [timeout: 100, protocols: [:http1]], retry_delay: fn _n -> 50 end, retry_log_level: false, - max_retries: 10 + max_retries: 15 ] ]} ) @@ -1361,6 +1372,885 @@ defmodule Electric.ClientTest do end end + describe "move-out handling" do + setup [:start_bypass, :bypass_client, :shape_definition] + + test "tag index stays empty when messages have no tags", ctx do + # When insert messages have no "tags" header, the tag index should remain empty. + # This is important because we shouldn't track rows that don't participate in + # move-out semantics. + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{ + "key" => "row-2", + "headers" => %{"operation" => "insert"}, + "offset" => "1_1", + "value" => %{"id" => "2222"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_1", + schema: schema + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Use live: false so the stream halts after up-to-date + stream = Client.stream(ctx.client, ctx.shape, live: false) + final_stream = drain_stream(stream) + + # The tag index should be empty since no messages had tags + assert final_stream.tag_to_keys == %{}, + "tag_to_keys should be empty when no tags present, got: #{inspect(final_stream.tag_to_keys)}" + + assert final_stream.key_data == %{}, + "key_data should be empty when no tags present, got: #{inspect(final_stream.key_data)}" + end + + test "receives move-out and generates synthetic deletes", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-abc"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "name" => "test"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-abc"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect 4 messages: insert, up-to-date, synthetic delete, up-to-date + msgs = stream(ctx, 4) + + assert [ + %ChangeMessage{headers: %{operation: :insert}, value: %{"id" => "1111"}}, + up_to_date(9998), + %ChangeMessage{ + headers: %{operation: :delete}, + value: %{"id" => "1111", "name" => "test"} + }, + up_to_date(9999) + ] = msgs + end + + test "move-out with multiple matching rows generates multiple deletes", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["shared-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{ + "key" => "row-2", + "headers" => %{"operation" => "insert", "tags" => ["shared-tag"]}, + "offset" => "1_1", + "value" => %{"id" => "2222"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "shared-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_1", + schema: schema + ) + ], + {"1_1", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect messages: 2 inserts, up-to-date, 2 synthetic deletes, up-to-date + msgs = stream(ctx, 6) + + insert_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :insert}}, &1)) + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert length(insert_msgs) == 2 + assert length(delete_msgs) == 2 + + delete_ids = Enum.map(delete_msgs, & &1.value["id"]) |> Enum.sort() + assert delete_ids == ["1111", "2222"] + end + + test "move-out with non-matching pattern is no-op", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-A"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-B"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Should only get: insert, up-to-date, up-to-date (no synthetic delete) + msgs = stream(ctx, 3) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + assert delete_msgs == [] + + assert [ + %ChangeMessage{headers: %{operation: :insert}}, + up_to_date(9998), + up_to_date(9999) + ] = msgs + end + + test "tag index tracks updates with removed_tags", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["old-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + "tags" => ["new-tag"], + "removed_tags" => ["old-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "old-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, update, up-to-date, up-to-date (no delete because row moved to new tag) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + assert delete_msgs == [] + end + + test "delete removes row from tag index", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "delete", "tags" => ["my-tag"]}, + "offset" => "2_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, delete, up-to-date, up-to-date (no synthetic delete) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Only 1 delete (the real one), not 2 (no synthetic delete after move-out) + assert length(delete_msgs) == 1 + assert hd(delete_msgs).value["id"] == "1111" + end + + test "update without removed_tags deduplicates synthetic deletes", ctx do + # Edge case: when a row is updated with the same tag but no removed_tags, + # the tag_index should deduplicate entries so only one synthetic delete is generated + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "version" => "1"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + # Same tag, but no removed_tags - this is the problematic case + "tags" => ["my-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "version" => "2"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "version" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, update, up-to-date, synthetic delete, up-to-date + # Should generate only 1 synthetic delete (deduplicated by key) + msgs = stream(ctx, 6) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Verifies deduplication: only 1 synthetic delete despite multiple tag_index entries + assert length(delete_msgs) == 1, + "Expected 1 synthetic delete but got #{length(delete_msgs)} - duplicate entries in tag_index" + end + + test "row with multiple tags - partial move-out should not delete if other tags remain", + ctx do + # Edge case: row has multiple tags, move-out for one tag shouldn't delete + # if the row still belongs to the shape via another tag + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + # Only moving out tag-a, row still has tag-b + "patterns" => [%{"pos" => 0, "value" => "tag-a"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, up-to-date + # BUG: Currently generates a synthetic delete even though row still has tag-b + # EXPECTED: No synthetic delete since row still belongs via tag-b + msgs = stream(ctx, 3) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # This documents expected behavior - row should NOT be deleted + # If this fails, it confirms the bug that partial move-out incorrectly deletes + assert delete_msgs == [], + "Row with multiple tags should not be deleted when only one tag is moved out" + end + + test "synthetic delete uses latest value after update", ctx do + # Edge case: synthetic delete should use the most recent value, not stale data + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "name" => "original"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + "tags" => ["my-tag"], + "removed_tags" => ["my-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + msgs = stream(ctx, 6) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert length(delete_msgs) == 1 + [delete] = delete_msgs + + # Synthetic delete should have the latest value, not the original + assert delete.value["name"] == "updated", + "Synthetic delete should use latest value, got: #{inspect(delete.value)}" + end + + test "multiple patterns matching same row generates single delete", ctx do + # Edge case: move-out with multiple patterns that both match the same row + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + # Both patterns match the same row + "patterns" => [ + %{"pos" => 0, "value" => "tag-a"}, + %{"pos" => 1, "value" => "tag-b"} + ] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, synthetic delete, up-to-date + msgs = stream(ctx, 4) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Should only generate 1 delete, not 2 + assert length(delete_msgs) == 1, + "Multiple patterns matching same row should generate single delete, got #{length(delete_msgs)}" + end + + test "update removing all tags should clear tag index so move-out is a no-op", ctx do + # This test demonstrates the stale tag-index entry bug: + # When a row is updated to remove ALL its tags (with removed_tags but no new tags), + # the tag-index entry should be cleared. A subsequent move-out for the old tag + # should NOT generate a synthetic delete since the row is no longer in the tag index. + # + # EXPECTED: move-out should be a no-op (0 deletes) + # CURRENT BUG: move-out generates 1 synthetic delete because the tag-index + # entry wasn't properly cleared when all tags were removed. + + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-A"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + # Remove the old tag but add NO new tags + "removed_tags" => ["tag-A"] + # Note: no "tags" field, meaning this row now has zero tags + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-A"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect messages: insert, up-to-date, update, up-to-date, up-to-date (no delete expected) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # The move-out should NOT generate a synthetic delete because: + # 1. The row was originally inserted with "tag-A" + # 2. The update removed "tag-A" (via removed_tags) and added NO new tags + # 3. The tag-index should have been cleared for this row + # 4. The move-out for "tag-A" should find no matching rows + assert delete_msgs == [], + "Move-out should be a no-op when all tags were removed from row, but got #{length(delete_msgs)} delete(s)" + end + + test "resume preserves move-out state - move-out after resume generates synthetic delete", + ctx do + # When resuming a stream, the tag index state should be preserved so that + # move-out events after resume can still generate synthetic deletes. + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # First, stream with live: false to get a ResumeMessage + msgs = stream(ctx, live: false) |> Enum.to_list() + + resume_msg = Enum.find(msgs, &match?(%ResumeMessage{}, &1)) + assert resume_msg != nil + + # Now set up for resumed stream with a move-out + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + {:ok, responses2} = + start_supervised( + {Agent, + fn -> + %{ + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end}, + id: :responses2 + ) + + bypass_response(ctx, responses2) + + # Resume the stream - with proper move-out support, the tag index should be restored + resumed_msgs = stream(ctx, resume: resume_msg, live: false) |> Enum.to_list() + + # The move-out SHOULD generate a synthetic delete for the row + delete_msgs = + Enum.filter(resumed_msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # After resume, move-out should still generate synthetic deletes + assert length(delete_msgs) == 1, + "After resume, move-out should generate synthetic delete, got: #{inspect(resumed_msgs)}" + + [delete] = delete_msgs + assert delete.value["id"] == "1111" + end + end + defp bypass_response_endpoint(ctx, responses, opts) do path = Keyword.get(opts, :path, "/v1/shape") parent = self() diff --git a/packages/sync-service/test/integration/subquery_move_out_test.exs b/packages/sync-service/test/integration/subquery_move_out_test.exs new file mode 100644 index 0000000000..ffacbbe1ea --- /dev/null +++ b/packages/sync-service/test/integration/subquery_move_out_test.exs @@ -0,0 +1,382 @@ +defmodule Electric.Integration.SubqueryMoveOutTest do + @moduledoc """ + Integration tests for subquery move-out functionality. + + These tests verify that the Elixir client correctly handles: + 1. Tags on change messages (indicating why a row belongs to the shape) + 2. Move-out control messages (when dependency values are removed) + 3. Synthetic delete generation from move-out patterns + """ + use ExUnit.Case, async: false + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + import Support.IntegrationSetup + import Support.StreamConsumer + + alias Electric.Client + alias Electric.Client.ShapeDefinition + alias Electric.Client.Message.ChangeMessage + + @moduletag :tmp_dir + + # Shape definition for child table filtered by active parents + @subquery_where "parent_id IN (SELECT id FROM parent WHERE active = true)" + + describe "subquery move-out with parent/child tables" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "change messages include tags for subquery-matched rows", %{client: client, shape: shape} do + stream = Client.stream(client, shape, live: false) + + with_consumer stream do + insert = assert_insert(consumer, %{"id" => "child-1"}) + assert %{headers: %{tags: [_]}} = insert + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "receives move-out control message when parent is deactivated", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Deactivate the parent - this should trigger a move-out + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Should receive a synthetic delete for child-1 + assert_delete(consumer, %{"id" => "child-1"}) + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'value 1')", + "INSERT INTO child (id, parent_id, value) VALUES ('child-2', 'parent-1', 'value 2')" + ] + test "move-out generates synthetic deletes for all affected child rows", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_insert(consumer, %{"id" => "child-2"}) + assert_up_to_date(consumer) + + # Deactivate the parent + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Wait for both synthetic deletes + {:ok, deletes} = + await_count(consumer, 2, + match: fn msg -> + match?(%ChangeMessage{headers: %{operation: :delete}}, msg) + end + ) + + delete_ids = Enum.map(deletes, & &1.value["id"]) |> Enum.sort() + assert delete_ids == ["child-1", "child-2"] + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "deleting parent row triggers move-out", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Delete the parent row + Postgrex.query!(db_conn, "DELETE FROM parent WHERE id = 'parent-1'", []) + + # Should receive a synthetic delete for the child + assert_delete(consumer, %{"id" => "child-1"}) + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'belongs to parent-1')" + ] + test "move-in after row becomes visible through different parent", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Deactivate parent-1 + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Wait for the move-out (synthetic delete) + assert_delete(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Change child to reference parent-2 (which is still active) + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive a new insert (move-in) for the child + assert_insert(consumer, %{"id" => "child-1", "parent_id" => "parent-2"}) + end + end + end + + describe "tag handling during updates" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'initial')" + ] + test "update that changes parent reference updates tags", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + initial = assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Store initial tags + initial_tags = Map.get(initial.headers, :tags, []) + + # Change child to reference parent-2 + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive an update with new tags + update_msg = assert_update(consumer, %{"id" => "child-1"}) + + # The headers should include both removed_tags and new tags + new_tags = Map.get(update_msg.headers, :tags, []) + removed_tags = Map.get(update_msg.headers, :removed_tags, []) + + # Either we have explicit removed_tags, or tags have changed + assert new_tags != initial_tags or length(removed_tags) > 0 + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'initial')" + ] + test "deactivating old parent after child changed parents should not generate delete", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + # This tests that the tag index is properly updated when a row's tags change. + # If the client doesn't clear stale tag entries, deactivating the OLD parent + # would incorrectly generate a synthetic delete even though the child is + # still in the shape (via the new parent). + + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot - child-1 is in shape via parent-1 + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Change child to reference parent-2 (also active) + # This should update the tag index: remove parent-1's tag, add parent-2's tag + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive an update (child is still in shape, just via different parent) + assert_update(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Now deactivate parent-1 - this triggers a move-out for parent-1's tag + # Since child-1 no longer has parent-1's tag (it was removed in the update), + # this should NOT generate a synthetic delete + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Collect any messages that arrive - we should NOT see a delete for child-1 + messages = collect_messages(consumer, timeout: 500) + + delete_msgs = + Enum.filter(messages, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert delete_msgs == [], + "Should not generate synthetic delete for child-1 after old parent deactivated. " <> + "The tag index should have been updated when child changed parents. " <> + "Got: #{inspect(delete_msgs)}" + end + end + end + + describe "tag consistency across streams" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test')" + ] + test "fresh streams have consistent tags", %{client: client, shape: shape} do + # Get initial data with tags from first stream + stream1 = Client.stream(client, shape, live: false) + + msg1 = + with_consumer stream1 do + assert_insert(consumer, %{"id" => "child-1"}) + end + + # Get data from a second fresh stream + stream2 = Client.stream(client, shape, live: false) + + msg2 = + with_consumer stream2 do + assert_insert(consumer, %{"id" => "child-1"}) + end + + # Both streams should report consistent tags for the same row + tags1 = Map.get(msg1.headers, :tags) + tags2 = Map.get(msg2.headers, :tags) + + # Tags must be present and consistent + assert tags1 != nil + assert tags1 == tags2 + end + end + + describe "resume preserves move-out state" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "move-out after resume generates synthetic delete", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + # First, stream with live: false to get a ResumeMessage + # This simulates a client that synced initial data and then disconnected + stream1 = Client.stream(client, shape, live: false) + + resume_msg = + with_consumer stream1 do + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + assert_resume(consumer) + end + + # Now deactivate the parent while "disconnected" + # This should trigger a move-out on the server side + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Resume the stream - with proper move-out support, the client should + # receive a synthetic delete for child-1 because its parent was deactivated + stream2 = Client.stream(client, shape, live: false, resume: resume_msg) + + with_consumer stream2 do + assert_delete(consumer, %{"id" => "child-1"}) + end + end + end + + # Helper to set up parent/child tables for subquery tests + def with_parent_child_tables(%{db_conn: conn} = _context) do + statements = [ + """ + CREATE TABLE parent ( + id TEXT PRIMARY KEY, + active BOOLEAN NOT NULL DEFAULT true + ) + """, + """ + CREATE TABLE child ( + id TEXT PRIMARY KEY, + parent_id TEXT NOT NULL REFERENCES parent(id) ON DELETE CASCADE, + value TEXT NOT NULL + ) + """ + ] + + Enum.each(statements, &Postgrex.query!(conn, &1, [])) + + %{tables: [{"public", "parent"}, {"public", "child"}]} + end +end