From b34276baac81a6b3e22853b642de5e67b2729bb7 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Wed, 18 Dec 2024 08:17:43 +0100 Subject: [PATCH 1/3] use behavior stacking for handler implementation --- lib/thousand_island/handler.ex | 494 +++++++++++++++----------- test/thousand_island/handler_test.exs | 35 +- 2 files changed, 287 insertions(+), 242 deletions(-) diff --git a/lib/thousand_island/handler.ex b/lib/thousand_island/handler.ex index a697b06..fb5aefb 100644 --- a/lib/thousand_island/handler.ex +++ b/lib/thousand_island/handler.ex @@ -249,6 +249,30 @@ defmodule ThousandIsland.Handler do @callback handle_data(data :: binary(), socket :: ThousandIsland.Socket.t(), state :: term()) :: handler_result() + @doc """ + Handles messages sent to the process. + + The value returned by this callback causes Thousand Island to proceed in one of several ways: + + * Returning `{:close, state}` will cause Thousand Island to close the socket & call the `c:handle_close/2` callback to + allow final cleanup to be done. + * Returning `{:continue, state}` will cause Thousand Island to switch the socket to an asynchronous mode. When the + client subsequently sends data (or if there is already unread data waiting from the client), Thousand Island will call + `c:handle_data/3` to allow this data to be processed. + * Returning `{:continue, state, timeout}` is identical to the previous case with the + addition of a timeout. If `timeout` milliseconds passes with no data being received or messages + being sent to the process, the socket will be closed and `c:handle_timeout/2` will be called. + Note that this timeout is not persistent; it applies only to the interval until the next message + is received. In order to set a persistent timeout for all future messages (essentially + overwriting the value of `read_timeout` that was set at server startup), a value of + `{:persistent, timeout}` may be returned. + * Returning `{:error, reason, state}` will cause Thousand Island to close the socket & call the `c:handle_error/3` callback to + allow final cleanup to be done. + + """ + @callback handle_info(data :: binary(), socket :: ThousandIsland.Socket.t(), state :: term()) :: + handler_result() + @doc """ This callback is called when the underlying socket is closed by the remote end; it should perform any cleanup required as it is the last callback called before the process backing this connection is terminated. The underlying socket @@ -299,19 +323,22 @@ defmodule ThousandIsland.Handler do handle_shutdown: 2, handle_timeout: 2 + use GenServer, restart: :temporary + @spec __using__(any) :: Macro.t() - defmacro __using__(_opts) do + defmacro __using__(opts) do quote location: :keep do @behaviour ThousandIsland.Handler - use GenServer, restart: :temporary - @impl ThousandIsland.Handler def handle_connection(_socket, state), do: {:continue, state} @impl ThousandIsland.Handler def handle_data(_data, _socket, state), do: {:continue, state} + @impl ThousandIsland.Handler + def handle_info(_data, _socket, state), do: {:continue, state} + @impl ThousandIsland.Handler def handle_close(_socket, _state), do: :ok @@ -328,244 +355,293 @@ defmodule ThousandIsland.Handler do @spec start_link({handler_options :: term(), GenServer.options()}) :: GenServer.on_start() def start_link({handler_options, genserver_options}) do - GenServer.start_link(__MODULE__, handler_options, genserver_options) + ThousandIsland.Handler.start_link({__MODULE__, handler_options, genserver_options}) end - @impl GenServer - def init(handler_options) do - Process.flag(:trap_exit, true) - {:ok, {nil, handler_options}} + @doc false + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + restart: :temporary + } + |> Supervisor.child_spec(unquote(Macro.escape(opts))) end + end + end - @impl GenServer - def handle_info( - {:thousand_island_ready, raw_socket, server_config, acceptor_span, start_time}, - {nil, state} - ) do - {ip, port} = - case server_config.transport_module.peername(raw_socket) do - {:ok, remote_info} -> - remote_info - - {:error, reason} -> - # the socket has been prematurely closed by the client, we can't do anything with it - # so we just close the socket, stop the GenServer with the error reason and move on. - _ = server_config.transport_module.close(raw_socket) - throw({:stop, {:shutdown, {:premature_conn_closing, reason}}, {raw_socket, state}}) - end - - span_meta = %{remote_address: ip, remote_port: port} - - connection_span = - ThousandIsland.Telemetry.start_child_span( - acceptor_span, - :connection, - %{monotonic_time: start_time}, - span_meta - ) + @spec start_link({handler_module :: module(), handler_options :: term(), GenServer.options()}) :: + GenServer.on_start() + def start_link({handler_module, handler_options, genserver_options}) do + GenServer.start_link(__MODULE__, {handler_module, handler_options}, genserver_options) + end - socket = ThousandIsland.Socket.new(raw_socket, server_config, connection_span) - ThousandIsland.Telemetry.span_event(connection_span, :ready) + @impl GenServer + def init({handler_module, handler_options}) do + Process.flag(:trap_exit, true) + {:ok, {handler_module, nil, handler_options}} + end - case ThousandIsland.Socket.handshake(socket) do - {:ok, socket} -> {:noreply, {socket, state}, {:continue, :handle_connection}} - {:error, reason} -> {:stop, {:shutdown, {:handshake, reason}}, {socket, state}} - end - catch - {:stop, _, _} = stop -> stop + @impl GenServer + def handle_info( + {:thousand_island_ready, raw_socket, server_config, acceptor_span, start_time}, + {handler_module, nil, state} + ) do + {ip, port} = + case server_config.transport_module.peername(raw_socket) do + {:ok, remote_info} -> + remote_info + + {:error, reason} -> + # the socket has been prematurely closed by the client, we can't do anything with it + # so we just close the socket, stop the GenServer with the error reason and move on. + _ = server_config.transport_module.close(raw_socket) + + throw( + {:stop, {:shutdown, {:premature_conn_closing, reason}}, + {handler_module, raw_socket, state}} + ) end - def handle_info( - {msg, raw_socket, data}, - {%ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp, :ssl] do - ThousandIsland.Telemetry.untimed_span_event(socket.span, :async_recv, %{data: data}) + span_meta = %{remote_address: ip, remote_port: port} - __MODULE__.handle_data(data, socket, state) - |> handle_continuation(socket) - end + connection_span = + ThousandIsland.Telemetry.start_child_span( + acceptor_span, + :connection, + %{monotonic_time: start_time}, + span_meta + ) - def handle_info( - {msg, raw_socket}, - {%ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp_closed, :ssl_closed] do - {:stop, {:shutdown, :peer_closed}, {socket, state}} - end + socket = ThousandIsland.Socket.new(raw_socket, server_config, connection_span) + ThousandIsland.Telemetry.span_event(connection_span, :ready) - def handle_info( - {msg, raw_socket, reason}, - {%ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp_error, :ssl_error] do - {:stop, reason, {socket, state}} - end + case ThousandIsland.Socket.handshake(socket) do + {:ok, socket} -> + {:noreply, {handler_module, socket, state}, {:continue, :handle_connection}} - def handle_info(:timeout, {%ThousandIsland.Socket{} = socket, state}) do - {:stop, {:shutdown, :timeout}, {socket, state}} - end + {:error, reason} -> + {:stop, {:shutdown, {:handshake, reason}}, {handler_module, socket, state}} + end + catch + {:stop, _, _} = stop -> stop + end - @before_compile {ThousandIsland.Handler, :add_handle_info_fallback} + def handle_info( + {msg, raw_socket, data}, + {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} + ) + when msg in [:tcp, :ssl] do + ThousandIsland.Telemetry.untimed_span_event(socket.span, :async_recv, %{data: data}) - # Use a continue pattern here so that we have committed the socket - # to state in case the `c:handle_connection/2` callback raises an error. - # This ensures that the `c:terminate/2` calls below are able to properly - # close down the process - @impl GenServer - def handle_continue(:handle_connection, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_connection(socket, state) - |> handle_continuation(socket) - end + handler_module.handle_data(data, socket, state) + |> handle_continuation(handler_module, socket) + end - # Called if the remote end closed the connection before we could initialize it - @impl GenServer - def terminate({:shutdown, {:premature_conn_closing, _reason}}, {_raw_socket, _state}) do - :ok - end + def handle_info( + {msg, raw_socket}, + {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} + ) + when msg in [:tcp_closed, :ssl_closed] do + {:stop, {:shutdown, :peer_closed}, {handler_module, socket, state}} + end - # Called by GenServer if we hit our read_timeout. Socket is still open - def terminate({:shutdown, :timeout}, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_timeout(socket, state) - do_socket_close(socket, :timeout) - end + def handle_info( + {msg, raw_socket, reason}, + {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} + ) + when msg in [:tcp_error, :ssl_error] do + {:stop, reason, {handler_module, socket, state}} + end - # Called if we're being shutdown in an orderly manner. Socket is still open - def terminate(:shutdown, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_shutdown(socket, state) - do_socket_close(socket, :shutdown) - end + def handle_info(:timeout, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + {:stop, {:shutdown, :timeout}, {handler_module, socket, state}} + end - # Called if the socket encountered an error during handshaking - def terminate({:shutdown, {:handshake, reason}}, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + def handle_info(msg, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + handler_module.handle_info(msg, socket, state) + |> handle_continuation(handler_module, socket) + end - # Called if the socket encountered an error and we are configured to shutdown silently. - # Socket is closed - def terminate( - {:shutdown, {:silent_termination, reason}}, - {%ThousandIsland.Socket{} = socket, state} - ) do - __MODULE__.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + # Use a continue pattern here so that we have committed the socket + # to state in case the `c:handle_connection/2` callback raises an error. + # This ensures that the `c:terminate/2` calls below are able to properly + # close down the process + @impl GenServer + def handle_continue( + :handle_connection, + {handler_module, %ThousandIsland.Socket{} = socket, state} + ) do + handler_module.handle_connection(socket, state) + |> handle_continuation(handler_module, socket) + end - # Called if the socket encountered an error during upgrading - def terminate({:shutdown, {:upgrade, reason}}, {socket, state}) do - __MODULE__.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + # Called if the remote end closed the connection before we could initialize it + @impl GenServer + def terminate( + {:shutdown, {:premature_conn_closing, _reason}}, + {_handler_module, _raw_socket, _state} + ) do + :ok + end - # Called if the remote end shut down the connection, or if the local end closed the - # connection by returning a `{:close,...}` tuple (in which case the socket will be open) - def terminate({:shutdown, reason}, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_close(socket, state) - do_socket_close(socket, reason) - end + # Called by GenServer if we hit our read_timeout. Socket is still open + def terminate({:shutdown, :timeout}, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + handler_module.handle_timeout(socket, state) + do_socket_close(socket, :timeout) + end - # Called if the socket encountered an error. Socket is closed - def terminate(reason, {%ThousandIsland.Socket{} = socket, state}) do - __MODULE__.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + # Called if we're being shutdown in an orderly manner. Socket is still open + def terminate(:shutdown, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + handler_module.handle_shutdown(socket, state) + do_socket_close(socket, :shutdown) + end - # This clause could happen if we do not have a socket defined in state (either because the - # process crashed before setting it up, or because the user sent an invalid state) - def terminate(_reason, _state) do - :ok - end + # Called if the socket encountered an error during handshaking + def terminate( + {:shutdown, {:handshake, reason}}, + {handler_module, %ThousandIsland.Socket{} = socket, state} + ) do + handler_module.handle_error(reason, socket, state) + do_socket_close(socket, reason) + end - @spec do_socket_close( - ThousandIsland.Socket.t(), - reason :: :shutdown | :local_closed | term() - ) :: :ok - defp do_socket_close(socket, reason) do - measurements = - case ThousandIsland.Socket.getstat(socket) do - {:ok, stats} -> - stats - |> Keyword.take([:send_oct, :send_cnt, :recv_oct, :recv_cnt]) - |> Enum.into(%{}) - - _ -> - %{} - end - - metadata = - if reason in [:shutdown, :local_closed, :peer_closed], do: %{}, else: %{error: reason} - - _ = ThousandIsland.Socket.close(socket) - ThousandIsland.Telemetry.stop_span(socket.span, measurements, metadata) - end + # Called if the socket encountered an error and we are configured to shutdown silently. + # Socket is closed + def terminate( + {:shutdown, {:silent_termination, reason}}, + {handler_module, %ThousandIsland.Socket{} = socket, state} + ) do + handler_module.handle_error(reason, socket, state) + do_socket_close(socket, reason) + end - # Dialyzer gets confused by handle_continuation being a defp and not a def - @dialyzer {:no_match, handle_continuation: 2} - defp handle_continuation(continuation, socket) do - case continuation do - {:continue, state} -> - _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {socket, state}, socket.read_timeout} - - {:continue, state, {:persistent, timeout}} -> - socket = %{socket | read_timeout: timeout} - _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {socket, state}, timeout} - - {:continue, state, timeout} -> - _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {socket, state}, timeout} - - {:switch_transport, {module, upgrade_opts}, state} -> - handle_switch_continuation(socket, module, upgrade_opts, state, socket.read_timeout) - - {:switch_transport, {module, upgrade_opts}, state, {:persistent, timeout}} -> - socket = %{socket | read_timeout: timeout} - handle_switch_continuation(socket, module, upgrade_opts, state, timeout) - - {:switch_transport, {module, upgrade_opts}, state, timeout} -> - handle_switch_continuation(socket, module, upgrade_opts, state, timeout) - - {:close, state} -> - {:stop, {:shutdown, :local_closed}, {socket, state}} - - {:error, :timeout, state} -> - {:stop, {:shutdown, :timeout}, {socket, state}} - - {:error, reason, state} -> - if socket.silent_terminate_on_error do - {:stop, {:shutdown, {:silent_termination, reason}}, {socket, state}} - else - {:stop, reason, {socket, state}} - end - end + # Called if the socket encountered an error during upgrading + def terminate({:shutdown, {:upgrade, reason}}, {handler_module, socket, state}) do + handler_module.handle_error(reason, socket, state) + do_socket_close(socket, reason) + end + + # Called if the remote end shut down the connection, or if the local end closed the + # connection by returning a `{:close,...}` tuple (in which case the socket will be open) + def terminate({:shutdown, reason}, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + handler_module.handle_close(socket, state) + do_socket_close(socket, reason) + end + + # Called if the socket encountered an error. Socket is closed + def terminate(reason, {handler_module, %ThousandIsland.Socket{} = socket, state}) do + handler_module.handle_error(reason, socket, state) + do_socket_close(socket, reason) + end + + # This clause could happen if we do not have a socket defined in state (either because the + # process crashed before setting it up, or because the user sent an invalid state) + def terminate(_reason, _state) do + :ok + end + + @spec do_socket_close( + ThousandIsland.Socket.t(), + reason :: :shutdown | :local_closed | term() + ) :: :ok + defp do_socket_close(socket, reason) do + measurements = + case ThousandIsland.Socket.getstat(socket) do + {:ok, stats} -> + stats + |> Keyword.take([:send_oct, :send_cnt, :recv_oct, :recv_cnt]) + |> Enum.into(%{}) + + _ -> + %{} end - @dialyzer {:nowarn_function, handle_switch_continuation: 5} - defp handle_switch_continuation(socket, module, upgrade_opts, state, timeout) do - case ThousandIsland.Socket.upgrade(socket, module, upgrade_opts) do - {:ok, socket} -> - _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {socket, state}, timeout} + metadata = + if reason in [:shutdown, :local_closed, :peer_closed], do: %{}, else: %{error: reason} - {:error, reason} -> - {:stop, {:shutdown, {:upgrade, reason}}, {socket, state}} + _ = ThousandIsland.Socket.close(socket) + ThousandIsland.Telemetry.stop_span(socket.span, measurements, metadata) + end + + # Dialyzer gets confused by handle_continuation being a defp and not a def + defp handle_continuation(continuation, handler_module, socket) do + case continuation do + {:continue, state} -> + _ = ThousandIsland.Socket.setopts(socket, active: :once) + {:noreply, {handler_module, socket, state}, socket.read_timeout} + + {:continue, state, {:persistent, timeout}} -> + socket = %{socket | read_timeout: timeout} + _ = ThousandIsland.Socket.setopts(socket, active: :once) + {:noreply, {handler_module, socket, state}, timeout} + + {:continue, state, timeout} -> + _ = ThousandIsland.Socket.setopts(socket, active: :once) + {:noreply, {handler_module, socket, state}, timeout} + + {:switch_transport, {transport_module, upgrade_opts}, state} -> + handle_switch_continuation( + handler_module, + socket, + transport_module, + upgrade_opts, + state, + socket.read_timeout + ) + + {:switch_transport, {transport_module, upgrade_opts}, state, {:persistent, timeout}} -> + socket = %{socket | read_timeout: timeout} + + handle_switch_continuation( + handler_module, + socket, + transport_module, + upgrade_opts, + state, + timeout + ) + + {:switch_transport, {transport_module, upgrade_opts}, state, timeout} -> + handle_switch_continuation( + handler_module, + socket, + transport_module, + upgrade_opts, + state, + timeout + ) + + {:close, state} -> + {:stop, {:shutdown, :local_closed}, {handler_module, socket, state}} + + {:error, :timeout, state} -> + {:stop, {:shutdown, :timeout}, {handler_module, socket, state}} + + {:error, reason, state} -> + if socket.silent_terminate_on_error do + {:stop, {:shutdown, {:silent_termination, reason}}, {handler_module, socket, state}} + else + {:stop, reason, {handler_module, socket, state}} end - end end end - @doc false - defmacro add_handle_info_fallback(_module) do - quote do - def handle_info({msg, _raw_socket, _data}, _state) when msg in [:tcp, :ssl] do - raise """ - The callback's `state` doesn't match the expected `{socket, state}` form. - Please ensure that you are returning a `{socket, state}` tuple from any - `GenServer.handle_*` callbacks you have implemented - """ - end + defp handle_switch_continuation( + handler_module, + socket, + transport_module, + upgrade_opts, + state, + timeout + ) do + case ThousandIsland.Socket.upgrade(socket, transport_module, upgrade_opts) do + {:ok, socket} -> + _ = ThousandIsland.Socket.setopts(socket, active: :once) + {:noreply, {handler_module, socket, state}, timeout} + + {:error, reason} -> + {:stop, {:shutdown, {:upgrade, reason}}, {handler_module, socket, state}} end end end diff --git a/test/thousand_island/handler_test.exs b/test/thousand_island/handler_test.exs index c533c8c..952f7d5 100644 --- a/test/thousand_island/handler_test.exs +++ b/test/thousand_island/handler_test.exs @@ -43,37 +43,6 @@ defmodule ThousandIsland.HandlerTest do assert messages =~ "Closing with hello" end - defmodule BogusState do - use ThousandIsland.Handler - - @impl ThousandIsland.Handler - def handle_connection(_socket, state) do - send(self(), "bogus") - {:continue, state} - end - - def handle_info("bogus", {_socket, state}) do - # Intentionally dropping socket here - {:noreply, state} - end - end - - test "it should complain loudly if a handle_info callback returns the wrong shaped state" do - {:ok, port} = start_handler(BogusState) - {:ok, client} = :gen_tcp.connect(:localhost, port, active: false) - - messages = - capture_log(fn -> - :gen_tcp.send(client, "ping") - Process.sleep(100) - end) - - # Ensure that we saw the message displayed when we tried to handle_data - # after getting a bogus state back - assert messages =~ - "The callback's `state` doesn't match the expected `{socket, state}` form" - end - defmodule FakeProxy do use ThousandIsland.Handler @@ -83,9 +52,9 @@ defmodule ThousandIsland.HandlerTest do {:continue, state} end - def handle_info({:tcp, _othersocket, _otherdata}, {socket, state}) do + def handle_info({:tcp, _othersocket, _otherdata}, socket, state) do ThousandIsland.Socket.send(socket, "Got other data") - {:noreply, {socket, state}} + {:continue, state} end end From e7e2a4796be75cc06334ff3fcf70c67552ccb842 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Sat, 21 Dec 2024 13:28:18 +0100 Subject: [PATCH 2/3] refactor to use modular macros approach --- lib/thousand_island/handler.ex | 438 +++++++++++--------------- test/thousand_island/handler_test.exs | 35 +- 2 files changed, 219 insertions(+), 254 deletions(-) diff --git a/lib/thousand_island/handler.ex b/lib/thousand_island/handler.ex index fb5aefb..d391075 100644 --- a/lib/thousand_island/handler.ex +++ b/lib/thousand_island/handler.ex @@ -249,30 +249,6 @@ defmodule ThousandIsland.Handler do @callback handle_data(data :: binary(), socket :: ThousandIsland.Socket.t(), state :: term()) :: handler_result() - @doc """ - Handles messages sent to the process. - - The value returned by this callback causes Thousand Island to proceed in one of several ways: - - * Returning `{:close, state}` will cause Thousand Island to close the socket & call the `c:handle_close/2` callback to - allow final cleanup to be done. - * Returning `{:continue, state}` will cause Thousand Island to switch the socket to an asynchronous mode. When the - client subsequently sends data (or if there is already unread data waiting from the client), Thousand Island will call - `c:handle_data/3` to allow this data to be processed. - * Returning `{:continue, state, timeout}` is identical to the previous case with the - addition of a timeout. If `timeout` milliseconds passes with no data being received or messages - being sent to the process, the socket will be closed and `c:handle_timeout/2` will be called. - Note that this timeout is not persistent; it applies only to the interval until the next message - is received. In order to set a persistent timeout for all future messages (essentially - overwriting the value of `read_timeout` that was set at server startup), a value of - `{:persistent, timeout}` may be returned. - * Returning `{:error, reason, state}` will cause Thousand Island to close the socket & call the `c:handle_error/3` callback to - allow final cleanup to be done. - - """ - @callback handle_info(data :: binary(), socket :: ThousandIsland.Socket.t(), state :: term()) :: - handler_result() - @doc """ This callback is called when the underlying socket is closed by the remote end; it should perform any cleanup required as it is the last callback called before the process backing this connection is terminated. The underlying socket @@ -323,229 +299,216 @@ defmodule ThousandIsland.Handler do handle_shutdown: 2, handle_timeout: 2 - use GenServer, restart: :temporary - @spec __using__(any) :: Macro.t() - defmacro __using__(opts) do + defmacro __using__(_opts) do quote location: :keep do @behaviour ThousandIsland.Handler - @impl ThousandIsland.Handler - def handle_connection(_socket, state), do: {:continue, state} - - @impl ThousandIsland.Handler - def handle_data(_data, _socket, state), do: {:continue, state} - - @impl ThousandIsland.Handler - def handle_info(_data, _socket, state), do: {:continue, state} - - @impl ThousandIsland.Handler - def handle_close(_socket, _state), do: :ok - - @impl ThousandIsland.Handler - def handle_error(_error, _socket, _state), do: :ok - - @impl ThousandIsland.Handler - def handle_shutdown(_socket, _state), do: :ok - - @impl ThousandIsland.Handler - def handle_timeout(_socket, _state), do: :ok - - defoverridable ThousandIsland.Handler + use GenServer, restart: :temporary @spec start_link({handler_options :: term(), GenServer.options()}) :: GenServer.on_start() def start_link({handler_options, genserver_options}) do - ThousandIsland.Handler.start_link({__MODULE__, handler_options, genserver_options}) + GenServer.start_link(__MODULE__, handler_options, genserver_options) end - @doc false - def child_spec(args) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [args]}, - restart: :temporary - } - |> Supervisor.child_spec(unquote(Macro.escape(opts))) - end + unquote(genserver_impl()) + unquote(handler_impl()) end end - @spec start_link({handler_module :: module(), handler_options :: term(), GenServer.options()}) :: - GenServer.on_start() - def start_link({handler_module, handler_options, genserver_options}) do - GenServer.start_link(__MODULE__, {handler_module, handler_options}, genserver_options) + @doc false + defmacro add_handle_info_fallback(_module) do + quote do + def handle_info({msg, _raw_socket, _data}, _state) when msg in [:tcp, :ssl] do + raise """ + The callback's `state` doesn't match the expected `{socket, state}` form. + Please ensure that you are returning a `{socket, state}` tuple from any + `GenServer.handle_*` callbacks you have implemented + """ + end + end end - @impl GenServer - def init({handler_module, handler_options}) do - Process.flag(:trap_exit, true) - {:ok, {handler_module, nil, handler_options}} - end + def genserver_impl do + quote do + @impl true + def init(handler_options) do + Process.flag(:trap_exit, true) + {:ok, {nil, handler_options}} + end + + @impl true + def handle_info( + {:thousand_island_ready, raw_socket, server_config, acceptor_span, start_time}, + {nil, state} + ) do + {ip, port} = + case server_config.transport_module.peername(raw_socket) do + {:ok, remote_info} -> + remote_info + + {:error, reason} -> + # the socket has been prematurely closed by the client, we can't do anything with it + # so we just close the socket, stop the GenServer with the error reason and move on. + _ = server_config.transport_module.close(raw_socket) + throw({:stop, {:shutdown, {:premature_conn_closing, reason}}, {raw_socket, state}}) + end + + span_meta = %{remote_address: ip, remote_port: port} + + connection_span = + ThousandIsland.Telemetry.start_child_span( + acceptor_span, + :connection, + %{monotonic_time: start_time}, + span_meta + ) + + socket = ThousandIsland.Socket.new(raw_socket, server_config, connection_span) + ThousandIsland.Telemetry.span_event(connection_span, :ready) + + case ThousandIsland.Socket.handshake(socket) do + {:ok, socket} -> {:noreply, {socket, state}, {:continue, :handle_connection}} + {:error, reason} -> {:stop, {:shutdown, {:handshake, reason}}, {socket, state}} + end + catch + {:stop, _, _} = stop -> stop + end - @impl GenServer - def handle_info( - {:thousand_island_ready, raw_socket, server_config, acceptor_span, start_time}, - {handler_module, nil, state} - ) do - {ip, port} = - case server_config.transport_module.peername(raw_socket) do - {:ok, remote_info} -> - remote_info - - {:error, reason} -> - # the socket has been prematurely closed by the client, we can't do anything with it - # so we just close the socket, stop the GenServer with the error reason and move on. - _ = server_config.transport_module.close(raw_socket) - - throw( - {:stop, {:shutdown, {:premature_conn_closing, reason}}, - {handler_module, raw_socket, state}} + def handle_info( + {msg, raw_socket, data}, + {%ThousandIsland.Socket{socket: raw_socket} = socket, state} ) + when msg in [:tcp, :ssl] do + ThousandIsland.Telemetry.untimed_span_event(socket.span, :async_recv, %{data: data}) + + __MODULE__.handle_data(data, socket, state) + |> ThousandIsland.Handler.handle_continuation(socket) end - span_meta = %{remote_address: ip, remote_port: port} + def handle_info( + {msg, raw_socket}, + {%ThousandIsland.Socket{socket: raw_socket} = socket, state} + ) + when msg in [:tcp_closed, :ssl_closed] do + {:stop, {:shutdown, :peer_closed}, {socket, state}} + end - connection_span = - ThousandIsland.Telemetry.start_child_span( - acceptor_span, - :connection, - %{monotonic_time: start_time}, - span_meta - ) + def handle_info( + {msg, raw_socket, reason}, + {%ThousandIsland.Socket{socket: raw_socket} = socket, state} + ) + when msg in [:tcp_error, :ssl_error] do + {:stop, reason, {socket, state}} + end - socket = ThousandIsland.Socket.new(raw_socket, server_config, connection_span) - ThousandIsland.Telemetry.span_event(connection_span, :ready) + def handle_info(:timeout, {%ThousandIsland.Socket{} = socket, state}) do + {:stop, {:shutdown, :timeout}, {socket, state}} + end - case ThousandIsland.Socket.handshake(socket) do - {:ok, socket} -> - {:noreply, {handler_module, socket, state}, {:continue, :handle_connection}} + @before_compile {ThousandIsland.Handler, :add_handle_info_fallback} - {:error, reason} -> - {:stop, {:shutdown, {:handshake, reason}}, {handler_module, socket, state}} - end - catch - {:stop, _, _} = stop -> stop - end + # Use a continue pattern here so that we have committed the socket + # to state in case the `c:handle_connection/2` callback raises an error. + # This ensures that the `c:terminate/2` calls below are able to properly + # close down the process + @impl true + def handle_continue(:handle_connection, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_connection(socket, state) + |> ThousandIsland.Handler.handle_continuation(socket) + end - def handle_info( - {msg, raw_socket, data}, - {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp, :ssl] do - ThousandIsland.Telemetry.untimed_span_event(socket.span, :async_recv, %{data: data}) + # Called if the remote end closed the connection before we could initialize it + @impl true + def terminate({:shutdown, {:premature_conn_closing, _reason}}, {_raw_socket, _state}) do + :ok + end - handler_module.handle_data(data, socket, state) - |> handle_continuation(handler_module, socket) - end + # Called by GenServer if we hit our read_timeout. Socket is still open + def terminate({:shutdown, :timeout}, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_timeout(socket, state) + ThousandIsland.Handler.do_socket_close(socket, :timeout) + end - def handle_info( - {msg, raw_socket}, - {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp_closed, :ssl_closed] do - {:stop, {:shutdown, :peer_closed}, {handler_module, socket, state}} - end + # Called if we're being shutdown in an orderly manner. Socket is still open + def terminate(:shutdown, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_shutdown(socket, state) + ThousandIsland.Handler.do_socket_close(socket, :shutdown) + end - def handle_info( - {msg, raw_socket, reason}, - {handler_module, %ThousandIsland.Socket{socket: raw_socket} = socket, state} - ) - when msg in [:tcp_error, :ssl_error] do - {:stop, reason, {handler_module, socket, state}} - end + # Called if the socket encountered an error during handshaking + def terminate({:shutdown, {:handshake, reason}}, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_error(reason, socket, state) + ThousandIsland.Handler.do_socket_close(socket, reason) + end - def handle_info(:timeout, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - {:stop, {:shutdown, :timeout}, {handler_module, socket, state}} - end + # Called if the socket encountered an error and we are configured to shutdown silently. + # Socket is closed + def terminate( + {:shutdown, {:silent_termination, reason}}, + {%ThousandIsland.Socket{} = socket, state} + ) do + __MODULE__.handle_error(reason, socket, state) + ThousandIsland.Handler.do_socket_close(socket, reason) + end - def handle_info(msg, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - handler_module.handle_info(msg, socket, state) - |> handle_continuation(handler_module, socket) - end + # Called if the socket encountered an error during upgrading + def terminate({:shutdown, {:upgrade, reason}}, {socket, state}) do + __MODULE__.handle_error(reason, socket, state) + ThousandIsland.Handler.do_socket_close(socket, reason) + end - # Use a continue pattern here so that we have committed the socket - # to state in case the `c:handle_connection/2` callback raises an error. - # This ensures that the `c:terminate/2` calls below are able to properly - # close down the process - @impl GenServer - def handle_continue( - :handle_connection, - {handler_module, %ThousandIsland.Socket{} = socket, state} - ) do - handler_module.handle_connection(socket, state) - |> handle_continuation(handler_module, socket) - end + # Called if the remote end shut down the connection, or if the local end closed the + # connection by returning a `{:close,...}` tuple (in which case the socket will be open) + def terminate({:shutdown, reason}, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_close(socket, state) + ThousandIsland.Handler.do_socket_close(socket, reason) + end - # Called if the remote end closed the connection before we could initialize it - @impl GenServer - def terminate( - {:shutdown, {:premature_conn_closing, _reason}}, - {_handler_module, _raw_socket, _state} - ) do - :ok - end + # Called if the socket encountered an error. Socket is closed + def terminate(reason, {%ThousandIsland.Socket{} = socket, state}) do + __MODULE__.handle_error(reason, socket, state) + ThousandIsland.Handler.do_socket_close(socket, reason) + end - # Called by GenServer if we hit our read_timeout. Socket is still open - def terminate({:shutdown, :timeout}, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - handler_module.handle_timeout(socket, state) - do_socket_close(socket, :timeout) + # This clause could happen if we do not have a socket defined in state (either because the + # process crashed before setting it up, or because the user sent an invalid state) + def terminate(_reason, _state) do + :ok + end + end end - # Called if we're being shutdown in an orderly manner. Socket is still open - def terminate(:shutdown, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - handler_module.handle_shutdown(socket, state) - do_socket_close(socket, :shutdown) - end + def handler_impl do + quote do + @impl true + def handle_connection(_socket, state), do: {:continue, state} - # Called if the socket encountered an error during handshaking - def terminate( - {:shutdown, {:handshake, reason}}, - {handler_module, %ThousandIsland.Socket{} = socket, state} - ) do - handler_module.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + @impl true + def handle_data(_data, _socket, state), do: {:continue, state} - # Called if the socket encountered an error and we are configured to shutdown silently. - # Socket is closed - def terminate( - {:shutdown, {:silent_termination, reason}}, - {handler_module, %ThousandIsland.Socket{} = socket, state} - ) do - handler_module.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + @impl true + def handle_close(_socket, _state), do: :ok - # Called if the socket encountered an error during upgrading - def terminate({:shutdown, {:upgrade, reason}}, {handler_module, socket, state}) do - handler_module.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + @impl true + def handle_error(_error, _socket, _state), do: :ok - # Called if the remote end shut down the connection, or if the local end closed the - # connection by returning a `{:close,...}` tuple (in which case the socket will be open) - def terminate({:shutdown, reason}, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - handler_module.handle_close(socket, state) - do_socket_close(socket, reason) - end + @impl true + def handle_shutdown(_socket, _state), do: :ok - # Called if the socket encountered an error. Socket is closed - def terminate(reason, {handler_module, %ThousandIsland.Socket{} = socket, state}) do - handler_module.handle_error(reason, socket, state) - do_socket_close(socket, reason) - end + @impl true + def handle_timeout(_socket, _state), do: :ok - # This clause could happen if we do not have a socket defined in state (either because the - # process crashed before setting it up, or because the user sent an invalid state) - def terminate(_reason, _state) do - :ok + defoverridable ThousandIsland.Handler + end end @spec do_socket_close( ThousandIsland.Socket.t(), reason :: :shutdown | :local_closed | term() ) :: :ok - defp do_socket_close(socket, reason) do + @doc false + def do_socket_close(socket, reason) do measurements = case ThousandIsland.Socket.getstat(socket) do {:ok, stats} -> @@ -564,84 +527,55 @@ defmodule ThousandIsland.Handler do ThousandIsland.Telemetry.stop_span(socket.span, measurements, metadata) end - # Dialyzer gets confused by handle_continuation being a defp and not a def - defp handle_continuation(continuation, handler_module, socket) do + @doc false + def handle_continuation(continuation, socket) do case continuation do {:continue, state} -> _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {handler_module, socket, state}, socket.read_timeout} + {:noreply, {socket, state}, socket.read_timeout} {:continue, state, {:persistent, timeout}} -> socket = %{socket | read_timeout: timeout} _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {handler_module, socket, state}, timeout} + {:noreply, {socket, state}, timeout} {:continue, state, timeout} -> _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {handler_module, socket, state}, timeout} - - {:switch_transport, {transport_module, upgrade_opts}, state} -> - handle_switch_continuation( - handler_module, - socket, - transport_module, - upgrade_opts, - state, - socket.read_timeout - ) - - {:switch_transport, {transport_module, upgrade_opts}, state, {:persistent, timeout}} -> + {:noreply, {socket, state}, timeout} + + {:switch_transport, {module, upgrade_opts}, state} -> + handle_switch_continuation(socket, module, upgrade_opts, state, socket.read_timeout) + + {:switch_transport, {module, upgrade_opts}, state, {:persistent, timeout}} -> socket = %{socket | read_timeout: timeout} + handle_switch_continuation(socket, module, upgrade_opts, state, timeout) - handle_switch_continuation( - handler_module, - socket, - transport_module, - upgrade_opts, - state, - timeout - ) - - {:switch_transport, {transport_module, upgrade_opts}, state, timeout} -> - handle_switch_continuation( - handler_module, - socket, - transport_module, - upgrade_opts, - state, - timeout - ) + {:switch_transport, {module, upgrade_opts}, state, timeout} -> + handle_switch_continuation(socket, module, upgrade_opts, state, timeout) {:close, state} -> - {:stop, {:shutdown, :local_closed}, {handler_module, socket, state}} + {:stop, {:shutdown, :local_closed}, {socket, state}} {:error, :timeout, state} -> - {:stop, {:shutdown, :timeout}, {handler_module, socket, state}} + {:stop, {:shutdown, :timeout}, {socket, state}} {:error, reason, state} -> if socket.silent_terminate_on_error do - {:stop, {:shutdown, {:silent_termination, reason}}, {handler_module, socket, state}} + {:stop, {:shutdown, {:silent_termination, reason}}, {socket, state}} else - {:stop, reason, {handler_module, socket, state}} + {:stop, reason, {socket, state}} end end end - defp handle_switch_continuation( - handler_module, - socket, - transport_module, - upgrade_opts, - state, - timeout - ) do - case ThousandIsland.Socket.upgrade(socket, transport_module, upgrade_opts) do + defp handle_switch_continuation(socket, module, upgrade_opts, state, timeout) do + case ThousandIsland.Socket.upgrade(socket, module, upgrade_opts) do {:ok, socket} -> _ = ThousandIsland.Socket.setopts(socket, active: :once) - {:noreply, {handler_module, socket, state}, timeout} + {:noreply, {socket, state}, timeout} {:error, reason} -> - {:stop, {:shutdown, {:upgrade, reason}}, {handler_module, socket, state}} + {:stop, {:shutdown, {:upgrade, reason}}, {socket, state}} end end end diff --git a/test/thousand_island/handler_test.exs b/test/thousand_island/handler_test.exs index 952f7d5..c533c8c 100644 --- a/test/thousand_island/handler_test.exs +++ b/test/thousand_island/handler_test.exs @@ -43,6 +43,37 @@ defmodule ThousandIsland.HandlerTest do assert messages =~ "Closing with hello" end + defmodule BogusState do + use ThousandIsland.Handler + + @impl ThousandIsland.Handler + def handle_connection(_socket, state) do + send(self(), "bogus") + {:continue, state} + end + + def handle_info("bogus", {_socket, state}) do + # Intentionally dropping socket here + {:noreply, state} + end + end + + test "it should complain loudly if a handle_info callback returns the wrong shaped state" do + {:ok, port} = start_handler(BogusState) + {:ok, client} = :gen_tcp.connect(:localhost, port, active: false) + + messages = + capture_log(fn -> + :gen_tcp.send(client, "ping") + Process.sleep(100) + end) + + # Ensure that we saw the message displayed when we tried to handle_data + # after getting a bogus state back + assert messages =~ + "The callback's `state` doesn't match the expected `{socket, state}` form" + end + defmodule FakeProxy do use ThousandIsland.Handler @@ -52,9 +83,9 @@ defmodule ThousandIsland.HandlerTest do {:continue, state} end - def handle_info({:tcp, _othersocket, _otherdata}, socket, state) do + def handle_info({:tcp, _othersocket, _otherdata}, {socket, state}) do ThousandIsland.Socket.send(socket, "Got other data") - {:continue, state} + {:noreply, {socket, state}} end end From 07079db3ecd32e35b47aca1d9bc596639c8c4ff6 Mon Sep 17 00:00:00 2001 From: Mat Trudel Date: Sun, 5 Jan 2025 13:04:36 -0500 Subject: [PATCH 3/3] Add credo filter --- lib/thousand_island/handler.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/thousand_island/handler.ex b/lib/thousand_island/handler.ex index d391075..9b76b9d 100644 --- a/lib/thousand_island/handler.ex +++ b/lib/thousand_island/handler.ex @@ -329,6 +329,7 @@ defmodule ThousandIsland.Handler do end end + # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity def genserver_impl do quote do @impl true