Skip to content

Commit

Permalink
add consume_error to stream response process for error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
beligante committed Oct 6, 2022
1 parent db5ffb3 commit a84d803
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
18 changes: 9 additions & 9 deletions examples/route_guide/lib/client.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
defmodule RouteGuide.Client do
def main(channel) do
print_feature(channel, Routeguide.Point.new(latitude: 409_146_138, longitude: -746_188_906))
print_feature(channel, Routeguide.Point.new(latitude: 0, longitude: 0))
# print_feature(channel, Routeguide.Point.new(latitude: 409_146_138, longitude: -746_188_906))
# print_feature(channel, Routeguide.Point.new(latitude: 0, longitude: 0))

# Looking for features between 40, -75 and 42, -73.
print_features(
channel,
Routeguide.Rectangle.new(
lo: Routeguide.Point.new(latitude: 400_000_000, longitude: -750_000_000),
hi: Routeguide.Point.new(latitude: 420_000_000, longitude: -730_000_000)
)
)
# print_features(
# channel,
# Routeguide.Rectangle.new(
# lo: Routeguide.Point.new(latitude: 400_000_000, longitude: -750_000_000),
# hi: Routeguide.Point.new(latitude: 420_000_000, longitude: -730_000_000)
# )
# )

run_record_route(channel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
dequeued_state = State.update_request_stream_queue(state, queue)

cond do
# Do nothing, wait for server (on stream/2) to give us more window size
window_size == 0 -> {:noreply, state}
body_size > window_size -> chunk_body_and_enqueue_rest(request, dequeued_state)
true -> stream_body_and_reply(request, dequeued_state)
Expand Down Expand Up @@ -198,7 +199,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
{:noreply, new_state}

{:error, conn, error} ->
if is_reference(from) do
if from != nil do
GenServer.reply(from, {:error, error})
else
state
Expand All @@ -215,14 +216,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do

case stream_body(state.conn, request_ref, body, send_eof?) do
{:ok, conn} ->
if is_reference(from) do
if from != nil do
GenServer.reply(from, :ok)
end

check_request_stream_queue(State.update_conn(state, conn))

{:error, conn, error} ->
if is_reference(from) do
if from != nil do
GenServer.reply(from, {:error, error})
else
state
Expand Down
14 changes: 13 additions & 1 deletion lib/grpc/client/adapters/mint/stream_response_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
@doc """
Cast a message to process to consume an incoming data or trailers
"""
@spec consume(pid(), :data | :trailers | :headers, binary() | Mint.Types.headers()) :: :ok
@spec consume(
pid(),
:data | :trailers | :headers | :error,
binary() | Mint.Types.headers() | Mint.Types.error()
) :: :ok
def consume(pid, :data, data) do
GenServer.cast(pid, {:consume_response, {:data, data}})
end
Expand All @@ -43,6 +47,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
GenServer.cast(pid, {:consume_response, {:headers, headers}})
end

def consume(pid, :error, error) do
GenServer.cast(pid, {:consume_response, {:error, error}})
end

# Callbacks

def init({stream, send_headers_or_trailers?}) do
Expand Down Expand Up @@ -114,6 +122,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
{:noreply, state, {:continue, :produce_response}}
end

def handle_cast({:consume_response, {:error, _error} = error}, %{responses: responses} = state) do
{:noreply, %{state | responses: [error | responses]}, {:continue, :produce_response}}
end

def handle_cast({:consume_response, :done}, state) do
{:noreply, %{state | done: true}, {:continue, :produce_response}}
end
Expand Down

0 comments on commit a84d803

Please sign in to comment.