Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/beige-ladybugs-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@core/elixir-client': patch
'@core/sync-service': patch
---

Add Move-Out Support for Subqueries in Elixir Client
1 change: 1 addition & 0 deletions .github/workflows/sync_service_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
--health-retries 5
ports:
- 54321:5432
command: postgres -c max_connections=200

pgbouncer:
image: bitnamilegacy/pgbouncer:latest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,92 +1,94 @@
defmodule Electric.Client.EctoAdapter.ArrayDecoder do
alias Electric.Client.EctoAdapter
if Code.ensure_loaded?(Ecto) do
defmodule Electric.Client.EctoAdapter.ArrayDecoder do
alias Electric.Client.EctoAdapter

def decode!("{}", _type), do: []
def decode!("{}", _type), do: []

def decode!(encoded_array, type) when is_binary(encoded_array) do
{"", [result]} =
decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array})
def decode!(encoded_array, type) when is_binary(encoded_array) do
{"", [result]} =
decode_array(encoded_array, [], {EctoAdapter.cast_to(type), type, encoded_array})

result
end
result
end

##############################
##############################

defp decode_array("", acc, _state) do
{"", :lists.reverse(acc)}
end
defp decode_array("", acc, _state) do
{"", :lists.reverse(acc)}
end

defp decode_array(<<"{}", rest::bitstring>>, acc, state) do
decode_array(rest, [[] | acc], state)
end
defp decode_array(<<"{}", rest::bitstring>>, acc, state) do
decode_array(rest, [[] | acc], state)
end

defp decode_array(<<"{", rest::bitstring>>, acc, state) do
{rest, array} = decode_array(rest, [], state)
decode_array(rest, [array | acc], state)
end
defp decode_array(<<"{", rest::bitstring>>, acc, state) do
{rest, array} = decode_array(rest, [], state)
decode_array(rest, [array | acc], state)
end

defp decode_array(<<"}", rest::bitstring>>, acc, _state) do
{rest, :lists.reverse(acc)}
end
defp decode_array(<<"}", rest::bitstring>>, acc, _state) do
{rest, :lists.reverse(acc)}
end

defp decode_array(<<?,, rest::bitstring>>, acc, state) do
decode_array(rest, acc, state)
end
defp decode_array(<<?,, rest::bitstring>>, acc, state) do
decode_array(rest, acc, state)
end

defp decode_array(<<?", rest::bitstring>>, acc, state) do
{rest, elem} = decode_quoted_elem(rest, [], state)
decode_array(rest, [elem | acc], state)
end
defp decode_array(<<?", rest::bitstring>>, acc, state) do
{rest, elem} = decode_quoted_elem(rest, [], state)
decode_array(rest, [elem | acc], state)
end

defp decode_array(rest, acc, state) do
{rest, elem} = decode_elem(rest, [], state)
decode_array(rest, [elem | acc], state)
end
defp decode_array(rest, acc, state) do
{rest, elem} = decode_elem(rest, [], state)
decode_array(rest, [elem | acc], state)
end

##############################
##############################

defp decode_elem(<<",", _::bitstring>> = rest, acc, state),
do: {rest, cast(acc, state)}
defp decode_elem(<<",", _::bitstring>> = rest, acc, state),
do: {rest, cast(acc, state)}

defp decode_elem(<<"}", _::bitstring>> = rest, acc, state),
do: {rest, cast(acc, state)}
defp decode_elem(<<"}", _::bitstring>> = rest, acc, state),
do: {rest, cast(acc, state)}

defp decode_elem(<<c::utf8, rest::bitstring>>, acc, state),
do: decode_elem(rest, [acc | <<c::utf8>>], state)
defp decode_elem(<<c::utf8, rest::bitstring>>, acc, state),
do: decode_elem(rest, [acc | <<c::utf8>>], state)

defp decode_elem("", _acc, {_cast_fun, type, source}) do
raise Ecto.CastError, type: {:array, type}, value: source
end
defp decode_elem("", _acc, {_cast_fun, type, source}) do
raise Ecto.CastError, type: {:array, type}, value: source
end

##############################
##############################

defp decode_quoted_elem(<<?", rest::bitstring>>, acc, state),
do: {rest, cast_quoted(acc, state)}
defp decode_quoted_elem(<<?", rest::bitstring>>, acc, state),
do: {rest, cast_quoted(acc, state)}

defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state),
do: decode_quoted_elem(rest, [acc | [?"]], state)
defp decode_quoted_elem(<<"\\\"", rest::bitstring>>, acc, state),
do: decode_quoted_elem(rest, [acc | [?"]], state)

defp decode_quoted_elem(<<c::utf8, rest::bitstring>>, acc, state),
do: decode_quoted_elem(rest, [acc | <<c::utf8>>], state)
defp decode_quoted_elem(<<c::utf8, rest::bitstring>>, acc, state),
do: decode_quoted_elem(rest, [acc | <<c::utf8>>], state)

defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do
raise Ecto.CastError, type: {:array, type}, value: source
end
defp decode_quoted_elem("", _acc, {_cast_fun, type, source}) do
raise Ecto.CastError, type: {:array, type}, value: source
end

##############################
##############################

defp cast(iodata, {cast_fun, _type, _source}) do
iodata
|> IO.iodata_to_binary()
|> case do
"NULL" -> nil
value -> cast_fun.(value)
defp cast(iodata, {cast_fun, _type, _source}) do
iodata
|> IO.iodata_to_binary()
|> case do
"NULL" -> nil
value -> cast_fun.(value)
end
end
end

defp cast_quoted(iodata, {cast_fun, _type, _source}) do
iodata
|> IO.iodata_to_binary()
|> cast_fun.()
defp cast_quoted(iodata, {cast_fun, _type, _source}) do
iodata
|> IO.iodata_to_binary()
|> cast_fun.()
end
end
end
133 changes: 115 additions & 18 deletions packages/elixir-client/lib/electric/client/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,31 @@ defmodule Electric.Client.Message do
alias Electric.Client.Offset

defmodule Headers do
defstruct [:operation, :relation, :handle, :lsn, txids: [], op_position: 0]
defstruct [
:operation,
:relation,
:handle,
:lsn,
txids: [],
op_position: 0,
tags: [],
removed_tags: []
]

@type operation :: :insert | :update | :delete
@type relation :: [String.t(), ...]
@type lsn :: binary()
@type txids :: [pos_integer(), ...] | nil
@type tag :: String.t()
@type t :: %__MODULE__{
operation: operation(),
relation: relation(),
handle: Client.shape_handle(),
lsn: lsn(),
txids: txids(),
op_position: non_neg_integer()
op_position: non_neg_integer(),
tags: [tag()],
removed_tags: [tag()]
}

@doc false
Expand All @@ -30,7 +42,9 @@ defmodule Electric.Client.Message do
handle: handle,
txids: Map.get(msg, "txids", []),
lsn: Map.get(msg, "lsn", nil),
op_position: Map.get(msg, "op_position", 0)
op_position: Map.get(msg, "op_position", 0),
tags: Map.get(msg, "tags", []),
removed_tags: Map.get(msg, "removed_tags", [])
}
end

Expand All @@ -56,23 +70,27 @@ defmodule Electric.Client.Message do

def from_message(
%{"headers" => %{"control" => control} = headers},
handle
handle,
request_timestamp
) do
%__MODULE__{
control: control_atom(control),
global_last_seen_lsn: global_last_seen_lsn(headers),
handle: handle
handle: handle,
request_timestamp: request_timestamp
}
end

def from_message(
%{headers: %{control: control} = headers},
handle
handle,
request_timestamp
) do
%__MODULE__{
control: control_atom(control),
global_last_seen_lsn: global_last_seen_lsn(headers),
handle: handle
handle: handle,
request_timestamp: request_timestamp
}
end

Expand Down Expand Up @@ -108,7 +126,7 @@ defmodule Electric.Client.Message do

require Logger

def from_message(msg, handle, value_mapping_fun) do
def from_message(msg, handle, value_mapping_fun, request_timestamp) do
%{
"headers" => headers,
"value" => raw_value
Expand All @@ -120,7 +138,8 @@ defmodule Electric.Client.Message do
key: msg["key"],
headers: Headers.from_message(headers, handle),
value: value,
old_value: old_value(msg, value_mapping_fun)
old_value: old_value(msg, value_mapping_fun),
request_timestamp: request_timestamp
}
end

Expand Down Expand Up @@ -168,30 +187,108 @@ defmodule Electric.Client.Message do

@enforce_keys [:shape_handle, :offset, :schema]

defstruct [:shape_handle, :offset, :schema]
defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}]

@type t :: %__MODULE__{
shape_handle: Client.shape_handle(),
offset: Offset.t(),
schema: Client.schema()
schema: Client.schema(),
tag_to_keys: %{String.t() => MapSet.t(String.t())},
key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}
}
end

defmodule MoveOutMessage do
@moduledoc """
Represents a move-out event from the server.

Move-out events are sent when rows should be removed from the client's view
because they no longer match the shape's subquery filter. The `patterns` field
contains tag hashes that identify which rows should be removed.

The client should use these patterns to generate synthetic delete messages
for any tracked rows that have matching tags.
"""

defstruct [:patterns, :handle, :request_timestamp]

@type pattern :: %{pos: non_neg_integer(), value: String.t()}
@type t :: %__MODULE__{
patterns: [pattern()],
handle: Client.shape_handle(),
request_timestamp: DateTime.t()
}

def from_message(
%{"headers" => %{"event" => "move-out", "patterns" => patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

def from_message(
%{headers: %{event: "move-out", patterns: patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

defp normalize_patterns(patterns) do
Enum.map(patterns, fn
%{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
%{pos: _, value: _} = pattern -> pattern
end)
end
end

defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert

def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun) do
[ChangeMessage.from_message(msg, shape_handle, value_mapper_fun)]
def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do
[ChangeMessage.from_message(msg, shape_handle, value_mapper_fun, request_timestamp)]
end

def parse(
%{"headers" => %{"control" => _}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[ControlMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun, request_timestamp) do
[ControlMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(%{"headers" => %{"control" => _}} = msg, shape_handle, _value_mapper_fun) do
[ControlMessage.from_message(msg, shape_handle)]
def parse(
%{"headers" => %{"event" => "move-out"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(%{headers: %{control: _}} = msg, shape_handle, _value_mapper_fun) do
[ControlMessage.from_message(msg, shape_handle)]
def parse(
%{headers: %{event: "move-out"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse("", _handle, _value_mapper_fun) do
def parse("", _handle, _value_mapper_fun, _request_timestamp) do
[]
end
end
Loading