Skip to content

Commit

Permalink
Refactor ReplicationClient into a state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
alco committed Aug 2, 2024
1 parent 5522305 commit dadddc4
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 117 deletions.
158 changes: 41 additions & 117 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ defmodule Electric.Postgres.ReplicationClient do

alias Electric.Postgres.LogicalReplication.Decoder
alias Electric.Postgres.ReplicationClient.Collector
alias Electric.Postgres.ReplicationClient.ConnectionSetup

require Logger

@type step ::
:disconnected
| :connected
| :create_publication
| :create_slot
| :set_display_setting
| :ready_to_stream
| :streaming

defmodule State do
@enforce_keys [:transaction_received, :publication_name]
defstruct [
Expand Down Expand Up @@ -40,13 +50,7 @@ defmodule Electric.Postgres.ReplicationClient do
slot_name: String.t(),
origin: String.t(),
txn_collector: Collector.t(),
step:
:disconnected
| :create_publication
| :create_slot
| :ready_to_stream
| :streaming
| :set_display_setting,
step: Electric.Postgres.ReplicationClient.step(),
display_settings: [String.t()],
received_wal: non_neg_integer,
applied_wal: non_neg_integer
Expand All @@ -69,6 +73,8 @@ defmodule Electric.Postgres.ReplicationClient do
end
end

# @type state :: State.t()

@repl_msg_x_log_data ?w
@repl_msg_primary_keepalive ?k
@repl_msg_standby_status_update ?r
Expand All @@ -81,93 +87,54 @@ defmodule Electric.Postgres.ReplicationClient do
send(client, :start_streaming)
end

# The Postgrex.ReplicationConnection behaviour does not adhere to gen server conventions and
# establishes its own. Unless the `sync_connet: true` option is passed to `start_link()`, the
# The `Postgrex.ReplicationConnection` behaviour does not adhere to gen server conventions and
# establishes its own. Unless the `sync_connet: false` option is passed to `start_link()`, the
# connection process will try opening a replication connection to Postgres before returning
# from its `init()` callback.
#
# The callbacks `init()`, `handle_connect()` and `handle_result()` defined in this module
# below are all invoked inside the connection process' `init()` callback. Once any of our
# callbacks returns `{:stream, ...}`, the connection process finishes its initialization and
# starts receiving replication messages from Postgres, invoking the `handle_data()` callback
# for each one.
# switches into the logical streaming mode to start receiving logical messages from Postgres,
# invoking the `handle_data()` callback for each one.
@impl true
def init(replication_opts) do
{:ok, State.new(replication_opts)}
end

@impl true
def handle_connect(%State{display_settings: [query | rest]} = state) do
{:query, query, %{state | display_settings: rest, step: :set_display_setting}}
end
# `Postgrex.ReplicationConnection` opens a new replication connection to Postgres and then
# gives us a chance to execute one or more queries before switching into the logical
# streaming mode. It doesn't give us the connection socket but instead takes the query returned
# by one of our `handle_connect/1`, `handle_result/2` or `handle_info/2` callbacks, executes
# it, invokes the `handle_result/2` callback on the result which may return another query to
# execute, executes that, and so it goes on and on, recursively, until a callback returns
# `{:noreply, ...}` or `{:streaming, ...}`.
#
# To execute a series of queries one after the other, we define an ad-hoc state
# machine that starts from the :connected state in `handle_connect/1`, then transitions to
# the next step and returns the appropriate query to `Postgrex.ReplicationConnection` for execution,
# This is all implemented in a separate module named `Electric.Postgres.ReplicationClient.ConnectionSetup`
# to separate the connection setup logic from logical streaming.

@impl true
def handle_connect(state) do
if state.try_creating_publication? do
create_publication_step(state)
else
create_replication_slot_step(state)
end
%{state | step: :connected}
|> ConnectionSetup.start()
end

# Successful creation of the replication slot.
@impl true
def handle_result(
[%Postgrex.Result{command: :create_publication}],
%State{step: :create_publication} = state
) do
create_replication_slot_step(state)
end

def handle_result(result, %State{step: :set_display_setting} = state) do
if is_struct(result, Postgrex.Error) do
Logger.error("Failed to set display setting: #{inspect(result)}")
end

handle_connect(state)
end

def handle_result(%Postgrex.Error{} = error, %State{step: :create_publication} = state) do
error_message = "publication \"#{state.publication_name}\" already exists"

case error.postgres do
%{code: :duplicate_object, pg_code: "42710", message: ^error_message} ->
create_replication_slot_step(state)

other ->
{:disconnect, other}
end
end

def handle_result([%Postgrex.Result{} = result], %State{step: :create_slot} = state) do
log_slot_creation_result(result)

maybe_start_streaming(state)
def handle_result(result_list_or_error, state) do
ConnectionSetup.process_query_result(result_list_or_error, state)
end

# Error while trying to create the replication slot.
def handle_result(%Postgrex.Error{} = error, %State{step: :create_slot} = state) do
error_msg = "replication slot \"#{state.slot_name}\" already exists"

case error.postgres do
%{code: :duplicate_object, pg_code: "42710", message: ^error_msg} ->
# Slot already exists, proceed nominally.
Logger.debug("Found existing replication slot")
maybe_start_streaming(state)

_ ->
# Unexpected error, fail loudly.
raise error
end
@impl true
def handle_info(:start_streaming, %State{step: :ready_to_stream} = state) do
ConnectionSetup.start_streaming(state)
end

@impl true
def handle_info(:start_streaming, state) do
if state.step == :ready_to_stream do
start_streaming_step(state)
else
Logger.debug("Replication client requested to start streaming while step=#{state.step}")
{:noreply, state}
end
def handle_info(:start_streaming, %State{step: step} = state) do
Logger.debug("Replication client requested to start streaming while step=#{step}")
{:noreply, state}
end

@impl true
Expand Down Expand Up @@ -248,49 +215,6 @@ defmodule Electric.Postgres.ReplicationClient do
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch

defp create_publication_step(state) do
# We're creating an "empty" publication because first snapshot creation should add the table
query = "CREATE PUBLICATION #{state.publication_name}"
{:query, query, %{state | step: :create_publication}}
end

defp create_replication_slot_step(state) do
query = "CREATE_REPLICATION_SLOT #{state.slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :create_slot}}
end

defp maybe_start_streaming(state) do
if state.start_streaming? do
start_streaming_step(state)
else
{:noreply, %{state | step: :ready_to_stream}}
end
end

defp start_streaming_step(state) do
query =
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')"

Logger.info("Started replication from postgres")

{:stream, query, [], %{state | step: :streaming}}
end

defp log_slot_creation_result(result) do
Logger.debug(fn ->
%Postgrex.Result{
command: :create,
columns: ["slot_name", "consistent_point", "snapshot_name", "output_plugin"],
rows: [[_, lsn_str, nil, _]],
num_rows: 1,
connection_id: _,
messages: []
} = result

"Created new slot at lsn=#{lsn_str}"
end)
end

# This is an edge case that seems to be caused by the documented requirement to respond to `Primary
# keepalive message`[1] with a `Standby status update`[2] message that has all of the WAL byte
# offset values incremented by 1. Perhaps, it is a bug in Postgres: when Electric opens a new
Expand Down
Loading

0 comments on commit dadddc4

Please sign in to comment.