Skip to content

Commit

Permalink
feat: Add Mint adapter (#272)
Browse files Browse the repository at this point in the history
* remove gun specific code from stub to improve adapter flexilibity

* remove maybe_return_headers in gun adapter for clairty

* refactor parse_response from gun adapter to improve readability

* refactor read_stream from gun adapter to improve readability

* remove unecessary response parse from inside with condition

* add mint adapter and implement draft for open connection

* add mint adapter and implement draft for open connection

* add simple request and response handlers

* add server stream connection handling

* refactor server stream consuption to use a short lived genserver

* fix disconnect code for connection_process when brutal killing

* refactor: use stream response module to handle unary calls

* add todo for compressor headers

* remove mint adapter code from hello world

* add client streaming api for mint adapter

* uncomment route_guide test client requests

* raise exception for unhandled error cases

* refact(client/mint_adapter): add cond to handle the four rpc types to receive data and improve readability

* refact(client/mint_adapter): use keyword.get to return headers and trailers

* chunk and enqueue requests inside connection process

* remove unused functions from state module

* fix reference check for response

* add consume_error to stream response process for error handling

* remove comments from route guide examples

* replace case in favor of if/else

* fix disconnect match on mint adapter

* improve error handling for response stream

* refact response module to use guards to return headers or trailers

* add factories for channel and stream, remove old factory code to use ex_machina, add tests to cast calls for stream_response

* add tests for error and done cast calls

* remove empty test

* add tests to stream producer for response process

* fix dialyzer issue

* improve documentation for connection process and add tests for disconnect and handle_call for requests

* add tests to stream_body handle_call cases on connection process module

* add test for process_request_stream_queue

* add error test cases for connection process

* add mint adapter to interop tests

* add tests to end_stream and cancel callbacks, also improved documentation

* fix dialyzer issues

* improve documentation and error handling

* remove mint from example projects

* remove ex_machina dependency

* fix: remove ex-machina from test helper

* add connection close handling

* add connection close checks and tests

* remove :queue.fold reference to avoid break on old versions of erlang

* send error tuple instead of raise for connect

* refact: extract interop test runnet to its module runner

* fix: fix code review suggestions

* Improve doc on `GRPC.Client.Adapters.Mint.ConnectionProcess`.
* Remove `Map.keys/1` in `GRPC.Client.Adapters.Mint.ConnectionProcess.finish_all_pending_requests/1`
* Remove if parenthesises.

* fix: fix code review suggestions

* fix: fix code review suggestions

* fix: fix code review suggestions

* Expand alias group in `GRPC.Client.Adapters.Mint`.

* fix: fix code review suggestions

* Expand alias group in interop test.

* fix: fix code review suggestions

* refactor: extract ref at the function argument

* remove unecessary sorts

* rename disconnect to remove brutal term

* rename bidi_stream atom to bidirectional_stream for clarity

* refactor response checks for mint adapter to have a single funtion

* refactor response process to use call instead of cast

* add pattern match for response calls

* unify return headers behavior to match gun behavior

* simplify code in favor of use enum module

* refactor variable names for clarity

* add todo for error handling on response process

* add bit lengh for binary manipulation for clarity

* rollback cowboy config

* unify factories to a single module

* add case for chunk_body to avoid raise

* Update interop/lib/interop/client.ex

Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>

* Update interop/lib/interop/client.ex

Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>

* improve documentation for GRPC.Stub.recv

* remove doc for GRPC.Stub.call/5

* make connection status check a private function

* apply PR comments

* apply missing PR comments

* Update lib/grpc/stub.ex

Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>

* anotate stream response process for genserver callback functions

* Update lib/grpc/client/adapters/mint/connection_process/connection_process.ex

Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>

* improve error messages

* Apply suggestions from code review

Co-authored-by: Thanabodee Charoenpiriyakij <thanabodee.c@linecorp.com>
Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 10, 2023
1 parent 71012b3 commit f5f5fab
Show file tree
Hide file tree
Showing 27 changed files with 1,951 additions and 97 deletions.
52 changes: 33 additions & 19 deletions interop/lib/interop/client.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
defmodule Interop.Client do

import ExUnit.Assertions, only: [refute: 1]

require Logger

# To better understand the behavior of streams used in this module
# we suggest you to check the documentation for `GRPC.Stub.recv/2`
# there is some unusual behavior that can be observed.

def connect(host, port, opts \\ []) do
{:ok, ch} = GRPC.Stub.connect(host, port, opts)
ch
Expand Down Expand Up @@ -103,12 +108,9 @@ defmodule Interop.Client do
params = Enum.map([31415, 9, 2653, 58979], &res_param(&1))
req = Grpc.Testing.StreamingOutputCallRequest.new(response_parameters: params)
{:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req)
result = Enum.map([31415, 9, 2653, 58979], &String.duplicate(<<0>>, &1))
result = Enum.map([9, 2653, 31415, 58979], &String.duplicate(<<0>>, &1))

^result =
Enum.map(res_enum, fn {:ok, res} ->
res.payload.body
end)
^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort()
end

def server_compressed_streaming!(ch) do
Expand All @@ -122,10 +124,7 @@ defmodule Interop.Client do
{:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req)
result = Enum.map([31415, 92653], &String.duplicate(<<0>>, &1))

^result =
Enum.map(res_enum, fn {:ok, res} ->
res.payload.body
end)
^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort()
end

def ping_pong!(ch) do
Expand All @@ -143,15 +142,13 @@ defmodule Interop.Client do
{:ok, res_enum} = GRPC.Stub.recv(stream)
reply = String.duplicate(<<0>>, 31415)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0)

Enum.each([{9, 8}, {2653, 1828}, {58979, 45904}], fn {res, payload} ->
GRPC.Stub.send_request(stream, req.(res, payload))
reply = String.duplicate(<<0>>, res)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0)
end)

GRPC.Stub.end_stream(stream)
Expand Down Expand Up @@ -191,19 +188,32 @@ defmodule Interop.Client do
payload: payload(271_828)
)

{:ok, res_enum, %{headers: new_headers}} =
{headers, data, trailers} =
ch
|> Grpc.Testing.TestService.Stub.full_duplex_call(metadata: metadata)
|> GRPC.Stub.send_request(req, end_stream: true)
|> GRPC.Stub.recv(return_headers: true)
|> process_full_duplex_response()

reply = String.duplicate(<<0>>, 314_159)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
%{payload: %{body: ^reply}} = data

{:trailers, new_trailers} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
validate_headers!(new_headers, new_trailers)
validate_headers!(headers, trailers)
end

defp process_full_duplex_response({:ok, res_enum, %{headers: new_headers}}) do
{:ok, data} = Enum.at(res_enum, 0)
{:trailers, new_trailers} = Enum.at(res_enum, 0)
{new_headers, data, new_trailers}
end


defp process_full_duplex_response({:ok, res_enum}) do
{:headers, headers} = Enum.at(res_enum, 0)
{:ok, data} = Enum.at(res_enum, 0)
{:trailers, trailers} = Enum.at(res_enum, 0)
{headers, data, trailers}
end

def status_code_and_message!(ch) do
Expand All @@ -226,6 +236,10 @@ defmodule Interop.Client do
|> Grpc.Testing.TestService.Stub.full_duplex_call()
|> GRPC.Stub.send_request(req, end_stream: true)
|> GRPC.Stub.recv()
|> case do
{:ok, stream} -> Enum.at(stream, 0)
error -> error
end
end

def unimplemented_service!(ch) do
Expand Down Expand Up @@ -260,7 +274,7 @@ defmodule Interop.Client do
|> GRPC.Stub.send_request(req)
|> GRPC.Stub.recv()

{:ok, _} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, _} = Enum.at(res_enum, 0)
stream = GRPC.Stub.cancel(stream)
{:error, %GRPC.RPCError{status: 1}} = GRPC.Stub.recv(stream)
end
Expand Down
2 changes: 2 additions & 0 deletions interop/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"grpc_prometheus": {:hex, :grpc_prometheus, "0.1.0", "a2f45ca83018c4ae59e4c293b7455634ac09e38c36cba7cc1fb8affdf462a6d5", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm", "8b9ab3098657e7daec0b3edc78e1d02418bc0871618d8ca89b51b74a8086bb71"},
"grpc_statsd": {:hex, :grpc_statsd, "0.1.0", "a95ae388188486043f92a3c5091c143f5a646d6af80c9da5ee616546c4d8f5ff", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:statix, ">= 0.0.0", [hex: :statix, repo: "hexpm", optional: true]}], "hexpm", "de0c05db313c7b3ffeff345855d173fd82fec3de16591a126b673f7f698d9e74"},
"gun": {:hex, :grpc_gun, "2.0.1", "221b792df3a93e8fead96f697cbaf920120deacced85c6cd3329d2e67f0871f8", [:rebar3], [{:cowlib, "~> 2.11", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "795a65eb9d0ba16697e6b0e1886009ce024799e43bb42753f0c59b029f592831"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"},
"prometheus": {:hex, :prometheus, "4.2.2", "a830e77b79dc6d28183f4db050a7cac926a6c58f1872f9ef94a35cd989aceef8", [:mix, :rebar3], [], "hexpm", "b479a33d4aa4ba7909186e29bb6c1240254e0047a8e2a9f88463f50c0089370e"},
"prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "9fd13404a48437e044b288b41f76e64acd9735fb8b0e3809f494811dfa66d0fb"},
"prometheus_httpd": {:hex, :prometheus_httpd, "2.1.11", "f616ed9b85b536b195d94104063025a91f904a4cfc20255363f49a197d96c896", [:rebar3], [{:accept, "~> 0.3", [hex: :accept, repo: "hexpm", optional: false]}, {:prometheus, "~> 4.2", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "0bbe831452cfdf9588538eb2f570b26f30c348adae5e95a7d87f35a5910bcf92"},
Expand Down
77 changes: 38 additions & 39 deletions interop/script/run.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,49 @@ Logger.configure(level: level)

Logger.info("Rounds: #{rounds}; concurrency: #{concurrency}; port: #{port}")

alias GRPC.Client.Adapters.Gun
alias GRPC.Client.Adapters.Mint
alias Interop.Client

{:ok, _pid, port} = GRPC.Server.start_endpoint(Interop.Endpoint, port)

1..concurrency
|> Task.async_stream(fn _cli ->
ch = Client.connect("127.0.0.1", port, interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger])

for _ <- 1..rounds do
Client.empty_unary!(ch)
Client.cacheable_unary!(ch)
Client.large_unary!(ch)
Client.large_unary2!(ch)
Client.client_compressed_unary!(ch)
Client.server_compressed_unary!(ch)
Client.client_streaming!(ch)
Client.client_compressed_streaming!(ch)
Client.server_streaming!(ch)
Client.server_compressed_streaming!(ch)
Client.ping_pong!(ch)
Client.empty_stream!(ch)
Client.custom_metadata!(ch)
Client.status_code_and_message!(ch)
Client.unimplemented_service!(ch)
Client.cancel_after_begin!(ch)
Client.cancel_after_first_response!(ch)
Client.timeout_on_sleeping_server!(ch)
defmodule InteropTestRunner do
def run(_cli, adapter, port, rounds) do
opts = [interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger], adapter: adapter]
ch = Client.connect("127.0.0.1", port, opts)

for _ <- 1..rounds do
Client.empty_unary!(ch)
Client.cacheable_unary!(ch)
Client.large_unary!(ch)
Client.large_unary2!(ch)
Client.client_compressed_unary!(ch)
Client.server_compressed_unary!(ch)
Client.client_streaming!(ch)
Client.client_compressed_streaming!(ch)
Client.server_streaming!(ch)
Client.server_compressed_streaming!(ch)
Client.ping_pong!(ch)
Client.empty_stream!(ch)
Client.custom_metadata!(ch)
Client.status_code_and_message!(ch)
Client.unimplemented_service!(ch)
Client.cancel_after_begin!(ch)
Client.cancel_after_first_response!(ch)
Client.timeout_on_sleeping_server!(ch)
end
:ok
end
:ok
end, max_concurrency: concurrency, ordered: false, timeout: :infinity)
|> Enum.to_list()

# defmodule Helper do
# def flush() do
# receive do
# msg ->
# IO.inspect(msg)
# flush()
# after
# 0 -> :ok
# end
# end
# end
# Helper.flush()
end

for adapter <- [Gun, Mint] do
Logger.info("Starting run for adapter: #{adapter}")
args = [adapter, port, rounds]
stream_opts = [max_concurrency: concurrency, ordered: false, timeout: :infinity]
1..concurrency
|> Task.async_stream(InteropTestRunner, :run, args, stream_opts)
|> Enum.to_list()
end

Logger.info("Succeed!")
:ok = GRPC.Server.stop_endpoint(Interop.Endpoint)
26 changes: 26 additions & 0 deletions lib/grpc/client/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,30 @@ defmodule GRPC.Client.Adapter do
"""
@callback receive_data(stream :: Stream.t(), opts :: keyword()) ::
GRPC.Stub.receive_data_return() | {:error, any()}

@doc """
This callback is used to open a stream connection to the server.
Mostly used when the payload for this request is streamed.
To send data using the open stream request, you should use `send_data/3`
"""
@callback send_headers(stream :: Stream.t(), opts :: keyword()) :: Stream.t()

@doc """
This callback will be responsible to send data to the server on a stream
request is open using `send_headers/2`
Opts:
- :send_end_stream (optional) - ends the request stream
"""
@callback send_data(stream :: Stream.t(), message :: binary(), opts :: keyword()) :: Stream.t()

@doc """
Similarly to the option sent on `send_data/2` - :send_end_stream -
this callback will end request stream
"""
@callback end_stream(stream :: Stream.t()) :: Stream.t()

@doc """
Cancel a stream in a streaming client.
"""
@callback cancel(stream :: Stream.t()) :: :ok | {:error, any()}
end
11 changes: 10 additions & 1 deletion lib/grpc/client/adapters/gun.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ defmodule GRPC.Client.Adapters.Gun do
:gun.post(conn_pid, path, headers, data)
end

@impl true
def send_headers(
%{channel: %{adapter_payload: %{conn_pid: conn_pid}}, path: path} = stream,
opts
Expand All @@ -116,6 +117,7 @@ defmodule GRPC.Client.Adapters.Gun do
GRPC.Client.Stream.put_payload(stream, :stream_ref, stream_ref)
end

@impl true
def send_data(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream, message, opts) do
conn_pid = channel.adapter_payload[:conn_pid]
fin = if opts[:send_end_stream], do: :fin, else: :nofin
Expand All @@ -124,13 +126,20 @@ defmodule GRPC.Client.Adapters.Gun do
stream
end

@impl true
def end_stream(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream) do
conn_pid = channel.adapter_payload[:conn_pid]
:gun.data(conn_pid, stream_ref, :fin, "")
stream
end

def cancel(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}) do
@impl true
def cancel(stream) do
%{
channel: %{adapter_payload: %{conn_pid: conn_pid}},
payload: %{stream_ref: stream_ref}
} = stream

:gun.cancel(conn_pid, stream_ref)
end

Expand Down
Loading

0 comments on commit f5f5fab

Please sign in to comment.