From a84d8031fa3908e1831503cf5a8f22f54249a129 Mon Sep 17 00:00:00 2001 From: Luis Gustavo Beligante Date: Thu, 6 Oct 2022 09:17:36 -0300 Subject: [PATCH] add consume_error to stream response process for error handling --- examples/route_guide/lib/client.ex | 18 +++++++++--------- .../connection_process/connection_process.ex | 7 ++++--- .../adapters/mint/stream_response_process.ex | 14 +++++++++++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/examples/route_guide/lib/client.ex b/examples/route_guide/lib/client.ex index 5d33f4ba..482242b8 100644 --- a/examples/route_guide/lib/client.ex +++ b/examples/route_guide/lib/client.ex @@ -1,16 +1,16 @@ defmodule RouteGuide.Client do def main(channel) do - print_feature(channel, Routeguide.Point.new(latitude: 409_146_138, longitude: -746_188_906)) - print_feature(channel, Routeguide.Point.new(latitude: 0, longitude: 0)) + # print_feature(channel, Routeguide.Point.new(latitude: 409_146_138, longitude: -746_188_906)) + # print_feature(channel, Routeguide.Point.new(latitude: 0, longitude: 0)) # Looking for features between 40, -75 and 42, -73. - print_features( - channel, - Routeguide.Rectangle.new( - lo: Routeguide.Point.new(latitude: 400_000_000, longitude: -750_000_000), - hi: Routeguide.Point.new(latitude: 420_000_000, longitude: -730_000_000) - ) - ) + # print_features( + # channel, + # Routeguide.Rectangle.new( + # lo: Routeguide.Point.new(latitude: 400_000_000, longitude: -750_000_000), + # hi: Routeguide.Point.new(latitude: 420_000_000, longitude: -730_000_000) + # ) + # ) run_record_route(channel) diff --git a/lib/grpc/client/adapters/mint/connection_process/connection_process.ex b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex index efd2cf71..97fd71c3 100644 --- a/lib/grpc/client/adapters/mint/connection_process/connection_process.ex +++ b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex @@ -133,6 +133,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do dequeued_state = State.update_request_stream_queue(state, queue) cond do + # Do nothing, wait for server (on stream/2) to give us more window size window_size == 0 -> {:noreply, state} body_size > window_size -> chunk_body_and_enqueue_rest(request, dequeued_state) true -> stream_body_and_reply(request, dequeued_state) @@ -198,7 +199,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do {:noreply, new_state} {:error, conn, error} -> - if is_reference(from) do + if from != nil do GenServer.reply(from, {:error, error}) else state @@ -215,14 +216,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do case stream_body(state.conn, request_ref, body, send_eof?) do {:ok, conn} -> - if is_reference(from) do + if from != nil do GenServer.reply(from, :ok) end check_request_stream_queue(State.update_conn(state, conn)) {:error, conn, error} -> - if is_reference(from) do + if from != nil do GenServer.reply(from, {:error, error}) else state diff --git a/lib/grpc/client/adapters/mint/stream_response_process.ex b/lib/grpc/client/adapters/mint/stream_response_process.ex index f8f29ece..d7bfc66a 100644 --- a/lib/grpc/client/adapters/mint/stream_response_process.ex +++ b/lib/grpc/client/adapters/mint/stream_response_process.ex @@ -30,7 +30,11 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do @doc """ Cast a message to process to consume an incoming data or trailers """ - @spec consume(pid(), :data | :trailers | :headers, binary() | Mint.Types.headers()) :: :ok + @spec consume( + pid(), + :data | :trailers | :headers | :error, + binary() | Mint.Types.headers() | Mint.Types.error() + ) :: :ok def consume(pid, :data, data) do GenServer.cast(pid, {:consume_response, {:data, data}}) end @@ -43,6 +47,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do GenServer.cast(pid, {:consume_response, {:headers, headers}}) end + def consume(pid, :error, error) do + GenServer.cast(pid, {:consume_response, {:error, error}}) + end + # Callbacks def init({stream, send_headers_or_trailers?}) do @@ -114,6 +122,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do {:noreply, state, {:continue, :produce_response}} end + def handle_cast({:consume_response, {:error, _error} = error}, %{responses: responses} = state) do + {:noreply, %{state | responses: [error | responses]}, {:continue, :produce_response}} + end + def handle_cast({:consume_response, :done}, state) do {:noreply, %{state | done: true}, {:continue, :produce_response}} end