Skip to content

Commit

Permalink
First pass at StreamTransport (very much a WiP)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrudel committed Jan 19, 2024
1 parent 988a389 commit 0611381
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 237 deletions.
200 changes: 43 additions & 157 deletions lib/bandit/http2/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,154 +6,88 @@ defmodule Bandit.HTTP2.Adapter do

@behaviour Plug.Conn.Adapter

defstruct connection: nil,
defstruct stream_transport: nil,
owner_pid: nil,
transport_info: nil,
stream_id: nil,
end_stream: false,
recv_window_size: 65_535,
send_window_size: nil,
method: nil,
content_encoding: nil,
pending_content_length: nil,
metrics: %{},
opts: []

@typedoc "A struct for backing a Plug.Conn.Adapter"
@type t :: %__MODULE__{
connection: pid(),
stream_transport: Bandit.HTTP2.StreamTransport.t(),
owner_pid: pid() | nil,
transport_info: Bandit.TransportInfo.t(),
stream_id: Bandit.HTTP2.Stream.stream_id(),
end_stream: boolean(),
recv_window_size: non_neg_integer(),
send_window_size: non_neg_integer(),
method: Plug.Conn.method() | nil,
content_encoding: String.t() | nil,
pending_content_length: non_neg_integer() | nil,
metrics: map(),
opts: keyword()
}

def init(connection, owner, transport_info, stream_id, send_window_size, opts) do
def init(stream_transport, owner, opts) do
%__MODULE__{
connection: connection,
stream_transport: stream_transport,
owner_pid: owner,
transport_info: transport_info,
stream_id: stream_id,
send_window_size: send_window_size,
opts: opts
}
end

def add_end_header_metric(adapter) do
def recv_request_line(adapter, method, _request_target) do
%{adapter | method: method}
end

def recv_headers(adapter, headers) do
content_encoding =
Bandit.Compression.negotiate_content_encoding(
Bandit.Headers.get_header(headers, "accept-encoding"),
Keyword.get(adapter.opts, :compress, true)
)

stream_transport =
Bandit.HTTP2.StreamTransport.recv_headers(adapter.stream_transport, headers)

%{
adapter
| metrics: Map.put(adapter.metrics, :req_header_end_time, Bandit.Telemetry.monotonic_time())
| stream_transport: stream_transport,
content_encoding: content_encoding,
metrics: Map.put(adapter.metrics, :req_header_end_time, Bandit.Telemetry.monotonic_time())
}
end

# We purposefully use raw `receive` message patterns here in order to facilitate an imperatively
# structured blocking interface as required by `Plug.Conn.Adapter`. This is very unconventional
# but also safe, so long as the receive patterns expressed below are extremely tight.
#
# The events which 'unblock' these conditions come from within the Connection, and are pushed
# down to streams via calls on `Bandit.HTTP2.StreamProcess` as a fundamental design decision
# (rather than having stream processes query the connection directly).
@impl Plug.Conn.Adapter
def read_req_body(%__MODULE__{end_stream: true}, _opts), do: raise(Bandit.BodyAlreadyReadError)

def read_req_body(%__MODULE__{} = adapter, opts) do
validate_calling_process!(adapter)
timeout = Keyword.get(opts, :read_timeout, 15_000)
length = Keyword.get(opts, :length, 8_000_000)
do_read_req_body(adapter, timeout, length, [])
end
timeout = Keyword.get(opts, :read_timeout, 15_000)

defp do_read_req_body(adapter, timeout, remaining_length, acc) do
metrics =
adapter.metrics
|> Map.put_new_lazy(:req_body_start_time, &Bandit.Telemetry.monotonic_time/0)

adapter = %{adapter | metrics: metrics}

receive do
{:data, data} ->
{new_window, increment} =
Bandit.HTTP2.FlowControl.compute_recv_window(adapter.recv_window_size, byte_size(data))

if increment > 0 do
GenServer.call(
adapter.connection,
{:send_recv_window_update, adapter.stream_id, increment}
)
end

adapter = %{adapter | recv_window_size: new_window}

acc = [data | acc]
remaining_length = remaining_length - byte_size(data)

if remaining_length >= 0 do
do_read_req_body(adapter, timeout, remaining_length, acc)
else
return_more(acc, adapter)
end
case Bandit.HTTP2.StreamTransport.read_body(adapter.stream_transport, length, timeout) do
{:ok, body, stream_transport} ->
metrics =
metrics
|> Map.update(:req_body_bytes, byte_size(body), &(&1 + byte_size(body)))
|> Map.put(:req_body_end_time, Bandit.Telemetry.monotonic_time())

:end_stream ->
bytes_read = IO.iodata_length(acc)
{:ok, body,
%{adapter | stream_transport: stream_transport, metrics: metrics, end_stream: true}}

pending_content_length =
case adapter.pending_content_length do
nil -> nil
pending_content_length -> pending_content_length - bytes_read
end
{:more, body, stream_transport} ->
metrics =
metrics
|> Map.update(:req_body_bytes, byte_size(body), &(&1 + byte_size(body)))

if pending_content_length in [nil, 0] do
metrics =
adapter.metrics
|> Map.update(:req_body_bytes, bytes_read, &(&1 + bytes_read))
|> Map.put(:req_body_end_time, Bandit.Telemetry.monotonic_time())

{:ok, wrap_req_body(acc),
%{
adapter
| end_stream: true,
pending_content_length: pending_content_length,
metrics: metrics
}}
else
raise Bandit.HTTP2.Stream.StreamError,
message: "Received end of stream with #{pending_content_length} byte(s) pending",
method: adapter.method,
error_code: Bandit.HTTP2.Errors.protocol_error()
end
after
timeout -> return_more(acc, adapter)
{:more, body, %{adapter | stream_transport: stream_transport, metrics: metrics}}
end
end

defp return_more(data, adapter) do
bytes_read = IO.iodata_length(data)

pending_content_length =
case adapter.pending_content_length do
nil -> nil
pending_content_length -> pending_content_length - bytes_read
end

metrics =
adapter.metrics
|> Map.update(:req_body_bytes, bytes_read, &(&1 + bytes_read))

{:more, wrap_req_body(data),
%{adapter | metrics: metrics, pending_content_length: pending_content_length}}
end

defp wrap_req_body(data) do
data |> Enum.reverse() |> IO.iodata_to_binary()
end

@impl Plug.Conn.Adapter
def send_resp(%__MODULE__{} = adapter, status, headers, body) do
validate_calling_process!(adapter)
Expand Down Expand Up @@ -303,7 +237,7 @@ defmodule Bandit.HTTP2.Adapter do
headers = split_cookies(headers)
headers = [{":status", to_string(status)} | headers]

GenServer.call(adapter.connection, {:send_headers, adapter.stream_id, headers, false})
Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, false)
end

@impl Plug.Conn.Adapter
Expand All @@ -313,8 +247,7 @@ defmodule Bandit.HTTP2.Adapter do
def push(_adapter, _path, _headers), do: {:error, :not_supported}

@impl Plug.Conn.Adapter
def get_peer_data(%__MODULE__{transport_info: transport_info}),
do: Bandit.TransportInfo.peer_data(transport_info)
def get_peer_data(req), do: Bandit.TransportInfo.peer_data(req.stream_transport.transport_info)

@impl Plug.Conn.Adapter
def get_http_protocol(%__MODULE__{}), do: :"HTTP/2"
Expand All @@ -341,66 +274,19 @@ defmodule Bandit.HTTP2.Adapter do

headers = [{":status", to_string(status)} | headers]

GenServer.call(adapter.connection, {:send_headers, adapter.stream_id, headers, end_stream})
Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, end_stream)

%{adapter | metrics: metrics}
end

defp send_data(adapter, data, end_stream) do
adapter = wait_for_send_window(adapter, 0)
max_bytes_to_send = max(adapter.send_window_size, 0)
{data_to_send, bytes_to_send, rest} = split_data(data, max_bytes_to_send)

adapter =
if end_stream || bytes_to_send > 0 do
GenServer.call(
adapter.connection,
{:send_data, adapter.stream_id, data_to_send, end_stream && byte_size(rest) == 0},
:infinity
)

metrics =
adapter.metrics |> Map.update(:resp_body_bytes, bytes_to_send, &(&1 + bytes_to_send))

%{adapter | metrics: metrics, send_window_size: adapter.send_window_size - bytes_to_send}
else
adapter
end
{stream_transport, bytes_sent} =
Bandit.HTTP2.StreamTransport.send_data(adapter.stream_transport, data, end_stream)

if byte_size(rest) == 0 do
adapter
else
adapter = wait_for_send_window(adapter, :infinity)
send_data(adapter, rest, end_stream)
end
end

defp wait_for_send_window(adapter, timeout) do
receive do
{:send_window_update, increment} ->
case Bandit.HTTP2.FlowControl.update_send_window(adapter.send_window_size, increment) do
{:ok, new_window} ->
%{adapter | send_window_size: new_window}

{:error, reason} ->
raise Bandit.HTTP2.Stream.StreamError,
message: reason,
error_code: Bandit.HTTP2.Errors.flow_control_error()
end
after
timeout -> adapter
end
end

defp split_data(data, desired_length) do
data_length = IO.iodata_length(data)
metrics =
adapter.metrics |> Map.update(:resp_body_bytes, bytes_sent, &(&1 + bytes_sent))

if data_length <= desired_length do
{data, data_length, <<>>}
else
<<to_send::binary-size(desired_length), rest::binary>> = IO.iodata_to_binary(data)
{to_send, desired_length, rest}
end
%{adapter | stream_transport: stream_transport, metrics: metrics}
end

defp split_cookies(headers) do
Expand Down
13 changes: 5 additions & 8 deletions lib/bandit/http2/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Bandit.HTTP2.Stream do

require Logger

alias Bandit.HTTP2.{Connection, Errors, StreamProcess}
alias Bandit.HTTP2.{Connection, Errors, StreamProcess, StreamTransport}

defstruct stream_id: nil,
state: nil,
Expand Down Expand Up @@ -74,13 +74,10 @@ defmodule Bandit.HTTP2.Stream do
plug,
opts
) do
case StreamProcess.start_link(
self(),
stream.stream_id,
initial_send_window_size,
transport_info,
connection_span
) do
stream_process =
StreamTransport.new(self(), stream.stream_id, initial_send_window_size, transport_info)

case StreamProcess.start_link(stream_process, connection_span) do
{:ok, pid} ->
StreamProcess.recv_headers(pid, headers, plug, opts)
{:ok, %{stream | state: :open, pid: pid}}
Expand Down
Loading

0 comments on commit 0611381

Please sign in to comment.