Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Electric.Config do

@build_env Mix.env()

@known_feature_flags ~w[allow_subqueries suspend_consumers]
@known_feature_flags ~w[allow_subqueries suspend_consumers tagged_subqueries]

@defaults [
## Database
Expand Down
31 changes: 23 additions & 8 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ defmodule Electric.LogItems do
if value, do: Map.put(map, key, value), else: map
end

defp put_if_true(map, key, condition, value) do
if condition, do: Map.put(map, key, value), else: map
end

@type log_item ::
{LogOffset.t(),
%{
Expand Down Expand Up @@ -44,6 +48,7 @@ defmodule Electric.LogItems do
op_position: change.log_offset.op_offset
}
|> put_if_true(:last, change.last?)
|> put_if_true(:tags, change.move_tags != [], change.move_tags)
}}
]
end
Expand All @@ -63,6 +68,7 @@ defmodule Electric.LogItems do
op_position: change.log_offset.op_offset
}
|> put_if_true(:last, change.last?)
|> put_if_true(:tags, change.move_tags != [], change.move_tags)
}}
]
end
Expand All @@ -82,6 +88,8 @@ defmodule Electric.LogItems do
op_position: change.log_offset.op_offset
}
|> put_if_true(:last, change.last?)
|> put_if_true(:tags, change.move_tags != [], change.move_tags)
|> put_if_true(:removed_tags, change.move_tags != [], change.removed_move_tags)
}
|> Map.merge(put_update_values(change, pk_cols, replica))}
]
Expand All @@ -95,14 +103,20 @@ defmodule Electric.LogItems do
%{
key: change.old_key,
value: take_pks_or_all(change.old_record, pk_cols, replica),
headers: %{
operation: :delete,
txids: List.wrap(txids),
relation: Tuple.to_list(change.relation),
key_change_to: change.key,
lsn: to_string(change.log_offset.tx_offset),
op_position: change.log_offset.op_offset
}
headers:
%{
operation: :delete,
txids: List.wrap(txids),
relation: Tuple.to_list(change.relation),
key_change_to: change.key,
lsn: to_string(change.log_offset.tx_offset),
op_position: change.log_offset.op_offset
}
|> put_if_true(
:tags,
change.move_tags != [],
change.move_tags ++ change.removed_move_tags
)
}},
{new_offset,
%{
Expand All @@ -118,6 +132,7 @@ defmodule Electric.LogItems do
op_position: new_offset.op_offset
}
|> put_if_true(:last, change.last?)
|> put_if_true(:tags, change.move_tags != [], change.move_tags)
}}
]
end
Expand Down
106 changes: 106 additions & 0 deletions packages/sync-service/lib/electric/postgres/snapshot_query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
defmodule Electric.Postgres.SnapshotQuery do
alias Electric.SnapshotError
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

@type pg_snapshot() ::
{xmin :: pos_integer(), xmax :: pos_integer(), xip_list :: [pos_integer()]}

@doc """
Execute a snapshot query for a shape in a isolated readonly transaction.

This function operates on two callbacks: `snapshot_info_fn` and `query_fn`.

`snapshot_info_fn` is called with the shape handle, the pg_snapshot, and the lsn as soon
as the snapshot information for the started transaction is available.

`query_fn` is called with the connection, the pg_snapshot, and the lsn and
is expected to do all the work querying and dealing with the results.

The query function is executed within a transaction, so it shouldn't return
a stream (as it will fail to be read after the transaction is committed), but
rather should execute all desired side-effects or materialize the results.

Query will be executed within a REPEATABLE READ READ ONLY transaction, with
correct display settings set.

Options:
- `:query_fn` - the function to execute the query.
- `:snapshot_info_fn` - the function to call with the snapshot information.
- `:stack_id` - the stack id for this shape.
"""
@spec execute_for_shape(Postgrex.conn(), Shape.handle(), Shape.t(), [option]) ::
{:ok, result} | {:error, any()}
when result: term(),
option:
{:snapshot_info_fn, (Shape.handle(), pg_snapshot, pos_integer() -> any())}
| {:query_fn, (Postgrex.conn(), pg_snapshot, pos_integer() -> result)}
| {:stack_id, Electric.stack_id()},
pg_snapshot:
{xmin :: pos_integer(), xmax :: pos_integer(), xip_list :: [pos_integer()]}
def execute_for_shape(pool, shape_handle, shape, opts) do
query_fn = Access.fetch!(opts, :query_fn)
snapshot_info_fn = Access.fetch!(opts, :snapshot_info_fn)
shape_attrs = shape_attrs(shape_handle, shape)
stack_id = Access.fetch!(opts, :stack_id)

Postgrex.transaction(
pool,
fn conn ->
ctx = %{
conn: conn,
stack_id: stack_id,
span_attrs: shape_attrs,
query_reason: Access.get(opts, :query_reason, "initial_snapshot")
}

query!(ctx, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY",
span_name: "shape_snapshot.start_readonly_txn"
)

[%{rows: [[pg_snapshot, lsn]]}] =
query!(ctx, "SELECT pg_current_snapshot(), pg_current_wal_lsn()",
span_name: "shape_snapshot.get_pg_snapshot"
)

snapshot_info_fn.(shape_handle, pg_snapshot, lsn)

query!(ctx, Electric.Postgres.display_settings(),
span_name: "shape_snapshot.set_display_settings"
)

query_fn.(conn, pg_snapshot, lsn)
end,
timeout: :infinity
)
catch
:exit, {_, {DBConnection.Holder, :checkout, _}} ->
raise SnapshotError.connection_not_available()
end

defp shape_attrs(shape_handle, shape) do
[
"shape.handle": shape_handle,
"shape.root_table": shape.root_table,
"shape.where": shape.where
]
end

@spec query!(map(), String.t() | [String.t()], Keyword.t()) :: [Postgrex.Result.t(), ...]
defp query!(
%{conn: conn, stack_id: stack_id, span_attrs: span_attrs},
query_or_queries,
opts
) do
OpenTelemetry.with_span(
Keyword.fetch!(opts, :span_name),
span_attrs,
stack_id,
fn ->
query_or_queries
|> List.wrap()
|> Enum.map(fn query -> Postgrex.query!(conn, query, Keyword.get(opts, :params, [])) end)
end
)
end
end
192 changes: 170 additions & 22 deletions packages/sync-service/lib/electric/postgres/xid.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,166 @@ defmodule Electric.Postgres.Xid do
# We don't include 0 in the definition of uint32 because it is not a valid transaction ID.
defguardp uint32?(num) when num > 0 and num <= @uint32_max

@doc """
Guard function to check if xid_l < xid_r using the same wraparound logic as compare/2.

This can be used in guard clauses. Since guards don't allow bit-casting, we manually
handle the modulo-2^32 arithmetic:
- For 64-bit XIDs (both > 32-bit max), use regular comparison
- For 32-bit XIDs, we compute the unsigned 32-bit difference and check if it's > 2^31

## Examples

iex> is_lt(3, 3)
false

iex> is_lt(2, 1)
false

iex> is_lt(2, 2)
false

iex> is_lt(2, 3)
true

iex> is_lt(#{@uint32_max}, #{@uint32_max})
false

iex> is_lt(1, #{@uint32_half_max})
true

iex> is_lt(1, #{@uint32_half_max + 1})
true

iex> is_lt(1, #{@uint32_half_max + 2})
false

iex> is_lt(1, #{@uint32_max})
false

iex> is_lt(#{@uint32_max}, 1)
true

iex> is_lt(#{@uint32_half_max}, 1)
false

iex> is_lt(#{@uint32_half_max + 1}, 1)
true

iex> is_lt(#{@uint32_half_max}, #{@uint32_max})
true

iex> is_lt(#{@uint32_half_max - 1}, #{@uint32_max})
true

iex> is_lt(#{@uint32_half_max - 2}, #{@uint32_max})
false

Any of the two arguments can be 64-bit, the order doesn't matter:

iex> is_lt(1, #{@uint64_xid})
true

iex> is_lt(1, #{@uint64_xid + 1})
true

iex> is_lt(1, #{@uint64_xid + 2})
false

iex> is_lt(#{@uint64_xid}, 1)
false

iex> is_lt(#{@uint64_xid + 1}, 1)
true

# When both numbers are 64-bit, regular comparison rules apply:

iex> is_lt(#{@uint64_xid + 2}, #{@uint64_xid + 1})
false

iex> is_lt(#{@uint64_xid}, #{@uint64_xid + @uint32_half_max + 2})
true
"""
# This produces equivalent results to the following C code:
#
# uint32 xid_l, xid_r;
# int32 signed_diff = (int32)(xid_l - xid_r);
# return signed_diff < 0;
#
defguard is_lt(xid_l, xid_r)
# Both are 64-bit XIDs - use regular comparison
# At least one is 32-bit - use modulo-2^32 comparison
# Case 1: xid_l >= xid_r (difference is non-negative)
# The unsigned 32-bit difference is >= 2^31, meaning wraparound makes xid_l < xid_r
# Case 2: xid_l < xid_r (difference is negative)
# rem() returns a negative value, so we add 2^32 to get the unsigned result
# Then check if it's >= 2^31 (values 0x80000000-0xFFFFFFFF represent negative signed ints)
when (not uint32?(xid_l) and not uint32?(xid_r) and xid_l > 0 and xid_r > 0 and
xid_l < xid_r) or
((uint32?(xid_l) or uint32?(xid_r)) and xid_l > 0 and xid_r > 0 and
((xid_l - xid_r >= 0 and
rem(xid_l - xid_r, @uint32_max + 1) >= @uint32_half_max) or
(xid_l - xid_r < 0 and
rem(xid_l - xid_r, @uint32_max + 1) + @uint32_max + 1 >=
@uint32_half_max)))

@doc """
Guard function to check if xid_l == xid_r using the same wraparound logic as compare/2.

This can be used in guard clauses. For equality, two XIDs are equal if their difference
is zero modulo 2^32.

## Examples

iex> is_eq(3, 3)
true

iex> is_eq(2, 1)
false

iex> is_eq(2, 2)
true

iex> is_eq(2, 3)
false

iex> is_eq(#{@uint32_max}, #{@uint32_max})
true

iex> is_eq(1, #{@uint32_half_max})
false

iex> is_eq(#{@uint32_max}, 1)
false

Any of the two arguments can be 64-bit, the order doesn't matter:

iex> is_eq(1, #{@uint64_xid})
false

iex> is_eq(#{@uint64_xid}, 1)
false

# When both numbers are 64-bit, regular comparison rules apply:

iex> is_eq(#{@uint64_xid}, #{@uint64_xid})
true

iex> is_eq(#{@uint64_xid + 2}, #{@uint64_xid + 1})
false
"""
# This produces equivalent results to the following C code:
#
# uint32 xid_l, xid_r;
# int32 signed_diff = (int32)(xid_l - xid_r);
# return signed_diff == 0;
#
defguard is_eq(xid_l, xid_r)
when (not uint32?(xid_l) and not uint32?(xid_r) and xid_l > 0 and xid_r > 0 and
xid_l == xid_r) or
((uint32?(xid_l) or uint32?(xid_r)) and xid_l > 0 and xid_r > 0 and
rem(xid_l - xid_r, @uint32_max + 1) == 0)

@doc """
In Postgres, any 32-bit xid has ~2 billion values preceding it and ~2 billion values following it.
Regular autovacuuming maintains this invariant. When we see a difference between two
Expand Down Expand Up @@ -102,26 +262,14 @@ defmodule Electric.Postgres.Xid do

# If both numbers do not fit into 32 bits, then both are of type xid8 and we compare them
# using regular comparison.
def compare(xid8_l, xid8_r)
when not uint32?(xid8_l) and not uint32?(xid8_r) and xid8_l > 0 and xid8_r > 0 do
cmp(xid8_l, xid8_r)
end

# If one of the numbers is a 32-bit unsigned integer, we compare the two numbers using
# modulo-2^32 arithmetic.
def compare(xid_l, xid_r) when (uint32?(xid_l) or uint32?(xid_r)) and xid_l > 0 and xid_r > 0 do
# This produces equivalent results to the following C code:
#
# uint32 xid_l, xid_r;
# int32 signed_diff = (int32)(xid_l - xid_r);
#
<<signed_diff::signed-32>> = <<xid_l - xid_r::unsigned-32>>

# If signed_diff is a negative number, xid_l precedes xid_r.
cmp(signed_diff, 0)
end

defp cmp(a, b) when a == b, do: :eq
defp cmp(a, b) when a < b, do: :lt
defp cmp(a, b) when a > b, do: :gt
def compare(xid8_l, xid8_r) when is_eq(xid8_l, xid8_r), do: :eq
def compare(xid8_l, xid8_r) when is_lt(xid8_l, xid8_r), do: :lt
def compare(_, _), do: :gt

@doc """
Check if a transaction is after the end of a snapshot - if it's xid is over xmax
"""
@spec after_snapshot?(anyxid, {anyxid, anyxid, [anyxid]}) :: boolean()
def after_snapshot?(xid, {_, xmax, _}) when not is_lt(xid, xmax), do: true
def after_snapshot?(_, _), do: false
end
Loading