diff --git a/lib/grpc/server/adapters/cowboy/handler.ex b/lib/grpc/server/adapters/cowboy/handler.ex index c4189dc4..836e9f5f 100644 --- a/lib/grpc/server/adapters/cowboy/handler.ex +++ b/lib/grpc/server/adapters/cowboy/handler.ex @@ -1,22 +1,53 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do - @moduledoc false - - # A cowboy handler accepting all requests and calls corresponding functions - # defined by users. + @moduledoc """ + A cowboy handler accepting all requests and calls corresponding functions defined by users. + """ alias GRPC.Transport.HTTP2 alias GRPC.RPCError require Logger + @behaviour :cowboy_loop + @adapter GRPC.Server.Adapters.Cowboy @default_trailers HTTP2.server_trailers() - @spec init( - map(), - state :: - {endpoint :: atom(), server :: {String.t(), module()}, route :: String.t(), - opts :: keyword()} - ) :: {:cowboy_loop, map(), map()} + @type init_state :: { + endpoint :: atom(), + server :: {name :: String.t(), module()}, + route :: String.t(), + opts :: keyword() + } + + @type pending_reader :: { + cowboy_read_ref :: reference, + server_rpc_pid :: pid, + server_rpc_reader_reference :: reference + } + @type stream_state :: %{ + pid: server_rpc_pid :: pid, + handling_timer: timeout_timer_ref :: reference, + pending_reader: nil | pending_reader + } + @type init_result :: + {:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state} + + @type is_fin :: :fin | :nofin + + @type stream_body_opts :: {:code, module()} | {:compress, boolean()} + + @type headers :: %{binary() => binary()} + + @doc """ + This function is meant to be called whenever a new request arrives to an existing connection. + This handler works mainly with two linked processes. + One of them is the process started by cowboy which internally we'll refer to it as `stream_pid`, + this process is responsible to interface the interactions with the open socket. + The second process is the one we start in this function, we'll refer to it as `server_rpc_pid`, + which is the point where we call the functions implemented by users (aka the modules who use + the `GRPC.Server` macro) + """ + @spec init(:cowboy_req.req(), state :: init_state) :: init_result def init(req, {endpoint, {_name, server}, route, opts} = state) do http_method = req @@ -27,11 +58,13 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do with {:ok, sub_type, content_type} <- find_content_type_subtype(req), {:ok, codec} <- find_codec(sub_type, content_type, server), {:ok, compressor} <- find_compressor(req, server) do + stream_pid = self() + stream = %GRPC.Server.Stream{ server: server, endpoint: endpoint, adapter: @adapter, - payload: %{pid: self()}, + payload: %{pid: stream_pid}, local: opts[:local], codec: codec, http_method: http_method, @@ -39,7 +72,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do http_transcode: transcode?(req) } - pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream]) + server_rpc_pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream]) Process.flag(:trap_exit, true) req = :cowboy_req.set_resp_headers(HTTP2.server_headers(stream), req) @@ -55,7 +88,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do ) end - {:cowboy_loop, req, %{pid: pid, handling_timer: timer_ref, pending_reader: nil}} + {:cowboy_loop, req, %{pid: server_rpc_pid, handling_timer: timer_ref, pending_reader: nil}} else {:error, error} -> Logger.error(fn -> inspect(error) end) @@ -116,54 +149,126 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do end # APIs begin + @doc """ + Synchronously reads the whole body content of a given request. + Raise in case of a timeout. + """ + @spec read_full_body(stream_pid :: pid) :: binary() def read_full_body(pid) do sync_call(pid, :read_full_body) end + @doc """ + Synchronously reads a chunk of body content of a given request. + Raise in case of a timeout. + """ + @spec read_body(stream_pid :: pid) :: binary() def read_body(pid) do sync_call(pid, :read_body) end - def stream_body(pid, data, opts, is_fin, http_transcode \\ false) do - send(pid, {:stream_body, data, opts, is_fin, http_transcode}) + @doc """ + Asynchronously send back to client a chunk of `data`, when `http_transcode?` is true, the + data is sent back as it's, with no transformation of protobuf binaries to http2 data frames. + """ + @spec stream_body( + stream_pid :: pid, + data :: iodata, + opts :: list(stream_body_opts), + is_fin, + http_transcode? :: boolean() + ) :: :ok + def stream_body(pid, data, opts, is_fin, http_transcode? \\ false) do + send(pid, {:stream_body, data, opts, is_fin, http_transcode?}) + :ok end + @doc """ + Asynchronously send back to the client the http status and the headers for a given request. + """ + @spec stream_reply(stream_pid :: pid, status :: non_neg_integer(), headers :: headers) :: :ok def stream_reply(pid, status, headers) do send(pid, {:stream_reply, status, headers}) + :ok end + @doc """ + Asynchronously set the headers for a given request. This function does not send any + data back to the client. It simply appends the headers to be used in the response. + """ + @spec set_resp_headers(stream_pid :: pid, headers :: headers) :: :ok def set_resp_headers(pid, headers) do send(pid, {:set_resp_headers, headers}) + :ok end + @doc """ + Asynchronously set the trailer headers for a given request. This function does not send any + data back to the client. It simply appends the trailer headers to be used in the response. + """ + @spec set_resp_trailers(stream_pid :: pid, trailers :: headers) :: :ok def set_resp_trailers(pid, trailers) do send(pid, {:set_resp_trailers, trailers}) + :ok end + @doc """ + Asynchronously set the compressor algorithm to be used for compress the responses. This checks if + the `grpc-accept-encoding` header is present on the original request, otherwise no compression + is applied. + """ + @spec set_compressor(stream_pid :: pid, compressor :: module) :: :ok def set_compressor(pid, compressor) do send(pid, {:set_compressor, compressor}) + :ok end + @doc """ + Asynchronously stream the given trailers of request back to client. + """ + @spec stream_trailers(stream_pid :: pid, trailers :: headers) :: :ok def stream_trailers(pid, trailers) do send(pid, {:stream_trailers, trailers}) + :ok end + @doc """ + Return all request headers. + """ + @spec get_headers(stream_pid :: pid) :: :cowboy.http_headers() def get_headers(pid) do sync_call(pid, :get_headers) end + @doc """ + Return the peer IP address and port number + """ + @spec get_peer(stream_pid :: pid) :: {:inet.ip_address(), :inet.port_number()} def get_peer(pid) do sync_call(pid, :get_peer) end + @doc """ + Return the client TLS certificate. `:undefined` is returned if no certificate was specified + when establishing the connection. + """ + @spec get_cert(stream_pid :: pid) :: binary() | :undefined def get_cert(pid) do sync_call(pid, :get_cert) end + @doc """ + Return the query string for the request URI. + """ + @spec get_qs(stream_pid :: pid) :: binary() def get_qs(pid) do sync_call(pid, :get_qs) end + @doc """ + Return all bindings of a given request. + """ + @spec get_bindings(stream_pid :: pid) :: :cowboy_router.bindings() def get_bindings(pid) do sync_call(pid, :get_bindings) end @@ -204,6 +309,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do def info({:request_body, ref, :nofin, body}, req, %{pending_reader: {ref, pid, reader_ref}} = s) do send(pid, {reader_ref, {:more, body}}) + {:ok, req, %{s | pending_reader: nil}} end diff --git a/test/grpc/integration/stub_test.exs b/test/grpc/integration/stub_test.exs index d24cdc68..8489a621 100644 --- a/test/grpc/integration/stub_test.exs +++ b/test/grpc/integration/stub_test.exs @@ -69,7 +69,7 @@ defmodule GRPC.Integration.StubTest do end test "invalid channel function clause error" do - req = Helloworld.HelloRequest.new(name: "GRPC") + req = %Helloworld.HelloRequest{name: "GRPC"} assert_raise FunctionClauseError, ~r/Helloworld.Greeter.Stub.say_hello/, fn -> Helloworld.Greeter.Stub.say_hello(nil, req)