Skip to content

Commit

Permalink
Merge pull request #374 from beligante/improve-cowboy-handler-specs
Browse files Browse the repository at this point in the history
improve docs and specs for `Cowboy.Handler`
  • Loading branch information
sleipnir authored Jul 1, 2024
2 parents a778228 + c6cd103 commit fa640ab
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 16 deletions.
136 changes: 121 additions & 15 deletions lib/grpc/server/adapters/cowboy/handler.ex
Original file line number Diff line number Diff line change
@@ -1,22 +1,53 @@
defmodule GRPC.Server.Adapters.Cowboy.Handler do
@moduledoc false

# A cowboy handler accepting all requests and calls corresponding functions
# defined by users.
@moduledoc """
A cowboy handler accepting all requests and calls corresponding functions defined by users.
"""

alias GRPC.Transport.HTTP2
alias GRPC.RPCError
require Logger

@behaviour :cowboy_loop

@adapter GRPC.Server.Adapters.Cowboy
@default_trailers HTTP2.server_trailers()

@spec init(
map(),
state ::
{endpoint :: atom(), server :: {String.t(), module()}, route :: String.t(),
opts :: keyword()}
) :: {:cowboy_loop, map(), map()}
@type init_state :: {
endpoint :: atom(),
server :: {name :: String.t(), module()},
route :: String.t(),
opts :: keyword()
}

@type pending_reader :: {
cowboy_read_ref :: reference,
server_rpc_pid :: pid,
server_rpc_reader_reference :: reference
}
@type stream_state :: %{
pid: server_rpc_pid :: pid,
handling_timer: timeout_timer_ref :: reference,
pending_reader: nil | pending_reader
}
@type init_result ::
{:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state}

@type is_fin :: :fin | :nofin

@type stream_body_opts :: {:code, module()} | {:compress, boolean()}

@type headers :: %{binary() => binary()}

@doc """
This function is meant to be called whenever a new request arrives to an existing connection.
This handler works mainly with two linked processes.
One of them is the process started by cowboy which internally we'll refer to it as `stream_pid`,
this process is responsible to interface the interactions with the open socket.
The second process is the one we start in this function, we'll refer to it as `server_rpc_pid`,
which is the point where we call the functions implemented by users (aka the modules who use
the `GRPC.Server` macro)
"""
@spec init(:cowboy_req.req(), state :: init_state) :: init_result
def init(req, {endpoint, {_name, server}, route, opts} = state) do
http_method =
req
Expand All @@ -27,19 +58,21 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
with {:ok, sub_type, content_type} <- find_content_type_subtype(req),
{:ok, codec} <- find_codec(sub_type, content_type, server),
{:ok, compressor} <- find_compressor(req, server) do
stream_pid = self()

stream = %GRPC.Server.Stream{
server: server,
endpoint: endpoint,
adapter: @adapter,
payload: %{pid: self()},
payload: %{pid: stream_pid},
local: opts[:local],
codec: codec,
http_method: http_method,
compressor: compressor,
http_transcode: transcode?(req)
}

pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream])
server_rpc_pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream])
Process.flag(:trap_exit, true)

req = :cowboy_req.set_resp_headers(HTTP2.server_headers(stream), req)
Expand All @@ -55,7 +88,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
)
end

{:cowboy_loop, req, %{pid: pid, handling_timer: timer_ref, pending_reader: nil}}
{:cowboy_loop, req, %{pid: server_rpc_pid, handling_timer: timer_ref, pending_reader: nil}}
else
{:error, error} ->
Logger.error(fn -> inspect(error) end)
Expand Down Expand Up @@ -116,54 +149,126 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
end

# APIs begin
@doc """
Synchronously reads the whole body content of a given request.
Raise in case of a timeout.
"""
@spec read_full_body(stream_pid :: pid) :: binary()
def read_full_body(pid) do
sync_call(pid, :read_full_body)
end

@doc """
Synchronously reads a chunk of body content of a given request.
Raise in case of a timeout.
"""
@spec read_body(stream_pid :: pid) :: binary()
def read_body(pid) do
sync_call(pid, :read_body)
end

def stream_body(pid, data, opts, is_fin, http_transcode \\ false) do
send(pid, {:stream_body, data, opts, is_fin, http_transcode})
@doc """
Asynchronously send back to client a chunk of `data`, when `http_transcode?` is true, the
data is sent back as it's, with no transformation of protobuf binaries to http2 data frames.
"""
@spec stream_body(
stream_pid :: pid,
data :: iodata,
opts :: list(stream_body_opts),
is_fin,
http_transcode? :: boolean()
) :: :ok
def stream_body(pid, data, opts, is_fin, http_transcode? \\ false) do
send(pid, {:stream_body, data, opts, is_fin, http_transcode?})
:ok
end

@doc """
Asynchronously send back to the client the http status and the headers for a given request.
"""
@spec stream_reply(stream_pid :: pid, status :: non_neg_integer(), headers :: headers) :: :ok
def stream_reply(pid, status, headers) do
send(pid, {:stream_reply, status, headers})
:ok
end

@doc """
Asynchronously set the headers for a given request. This function does not send any
data back to the client. It simply appends the headers to be used in the response.
"""
@spec set_resp_headers(stream_pid :: pid, headers :: headers) :: :ok
def set_resp_headers(pid, headers) do
send(pid, {:set_resp_headers, headers})
:ok
end

@doc """
Asynchronously set the trailer headers for a given request. This function does not send any
data back to the client. It simply appends the trailer headers to be used in the response.
"""
@spec set_resp_trailers(stream_pid :: pid, trailers :: headers) :: :ok
def set_resp_trailers(pid, trailers) do
send(pid, {:set_resp_trailers, trailers})
:ok
end

@doc """
Asynchronously set the compressor algorithm to be used for compress the responses. This checks if
the `grpc-accept-encoding` header is present on the original request, otherwise no compression
is applied.
"""
@spec set_compressor(stream_pid :: pid, compressor :: module) :: :ok
def set_compressor(pid, compressor) do
send(pid, {:set_compressor, compressor})
:ok
end

@doc """
Asynchronously stream the given trailers of request back to client.
"""
@spec stream_trailers(stream_pid :: pid, trailers :: headers) :: :ok
def stream_trailers(pid, trailers) do
send(pid, {:stream_trailers, trailers})
:ok
end

@doc """
Return all request headers.
"""
@spec get_headers(stream_pid :: pid) :: :cowboy.http_headers()
def get_headers(pid) do
sync_call(pid, :get_headers)
end

@doc """
Return the peer IP address and port number
"""
@spec get_peer(stream_pid :: pid) :: {:inet.ip_address(), :inet.port_number()}
def get_peer(pid) do
sync_call(pid, :get_peer)
end

@doc """
Return the client TLS certificate. `:undefined` is returned if no certificate was specified
when establishing the connection.
"""
@spec get_cert(stream_pid :: pid) :: binary() | :undefined
def get_cert(pid) do
sync_call(pid, :get_cert)
end

@doc """
Return the query string for the request URI.
"""
@spec get_qs(stream_pid :: pid) :: binary()
def get_qs(pid) do
sync_call(pid, :get_qs)
end

@doc """
Return all bindings of a given request.
"""
@spec get_bindings(stream_pid :: pid) :: :cowboy_router.bindings()
def get_bindings(pid) do
sync_call(pid, :get_bindings)
end
Expand Down Expand Up @@ -204,6 +309,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

def info({:request_body, ref, :nofin, body}, req, %{pending_reader: {ref, pid, reader_ref}} = s) do
send(pid, {reader_ref, {:more, body}})

{:ok, req, %{s | pending_reader: nil}}
end

Expand Down
2 changes: 1 addition & 1 deletion test/grpc/integration/stub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule GRPC.Integration.StubTest do
end

test "invalid channel function clause error" do
req = Helloworld.HelloRequest.new(name: "GRPC")
req = %Helloworld.HelloRequest{name: "GRPC"}

assert_raise FunctionClauseError, ~r/Helloworld.Greeter.Stub.say_hello/, fn ->
Helloworld.Greeter.Stub.say_hello(nil, req)
Expand Down

0 comments on commit fa640ab

Please sign in to comment.