diff --git a/.changeset/beige-ladybugs-give.md b/.changeset/beige-ladybugs-give.md new file mode 100644 index 0000000000..9457104261 --- /dev/null +++ b/.changeset/beige-ladybugs-give.md @@ -0,0 +1,6 @@ +--- +'@core/elixir-client': patch +'@core/sync-service': patch +--- + +Add Move-Out Support for Subqueries in Elixir Client diff --git a/.github/workflows/sync_service_tests.yml b/.github/workflows/sync_service_tests.yml index b1559de77f..9fa77be1d2 100644 --- a/.github/workflows/sync_service_tests.yml +++ b/.github/workflows/sync_service_tests.yml @@ -42,6 +42,7 @@ jobs: --health-retries 5 ports: - 54321:5432 + command: postgres -c max_connections=200 pgbouncer: image: bitnamilegacy/pgbouncer:latest diff --git a/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex b/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex index 788c677c62..0bbf6d273b 100644 --- a/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex +++ b/packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex @@ -1,92 +1,94 @@ -defmodule Electric.Client.EctoAdapter.ArrayDecoder do - alias Electric.Client.EctoAdapter +if Code.ensure_loaded?(Ecto) do + defmodule Electric.Client.EctoAdapter.ArrayDecoder do + alias Electric.Client.EctoAdapter - def decode!("{}", _type), do: [] + def decode!("{}", _type), do: [] - def decode!(encoded_array, type) when is_binary(encoded_array) do - {"", [result]} = - decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array}) + def decode!(encoded_array, type) when is_binary(encoded_array) do + {"", [result]} = + decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array}) - result - end + result + end - ############################## + ############################## - defp decode_array("", acc, _state) do - {"", :lists.reverse(acc)} - end + defp decode_array("", acc, _state) do + {"", :lists.reverse(acc)} + end - defp decode_array(<<"{}", rest::bitstring>>, acc, state) do - decode_array(rest, [[] | acc], state) - end + defp decode_array(<<"{}", rest::bitstring>>, acc, state) do + decode_array(rest, [[] | acc], state) + end - defp decode_array(<<"{", rest::bitstring>>, acc, state) do - {rest, array} = decode_array(rest, [], state) - decode_array(rest, [array | acc], state) - end + defp decode_array(<<"{", rest::bitstring>>, acc, state) do + {rest, array} = decode_array(rest, [], state) + decode_array(rest, [array | acc], state) + end - defp decode_array(<<"}", rest::bitstring>>, acc, _state) do - {rest, :lists.reverse(acc)} - end + defp decode_array(<<"}", rest::bitstring>>, acc, _state) do + {rest, :lists.reverse(acc)} + end - defp decode_array(<>, acc, state) do - decode_array(rest, acc, state) - end + defp decode_array(<>, acc, state) do + decode_array(rest, acc, state) + end - defp decode_array(<>, acc, state) do - {rest, elem} = decode_quoted_elem(rest, [], state) - decode_array(rest, [elem | acc], state) - end + defp decode_array(<>, acc, state) do + {rest, elem} = decode_quoted_elem(rest, [], state) + decode_array(rest, [elem | acc], state) + end - defp decode_array(rest, acc, state) do - {rest, elem} = decode_elem(rest, [], state) - decode_array(rest, [elem | acc], state) - end + defp decode_array(rest, acc, state) do + {rest, elem} = decode_elem(rest, [], state) + decode_array(rest, [elem | acc], state) + end - ############################## + ############################## - defp decode_elem(<<",", _::bitstring>> = rest, acc, state), - do: {rest, cast(acc, state)} + defp decode_elem(<<",", _::bitstring>> = rest, acc, state), + do: {rest, cast(acc, state)} - defp decode_elem(<<"}", _::bitstring>> = rest, acc, state), - do: {rest, cast(acc, state)} + defp decode_elem(<<"}", _::bitstring>> = rest, acc, state), + do: {rest, cast(acc, state)} - defp decode_elem(<>, acc, state), - do: decode_elem(rest, [acc | <>], state) + defp decode_elem(<>, acc, state), + do: decode_elem(rest, [acc | <>], state) - defp decode_elem("", _acc, {_cast_fun, type, source}) do - raise Ecto.CastError, type: {:array, type}, value: source - end + defp decode_elem("", _acc, {_cast_fun, type, source}) do + raise Ecto.CastError, type: {:array, type}, value: source + end - ############################## + ############################## - defp decode_quoted_elem(<>, acc, state), - do: {rest, cast_quoted(acc, state)} + defp decode_quoted_elem(<>, acc, state), + do: {rest, cast_quoted(acc, state)} - defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state), - do: decode_quoted_elem(rest, [acc | [?"]], state) + defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state), + do: decode_quoted_elem(rest, [acc | [?"]], state) - defp decode_quoted_elem(<>, acc, state), - do: decode_quoted_elem(rest, [acc | <>], state) + defp decode_quoted_elem(<>, acc, state), + do: decode_quoted_elem(rest, [acc | <>], state) - defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do - raise Ecto.CastError, type: {:array, type}, value: source - end + defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do + raise Ecto.CastError, type: {:array, type}, value: source + end - ############################## + ############################## - defp cast(iodata, {cast_fun, _type, _source}) do - iodata - |> IO.iodata_to_binary() - |> case do - "NULL" -> nil - value -> cast_fun.(value) + defp cast(iodata, {cast_fun, _type, _source}) do + iodata + |> IO.iodata_to_binary() + |> case do + "NULL" -> nil + value -> cast_fun.(value) + end end - end - defp cast_quoted(iodata, {cast_fun, _type, _source}) do - iodata - |> IO.iodata_to_binary() - |> cast_fun.() + defp cast_quoted(iodata, {cast_fun, _type, _source}) do + iodata + |> IO.iodata_to_binary() + |> cast_fun.() + end end end diff --git a/packages/elixir-client/lib/electric/client/message.ex b/packages/elixir-client/lib/electric/client/message.ex index 6fcfeb2dd0..1be05998aa 100644 --- a/packages/elixir-client/lib/electric/client/message.ex +++ b/packages/elixir-client/lib/electric/client/message.ex @@ -5,19 +5,31 @@ defmodule Electric.Client.Message do alias Electric.Client.Offset defmodule Headers do - defstruct [:operation, :relation, :handle, :lsn, txids: [], op_position: 0] + defstruct [ + :operation, + :relation, + :handle, + :lsn, + txids: [], + op_position: 0, + tags: [], + removed_tags: [] + ] @type operation :: :insert | :update | :delete @type relation :: [String.t(), ...] @type lsn :: binary() @type txids :: [pos_integer(), ...] | nil + @type tag :: String.t() @type t :: %__MODULE__{ operation: operation(), relation: relation(), handle: Client.shape_handle(), lsn: lsn(), txids: txids(), - op_position: non_neg_integer() + op_position: non_neg_integer(), + tags: [tag()], + removed_tags: [tag()] } @doc false @@ -30,7 +42,9 @@ defmodule Electric.Client.Message do handle: handle, txids: Map.get(msg, "txids", []), lsn: Map.get(msg, "lsn", nil), - op_position: Map.get(msg, "op_position", 0) + op_position: Map.get(msg, "op_position", 0), + tags: Map.get(msg, "tags", []), + removed_tags: Map.get(msg, "removed_tags", []) } end @@ -56,23 +70,27 @@ defmodule Electric.Client.Message do def from_message( %{"headers" => %{"control" => control} = headers}, - handle + handle, + request_timestamp ) do %__MODULE__{ control: control_atom(control), global_last_seen_lsn: global_last_seen_lsn(headers), - handle: handle + handle: handle, + request_timestamp: request_timestamp } end def from_message( %{headers: %{control: control} = headers}, - handle + handle, + request_timestamp ) do %__MODULE__{ control: control_atom(control), global_last_seen_lsn: global_last_seen_lsn(headers), - handle: handle + handle: handle, + request_timestamp: request_timestamp } end @@ -108,7 +126,7 @@ defmodule Electric.Client.Message do require Logger - def from_message(msg, handle, value_mapping_fun) do + def from_message(msg, handle, value_mapping_fun, request_timestamp) do %{ "headers" => headers, "value" => raw_value @@ -120,7 +138,8 @@ defmodule Electric.Client.Message do key: msg["key"], headers: Headers.from_message(headers, handle), value: value, - old_value: old_value(msg, value_mapping_fun) + old_value: old_value(msg, value_mapping_fun), + request_timestamp: request_timestamp } end @@ -168,30 +187,108 @@ defmodule Electric.Client.Message do @enforce_keys [:shape_handle, :offset, :schema] - defstruct [:shape_handle, :offset, :schema] + defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}] @type t :: %__MODULE__{ shape_handle: Client.shape_handle(), offset: Offset.t(), - schema: Client.schema() + schema: Client.schema(), + tag_to_keys: %{String.t() => MapSet.t(String.t())}, + key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}} } end + defmodule MoveOutMessage do + @moduledoc """ + Represents a move-out event from the server. + + Move-out events are sent when rows should be removed from the client's view + because they no longer match the shape's subquery filter. The `patterns` field + contains tag hashes that identify which rows should be removed. + + The client should use these patterns to generate synthetic delete messages + for any tracked rows that have matching tags. + """ + + defstruct [:patterns, :handle, :request_timestamp] + + @type pattern :: %{pos: non_neg_integer(), value: String.t()} + @type t :: %__MODULE__{ + patterns: [pattern()], + handle: Client.shape_handle(), + request_timestamp: DateTime.t() + } + + def from_message( + %{"headers" => %{"event" => "move-out", "patterns" => patterns}}, + handle, + request_timestamp + ) do + %__MODULE__{ + patterns: normalize_patterns(patterns), + handle: handle, + request_timestamp: request_timestamp + } + end + + def from_message( + %{headers: %{event: "move-out", patterns: patterns}}, + handle, + request_timestamp + ) do + %__MODULE__{ + patterns: normalize_patterns(patterns), + handle: handle, + request_timestamp: request_timestamp + } + end + + defp normalize_patterns(patterns) do + Enum.map(patterns, fn + %{"pos" => pos, "value" => value} -> %{pos: pos, value: value} + %{pos: _, value: _} = pattern -> pattern + end) + end + end + defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert - def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun) do - [ChangeMessage.from_message(msg, shape_handle, value_mapper_fun)] + def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do + [ChangeMessage.from_message(msg, shape_handle, value_mapper_fun, request_timestamp)] + end + + def parse( + %{"headers" => %{"control" => _}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [ControlMessage.from_message(msg, shape_handle, request_timestamp)] + end + + def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun, request_timestamp) do + [ControlMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse(%{"headers" => %{"control" => _}} = msg, shape_handle, _value_mapper_fun) do - [ControlMessage.from_message(msg, shape_handle)] + def parse( + %{"headers" => %{"event" => "move-out"}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun) do - [ControlMessage.from_message(msg, shape_handle)] + def parse( + %{headers: %{event: "move-out"}} = msg, + shape_handle, + _value_mapper_fun, + request_timestamp + ) do + [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] end - def parse("", _handle, _value_mapper_fun) do + def parse("", _handle, _value_mapper_fun, _request_timestamp) do [] end end diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 5da032a990..8bb818fbac 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -19,7 +19,12 @@ defmodule Electric.Client.Stream do shape_handle: nil, next_cursor: nil, state: :init, - opts: %{} + opts: %{}, + # Move-out support: tracks tags per key and keys per tag + # tag_to_keys: %{tag_value => MapSet} - which keys have this tag + # key_data: %{key => %{tags: MapSet, msg: msg}} - each key's tags and latest message + tag_to_keys: %{}, + key_data: %{} ] @external_options [ @@ -89,7 +94,9 @@ defmodule Electric.Client.Stream do } @type t :: %__MODULE__{ + id: integer(), client: Client.t(), + shape: Client.shape() | nil, schema: Client.schema(), value_mapper_fun: Client.ValueMapper.mapper_fun(), parser: nil | {module(), term()}, @@ -98,8 +105,11 @@ defmodule Electric.Client.Stream do offset: Client.offset(), replica: Client.replica(), shape_handle: nil | Client.shape_handle(), + next_cursor: binary() | nil, state: :init | :stream | :done, - opts: opts() + opts: opts(), + tag_to_keys: %{optional(term()) => MapSet.t()}, + key_data: %{optional(term()) => %{tags: MapSet.t(), msg: Message.ChangeMessage.t()}} } alias __MODULE__, as: S @@ -182,8 +192,7 @@ defmodule Electric.Client.Stream do resp.body |> ensure_enum() - |> Enum.flat_map(&Message.parse(&1, shape_handle, value_mapper_fun)) - |> Enum.map(&Map.put(&1, :request_timestamp, resp.request_timestamp)) + |> Enum.flat_map(&Message.parse(&1, shape_handle, value_mapper_fun, resp.request_timestamp)) |> Enum.reduce_while(stream, &handle_msg/2) |> dispatch() end @@ -198,14 +207,19 @@ defmodule Electric.Client.Stream do stream |> reset(handle) - |> buffer(Enum.flat_map(resp.body, &Message.parse(&1, handle, value_mapper_fun))) + |> buffer( + Enum.flat_map( + resp.body, + &Message.parse(&1, handle, value_mapper_fun, resp.request_timestamp) + ) + ) |> dispatch() end defp handle_response({:error, %Fetch.Response{} = resp}, stream) do %Fetch.Response{body: body} = resp - handle_error(%Client.Error{message: unwrap(body), resp: resp}, stream) + handle_error(%Client.Error{message: unwrap_error(body), resp: resp}, stream) end defp handle_response({:error, error}, stream) do @@ -221,9 +235,31 @@ defmodule Electric.Client.Stream do end defp handle_msg(%Message.ChangeMessage{} = msg, stream) do + stream = update_tag_index(stream, msg) {:cont, %{stream | buffer: :queue.in(msg, stream.buffer)}} end + defp handle_msg( + %Message.MoveOutMessage{patterns: patterns, request_timestamp: request_timestamp} = _msg, + stream + ) do + # Assumption: move-out events are only emitted after the initial snapshot is complete. + # We therefore apply them immediately and do not buffer for later inserts. + + # Generate synthetic deletes for rows matching the move-out patterns + {synthetic_deletes, updated_tag_to_keys, updated_key_data} = + generate_synthetic_deletes(stream, patterns, request_timestamp) + + # Add synthetic deletes to the buffer + buffer = + Enum.reduce(synthetic_deletes, stream.buffer, fn delete_msg, buf -> + :queue.in(delete_msg, buf) + end) + + {:cont, + %{stream | buffer: buffer, tag_to_keys: updated_tag_to_keys, key_data: updated_key_data}} + end + defp handle_up_to_date(%{opts: %{live: true}} = stream) do {:cont, stream} end @@ -232,15 +268,18 @@ defmodule Electric.Client.Stream do resume_message = %Message.ResumeMessage{ schema: stream.schema, offset: stream.offset, - shape_handle: stream.shape_handle + shape_handle: stream.shape_handle, + tag_to_keys: stream.tag_to_keys, + key_data: stream.key_data } {:halt, %{stream | buffer: :queue.in(resume_message, stream.buffer), state: :done}} end - defp unwrap([msg]), do: msg - defp unwrap([_ | _] = msgs), do: msgs - defp unwrap(msg), do: msg + defp unwrap_error([]), do: "Unknown error" + defp unwrap_error([msg]), do: msg + defp unwrap_error([_ | _] = msgs), do: msgs + defp unwrap_error(msg), do: msg defp handle_error(error, %{opts: %{errors: :stream}} = stream) do %{stream | buffer: :queue.in(error, stream.buffer), state: :done} @@ -301,7 +340,9 @@ defmodule Electric.Client.Stream do up_to_date?: false, buffer: :queue.new(), schema: nil, - value_mapper_fun: nil + value_mapper_fun: nil, + tag_to_keys: %{}, + key_data: %{} } end @@ -355,11 +396,22 @@ defmodule Electric.Client.Stream do defp resume(%{opts: %{resume: %Message.ResumeMessage{} = resume}} = stream) do %{shape_handle: shape_handle, offset: offset, schema: schema} = resume + tag_to_keys = Map.get(resume, :tag_to_keys, %{}) + key_data = Map.get(resume, :key_data, %{}) + + stream = %{ + stream + | shape_handle: shape_handle, + offset: offset, + tag_to_keys: tag_to_keys, + key_data: key_data, + up_to_date?: true + } if schema do - generate_value_mapper(schema, %{stream | shape_handle: shape_handle, offset: offset}) + generate_value_mapper(schema, stream) else - %{stream | shape_handle: shape_handle, offset: offset} + stream end end @@ -371,6 +423,169 @@ defmodule Electric.Client.Stream do %{stream | state: :stream} end + # Tag index management for move-out support + # + # We maintain two data structures: + # - tag_to_keys: %{tag_value => MapSet} - which keys have each tag + # - key_data: %{key => %{tags: MapSet, msg: msg}} - each key's current tags and latest message + # + # This allows us to: + # 1. Avoid duplicate entries when a row is updated (we update the msg, not add a new entry) + # 2. Check if a row still has other tags before generating a synthetic delete + + defp update_tag_index(stream, %Message.ChangeMessage{headers: headers, key: key} = msg) do + %{tag_to_keys: tag_to_keys, key_data: key_data} = stream + new_tags = headers.tags || [] + removed_tags = headers.removed_tags || [] + + # Get current data for this key + current_data = Map.get(key_data, key) + current_tags = if current_data, do: current_data.tags, else: MapSet.new() + + # Calculate the new set of tags for this key + updated_tags = + current_tags + |> MapSet.difference(MapSet.new(removed_tags)) + |> MapSet.union(MapSet.new(new_tags)) + + # For deletes, remove the key entirely + {_final_tags, final_key_data, final_tag_to_keys} = + case headers.operation do + :delete -> + # Remove key from all its tags in tag_to_keys + updated_tag_to_keys = + Enum.reduce(updated_tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + + # Remove key from key_data + {MapSet.new(), Map.delete(key_data, key), updated_tag_to_keys} + + _ -> + # If no tags (current or new), don't track this key + if MapSet.size(updated_tags) == 0 do + # Remove key from all its previous tags in tag_to_keys + updated_tag_to_keys = + Enum.reduce(current_tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + + # Remove key from key_data + {MapSet.new(), Map.delete(key_data, key), updated_tag_to_keys} + else + # Update tag_to_keys: remove from old tags, add to new tags + tags_to_remove = MapSet.difference(current_tags, updated_tags) + tags_to_add = MapSet.difference(updated_tags, current_tags) + + updated_tag_to_keys = + tag_to_keys + |> remove_key_from_tags(tags_to_remove, key) + |> add_key_to_tags(tags_to_add, key) + + # Update key_data with new tags and latest message + updated_key_data = Map.put(key_data, key, %{tags: updated_tags, msg: msg}) + + {updated_tags, updated_key_data, updated_tag_to_keys} + end + end + + %{stream | tag_to_keys: final_tag_to_keys, key_data: final_key_data} + end + + defp remove_key_from_tags(tag_to_keys, tags, key) do + Enum.reduce(tags, tag_to_keys, fn tag, acc -> + remove_key_from_tag(acc, tag, key) + end) + end + + defp remove_key_from_tag(tag_to_keys, tag, key) do + case Map.get(tag_to_keys, tag) do + nil -> + tag_to_keys + + keys -> + updated_keys = MapSet.delete(keys, key) + + if MapSet.size(updated_keys) == 0 do + Map.delete(tag_to_keys, tag) + else + Map.put(tag_to_keys, tag, updated_keys) + end + end + end + + defp add_key_to_tags(tag_to_keys, tags, key) do + Enum.reduce(tags, tag_to_keys, fn tag, acc -> + keys = Map.get(acc, tag, MapSet.new()) + Map.put(acc, tag, MapSet.put(keys, key)) + end) + end + + defp generate_synthetic_deletes(stream, patterns, request_timestamp) do + %{tag_to_keys: tag_to_keys, key_data: key_data} = stream + + # Assumption: move-out patterns only include simple tag values; positional matching + # for composite tags is not needed with the current server behavior. + + # First pass: collect all keys that match any pattern and remove those tags + {matched_keys_with_tags, updated_tag_to_keys} = + Enum.reduce(patterns, {%{}, tag_to_keys}, fn %{value: tag_value}, {keys_acc, ttk_acc} -> + case Map.pop(ttk_acc, tag_value) do + {nil, ttk_acc} -> + {keys_acc, ttk_acc} + + {keys_in_tag, ttk_acc} -> + # Track which tags were removed for each key + updated_keys_acc = + Enum.reduce(keys_in_tag, keys_acc, fn key, acc -> + removed_tags = Map.get(acc, key, MapSet.new()) + Map.put(acc, key, MapSet.put(removed_tags, tag_value)) + end) + + {updated_keys_acc, ttk_acc} + end + end) + + # Second pass: for each matched key, update its tags and check if it should be deleted + {keys_to_delete, updated_key_data} = + Enum.reduce(matched_keys_with_tags, {[], key_data}, fn {key, removed_tags}, + {deletes, kd_acc} -> + case Map.get(kd_acc, key) do + nil -> + {deletes, kd_acc} + + %{tags: current_tags, msg: msg} -> + remaining_tags = MapSet.difference(current_tags, removed_tags) + + if MapSet.size(remaining_tags) == 0 do + # No remaining tags - key should be deleted + {[{key, msg} | deletes], Map.delete(kd_acc, key)} + else + # Still has other tags - update key_data but don't delete + {deletes, Map.put(kd_acc, key, %{tags: remaining_tags, msg: msg})} + end + end + end) + + # Generate synthetic delete messages + synthetic_deletes = + Enum.map(keys_to_delete, fn {key, original_msg} -> + %Message.ChangeMessage{ + key: key, + value: original_msg.value, + old_value: nil, + headers: + Message.Headers.delete( + relation: original_msg.headers.relation, + handle: original_msg.headers.handle + ), + request_timestamp: request_timestamp + } + end) + + {synthetic_deletes, updated_tag_to_keys, updated_key_data} + end + defimpl Enumerable do alias Electric.Client diff --git a/packages/elixir-client/test/electric/client/message_test.exs b/packages/elixir-client/test/electric/client/message_test.exs index 3c4d32203f..8fbd16c8fc 100644 --- a/packages/elixir-client/test/electric/client/message_test.exs +++ b/packages/elixir-client/test/electric/client/message_test.exs @@ -2,31 +2,174 @@ defmodule Electric.Client.MessageTest do use ExUnit.Case, async: true alias Electric.Client.Message - alias Electric.Client.Message.ControlMessage + alias Electric.Client.Message.{ChangeMessage, ControlMessage, MoveOutMessage} + + # request_timestamp + @ts ~U[2024-01-15 10:30:00Z] describe "ControlMessage" do test "up-to-date" do assert [%ControlMessage{control: :up_to_date}] = - Message.parse(%{"headers" => %{"control" => "up-to-date"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "up-to-date"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :up_to_date}] = - Message.parse(%{headers: %{control: "up-to-date"}}, "handle", & &1) + Message.parse(%{headers: %{control: "up-to-date"}}, "handle", & &1, @ts) end test "must-refetch" do assert [%ControlMessage{control: :must_refetch}] = - Message.parse(%{"headers" => %{"control" => "must-refetch"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "must-refetch"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :must_refetch}] = - Message.parse(%{headers: %{control: "must-refetch"}}, "handle", & &1) + Message.parse(%{headers: %{control: "must-refetch"}}, "handle", & &1, @ts) end test "snapshot-end" do assert [%ControlMessage{control: :snapshot_end}] = - Message.parse(%{"headers" => %{"control" => "snapshot-end"}}, "handle", & &1) + Message.parse(%{"headers" => %{"control" => "snapshot-end"}}, "handle", & &1, @ts) assert [%ControlMessage{control: :snapshot_end}] = - Message.parse(%{headers: %{control: "snapshot-end"}}, "handle", & &1) + Message.parse(%{headers: %{control: "snapshot-end"}}, "handle", & &1, @ts) + end + end + + describe "MoveOutMessage" do + test "parses move-out with string keys" do + msg = %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-hash-abc"}] + } + } + + assert [%MoveOutMessage{patterns: [%{pos: 0, value: "tag-hash-abc"}], handle: "my-handle"}] = + Message.parse(msg, "my-handle", & &1, @ts) + end + + test "parses move-out with atom keys" do + msg = %{ + headers: %{ + event: "move-out", + patterns: [%{pos: 0, value: "tag-hash-xyz"}] + } + } + + assert [%MoveOutMessage{patterns: [%{pos: 0, value: "tag-hash-xyz"}], handle: "my-handle"}] = + Message.parse(msg, "my-handle", & &1, @ts) + end + + test "parses move-out with multiple patterns" do + msg = %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [ + %{"pos" => 0, "value" => "tag-1"}, + %{"pos" => 1, "value" => "tag-2"}, + %{"pos" => 2, "value" => "tag-3"} + ] + } + } + + assert [ + %MoveOutMessage{ + patterns: [ + %{pos: 0, value: "tag-1"}, + %{pos: 1, value: "tag-2"}, + %{pos: 2, value: "tag-3"} + ] + } + ] = Message.parse(msg, "handle", & &1, @ts) + end + end + + describe "Headers with tags" do + test "parses headers with tags" do + msg = %{ + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == ["tag-a", "tag-b"] + end + + test "parses headers with removed_tags" do + msg = %{ + "headers" => %{"operation" => "update", "removed_tags" => ["old-tag"]}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.removed_tags == ["old-tag"] + end + + test "parses headers with both tags and removed_tags" do + msg = %{ + "headers" => %{ + "operation" => "update", + "tags" => ["new-tag"], + "removed_tags" => ["old-tag"] + }, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == ["new-tag"] + assert headers.removed_tags == ["old-tag"] + end + + test "defaults tags and removed_tags to empty lists" do + msg = %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"id" => "1"} + } + + assert [%ChangeMessage{headers: headers}] = Message.parse(msg, "handle", & &1, @ts) + assert headers.tags == [] + assert headers.removed_tags == [] + end + end + + describe "ChangeMessage" do + test "parses insert with tags in headers" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "value" => %{"id" => "123", "name" => "test"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "my-handle", & &1, @ts) + assert change.key == "row-key" + assert change.value == %{"id" => "123", "name" => "test"} + assert change.headers.operation == :insert + assert change.headers.tags == ["my-tag"] + assert change.headers.handle == "my-handle" + end + + test "parses update with old_value" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "update", "tags" => ["tag-1"]}, + "value" => %{"id" => "123", "name" => "updated"}, + "old_value" => %{"name" => "original"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "handle", & &1, @ts) + assert change.headers.operation == :update + assert change.value == %{"id" => "123", "name" => "updated"} + assert change.old_value == %{"name" => "original"} + end + + test "parses delete" do + msg = %{ + "key" => "row-key", + "headers" => %{"operation" => "delete", "tags" => ["tag-1"]}, + "value" => %{"id" => "123"} + } + + assert [%ChangeMessage{} = change] = Message.parse(msg, "handle", & &1, @ts) + assert change.headers.operation == :delete + assert change.value == %{"id" => "123"} end end end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 71a8c4e2b4..bdbfa3ff17 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -35,6 +35,17 @@ defmodule Electric.ClientTest do [bypass: Bypass.open()] end + # Drains the stream by repeatedly calling next/1 until :halt is returned. + # Returns the final stream struct so we can inspect internal state. + defp drain_stream(stream) do + alias Electric.Client.Stream + + case Stream.next(stream) do + {:halt, stream} -> stream + {_msgs, stream} -> drain_stream(stream) + end + end + describe "new" do test ":base_url is used as the base of the endpoint" do endpoint = URI.new!("http://localhost:3000/v1/shape") @@ -884,7 +895,7 @@ defmodule Electric.ClientTest do connect_options: [timeout: 100, protocols: [:http1]], retry_delay: fn _n -> 50 end, retry_log_level: false, - max_retries: 10 + max_retries: 15 ] ]} ) @@ -1361,6 +1372,885 @@ defmodule Electric.ClientTest do end end + describe "move-out handling" do + setup [:start_bypass, :bypass_client, :shape_definition] + + test "tag index stays empty when messages have no tags", ctx do + # When insert messages have no "tags" header, the tag index should remain empty. + # This is important because we shouldn't track rows that don't participate in + # move-out semantics. + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert"}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{ + "key" => "row-2", + "headers" => %{"operation" => "insert"}, + "offset" => "1_1", + "value" => %{"id" => "2222"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_1", + schema: schema + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Use live: false so the stream halts after up-to-date + stream = Client.stream(ctx.client, ctx.shape, live: false) + final_stream = drain_stream(stream) + + # The tag index should be empty since no messages had tags + assert final_stream.tag_to_keys == %{}, + "tag_to_keys should be empty when no tags present, got: #{inspect(final_stream.tag_to_keys)}" + + assert final_stream.key_data == %{}, + "key_data should be empty when no tags present, got: #{inspect(final_stream.key_data)}" + end + + test "receives move-out and generates synthetic deletes", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-abc"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "name" => "test"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-abc"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect 4 messages: insert, up-to-date, synthetic delete, up-to-date + msgs = stream(ctx, 4) + + assert [ + %ChangeMessage{headers: %{operation: :insert}, value: %{"id" => "1111"}}, + up_to_date(9998), + %ChangeMessage{ + headers: %{operation: :delete}, + value: %{"id" => "1111", "name" => "test"} + }, + up_to_date(9999) + ] = msgs + end + + test "move-out with multiple matching rows generates multiple deletes", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["shared-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{ + "key" => "row-2", + "headers" => %{"operation" => "insert", "tags" => ["shared-tag"]}, + "offset" => "1_1", + "value" => %{"id" => "2222"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "shared-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_1", + schema: schema + ) + ], + {"1_1", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect messages: 2 inserts, up-to-date, 2 synthetic deletes, up-to-date + msgs = stream(ctx, 6) + + insert_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :insert}}, &1)) + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert length(insert_msgs) == 2 + assert length(delete_msgs) == 2 + + delete_ids = Enum.map(delete_msgs, & &1.value["id"]) |> Enum.sort() + assert delete_ids == ["1111", "2222"] + end + + test "move-out with non-matching pattern is no-op", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-A"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-B"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Should only get: insert, up-to-date, up-to-date (no synthetic delete) + msgs = stream(ctx, 3) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + assert delete_msgs == [] + + assert [ + %ChangeMessage{headers: %{operation: :insert}}, + up_to_date(9998), + up_to_date(9999) + ] = msgs + end + + test "tag index tracks updates with removed_tags", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["old-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + "tags" => ["new-tag"], + "removed_tags" => ["old-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "old-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, update, up-to-date, up-to-date (no delete because row moved to new tag) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + assert delete_msgs == [] + end + + test "delete removes row from tag index", ctx do + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "delete", "tags" => ["my-tag"]}, + "offset" => "2_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, delete, up-to-date, up-to-date (no synthetic delete) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Only 1 delete (the real one), not 2 (no synthetic delete after move-out) + assert length(delete_msgs) == 1 + assert hd(delete_msgs).value["id"] == "1111" + end + + test "update without removed_tags deduplicates synthetic deletes", ctx do + # Edge case: when a row is updated with the same tag but no removed_tags, + # the tag_index should deduplicate entries so only one synthetic delete is generated + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "version" => "1"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + # Same tag, but no removed_tags - this is the problematic case + "tags" => ["my-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "version" => "2"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "version" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, update, up-to-date, synthetic delete, up-to-date + # Should generate only 1 synthetic delete (deduplicated by key) + msgs = stream(ctx, 6) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Verifies deduplication: only 1 synthetic delete despite multiple tag_index entries + assert length(delete_msgs) == 1, + "Expected 1 synthetic delete but got #{length(delete_msgs)} - duplicate entries in tag_index" + end + + test "row with multiple tags - partial move-out should not delete if other tags remain", + ctx do + # Edge case: row has multiple tags, move-out for one tag shouldn't delete + # if the row still belongs to the shape via another tag + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + # Only moving out tag-a, row still has tag-b + "patterns" => [%{"pos" => 0, "value" => "tag-a"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, up-to-date + # BUG: Currently generates a synthetic delete even though row still has tag-b + # EXPECTED: No synthetic delete since row still belongs via tag-b + msgs = stream(ctx, 3) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # This documents expected behavior - row should NOT be deleted + # If this fails, it confirms the bug that partial move-out incorrectly deletes + assert delete_msgs == [], + "Row with multiple tags should not be deleted when only one tag is moved out" + end + + test "synthetic delete uses latest value after update", ctx do + # Edge case: synthetic delete should use the most recent value, not stale data + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111", "name" => "original"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + "tags" => ["my-tag"], + "removed_tags" => ["my-tag"] + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + msgs = stream(ctx, 6) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert length(delete_msgs) == 1 + [delete] = delete_msgs + + # Synthetic delete should have the latest value, not the original + assert delete.value["name"] == "updated", + "Synthetic delete should use latest value, got: #{inspect(delete.value)}" + end + + test "multiple patterns matching same row generates single delete", ctx do + # Edge case: move-out with multiple patterns that both match the same row + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-a", "tag-b"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + # Both patterns match the same row + "patterns" => [ + %{"pos" => 0, "value" => "tag-a"}, + %{"pos" => 1, "value" => "tag-b"} + ] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # insert, up-to-date, synthetic delete, up-to-date + msgs = stream(ctx, 4) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # Should only generate 1 delete, not 2 + assert length(delete_msgs) == 1, + "Multiple patterns matching same row should generate single delete, got #{length(delete_msgs)}" + end + + test "update removing all tags should clear tag index so move-out is a no-op", ctx do + # This test demonstrates the stale tag-index entry bug: + # When a row is updated to remove ALL its tags (with removed_tags but no new tags), + # the tag-index entry should be cleared. A subsequent move-out for the old tag + # should NOT generate a synthetic delete since the row is no longer in the tag index. + # + # EXPECTED: move-out should be a no-op (0 deletes) + # CURRENT BUG: move-out generates 1 synthetic delete because the tag-index + # entry wasn't properly cleared when all tags were removed. + + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["tag-A"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9997}} + ]) + + body2 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{ + "operation" => "update", + # Remove the old tag but add NO new tags + "removed_tags" => ["tag-A"] + # Note: no "tags" field, meaning this row now has zero tags + }, + "offset" => "2_0", + "value" => %{"id" => "1111", "name" => "updated"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + body3 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "tag-A"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}, "name" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ], + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ], + {"2_0", "my-shape"} => [ + &bypass_resp(&1, body3, + shape_handle: "my-shape", + last_offset: "3_0" + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # Collect messages: insert, up-to-date, update, up-to-date, up-to-date (no delete expected) + msgs = stream(ctx, 5) + + delete_msgs = Enum.filter(msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # The move-out should NOT generate a synthetic delete because: + # 1. The row was originally inserted with "tag-A" + # 2. The update removed "tag-A" (via removed_tags) and added NO new tags + # 3. The tag-index should have been cleared for this row + # 4. The move-out for "tag-A" should find no matching rows + assert delete_msgs == [], + "Move-out should be a no-op when all tags were removed from row, but got #{length(delete_msgs)} delete(s)" + end + + test "resume preserves move-out state - move-out after resume generates synthetic delete", + ctx do + # When resuming a stream, the tag index state should be preserved so that + # move-out events after resume can still generate synthetic deletes. + body1 = + Jason.encode!([ + %{ + "key" => "row-1", + "headers" => %{"operation" => "insert", "tags" => ["my-tag"]}, + "offset" => "1_0", + "value" => %{"id" => "1111"} + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9998}} + ]) + + schema = Jason.encode!(%{"id" => %{type: "text"}}) + + {:ok, responses} = + start_supervised( + {Agent, + fn -> + %{ + {"-1", nil} => [ + &bypass_resp(&1, body1, + shape_handle: "my-shape", + last_offset: "1_0", + schema: schema + ) + ] + } + end} + ) + + bypass_response(ctx, responses) + + # First, stream with live: false to get a ResumeMessage + msgs = stream(ctx, live: false) |> Enum.to_list() + + resume_msg = Enum.find(msgs, &match?(%ResumeMessage{}, &1)) + assert resume_msg != nil + + # Now set up for resumed stream with a move-out + body2 = + Jason.encode!([ + %{ + "headers" => %{ + "event" => "move-out", + "patterns" => [%{"pos" => 0, "value" => "my-tag"}] + } + }, + %{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => 9999}} + ]) + + {:ok, responses2} = + start_supervised( + {Agent, + fn -> + %{ + {"1_0", "my-shape"} => [ + &bypass_resp(&1, body2, + shape_handle: "my-shape", + last_offset: "2_0" + ) + ] + } + end}, + id: :responses2 + ) + + bypass_response(ctx, responses2) + + # Resume the stream - with proper move-out support, the tag index should be restored + resumed_msgs = stream(ctx, resume: resume_msg, live: false) |> Enum.to_list() + + # The move-out SHOULD generate a synthetic delete for the row + delete_msgs = + Enum.filter(resumed_msgs, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + # After resume, move-out should still generate synthetic deletes + assert length(delete_msgs) == 1, + "After resume, move-out should generate synthetic delete, got: #{inspect(resumed_msgs)}" + + [delete] = delete_msgs + assert delete.value["id"] == "1111" + end + end + defp bypass_response_endpoint(ctx, responses, opts) do path = Keyword.get(opts, :path, "/v1/shape") parent = self() diff --git a/packages/sync-service/dev/postgres.conf b/packages/sync-service/dev/postgres.conf index e18a913260..b320e296e5 100644 --- a/packages/sync-service/dev/postgres.conf +++ b/packages/sync-service/dev/postgres.conf @@ -1,3 +1,4 @@ listen_addresses = '*' wal_level = logical # minimal, replica, or logical max_replication_slots = 100 +max_connections = 200 diff --git a/packages/sync-service/dev/postgres2.conf b/packages/sync-service/dev/postgres2.conf index 3f5b60c308..e6f72319e6 100644 --- a/packages/sync-service/dev/postgres2.conf +++ b/packages/sync-service/dev/postgres2.conf @@ -1,4 +1,5 @@ listen_addresses = '*' wal_level = logical # minimal, replica, or logical max_replication_slots = 100 +max_connections = 200 port = 5433 \ No newline at end of file diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index 8d0fed733b..d81e7f2d5e 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -119,7 +119,8 @@ defmodule Electric.MixProject do {:junit_formatter, "~> 3.4", only: [:test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:stream_data, "~> 1.2", only: [:dev, :test]}, - {:repatch, "~> 1.0", only: [:test]} + {:repatch, "~> 1.0", only: [:test]}, + {:electric_client, path: "../elixir-client", only: [:test], runtime: false} ] end diff --git a/packages/sync-service/test/electric/postgres/configuration_test.exs b/packages/sync-service/test/electric/postgres/configuration_test.exs index 9c3716bdfb..ea336eca19 100644 --- a/packages/sync-service/test/electric/postgres/configuration_test.exs +++ b/packages/sync-service/test/electric/postgres/configuration_test.exs @@ -104,6 +104,7 @@ defmodule Electric.Postgres.ConfigurationTest do Configuration.drop_table_from_publication(conn, "nonexistent", oid_rel) end + @tag connection_opt_overrides: [pool_size: 2] test "fails relation configuration if timing out on lock", %{ pool: conn, publication_name: publication diff --git a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs index 18fb168f20..02a6e66bda 100644 --- a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs +++ b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs @@ -13,6 +13,7 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do :with_slot_name ] + @tag connection_opt_overrides: [pool_size: 2] test "should break an abandoned lock if slot is inactive", ctx do Postgrex.query!( ctx.db_conn, diff --git a/packages/sync-service/test/integration/streaming_test.exs b/packages/sync-service/test/integration/streaming_test.exs new file mode 100644 index 0000000000..08c0f85bb7 --- /dev/null +++ b/packages/sync-service/test/integration/streaming_test.exs @@ -0,0 +1,71 @@ +defmodule Electric.Integration.StreamingTest do + @moduledoc """ + Integration tests that spin up an Electric HTTP API + stack for a unique test DB, + then use Electric.Client to stream a shape over HTTP. + """ + use ExUnit.Case, async: false + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + import Support.IntegrationSetup + import Support.StreamConsumer + + alias Electric.Client + + @moduletag :tmp_dir + + describe "Electric.Client streaming over HTTP" do + setup [:with_unique_db, :with_basic_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + @tag with_sql: [ + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'initial value')" + ] + test "initial snapshot contains pre-existing row", %{client: client} do + stream = Client.stream(client, "items", live: false) + + with_consumer stream do + assert_insert(consumer, %{ + "id" => "00000000-0000-0000-0000-000000000001", + "value" => "initial value" + }) + + assert_up_to_date(consumer) + end + end + + @tag with_sql: [ + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'initial value')" + ] + test "receives live changes after initial snapshot", %{client: client, db_conn: db_conn} do + stream = Client.stream(client, "items", live: true) + + with_consumer stream do + assert_insert(consumer, %{"value" => "initial value"}) + assert_up_to_date(consumer) + + Postgrex.query!( + db_conn, + "INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000002', 'new value')", + [] + ) + + assert_insert(consumer, %{ + "id" => "00000000-0000-0000-0000-000000000002", + "value" => "new value" + }) + end + end + + test "streaming empty table returns up-to-date", %{client: client} do + stream = Client.stream(client, "items", live: false) + + with_consumer stream do + assert_up_to_date(consumer) + end + end + end +end diff --git a/packages/sync-service/test/integration/subquery_move_out_test.exs b/packages/sync-service/test/integration/subquery_move_out_test.exs new file mode 100644 index 0000000000..ffacbbe1ea --- /dev/null +++ b/packages/sync-service/test/integration/subquery_move_out_test.exs @@ -0,0 +1,382 @@ +defmodule Electric.Integration.SubqueryMoveOutTest do + @moduledoc """ + Integration tests for subquery move-out functionality. + + These tests verify that the Elixir client correctly handles: + 1. Tags on change messages (indicating why a row belongs to the shape) + 2. Move-out control messages (when dependency values are removed) + 3. Synthetic delete generation from move-out patterns + """ + use ExUnit.Case, async: false + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + import Support.IntegrationSetup + import Support.StreamConsumer + + alias Electric.Client + alias Electric.Client.ShapeDefinition + alias Electric.Client.Message.ChangeMessage + + @moduletag :tmp_dir + + # Shape definition for child table filtered by active parents + @subquery_where "parent_id IN (SELECT id FROM parent WHERE active = true)" + + describe "subquery move-out with parent/child tables" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "change messages include tags for subquery-matched rows", %{client: client, shape: shape} do + stream = Client.stream(client, shape, live: false) + + with_consumer stream do + insert = assert_insert(consumer, %{"id" => "child-1"}) + assert %{headers: %{tags: [_]}} = insert + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "receives move-out control message when parent is deactivated", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Deactivate the parent - this should trigger a move-out + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Should receive a synthetic delete for child-1 + assert_delete(consumer, %{"id" => "child-1"}) + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'value 1')", + "INSERT INTO child (id, parent_id, value) VALUES ('child-2', 'parent-1', 'value 2')" + ] + test "move-out generates synthetic deletes for all affected child rows", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_insert(consumer, %{"id" => "child-2"}) + assert_up_to_date(consumer) + + # Deactivate the parent + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Wait for both synthetic deletes + {:ok, deletes} = + await_count(consumer, 2, + match: fn msg -> + match?(%ChangeMessage{headers: %{operation: :delete}}, msg) + end + ) + + delete_ids = Enum.map(deletes, & &1.value["id"]) |> Enum.sort() + assert delete_ids == ["child-1", "child-2"] + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "deleting parent row triggers move-out", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Delete the parent row + Postgrex.query!(db_conn, "DELETE FROM parent WHERE id = 'parent-1'", []) + + # Should receive a synthetic delete for the child + assert_delete(consumer, %{"id" => "child-1"}) + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'belongs to parent-1')" + ] + test "move-in after row becomes visible through different parent", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Deactivate parent-1 + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Wait for the move-out (synthetic delete) + assert_delete(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Change child to reference parent-2 (which is still active) + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive a new insert (move-in) for the child + assert_insert(consumer, %{"id" => "child-1", "parent_id" => "parent-2"}) + end + end + end + + describe "tag handling during updates" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'initial')" + ] + test "update that changes parent reference updates tags", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot + initial = assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Store initial tags + initial_tags = Map.get(initial.headers, :tags, []) + + # Change child to reference parent-2 + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive an update with new tags + update_msg = assert_update(consumer, %{"id" => "child-1"}) + + # The headers should include both removed_tags and new tags + new_tags = Map.get(update_msg.headers, :tags, []) + removed_tags = Map.get(update_msg.headers, :removed_tags, []) + + # Either we have explicit removed_tags, or tags have changed + assert new_tags != initial_tags or length(removed_tags) > 0 + end + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO parent (id, active) VALUES ('parent-2', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'initial')" + ] + test "deactivating old parent after child changed parents should not generate delete", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + # This tests that the tag index is properly updated when a row's tags change. + # If the client doesn't clear stale tag entries, deactivating the OLD parent + # would incorrectly generate a synthetic delete even though the child is + # still in the shape (via the new parent). + + stream = Client.stream(client, shape, live: true) + + with_consumer stream do + # Wait for initial snapshot - child-1 is in shape via parent-1 + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Change child to reference parent-2 (also active) + # This should update the tag index: remove parent-1's tag, add parent-2's tag + Postgrex.query!( + db_conn, + "UPDATE child SET parent_id = 'parent-2' WHERE id = 'child-1'", + [] + ) + + # Should receive an update (child is still in shape, just via different parent) + assert_update(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + + # Now deactivate parent-1 - this triggers a move-out for parent-1's tag + # Since child-1 no longer has parent-1's tag (it was removed in the update), + # this should NOT generate a synthetic delete + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Collect any messages that arrive - we should NOT see a delete for child-1 + messages = collect_messages(consumer, timeout: 500) + + delete_msgs = + Enum.filter(messages, &match?(%ChangeMessage{headers: %{operation: :delete}}, &1)) + + assert delete_msgs == [], + "Should not generate synthetic delete for child-1 after old parent deactivated. " <> + "The tag index should have been updated when child changed parents. " <> + "Got: #{inspect(delete_msgs)}" + end + end + end + + describe "tag consistency across streams" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test')" + ] + test "fresh streams have consistent tags", %{client: client, shape: shape} do + # Get initial data with tags from first stream + stream1 = Client.stream(client, shape, live: false) + + msg1 = + with_consumer stream1 do + assert_insert(consumer, %{"id" => "child-1"}) + end + + # Get data from a second fresh stream + stream2 = Client.stream(client, shape, live: false) + + msg2 = + with_consumer stream2 do + assert_insert(consumer, %{"id" => "child-1"}) + end + + # Both streams should report consistent tags for the same row + tags1 = Map.get(msg1.headers, :tags) + tags2 = Map.get(msg2.headers, :tags) + + # Tags must be present and consistent + assert tags1 != nil + assert tags1 == tags2 + end + end + + describe "resume preserves move-out state" do + setup [:with_unique_db, :with_parent_child_tables, :with_sql_execute] + setup :with_complete_stack + + setup :with_electric_client + + setup _ctx do + shape = ShapeDefinition.new!("child", where: @subquery_where) + %{shape: shape} + end + + @tag with_sql: [ + "INSERT INTO parent (id, active) VALUES ('parent-1', true)", + "INSERT INTO child (id, parent_id, value) VALUES ('child-1', 'parent-1', 'test value')" + ] + test "move-out after resume generates synthetic delete", %{ + client: client, + shape: shape, + db_conn: db_conn + } do + # First, stream with live: false to get a ResumeMessage + # This simulates a client that synced initial data and then disconnected + stream1 = Client.stream(client, shape, live: false) + + resume_msg = + with_consumer stream1 do + assert_insert(consumer, %{"id" => "child-1"}) + assert_up_to_date(consumer) + assert_resume(consumer) + end + + # Now deactivate the parent while "disconnected" + # This should trigger a move-out on the server side + Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", []) + + # Resume the stream - with proper move-out support, the client should + # receive a synthetic delete for child-1 because its parent was deactivated + stream2 = Client.stream(client, shape, live: false, resume: resume_msg) + + with_consumer stream2 do + assert_delete(consumer, %{"id" => "child-1"}) + end + end + end + + # Helper to set up parent/child tables for subquery tests + def with_parent_child_tables(%{db_conn: conn} = _context) do + statements = [ + """ + CREATE TABLE parent ( + id TEXT PRIMARY KEY, + active BOOLEAN NOT NULL DEFAULT true + ) + """, + """ + CREATE TABLE child ( + id TEXT PRIMARY KEY, + parent_id TEXT NOT NULL REFERENCES parent(id) ON DELETE CASCADE, + value TEXT NOT NULL + ) + """ + ] + + Enum.each(statements, &Postgrex.query!(conn, &1, [])) + + %{tables: [{"public", "parent"}, {"public", "child"}]} + end +end diff --git a/packages/sync-service/test/support/db_setup.ex b/packages/sync-service/test/support/db_setup.ex index 7e5dfae482..46c305e840 100644 --- a/packages/sync-service/test/support/db_setup.ex +++ b/packages/sync-service/test/support/db_setup.ex @@ -4,7 +4,7 @@ defmodule Support.DbSetup do @postgrex_start_opts [ backoff_type: :stop, max_restarts: 0, - pool_size: 2, + pool_size: 1, types: PgInterop.Postgrex.Types ] diff --git a/packages/sync-service/test/support/integration_setup.ex b/packages/sync-service/test/support/integration_setup.ex new file mode 100644 index 0000000000..1122bbc41d --- /dev/null +++ b/packages/sync-service/test/support/integration_setup.ex @@ -0,0 +1,42 @@ +defmodule Support.IntegrationSetup do + @moduledoc """ + Helper functions for setting up integration tests that need an HTTP server. + """ + + import Support.ComponentSetup, only: [build_router_opts: 2] + + @doc """ + Starts a Bandit HTTP server and creates an Electric.Client. + + Returns a map with: + - `client` - Electric.Client configured to connect to the server + - `base_url` - The base URL of the server + - `server_pid` - The Bandit server process + - `port` - The port the server is listening on + """ + def with_electric_client(ctx, opts \\ []) do + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 2000) + + router_opts = build_router_opts(ctx, Keyword.get(opts, :router_opts, [])) + + {:ok, server_pid} = + ExUnit.Callbacks.start_supervised( + {Bandit, + plug: {Electric.Plug.Router, router_opts}, + port: 0, + ip: :loopback, + thousand_island_options: [num_acceptors: 1]} + ) + + {:ok, {_ip, port}} = ThousandIsland.listener_info(server_pid) + base_url = "http://localhost:#{port}" + {:ok, client} = Electric.Client.new(base_url: base_url) + + Map.merge(ctx, %{ + client: client, + base_url: base_url, + server_pid: server_pid, + port: port + }) + end +end diff --git a/packages/sync-service/test/support/stream_consumer.ex b/packages/sync-service/test/support/stream_consumer.ex new file mode 100644 index 0000000000..d06ee96cdf --- /dev/null +++ b/packages/sync-service/test/support/stream_consumer.ex @@ -0,0 +1,290 @@ +defmodule Support.StreamConsumer do + @moduledoc """ + Test helper for consuming Electric.Client streams in integration tests. + """ + + alias Electric.Client.Message.ChangeMessage + alias Electric.Client.Message.ControlMessage + alias Electric.Client.Message.ResumeMessage + + import ExUnit.Assertions + + @default_timeout 5_000 + + defstruct [:task, :task_pid, :timeout] + + @doc """ + Start consuming a stream, forwarding messages to the test process. + + ## Options + + * `:timeout` - default timeout for assertions (default: 5000ms) + """ + def start(stream, opts \\ []) do + test_pid = self() + timeout = Keyword.get(opts, :timeout, @default_timeout) + + # Start the streaming task + task = + Task.async(fn -> + stream + |> Stream.each(fn msg -> + send(test_pid, {:stream_message, self(), msg}) + end) + |> Stream.run() + end) + + {:ok, + %__MODULE__{ + task: task, + task_pid: task.pid, + timeout: timeout + }} + end + + @doc """ + Stop the consumer and cleanup resources. + """ + def stop(%__MODULE__{task: task}) do + Task.shutdown(task, :brutal_kill) + :ok + end + + @doc """ + Assert an insert message is received with matching value fields. + """ + def assert_insert(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :insert}, value: value} -> + pattern_matches?(value, value_pattern) + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert an update message is received with matching value fields. + Optionally match old_value (only present in :full replica mode). + """ + def assert_update( + %__MODULE__{} = consumer, + value_pattern, + old_value_pattern \\ nil, + timeout \\ nil + ) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :update}, value: value, old_value: old_value} -> + value_matches = pattern_matches?(value, value_pattern) + + old_value_matches = + old_value_pattern == nil or pattern_matches?(old_value || %{}, old_value_pattern) + + value_matches and old_value_matches + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert a delete message is received with matching value fields. + """ + def assert_delete(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ChangeMessage{headers: %{operation: :delete}, value: value} -> + pattern_matches?(value, value_pattern) + + _ -> + false + end, + timeout + ) + end + + @doc """ + Assert an up_to_date control message is received. + """ + def assert_up_to_date(%__MODULE__{} = consumer, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ControlMessage{control: :up_to_date} -> true + _ -> false + end, + timeout + ) + end + + @doc """ + Assert a resume message is received. + """ + def assert_resume(%__MODULE__{} = consumer, timeout \\ nil) do + timeout = timeout || consumer.timeout + + assert_receive_message( + consumer, + fn + %ResumeMessage{} -> true + _ -> false + end, + timeout + ) + end + + @doc """ + Wait for N messages matching a condition. + + ## Options + + * `:match` - function to filter messages (default: matches all) + * `:timeout` - timeout in ms (default: consumer timeout) + """ + def await_count(%__MODULE__{} = consumer, count, opts \\ []) do + matcher = Keyword.get(opts, :match, fn _ -> true end) + timeout = Keyword.get(opts, :timeout, consumer.timeout) + + do_await_count(consumer, count, matcher, [], timeout, System.monotonic_time(:millisecond)) + end + + defp do_await_count(_consumer, 0, _matcher, collected, _timeout, _start) do + {:ok, Enum.reverse(collected)} + end + + defp do_await_count( + %{task_pid: task_pid} = consumer, + count, + matcher, + collected, + timeout, + start_time + ) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + do_await_count(consumer, count - 1, matcher, [msg | collected], timeout, start_time) + else + do_await_count(consumer, count, matcher, collected, timeout, start_time) + end + after + remaining -> {:error, :timeout} + end + end + + # Private helpers + + defp assert_receive_message(%__MODULE__{task_pid: task_pid}, matcher, timeout) do + do_assert_receive_message(task_pid, matcher, timeout, System.monotonic_time(:millisecond)) + end + + defp do_assert_receive_message(task_pid, matcher, timeout, start_time) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + msg + else + flunk("Received unexpected message: #{inspect(msg)}") + end + after + remaining -> + flunk("Expected to receive matching message within #{timeout}ms") + end + end + + defp pattern_matches?(map, pattern) when is_map(map) and is_map(pattern) do + Enum.all?(pattern, fn {key, expected} -> + case Map.fetch(map, key) do + {:ok, actual} when is_map(expected) and is_map(actual) -> + pattern_matches?(actual, expected) + + {:ok, actual} -> + actual == expected + + :error -> + false + end + end) + end + + @doc """ + Collect all messages received within the timeout period. + Returns a list of messages (possibly empty). + + ## Options + + * `:timeout` - how long to wait for messages (default: 100ms) + * `:match` - optional function to filter messages + """ + def collect_messages(%__MODULE__{task_pid: task_pid}, opts \\ []) do + timeout = Keyword.get(opts, :timeout, 100) + matcher = Keyword.get(opts, :match, fn _ -> true end) + + do_collect_messages(task_pid, matcher, [], timeout, System.monotonic_time(:millisecond)) + end + + defp do_collect_messages(task_pid, matcher, collected, timeout, start_time) do + elapsed = System.monotonic_time(:millisecond) - start_time + remaining = max(0, timeout - elapsed) + + receive do + {:stream_message, ^task_pid, msg} -> + if matcher.(msg) do + do_collect_messages(task_pid, matcher, [msg | collected], timeout, start_time) + else + do_collect_messages(task_pid, matcher, collected, timeout, start_time) + end + after + remaining -> Enum.reverse(collected) + end + end + + @doc """ + Macro for cleaner test blocks - auto start/stop. + + ## Example + + import Support.StreamConsumer + + stream = Client.stream(client, "items", live: true) + + with_consumer stream do + assert_insert(consumer, %{"id" => "123"}) + assert_up_to_date(consumer) + end + """ + defmacro with_consumer(stream, opts \\ [], do: block) do + quote do + {:ok, var!(consumer)} = Support.StreamConsumer.start(unquote(stream), unquote(opts)) + + try do + unquote(block) + after + Support.StreamConsumer.stop(var!(consumer)) + end + end + end +end diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index 375392b593..9648fd7caf 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -7,6 +7,12 @@ ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true) +# Start electric_client application directly, bypassing OTP's dependency resolution. +# This avoids a circular dependency: electric_client has :electric as an optional dep, +# which gets added to its applications list when compiled in sync-service context, +# causing a deadlock when OTP tries to start both applications. +{:ok, _} = Electric.Client.Application.start(:normal, []) + # Repatch in async tests has lazy recompilation issues, so as a temporary fix # we force recompilation in the setup. The issue is tracked here: # https://github.com/hissssst/repatch/issues/2