Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sync-service): Set display settings before querying initial data #232

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/swift-meals-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Enforce the use of consistent display formats for both the initial snapshot and the live replication stream.
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ defmodule Electric.ShapeCache do
%{rows: [[xmin]]} =
Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", [])

# Enforce display settings *before* querying initial data to maintain consistent
# formatting between snapshot and live log entries.
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))

GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin})
{query, stream} = Querying.stream_initial_data(conn, shape)

Expand Down
15 changes: 7 additions & 8 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ defmodule Electric.Plug.RouterTest do
"""
use ExUnit.Case

alias Electric.Replication.LogOffset
alias Support.DbStructureSetup
alias Electric.Plug.Router
alias Support.DbSetup
alias Electric.Replication.Changes
import Support.ComponentSetup
import Support.DbSetup
import Support.DbStructureSetup
import Plug.Test

alias Electric.Plug.Router
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset

@moduletag :tmp_dir
@moduletag :capture_log

Expand All @@ -32,9 +33,7 @@ defmodule Electric.Plug.RouterTest do
end

describe "/v1/shapes" do
setup {DbSetup, :with_unique_db}
setup {DbStructureSetup, :with_basic_tables}
setup {DbStructureSetup, :with_sql_execute}
setup [:with_unique_db, :with_basic_tables, :with_sql_execute]

setup do
%{publication_name: "electric_test_publication", slot_name: "electric_test_slot"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defmodule Electric.Postgres.ReplicationClientTest do
use ExUnit.Case, async: true

import Support.DbSetup, except: [with_publication: 1]
import Support.DbStructureSetup

alias Electric.Postgres.Lsn
alias Electric.Postgres.ReplicationClient

Expand All @@ -16,8 +19,7 @@ defmodule Electric.Postgres.ReplicationClientTest do
@slot_name "test_electric_slot"

describe "ReplicationClient init" do
setup {Support.DbSetup, :with_unique_db}
setup {Support.DbStructureSetup, :with_basic_tables}
setup [:with_unique_db, :with_basic_tables]

test "creates an empty publication on startup if requested", %{
db_config: config,
Expand All @@ -40,11 +42,7 @@ defmodule Electric.Postgres.ReplicationClientTest do
end

describe "ReplicationClient against real db" do
setup [
{Support.DbSetup, :with_unique_db},
{Support.DbStructureSetup, :with_basic_tables},
:setup_publication_and_replication_opts
]
setup [:with_unique_db, :with_basic_tables, :with_publication, :with_replication_opts]

test "calls a provided function when receiving it from the PG",
%{db_config: config, replication_opts: replication_opts, db_conn: conn} do
Expand Down Expand Up @@ -168,70 +166,66 @@ defmodule Electric.Postgres.ReplicationClientTest do
refute_receive _
end

# Set the DB's display settings to something else than Electric.Postgres.display_settings
@tag database_settings: [
"DateStyle='Postgres, DMY'",
"TimeZone='CET'",
"extra_float_digits=-1",
"bytea_output='escape'",
"IntervalStyle='postgres'"
]
@tag additional_fields:
"date DATE, timestamptz TIMESTAMPTZ, float FLOAT8, bytea BYTEA, interval INTERVAL"
test "returns data formatted according to display settings", %{
db_config: config,
replication_opts: replication_opts,
db_conn: conn
} do
replication_opts = Keyword.put(replication_opts, :try_creating_publication?, true)
db_name = Keyword.get(config, :database)

# Set the DB's display settings to something else than Electric.Postgres.display_settings
Postgrex.query!(conn, "ALTER DATABASE \"#{db_name}\" SET DateStyle='Postgres, DMY';", [])
Postgrex.query!(conn, "ALTER DATABASE \"#{db_name}\" SET TimeZone='CET';", [])
Postgrex.query!(conn, "ALTER DATABASE \"#{db_name}\" SET extra_float_digits=-1;", [])
Postgrex.query!(conn, "ALTER DATABASE \"#{db_name}\" SET bytea_output='escape';", [])
Postgrex.query!(conn, "ALTER DATABASE \"#{db_name}\" SET IntervalStyle='postgres';", [])

assert {:ok, _} = ReplicationClient.start_link(config, replication_opts)

{:ok, _} =
Postgrex.query(
conn,
"INSERT INTO items (id, value, date, timestamptz, float, bytea, interval) VALUES ($1, $2, $3, $4, $5, $6, $7)",
[
Ecto.UUID.bingenerate(),
"test value",
~D[2022-05-17],
~U[2022-01-12 00:01:00.00Z],
1.234567890123456,
# 5 in hex
"0x5",
%Postgrex.Interval{
days: 1,
months: 0,
# 12 hours, 59 minutes, 10 seconds
secs: 46750,
microsecs: 0
}
]
Postgrex.query!(
conn,
"""
INSERT INTO items (
id, value, date, timestamptz, float, bytea, interval
) VALUES (
$1, $2, $3, $4, $5, $6, $7
)
""",
[
Ecto.UUID.bingenerate(),
"test value",
~D[2022-05-17],
~U[2022-01-12 00:01:00.00Z],
1.234567890123456,
<<0x5, 0x10, 0xFA>>,
%Postgrex.Interval{
days: 1,
months: 0,
# 12 hours, 59 minutes, 10 seconds
secs: 46750,
microsecs: 0
}
]
)

# Check that the incoming data is formatted according to Electric.Postgres.display_settings
assert_receive {:from_replication, %Transaction{changes: [change]}}

assert %NewRecord{
record: %{
"date" => "2022-05-17",
"timestamptz" => timestamp,
"timestamptz" => "2022-01-12 00:01:00+00",
"float" => "1.234567890123456",
"bytea" => "\\x307835",
"bytea" => "\\x0510fa",
"interval" => "P1DT12H59M10S"
}
} = change

assert String.ends_with?(timestamp, "+00")
end
end

describe "ReplicationClient against real db (toast)" do
setup [
{Support.DbSetup, :with_unique_db},
{Support.DbStructureSetup, :with_basic_tables},
:setup_publication_and_replication_opts
]
setup [:with_unique_db, :with_basic_tables, :with_publication, :with_replication_opts]

setup %{db_config: config, replication_opts: replication_opts, db_conn: conn} do
Postgrex.query!(
Expand Down Expand Up @@ -341,9 +335,12 @@ defmodule Electric.Postgres.ReplicationClientTest do
assert app_wal == state.applied_wal
end

defp setup_publication_and_replication_opts(%{db_conn: conn}) do
create_publication_for_all_tables(conn)
defp with_publication(%{db_conn: conn}) do
Postgrex.query!(conn, "CREATE PUBLICATION #{@publication_name} FOR ALL TABLES", [])
:ok
end

defp with_replication_opts(_) do
%{
replication_opts: [
publication_name: @publication_name,
Expand Down Expand Up @@ -385,9 +382,6 @@ defmodule Electric.Postgres.ReplicationClientTest do
:ok
end

defp create_publication_for_all_tables(conn),
do: Postgrex.query!(conn, "CREATE PUBLICATION #{@publication_name} FOR ALL TABLES", [])

defp gen_random_string(length) do
Stream.repeatedly(fn -> :rand.uniform(125 - 32) + 32 end)
|> Enum.take(length)
Expand Down
117 changes: 94 additions & 23 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
defmodule Electric.ShapeCacheTest do
use ExUnit.Case, async: true

import ExUnit.CaptureLog
import Support.ComponentSetup
import Support.DbSetup
import Support.DbStructureSetup
import Support.TestUtils

alias Electric.ShapeCache.Storage
alias Electric.ShapeCache
alias Electric.Shapes.Shape
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
alias Electric.ShapeCache
alias Electric.ShapeCache.Storage
alias Electric.Shapes.Shape

@shape %Shape{
root_table: {"public", "items"},
Expand Down Expand Up @@ -41,17 +42,14 @@ defmodule Electric.ShapeCacheTest do
@prepare_tables_noop {__MODULE__, :prepare_tables_noop, []}

describe "get_or_create_shape_id/2" do
setup :with_in_memory_storage
setup [:with_in_memory_storage, :with_no_pool]

setup(do: %{pool: :no_pool})

setup(ctx,
do:
with_shape_cache(ctx,
create_snapshot_fn: fn _, _, _, _, _ -> nil end,
prepare_tables_fn: @prepare_tables_noop
)
)
setup ctx do
with_shape_cache(ctx,
create_snapshot_fn: fn _, _, _, _, _ -> nil end,
prepare_tables_fn: @prepare_tables_noop
)
end

test "creates a new shape_id", %{shape_cache_opts: opts} do
{shape_id, @zero_offset} = ShapeCache.get_or_create_shape_id(@shape, opts)
Expand Down Expand Up @@ -183,11 +181,13 @@ defmodule Electric.ShapeCacheTest do
end

describe "get_or_create_shape_id/2 against real db" do
setup :with_in_memory_storage
setup :with_unique_db
setup :with_publication
setup :with_basic_tables
setup :with_shape_cache
setup [
:with_in_memory_storage,
:with_unique_db,
:with_publication,
:with_basic_tables,
:with_shape_cache
]

setup %{pool: pool} do
Postgrex.query!(pool, "INSERT INTO items (id, value) VALUES ($1, $2), ($3, $4)", [
Expand All @@ -200,15 +200,84 @@ defmodule Electric.ShapeCacheTest do
:ok
end

test "creates initial snapshot from DB data",
%{storage: storage, shape_cache_opts: opts} do
test "creates initial snapshot from DB data", %{storage: storage, shape_cache_opts: opts} do
{shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts)
assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id)
assert Storage.snapshot_exists?(shape_id, storage)
assert {@zero_offset, stream} = Storage.get_snapshot(shape_id, storage)

assert [%{value: %{"value" => "test1"}}, %{value: %{"value" => "test2"}}] =
Enum.to_list(stream)
stream_to_list(stream)
end

# Set the DB's display settings to something else than Electric.Postgres.display_settings
@tag database_settings: [
"DateStyle='Postgres, DMY'",
"TimeZone='CET'",
"extra_float_digits=-1",
"bytea_output='escape'",
"IntervalStyle='postgres'"
]
@tag additional_fields:
"date DATE, timestamptz TIMESTAMPTZ, float FLOAT8, bytea BYTEA, interval INTERVAL"
test "uses correct display settings when querying initial data", %{
pool: pool,
storage: storage,
shape_cache_opts: opts
} do
shape =
update_in(
@shape.table_info[{"public", "items"}].columns,
&(&1 ++
[
%{name: "date", type: :date},
%{name: "timestamptz", type: :timestamptz},
%{name: "float", type: :float8},
%{name: "bytea", type: :bytea},
%{name: "interval", type: :interval}
])
)

Postgrex.query!(
pool,
"""
INSERT INTO items (
id, value, date, timestamptz, float, bytea, interval
) VALUES (
$1, $2, $3, $4, $5, $6, $7
)
""",
[
Ecto.UUID.bingenerate(),
"test value",
~D[2022-05-17],
~U[2022-01-12 00:01:00.00Z],
1.234567890123456,
<<0x5, 0x10, 0xFA>>,
%Postgrex.Interval{
days: 1,
months: 0,
# 12 hours, 59 minutes, 10 seconds
secs: 46750,
microsecs: 0
}
]
)

{shape_id, _} = ShapeCache.get_or_create_shape_id(shape, opts)
assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id)
assert {@zero_offset, stream} = Storage.get_snapshot(shape_id, storage)

assert [%{value: map}, %{value: %{"value" => "test1"}}, %{value: %{"value" => "test2"}}] =
stream_to_list(stream)

assert %{
"date" => "2022-05-17",
"timestamptz" => "2022-01-12 00:01:00+00",
"float" => "1.234567890123456",
"bytea" => "\\x0510fa",
"interval" => "P1DT12H59M10S"
} = map
end

test "updates latest offset correctly",
Expand Down Expand Up @@ -543,8 +612,7 @@ defmodule Electric.ShapeCacheTest do
@describetag :tmp_dir
@snapshot_xmin 10

setup :with_cub_db_storage
setup(do: %{pool: :no_pool})
setup [:with_cub_db_storage, :with_no_pool]

setup(ctx,
do:
Expand Down Expand Up @@ -631,4 +699,7 @@ defmodule Electric.ShapeCacheTest do
end

def prepare_tables_noop(_, _), do: :ok

defp stream_to_list(stream),
do: Enum.sort_by(stream, fn %{value: %{"value" => val}} -> val end)
end
4 changes: 4 additions & 0 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ defmodule Support.ComponentSetup do
%{storage: {InMemoryStorage, storage_opts}}
end

def with_no_pool(_ctx) do
%{pool: :no_pool}
end

def with_cub_db_storage(ctx) do
{:ok, storage_opts} =
CubDbStorage.shared_opts(
Expand Down
Loading
Loading