Skip to content

Commit

Permalink
elixir-client: Fix race condition (#2157)
Browse files Browse the repository at this point in the history
  • Loading branch information
magnetised authored Dec 12, 2024
1 parent 0e1a7fd commit 01c63ae
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/lucky-boats-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix race condition in elixir client when multiple simultaneous clients are streaming the same shape
5 changes: 3 additions & 2 deletions packages/elixir-client/lib/electric/client/fetch/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ defmodule Electric.Client.Fetch.Monitor do
@impl true
def handle_continue({:start_request, request_id, request, client}, state) do
{:ok, _pid} = Fetch.Request.start_link({request_id, request, client, self()})

{:noreply, state}
end

Expand Down Expand Up @@ -127,7 +128,7 @@ defmodule Electric.Client.Fetch.Monitor do
send(pid, {:response, ref, response})
end

{:stop, :normal, :ok, state}
{:stop, {:shutdown, :normal}, :ok, state}
end

@impl true
Expand Down Expand Up @@ -157,6 +158,6 @@ defmodule Electric.Client.Fetch.Monitor do
send(pid, {:response, ref, {:error, reason}})
end

{:stop, :normal, state}
{:stop, {:shutdown, :normal}, state}
end
end
20 changes: 11 additions & 9 deletions packages/elixir-client/lib/electric/client/fetch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ defmodule Electric.Client.Fetch.Pool do
def request(%Client{} = client, %Fetch.Request{} = request, opts) do
request_id = request_id(client, request)

# register this pid before making the request to avoid race conditions for
# very fast responses
# The monitor process is unique to the request and launches the actual
# request as a linked process.
#
# This coalesces requests, so no matter how many simultaneous
# clients we have, we only ever make one request to the backend.
{:ok, monitor_pid} = start_monitor(request_id, request, client)

try do
Expand Down Expand Up @@ -48,13 +51,12 @@ defmodule Electric.Client.Fetch.Pool do
defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid}
defp return_existing(error), do: error

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{shape_handle: nil} = request) do
%{endpoint: endpoint, shape: shape_definition} = request
{fetch_impl, URI.to_string(endpoint), shape_definition}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request) do
%{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request
{fetch_impl, URI.to_string(endpoint), shape_handle, Client.Offset.to_tuple(offset), live}
{
fetch_impl,
URI.to_string(request.endpoint),
request.headers,
Fetch.Request.params(request)
}
end
end
25 changes: 14 additions & 11 deletions packages/elixir-client/lib/electric/client/fetch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ defmodule Electric.Client.Fetch.Request do
|> Util.map_put_if("replica", to_string(replica), replica != :default)
|> Util.map_put_if("handle", shape_handle, is_binary(shape_handle))
|> Util.map_put_if("live", "true", live?)
|> Util.map_put_if("cursor", cursor, !is_nil(cursor))
|> Util.map_put_if("cursor", to_string(cursor), !is_nil(cursor))
|> Util.map_put_if("database_id", database_id, !is_nil(database_id))
end

Expand All @@ -133,9 +133,7 @@ defmodule Electric.Client.Fetch.Request do

@doc false
def start_link({request_id, request, client, monitor_pid}) do
GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid},
name: name(request_id)
)
GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid})
end

@impl true
Expand All @@ -161,16 +159,21 @@ defmodule Electric.Client.Fetch.Request do

authenticated_request = Client.authenticate_request(client, request)

case fetcher.fetch(authenticated_request, fetcher_opts) do
{:ok, %Fetch.Response{status: status} = response} when status in 200..299 ->
reply(response, state)
try do
case fetcher.fetch(authenticated_request, fetcher_opts) do
{:ok, %Fetch.Response{status: status} = response} when status in 200..299 ->
reply(response, state)

{:ok, %Fetch.Response{} = response} ->
# Turn HTTP errors into errors
reply({:error, response}, state)
{:ok, %Fetch.Response{} = response} ->
# Turn HTTP errors into errors
reply({:error, response}, state)

error ->
reply(error, state)
end
rescue
error ->
reply(error, state)
reply({:error, error}, state)
end

{:stop, :normal, state}
Expand Down

0 comments on commit 01c63ae

Please sign in to comment.