Skip to content

Commit

Permalink
Move WebSocket adapter to package dream-httpaf
Browse files Browse the repository at this point in the history
It is needed by both the client and the server.
  • Loading branch information
aantron committed Jan 19, 2022
1 parent a8d6e5c commit acc2efc
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 228 deletions.
3 changes: 1 addition & 2 deletions src/http/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
(public_name dream.http)
(name dream__http)
(libraries
bigarray-compat
bigstringaf
digestif
dream.certificate
dream.cipher
dream-pure
dream.server
dream-httpaf
dream-httpaf.gluten
dream-httpaf.gluten-lwt-unix
dream-httpaf.h2
Expand Down
228 changes: 2 additions & 226 deletions src/http/http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,231 +38,6 @@ let sha1 s =
let websocket_log =
Log.sub_log "dream.websocket"

let websocket_handler stream socket =
(* Queue of received frames. There doesn't appear to be a nice way to achieve
backpressure with the current API of websocket/af, so that will have to be
added later. The user-facing API of Dream does support backpressure. *)
let frames, push_frame = Lwt_stream.create () in
let message_is_binary = ref `Binary in

(* Frame reader called by websocket/af on each frame received. There is no
good way to truly throttle this, hence this frame reader pushes frame
objects into the above frame queue for the reader to take from later. See
https://github.com/anmonteiro/websocketaf/issues/34. *)
let frame ~opcode ~is_fin ~len:_ payload =
match opcode with
| `Connection_close ->
push_frame (Some (`Close, payload))
| `Ping ->
push_frame (Some (`Ping, payload))
| `Pong ->
push_frame (Some (`Pong, payload))
| `Other _ ->
push_frame (Some (`Other, payload))
| `Text ->
message_is_binary := `Text;
push_frame (Some (`Data (`Text, is_fin), payload))
| `Binary ->
message_is_binary := `Binary;
push_frame (Some (`Data (`Binary, is_fin), payload))
| `Continuation ->
push_frame (Some (`Data (!message_is_binary, is_fin), payload))
in

let eof () =
push_frame None in

(* The reader retrieves the next frame. If it is a data frame, it keeps a
reference to the payload across multiple reader calls, until the payload is
exhausted. *)
let closed = ref false in
let close_code = ref 1005 in
let current_payload = ref None in

(* Used to convert the separate on_eof payload reading callback into a FIN bit
on the last chunk read. See
https://github.com/anmonteiro/websocketaf/issues/35. *)
let last_chunk = ref None in
(* TODO Review per-chunk allocations, including current_payload contents. *)

(* For control frames, the payload can be at most 125 bytes long. We assume
that the first chunk will contain the whole payload, and discard any other
chunks that may be reported by websocket/af. *)
let first_chunk_received = ref false in
let first_chunk = ref Bigstringaf.empty in
let first_chunk_offset = ref 0 in
let first_chunk_length = ref 0 in
let rec drain_payload payload continuation =
Websocketaf.Payload.schedule_read
payload
~on_read:(fun buffer ~off ~len ->
if not !first_chunk_received then begin
first_chunk := buffer;
first_chunk_offset := off;
first_chunk_length := len;
first_chunk_received := true
end
else
websocket_log.warning (fun log ->
log "Received fragmented control frame");
drain_payload payload continuation)
~on_eof:(fun () ->
let payload = !first_chunk in
let offset = !first_chunk_offset in
let length = !first_chunk_length in
first_chunk_received := false;
first_chunk := Bigstringaf.empty;
first_chunk_offset := 0;
first_chunk_length := 0;
continuation payload offset length)
in

(* TODO Can this be canceled by a user's close? i.e. will that eventually
cause a call to eof above? *)
let rec read ~data ~flush ~ping ~pong ~close ~exn =
if !closed then
close !close_code
else
match !current_payload with
| None ->
Lwt.on_success (Lwt_stream.get frames) begin function
| None ->
if not !closed then begin
closed := true;
close_code := 1005
end;
Websocketaf.Wsd.close socket;
close !close_code
| Some (`Close, payload) ->
drain_payload payload @@ fun buffer offset length ->
let code =
if length < 2 then
1005
else
let high_byte = Char.code buffer.{offset}
and low_byte = Char.code buffer.{offset + 1} in
high_byte lsl 8 lor low_byte
in
if not !closed then
close_code := code;
close !close_code
| Some (`Ping, payload) ->
drain_payload payload @@
ping
| Some (`Pong, payload) ->
drain_payload payload @@
pong
| Some (`Other, payload) ->
drain_payload payload @@ fun _buffer _offset length ->
websocket_log.warning (fun log ->
log "Unknown frame type with length %i" length);
read ~data ~flush ~ping ~pong ~close ~exn
| Some (`Data properties, payload) ->
current_payload := Some (properties, payload);
read ~data ~flush ~ping ~pong ~close ~exn
end
| Some ((binary, fin), payload) ->
Websocketaf.Payload.schedule_read
payload
~on_read:(fun buffer ~off ~len ->
match !last_chunk with
| None ->
last_chunk := Some (buffer, off, len);
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := Some (buffer, off, len);
let binary = binary = `Binary in
data last_buffer last_offset last_length binary false)
~on_eof:(fun () ->
current_payload := None;
match !last_chunk with
| None ->
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := None;
let binary = binary = `Binary in
data last_buffer last_offset last_length binary fin)
in

let bytes_since_flush = ref 0 in

let flush ~close ok =
bytes_since_flush := 0;
if !closed then
close !close_code
else
Websocketaf.Wsd.flushed socket ok
in

let close code =
if not !closed then begin
(* TODO Really need to work out the "close handshake" and how it is
exposed in the Stream API. *)
(* closed := true; *)
Websocketaf.Wsd.close ~code:(`Other code) socket
end
in

let abort _exn = close 1005 in

let reader = Stream.reader ~read ~close ~abort in
Stream.forward reader stream;

let rec outgoing_loop () =
Stream.read
stream
~data:(fun buffer offset length binary fin ->
(* Until https://github.com/anmonteiro/websocketaf/issues/33. *)
if not fin then
websocket_log.error (fun log ->
log "Non-FIN frames not yet supported");
let kind = if binary then `Binary else `Text in
if !closed then
close !close_code
else begin
Websocketaf.Wsd.schedule socket ~kind buffer ~off:offset ~len:length;
bytes_since_flush := !bytes_since_flush + length;
if !bytes_since_flush >= 4096 then
flush ~close outgoing_loop
else
outgoing_loop ()
end)
~flush:(fun () -> flush ~close outgoing_loop)
~ping:(fun _buffer _offset length ->
if length > 125 then
raise (Failure "Ping payload cannot exceed 125 bytes");
(* See https://github.com/anmonteiro/websocketaf/issues/36. *)
if length > 0 then
websocket_log.warning (fun log ->
log "Ping with non-empty payload not yet supported");
if !closed then
close !close_code
else begin
Websocketaf.Wsd.send_ping socket;
outgoing_loop ()
end)
~pong:(fun _buffer _offset length ->
(* TODO Is there any way for the peer to send a ping payload with more
than 125 bytes, forcing a too-large pong and an exception? *)
if length > 125 then
raise (Failure "Pong payload cannot exceed 125 bytes");
(* See https://github.com/anmonteiro/websocketaf/issues/36. *)
if length > 0 then
websocket_log.warning (fun log ->
log "Pong with non-empty payload not yet supported");
if !closed then
close !close_code
else begin
Websocketaf.Wsd.send_pong socket;
outgoing_loop ()
end)
~close
~exn:abort
in
outgoing_loop ();

Websocketaf.Server_connection.{frame; eof}



(* Wraps the user's Dream handler in the kind of handler expected by http/af.
Expand Down Expand Up @@ -380,7 +155,8 @@ let wrap_handler
let proceed () =
Websocketaf.Server_connection.create_websocket
~error_handler
(websocket_handler (Message.client_stream response))
(Dream_httpaf.Websocket.websocket_handler
(Message.client_stream response))
|> Gluten.make (module Websocketaf.Server_connection)
|> upgrade
in
Expand Down
10 changes: 10 additions & 0 deletions src/http/shared/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(library
(public_name dream-httpaf)
(name dream_httpaf)
(libraries
bigstringaf
dream-pure
dream-httpaf.websocketaf
)
(preprocess (pps lwt_ppx))
(instrumentation (backend bisect_ppx)))
Loading

0 comments on commit acc2efc

Please sign in to comment.