Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use behavior stacking for handler implementation #146

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 118 additions & 108 deletions lib/thousand_island/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,31 @@ defmodule ThousandIsland.Handler do

use GenServer, restart: :temporary

@impl ThousandIsland.Handler
def handle_connection(_socket, state), do: {:continue, state}

@impl ThousandIsland.Handler
def handle_data(_data, _socket, state), do: {:continue, state}

@impl ThousandIsland.Handler
def handle_close(_socket, _state), do: :ok

@impl ThousandIsland.Handler
def handle_error(_error, _socket, _state), do: :ok

@impl ThousandIsland.Handler
def handle_shutdown(_socket, _state), do: :ok

@impl ThousandIsland.Handler
def handle_timeout(_socket, _state), do: :ok

defoverridable ThousandIsland.Handler

@spec start_link({handler_options :: term(), GenServer.options()}) :: GenServer.on_start()
def start_link({handler_options, genserver_options}) do
GenServer.start_link(__MODULE__, handler_options, genserver_options)
end

unquote(genserver_impl())
unquote(handler_impl())
end
end

@doc false
defmacro add_handle_info_fallback(_module) do
quote do
def handle_info({msg, _raw_socket, _data}, _state) when msg in [:tcp, :ssl] do
raise """
The callback's `state` doesn't match the expected `{socket, state}` form.
Please ensure that you are returning a `{socket, state}` tuple from any
`GenServer.handle_*` callbacks you have implemented
"""
end
end
end

def genserver_impl do
quote do
@impl GenServer
def init(handler_options) do
Process.flag(:trap_exit, true)
Expand Down Expand Up @@ -383,7 +383,7 @@ defmodule ThousandIsland.Handler do
ThousandIsland.Telemetry.untimed_span_event(socket.span, :async_recv, %{data: data})

__MODULE__.handle_data(data, socket, state)
|> handle_continuation(socket)
|> ThousandIsland.Handler.handle_continuation(socket)
end

def handle_info(
Expand Down Expand Up @@ -415,7 +415,7 @@ defmodule ThousandIsland.Handler do
@impl GenServer
def handle_continue(:handle_connection, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_connection(socket, state)
|> handle_continuation(socket)
|> ThousandIsland.Handler.handle_continuation(socket)
end

# Called if the remote end closed the connection before we could initialize it
Expand All @@ -427,19 +427,19 @@ defmodule ThousandIsland.Handler do
# Called by GenServer if we hit our read_timeout. Socket is still open
def terminate({:shutdown, :timeout}, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_timeout(socket, state)
do_socket_close(socket, :timeout)
ThousandIsland.Handler.do_socket_close(socket, :timeout)
end

# Called if we're being shutdown in an orderly manner. Socket is still open
def terminate(:shutdown, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_shutdown(socket, state)
do_socket_close(socket, :shutdown)
ThousandIsland.Handler.do_socket_close(socket, :shutdown)
end

# Called if the socket encountered an error during handshaking
def terminate({:shutdown, {:handshake, reason}}, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_error(reason, socket, state)
do_socket_close(socket, reason)
ThousandIsland.Handler.do_socket_close(socket, reason)
end

# Called if the socket encountered an error and we are configured to shutdown silently.
Expand All @@ -449,123 +449,133 @@ defmodule ThousandIsland.Handler do
{%ThousandIsland.Socket{} = socket, state}
) do
__MODULE__.handle_error(reason, socket, state)
do_socket_close(socket, reason)
ThousandIsland.Handler.do_socket_close(socket, reason)
end

# Called if the socket encountered an error during upgrading
def terminate({:shutdown, {:upgrade, reason}}, {socket, state}) do
__MODULE__.handle_error(reason, socket, state)
do_socket_close(socket, reason)
ThousandIsland.Handler.do_socket_close(socket, reason)
end

# Called if the remote end shut down the connection, or if the local end closed the
# connection by returning a `{:close,...}` tuple (in which case the socket will be open)
def terminate({:shutdown, reason}, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_close(socket, state)
do_socket_close(socket, reason)
ThousandIsland.Handler.do_socket_close(socket, reason)
end

# Called if the socket encountered an error. Socket is closed
def terminate(reason, {%ThousandIsland.Socket{} = socket, state}) do
__MODULE__.handle_error(reason, socket, state)
do_socket_close(socket, reason)
ThousandIsland.Handler.do_socket_close(socket, reason)
end

# This clause could happen if we do not have a socket defined in state (either because the
# process crashed before setting it up, or because the user sent an invalid state)
def terminate(_reason, _state) do
:ok
end
end
end

@spec do_socket_close(
ThousandIsland.Socket.t(),
reason :: :shutdown | :local_closed | term()
) :: :ok
defp do_socket_close(socket, reason) do
measurements =
case ThousandIsland.Socket.getstat(socket) do
{:ok, stats} ->
stats
|> Keyword.take([:send_oct, :send_cnt, :recv_oct, :recv_cnt])
|> Enum.into(%{})

_ ->
%{}
end
def handler_impl do
quote do
@impl ThousandIsland.Handler
def handle_connection(_socket, state), do: {:continue, state}

metadata =
if reason in [:shutdown, :local_closed, :peer_closed], do: %{}, else: %{error: reason}
@impl ThousandIsland.Handler
def handle_data(_data, _socket, state), do: {:continue, state}

_ = ThousandIsland.Socket.close(socket)
ThousandIsland.Telemetry.stop_span(socket.span, measurements, metadata)
end
@impl ThousandIsland.Handler
def handle_close(_socket, _state), do: :ok

# Dialyzer gets confused by handle_continuation being a defp and not a def
@dialyzer {:no_match, handle_continuation: 2}
defp handle_continuation(continuation, socket) do
case continuation do
{:continue, state} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, socket.read_timeout}

{:continue, state, {:persistent, timeout}} ->
socket = %{socket | read_timeout: timeout}
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}

{:continue, state, timeout} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}

{:switch_transport, {module, upgrade_opts}, state} ->
handle_switch_continuation(socket, module, upgrade_opts, state, socket.read_timeout)

{:switch_transport, {module, upgrade_opts}, state, {:persistent, timeout}} ->
socket = %{socket | read_timeout: timeout}
handle_switch_continuation(socket, module, upgrade_opts, state, timeout)

{:switch_transport, {module, upgrade_opts}, state, timeout} ->
handle_switch_continuation(socket, module, upgrade_opts, state, timeout)

{:close, state} ->
{:stop, {:shutdown, :local_closed}, {socket, state}}

{:error, :timeout, state} ->
{:stop, {:shutdown, :timeout}, {socket, state}}

{:error, reason, state} ->
if socket.silent_terminate_on_error do
{:stop, {:shutdown, {:silent_termination, reason}}, {socket, state}}
else
{:stop, reason, {socket, state}}
end
end
end
@impl ThousandIsland.Handler
def handle_error(_error, _socket, _state), do: :ok

@dialyzer {:nowarn_function, handle_switch_continuation: 5}
defp handle_switch_continuation(socket, module, upgrade_opts, state, timeout) do
case ThousandIsland.Socket.upgrade(socket, module, upgrade_opts) do
{:ok, socket} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}
@impl ThousandIsland.Handler
def handle_shutdown(_socket, _state), do: :ok

{:error, reason} ->
{:stop, {:shutdown, {:upgrade, reason}}, {socket, state}}
end
end
@impl ThousandIsland.Handler
def handle_timeout(_socket, _state), do: :ok

defoverridable ThousandIsland.Handler
end
end

@spec do_socket_close(
ThousandIsland.Socket.t(),
reason :: :shutdown | :local_closed | term()
) :: :ok
@doc false
defmacro add_handle_info_fallback(_module) do
quote do
def handle_info({msg, _raw_socket, _data}, _state) when msg in [:tcp, :ssl] do
raise """
The callback's `state` doesn't match the expected `{socket, state}` form.
Please ensure that you are returning a `{socket, state}` tuple from any
`GenServer.handle_*` callbacks you have implemented
"""
def do_socket_close(socket, reason) do
measurements =
case ThousandIsland.Socket.getstat(socket) do
{:ok, stats} ->
stats
|> Keyword.take([:send_oct, :send_cnt, :recv_oct, :recv_cnt])
|> Enum.into(%{})

_ ->
%{}
end

metadata =
if reason in [:shutdown, :local_closed, :peer_closed], do: %{}, else: %{error: reason}

_ = ThousandIsland.Socket.close(socket)
ThousandIsland.Telemetry.stop_span(socket.span, measurements, metadata)
end

@doc false
def handle_continuation(continuation, socket) do
case continuation do
{:continue, state} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, socket.read_timeout}

{:continue, state, {:persistent, timeout}} ->
socket = %{socket | read_timeout: timeout}
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}

{:continue, state, timeout} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}

{:switch_transport, {module, upgrade_opts}, state} ->
handle_switch_continuation(socket, module, upgrade_opts, state, socket.read_timeout)

{:switch_transport, {module, upgrade_opts}, state, {:persistent, timeout}} ->
socket = %{socket | read_timeout: timeout}
handle_switch_continuation(socket, module, upgrade_opts, state, timeout)

{:switch_transport, {module, upgrade_opts}, state, timeout} ->
handle_switch_continuation(socket, module, upgrade_opts, state, timeout)

{:close, state} ->
{:stop, {:shutdown, :local_closed}, {socket, state}}

{:error, :timeout, state} ->
{:stop, {:shutdown, :timeout}, {socket, state}}

{:error, reason, state} ->
if socket.silent_terminate_on_error do
{:stop, {:shutdown, {:silent_termination, reason}}, {socket, state}}
else
{:stop, reason, {socket, state}}
end
end
end

defp handle_switch_continuation(socket, module, upgrade_opts, state, timeout) do
case ThousandIsland.Socket.upgrade(socket, module, upgrade_opts) do
{:ok, socket} ->
_ = ThousandIsland.Socket.setopts(socket, active: :once)
{:noreply, {socket, state}, timeout}

{:error, reason} ->
{:stop, {:shutdown, {:upgrade, reason}}, {socket, state}}
end
end
end