Skip to content

Commit

Permalink
Refactor SteamTask to be a GenServer
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrudel committed Jan 8, 2024
1 parent 29b47d2 commit 9fa5d65
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 44 deletions.
16 changes: 8 additions & 8 deletions lib/bandit/http2/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# HTTP/2 Handler

Included in this folder is a complete `ThousandIsland.Handler` based implementation of HTTP/2 as
defined in [RFC 9113](https://datatracker.ietf.org/doc/rfc9113).
defined in [RFC 9113](https://datatracker.ietf.org/doc/rfc9113).

## Process model

Expand All @@ -10,11 +10,11 @@ Within a Bandit server, an HTTP/2 connection is modeled as a set of processes:
* 1 process per connection, a `Bandit.HTTP2.Handler` module implementing the
`ThousandIsland.Handler` behaviour, and;
* 1 process per stream (i.e.: per HTTP request) within the connection, implemented as
a `Bandit.HTTP2.StreamTask` Task
a `Bandit.HTTP2.StreamProcess` process

The lifetimes of these processes correspond to their role; a connection process lives for as long
as a client is connected, and a stream process lives only as long as is required to process
a single stream request within a connection.
a single stream request within a connection.

Connection processes are the 'root' of each connection's process group, and are supervised by
Thousand Island in the same manner that `ThousandIsland.Handler` processes are usually supervised
Expand All @@ -40,12 +40,12 @@ looks like the following:
2. Frames are parsed from these bytes by calling the `Bandit.HTTP2.Frame.deserialize/2`
function. If successful, the parsed frame(s) are returned. We retain any unparsed bytes in
a buffer in order to attempt parsing them upon receipt of subsequent data from the client
3. Parsed frames are passed into the `Bandit.HTTP2.Connection` module along with a struct of
same module. Frames are applied against this struct in a vaguely FSM-like manner, using pattern
3. Parsed frames are passed into the `Bandit.HTTP2.Connection` module along with a struct of
same module. Frames are applied against this struct in a vaguely FSM-like manner, using pattern
matching within the `Bandit.HTTP2.Connection.handle_frame/3` function. Any side-effects of
received frames are applied in these functions, and an updated connection struct is returned to
represent the updated connection state. These side-effects can take the form of starting stream
tasks, conveying data to running stream tasks, responding to the client with various frames, or
processes, conveying data to running stream processes, responding to the client with various frames, or
any number of other actions
4. This process is repeated every time we receive data from the client until the
`Bandit.HTTP2.Connection` module indicates that the connection should be closed, either
Expand All @@ -59,11 +59,11 @@ looks like the following:
## Processing requests

The details of a particular stream are contained within a `Bandit.HTTP2.Stream` struct
(as well as a `Bandit.HTTP2.StreamTask` process in the case of active streams). The
(as well as a `Bandit.HTTP2.StreamProcess` process in the case of active streams). The
`Bandit.HTTP2.StreamCollection` module manages a collection of streams, allowing for the memory
efficient management of complete & yet unborn streams alongside active ones.

Once a complete header block has been read, a `Bandit.HTTP2.StreamTask` is started to manage the
Once a complete header block has been read, a `Bandit.HTTP2.StreamProcess` is started to manage the
actual calling of the configured `Plug` module for this server, using the `Bandit.HTTP2.Adapter`
module as the implementation of the `Plug.Conn.Adapter` behaviour. This adapter uses a simple
`receive` pattern to listen for messages sent to it from the connection process, a pattern chosen
Expand Down
10 changes: 7 additions & 3 deletions lib/bandit/http2/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ defmodule Bandit.HTTP2.Adapter do
}
end

# As described in the header documentation for the `Bandit.HTTP2.StreamTask` module, we
# purposefully use raw `receive` message patterns here in order to facilitate an imperatively
# structured blocking interface. Comments inline.
# We purposefully use raw `receive` message patterns here in order to facilitate an imperatively
# structured blocking interface as required by `Plug.Conn.Adapter`. This is very unconventional
# but also safe, so long as the receive patterns expressed below are extremely tight.
#
# The events which 'unblock' these conditions come from within the Connection, and are pushed
# down to streams via calls on `Bandit.HTTP2.StreamProcess` as a fundamental design decision
# (rather than having stream processes query the connection directly).
@impl Plug.Conn.Adapter
def read_req_body(%__MODULE__{end_stream: true}, _opts), do: raise(Bandit.BodyAlreadyReadError)

Expand Down
2 changes: 1 addition & 1 deletion lib/bandit/http2/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ defmodule Bandit.HTTP2.Connection do
#
# Sending logic
#
# All callers of functions below will be from stream tasks, looked up via pid
# All callers of functions below will be from stream processes, looked up via pid
#

#
Expand Down
17 changes: 10 additions & 7 deletions lib/bandit/http2/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Bandit.HTTP2.Stream do
require Integer
require Logger

alias Bandit.HTTP2.{Connection, Errors, FlowControl, StreamTask}
alias Bandit.HTTP2.{Connection, Errors, FlowControl, StreamProcess}

defstruct stream_id: nil,
state: nil,
Expand Down Expand Up @@ -91,9 +91,12 @@ defmodule Bandit.HTTP2.Stream do
content_encoding,
opts
),
{:ok, pid} <- StreamTask.start_link(req, transport_info, headers, plug, span) do
{:ok, pid} <- StreamProcess.start_link(req, transport_info, headers, plug, span) do
{:ok,
%{stream | state: :open, pid: pid, pending_content_length: content_length, span: span}}
else
:ignore -> {:error, "Unable to start stream process"}
other -> other
end
end

Expand Down Expand Up @@ -152,7 +155,7 @@ defmodule Bandit.HTTP2.Stream do

@spec recv_data(t(), binary()) :: {:ok, t(), non_neg_integer()} | {:error, Connection.error()}
def recv_data(%__MODULE__{state: state} = stream, data) when state in [:open, :local_closed] do
StreamTask.recv_data(stream.pid, data)
StreamProcess.recv_data(stream.pid, data)

{new_window, increment} =
FlowControl.compute_recv_window(stream.recv_window_size, byte_size(data))
Expand Down Expand Up @@ -195,22 +198,22 @@ defmodule Bandit.HTTP2.Stream do
end

def recv_rst_stream(%__MODULE__{} = stream, error_code) do
if is_pid(stream.pid), do: StreamTask.recv_rst_stream(stream.pid, error_code)
if is_pid(stream.pid), do: StreamProcess.recv_rst_stream(stream.pid, error_code)
{:ok, %{stream | state: :closed, pid: nil}}
end

@spec recv_end_of_stream(t(), boolean()) ::
{:ok, t()} | {:error, Connection.error()}
def recv_end_of_stream(%__MODULE__{state: :open} = stream, true) do
with :ok <- verify_content_length(stream) do
StreamTask.recv_end_of_stream(stream.pid)
StreamProcess.recv_end_of_stream(stream.pid)
{:ok, %{stream | state: :remote_closed}}
end
end

def recv_end_of_stream(%__MODULE__{state: :local_closed} = stream, true) do
with :ok <- verify_content_length(stream) do
StreamTask.recv_end_of_stream(stream.pid)
StreamProcess.recv_end_of_stream(stream.pid)
{:ok, %{stream | state: :closed, pid: nil}}
end
end
Expand Down Expand Up @@ -335,7 +338,7 @@ defmodule Bandit.HTTP2.Stream do
:ok
end

Logger.error("Task for stream #{stream.stream_id} crashed with #{inspect(reason)}")
Logger.error("Process for stream #{stream.stream_id} crashed with #{inspect(reason)}")

{:ok, %{stream | state: :closed, pid: nil}, Errors.internal_error()}
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
defmodule Bandit.HTTP2.StreamTask do
defmodule Bandit.HTTP2.StreamProcess do
@moduledoc false
# This Task is where an actual Plug is executed, within the context of an HTTP/2 stream. There
# This process is where an actual Plug is executed, within the context of an HTTP/2 stream. There
# is a bit of split responsibility between this module and the `Bandit.HTTP2.Adapter` module
# which merits explanation:
#
# Broadly, this module is responsible for the execution of a Plug and does so within a Task
# process. Task is used in preference to GenServer here because of the shape of the
# `Plug.Conn.Adapter` API (implemented by the `Bandit.HTTP2.Adapter` module). Specifically, that
# API requires blocking semantics for the `Plug.Conn.Adapter.read_req_body/2`) call and expects
# it to block until some underlying condition has been met (the body has been read, a timeout
# has occurred, etc). The events which 'unblock' these conditions typically come from within the
# Connection, and are pushed down to streams as a fundamental design decision (rather than
# having stream processes query the connection directly). As such, it is much simpler for Task
# processes to wait in an imperative fashion using `receive` calls directly.
# Broadly, this module is responsible for the execution of a Plug and does so within a GenServer
# handle_continue call. The entirety of a Plug lifecycle takes place in this single call.
#
# To contain these design decisions, the 'connection-facing' API for sending data to a stream
# process is expressed on this module (via the `recv_*` functions) even though the 'other half'
# of those calls exists in the `Bandit.HTTP2.Adapter` module. As a result, this module and the
# Handler module are fairly tightly coupled, but together they express clear APIs towards both
# Plug applications and the rest of Bandit.
# The 'connection-facing' API for sending data to a stream process is expressed on this module
# (via the `recv_*` functions) even though the 'other half' of those calls exists in the
# `Bandit.HTTP2.Adapter` module. As a result, this module and the Handler module are fairly
# tightly coupled, but together they express clear APIs towards both Plug applications and the
# rest of Bandit.

use Task
use GenServer, restart: :temporary

# A stream process can be created only once we have an adapter & set of headers. Pass them in
# at creation time to ensure this invariant
Expand All @@ -30,27 +23,31 @@ defmodule Bandit.HTTP2.StreamTask do
Plug.Conn.headers(),
Bandit.Pipeline.plug_def(),
Bandit.Telemetry.t()
) :: {:ok, pid()}
) :: GenServer.on_start()
def start_link(req, transport_info, headers, plug, span) do
Task.start_link(__MODULE__, :run, [req, transport_info, headers, plug, span])
GenServer.start_link(__MODULE__, {req, transport_info, headers, plug, span})
end

# Let the stream task know that body data has arrived from the client. The other half of this
# Let the stream process know that body data has arrived from the client. The other half of this
# flow can be found in `Bandit.HTTP2.Adapter.read_req_body/2`
@spec recv_data(pid(), iodata()) :: :ok | :noconnect | :nosuspend
def recv_data(pid, data), do: send(pid, {:data, data})

# Let the stream task know that the client has set the end of stream flag. The other half of
# Let the stream process know that the client has set the end of stream flag. The other half of
# this flow can be found in `Bandit.HTTP2.Adapter.read_req_body/2`
@spec recv_end_of_stream(pid()) :: :ok | :noconnect | :nosuspend
def recv_end_of_stream(pid), do: send(pid, :end_stream)

# Let the stream task know that the client has reset the stream. This will terminate the
# Let the stream process know that the client has reset the stream. This will terminate the
# stream's handling process
@spec recv_rst_stream(pid(), Bandit.HTTP2.Errors.error_code()) :: true
def recv_rst_stream(pid, error_code), do: Process.exit(pid, {:recv_rst_stream, error_code})

def run(req, transport_info, all_headers, plug, span) do
def init(state) do
{:ok, state, {:continue, :run}}
end

def handle_continue(:run, {req, transport_info, all_headers, plug, span}) do
with {:ok, request_target} <- build_request_target(all_headers),
method <- Bandit.Headers.get_header(all_headers, ":method"),
req <- %{req | method: method} do
Expand All @@ -74,7 +71,7 @@ defmodule Bandit.HTTP2.StreamTask do
status: conn.status
})

:ok
{:stop, :normal, {req, transport_info, all_headers, plug, span}}
else
{:error, reason} ->
raise Bandit.HTTP2.Stream.StreamError,
Expand Down
2 changes: 1 addition & 1 deletion test/bandit/http2/plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ defmodule HTTP2PlugTest do
# The return value here isn't relevant, since the HTTP call is done within
# a single Task call & may complete before the spawned process exits. Look
# at the logged errors instead
assert errors =~ ~r[\[error\] Task for stream .* crashed with :abnormal]
assert errors =~ ~r[\[error\] Process for stream .* crashed with :abnormal]
end

def spawn_abnormal_child(conn) do
Expand Down

0 comments on commit 9fa5d65

Please sign in to comment.