Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve docs and specs for Cowboy.Handler #374

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 121 additions & 15 deletions lib/grpc/server/adapters/cowboy/handler.ex
Original file line number Diff line number Diff line change
@@ -1,22 +1,53 @@
defmodule GRPC.Server.Adapters.Cowboy.Handler do
@moduledoc false

# A cowboy handler accepting all requests and calls corresponding functions
# defined by users.
@moduledoc """
A cowboy handler accepting all requests and calls corresponding functions defined by users.
"""

alias GRPC.Transport.HTTP2
alias GRPC.RPCError
require Logger

@behaviour :cowboy_loop

@adapter GRPC.Server.Adapters.Cowboy
@default_trailers HTTP2.server_trailers()

@spec init(
map(),
state ::
{endpoint :: atom(), server :: {String.t(), module()}, route :: String.t(),
opts :: keyword()}
) :: {:cowboy_loop, map(), map()}
@type init_state :: {
endpoint :: atom(),
server :: {name :: String.t(), module()},
route :: String.t(),
opts :: keyword()
}

@type pending_reader :: {
cowboy_read_ref :: reference,
server_rpc_pid :: pid,
server_rpc_reader_reference :: reference
}
@type stream_state :: %{
pid: server_rpc_pid :: pid,
handling_timer: timeout_timer_ref :: reference,
pending_reader: nil | pending_reader
}
@type init_result ::
{:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state}

@type is_fin :: :fin | :nofin

@type stream_body_opts :: {:code, module()} | {:compress, boolean()}

@type headers :: %{binary() => binary()}

@doc """
This function is meant to be called whenever a new request arrives to an existing connection.
This handler works mainly with two linked processes.
One of them is the process started by cowboy which internally we'll refer to it as `stream_pid`,
this process is responsible to interface the interactions with the open socket.
The second process is the one we start in this function, we'll refer to it as `server_rpc_pid`,
which is the point where we call the functions implemented by users (aka the modules who use
the `GRPC.Server` macro)
"""
@spec init(:cowboy_req.req(), state :: init_state) :: init_result
def init(req, {endpoint, {_name, server}, route, opts} = state) do
http_method =
req
Expand All @@ -27,19 +58,21 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
with {:ok, sub_type, content_type} <- find_content_type_subtype(req),
{:ok, codec} <- find_codec(sub_type, content_type, server),
{:ok, compressor} <- find_compressor(req, server) do
stream_pid = self()

stream = %GRPC.Server.Stream{
server: server,
endpoint: endpoint,
adapter: @adapter,
payload: %{pid: self()},
payload: %{pid: stream_pid},
local: opts[:local],
codec: codec,
http_method: http_method,
compressor: compressor,
http_transcode: transcode?(req)
}

pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream])
server_rpc_pid = spawn_link(__MODULE__, :call_rpc, [server, route, stream])
Process.flag(:trap_exit, true)

req = :cowboy_req.set_resp_headers(HTTP2.server_headers(stream), req)
Expand All @@ -55,7 +88,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
)
end

{:cowboy_loop, req, %{pid: pid, handling_timer: timer_ref, pending_reader: nil}}
{:cowboy_loop, req, %{pid: server_rpc_pid, handling_timer: timer_ref, pending_reader: nil}}
else
{:error, error} ->
Logger.error(fn -> inspect(error) end)
Expand Down Expand Up @@ -116,54 +149,126 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
end

# APIs begin
@doc """
Synchronously reads the whole body content of a given request.
Raise in case of a timeout.
"""
@spec read_full_body(stream_pid :: pid) :: binary()
def read_full_body(pid) do
sync_call(pid, :read_full_body)
end

@doc """
Synchronously reads a chunk of body content of a given request.
Raise in case of a timeout.
"""
@spec read_body(stream_pid :: pid) :: binary()
def read_body(pid) do
sync_call(pid, :read_body)
end

def stream_body(pid, data, opts, is_fin, http_transcode \\ false) do
send(pid, {:stream_body, data, opts, is_fin, http_transcode})
@doc """
Asynchronously send back to client a chunk of `data`, when `http_transcode?` is true, the
data is sent back as it's, with no transformation of protobuf binaries to http2 data frames.
"""
@spec stream_body(
stream_pid :: pid,
data :: iodata,
opts :: list(stream_body_opts),
is_fin,
http_transcode? :: boolean()
) :: :ok
def stream_body(pid, data, opts, is_fin, http_transcode? \\ false) do
send(pid, {:stream_body, data, opts, is_fin, http_transcode?})
:ok
sleipnir marked this conversation as resolved.
Show resolved Hide resolved
end

@doc """
Asynchronously send back to the client the http status and the headers for a given request.
"""
@spec stream_reply(stream_pid :: pid, status :: non_neg_integer(), headers :: headers) :: :ok
def stream_reply(pid, status, headers) do
send(pid, {:stream_reply, status, headers})
:ok
end

@doc """
Asynchronously set the headers for a given request. This function does not send any
data back to the client. It simply appends the headers to be used in the response.
"""
@spec set_resp_headers(stream_pid :: pid, headers :: headers) :: :ok
def set_resp_headers(pid, headers) do
send(pid, {:set_resp_headers, headers})
:ok
end

@doc """
Asynchronously set the trailer headers for a given request. This function does not send any
data back to the client. It simply appends the trailer headers to be used in the response.
"""
@spec set_resp_trailers(stream_pid :: pid, trailers :: headers) :: :ok
def set_resp_trailers(pid, trailers) do
send(pid, {:set_resp_trailers, trailers})
:ok
end

@doc """
Asynchronously set the compressor algorithm to be used for compress the responses. This checks if
the `grpc-accept-encoding` header is present on the original request, otherwise no compression
is applied.
"""
@spec set_compressor(stream_pid :: pid, compressor :: module) :: :ok
def set_compressor(pid, compressor) do
send(pid, {:set_compressor, compressor})
:ok
end

@doc """
Asynchronously stream the given trailers of request back to client.
"""
@spec stream_trailers(stream_pid :: pid, trailers :: headers) :: :ok
def stream_trailers(pid, trailers) do
send(pid, {:stream_trailers, trailers})
:ok
end

@doc """
Return all request headers.
"""
@spec get_headers(stream_pid :: pid) :: :cowboy.http_headers()
def get_headers(pid) do
sync_call(pid, :get_headers)
end

@doc """
Return the peer IP address and port number
"""
@spec get_peer(stream_pid :: pid) :: {:inet.ip_address(), :inet.port_number()}
def get_peer(pid) do
sync_call(pid, :get_peer)
end

@doc """
Return the client TLS certificate. `:undefined` is returned if no certificate was specified
when establishing the connection.
"""
@spec get_cert(stream_pid :: pid) :: binary() | :undefined
def get_cert(pid) do
sync_call(pid, :get_cert)
end

@doc """
Return the query string for the request URI.
"""
@spec get_qs(stream_pid :: pid) :: binary()
def get_qs(pid) do
sync_call(pid, :get_qs)
end

@doc """
Return all bindings of a given request.
"""
@spec get_bindings(stream_pid :: pid) :: :cowboy_router.bindings()
def get_bindings(pid) do
sync_call(pid, :get_bindings)
end
Expand Down Expand Up @@ -204,6 +309,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do

def info({:request_body, ref, :nofin, body}, req, %{pending_reader: {ref, pid, reader_ref}} = s) do
send(pid, {reader_ref, {:more, body}})

{:ok, req, %{s | pending_reader: nil}}
end

Expand Down
2 changes: 1 addition & 1 deletion test/grpc/integration/stub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule GRPC.Integration.StubTest do
end

test "invalid channel function clause error" do
req = Helloworld.HelloRequest.new(name: "GRPC")
req = %Helloworld.HelloRequest{name: "GRPC"}
sleipnir marked this conversation as resolved.
Show resolved Hide resolved

assert_raise FunctionClauseError, ~r/Helloworld.Greeter.Stub.say_hello/, fn ->
Helloworld.Greeter.Stub.say_hello(nil, req)
Expand Down
Loading