Skip to content

Remove gun-specific code from stub to improve client adapter flexibility #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
14 changes: 5 additions & 9 deletions lib/grpc/client/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ defmodule GRPC.Client.Adapter do
@callback send_request(stream :: Stream.t(), contents :: binary(), opts :: keyword()) ::
Stream.t()

@callback recv_headers(stream :: map(), headers :: map(), opts :: keyword()) ::
{:ok, %{String.t() => String.t()}, fin()} | {:error, GRPC.RPCError.t()}

@callback recv_data_or_trailers(
stream :: map(),
trailers_or_metadata :: map(),
opts :: keyword()
) ::
{:data, binary()} | {:trailers, binary()} | {:error, GRPC.RPCError.t()}
@doc """
Check `GRPC.Stub.recv/2` for more context about the return types
"""
@callback receive_data(stream :: Stream.t(), opts :: keyword()) ::
GRPC.Stub.receive_data_return() | {:error, any()}
end
175 changes: 172 additions & 3 deletions lib/grpc/client/adapters/gun.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,38 @@ defmodule GRPC.Client.Adapters.Gun do
end

@impl true
def recv_headers(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}, opts) do
def receive_data(
%{server_stream: true} = stream,
opts
) do
%{channel: %{adapter_payload: adapter_payload}, payload: payload} = stream

with {:ok, headers, is_fin} <- recv_headers(adapter_payload, payload, opts) do
response = response_stream(is_fin, stream, opts)

if(opts[:return_headers]) do
{:ok, response, %{headers: headers}}
else
{:ok, response}
end
end
end

def receive_data(stream, opts) do
%{payload: payload, channel: %{adapter_payload: adapter_payload}} = stream

with {:ok, headers, _is_fin} <- recv_headers(adapter_payload, payload, opts),
{:ok, body, trailers} <- recv_body(adapter_payload, payload, opts),
{:ok, response} <- parse_response(stream, headers, body, trailers) do
if(opts[:return_headers]) do
{:ok, response, %{headers: headers, trailers: trailers}}
else
{:ok, response}
end
end
end

defp recv_headers(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}, opts) do
case await(conn_pid, stream_ref, opts[:timeout]) do
{:response, headers, fin} ->
{:ok, headers, fin}
Expand All @@ -152,8 +183,7 @@ defmodule GRPC.Client.Adapters.Gun do
end
end

@impl true
def recv_data_or_trailers(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}, opts) do
defp recv_data_or_trailers(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}, opts) do
case await(conn_pid, stream_ref, opts[:timeout]) do
data = {:data, _} ->
data
Expand Down Expand Up @@ -287,4 +317,143 @@ defmodule GRPC.Client.Adapters.Gun do
timeout = round(timeout + jitter * timeout)
%{retries: retries - 1, timeout: timeout}
end

defp recv_body(conn_payload, stream_payload, opts) do
recv_body(conn_payload, stream_payload, "", opts)
end

defp recv_body(conn_payload, stream_payload, acc, opts) do
case recv_data_or_trailers(conn_payload, stream_payload, opts) do
{:data, data} ->
recv_body(conn_payload, stream_payload, <<acc::binary, data::binary>>, opts)

{:trailers, trailers} ->
{:ok, acc, GRPC.Transport.HTTP2.decode_headers(trailers)}

err ->
err
end
end

defp response_stream(:fin, _stream, _opts), do: []

defp response_stream(
:nofin,
%{
channel: %{adapter_payload: ap},
response_mod: res_mod,
codec: codec,
payload: payload
},
opts
) do
state = %{
adapter_payload: ap,
payload: payload,
buffer: <<>>,
fin: false,
need_more: true,
opts: opts,
response_mod: res_mod,
codec: codec
}

Stream.unfold(state, fn s -> read_stream(s) end)
end

defp read_stream(%{buffer: <<>>, fin: true, fin_resp: nil}), do: nil

defp read_stream(%{buffer: <<>>, fin: true, fin_resp: fin_resp} = s),
do: {fin_resp, Map.put(s, :fin_resp, nil)}

defp read_stream(
%{
adapter_payload: ap,
payload: payload,
buffer: buffer,
need_more: true,
opts: opts
} = stream
) do
case recv_data_or_trailers(ap, payload, opts) do
{:data, data} ->
stream
|> Map.put(:need_more, false)
|> Map.put(:buffer, buffer <> data)
|> read_stream()

{:trailers, trailers} ->
update_stream_with_trailers(stream, trailers, opts[:return_headers])

error = {:error, _} ->
{error, %{buffer: <<>>, fin: true, fin_resp: nil}}
end
end

defp read_stream(%{buffer: buffer, need_more: false, response_mod: res_mod, codec: codec} = s) do
case GRPC.Message.get_message(buffer) do
{{_, message}, rest} ->
reply = codec.decode(message, res_mod)
new_s = Map.put(s, :buffer, rest)
{{:ok, reply}, new_s}

_ ->
read_stream(Map.put(s, :need_more, true))
end
end

defp parse_response(
%{response_mod: res_mod, codec: codec, accepted_compressors: accepted_compressors},
headers,
body,
trailers
) do
with :ok <- parse_trailers(trailers),
compressor <- get_compressor(headers, accepted_compressors),
body <- get_body(codec, body),
{:ok, msg} <- GRPC.Message.from_data(%{compressor: compressor}, body) do
{:ok, codec.decode(msg, res_mod)}
end
end

defp update_stream_with_trailers(stream, trailers, return_headers?) do
trailers = GRPC.Transport.HTTP2.decode_headers(trailers)

case parse_trailers(trailers) do
:ok ->
fin_resp = if return_headers?, do: {:trailers, trailers}

stream
|> Map.put(:fin, true)
|> Map.put(:fin_resp, fin_resp)
|> read_stream()

error ->
{error, %{buffer: <<>>, fin: true, fin_resp: nil}}
end
end

defp parse_trailers(trailers) do
status = String.to_integer(trailers["grpc-status"])

if status == GRPC.Status.ok() do
:ok
else
{:error, %GRPC.RPCError{status: status, message: trailers["grpc-message"]}}
end
end

defp get_compressor(%{"grpc-encoding" => encoding} = _headers, accepted_compressors) do
Enum.find(accepted_compressors, nil, fn c -> c.name() == encoding end)
end

defp get_compressor(_headers, _accepted_compressors), do: nil

defp get_body(codec, body) do
if function_exported?(codec, :unpack_from_channel, 1) do
codec.unpack_from_channel(body)
else
body
end
end
end
9 changes: 8 additions & 1 deletion lib/grpc/client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ defmodule GRPC.Client.Stream do
compressor: nil,
accepted_compressors: [],
headers: %{},
__interface__: %{send_request: &__MODULE__.send_request/3, recv: &GRPC.Stub.do_recv/2}
__interface__: %{
send_request: &__MODULE__.send_request/3,
receive_data: &__MODULE__.receive_data/2
}

@doc false
def put_payload(%{payload: payload} = stream, key, val) do
Expand Down Expand Up @@ -90,4 +93,8 @@ defmodule GRPC.Client.Stream do
compressor: compressor
)
end

def receive_data(%{channel: %{adapter: adapter}} = stream, opts) do
adapter.receive_data(stream, opts)
end
end
Loading