Skip to content

Commit

Permalink
feat: move to bytestrings
Browse files Browse the repository at this point in the history
  • Loading branch information
leostera committed Jan 11, 2024
1 parent 5e954c6 commit 040ae22
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 177 deletions.
107 changes: 58 additions & 49 deletions nomad/adapter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ open Logger

let ( let* ) = Result.bind

let rec split ?(left = {%b||}) str =
match%b str with
| {| "\r\n"::bytes, rest::bytes |} -> [ left; rest ]
| {| c::utf8, rest::bytes |} -> split ~left:(Bytestring.join c left) rest

let deflate_string str =
let i = De.bigstring_create De.io_buffer_size in
let o = De.bigstring_create De.io_buffer_size in
Expand Down Expand Up @@ -51,11 +56,11 @@ let gzip_string str =
Gz.Higher.compress ~w ~q ~level:9 ~refill ~flush () cfg i o;
Buffer.contents r

let gzip buf = gzip_string (IO.Bytes.to_string buf) |> IO.Bytes.of_string
let gzip buf = gzip_string (Bytestring.to_string buf) |> Bytestring.of_string

let deflate buf =
let str = deflate_string (IO.Bytes.to_string buf) in
str |> IO.Bytes.of_string
let str = deflate_string (Bytestring.to_string buf) in
str |> Bytestring.of_string

let has_custom_content_encoding (res : Response.t) =
Http.Header.get res.headers "content-encoding" |> Option.is_some
Expand All @@ -76,9 +81,9 @@ let has_no_transform (res : Response.t) =
| _ -> false

let maybe_compress (req : Request.t) buf =
if IO.Bytes.length buf = 0 then (None, None)
if Bytestring.length buf = 0 then (None, None)
else (
debug (fun f -> f "body: %s" (IO.Bytes.to_string buf));
debug (fun f -> f "body: %s" (Bytestring.to_string buf));
let accepted_encodings =
Http.Header.get req.headers "accept-encoding"
|> Option.map (fun enc -> String.split_on_char ',' enc)
Expand Down Expand Up @@ -113,7 +118,7 @@ let send conn (req : Request.t) (res : Response.t) =
in

let body_len =
Option.map IO.Bytes.length body
Option.map Bytestring.length body
|> Option.value ~default:0 |> Int.to_string
in
let headers =
Expand Down Expand Up @@ -183,31 +188,31 @@ let send conn (req : Request.t) (res : Response.t) =
Httpaf.Httpaf_private.Serialize.write_response buf res;

(match body with
| Some body -> Faraday.write_string buf (IO.Bytes.to_string body)
| Some body -> Faraday.write_string buf (Bytestring.to_string body)
| _ -> ());

let s = Faraday.serialize_to_string buf in
IO.Bytes.of_string s
Bytestring.of_string s
in

debug (fun f -> f "res: %S" (IO.Bytes.to_string buf));
debug (fun f -> f "res: %S" (Bytestring.to_string buf));
let _ = Atacama.Connection.send conn buf in
()

let send_chunk conn (req : Request.t) buf =
if req.meth = `HEAD then ()
else
let chunk =
Format.sprintf "%x\r\n%s\r\n" (IO.Bytes.length buf)
(IO.Bytes.to_string buf)
Format.sprintf "%x\r\n%s\r\n" (Bytestring.length buf)
(Bytestring.to_string buf)
in
debug (fun f -> f "sending chunk: %S" chunk);
let chunk = IO.Bytes.of_string chunk in
let chunk = Bytestring.of_string chunk in
let _ = Atacama.Connection.send conn chunk in
()

let close_chunk conn =
let chunk = IO.Bytes.of_string "0\r\n\r\n" in
let chunk = Bytestring.of_string "0\r\n\r\n" in
let _ = Atacama.Connection.send conn chunk in
()

Expand All @@ -222,7 +227,7 @@ let send_file conn (req : Request.t) (res : Response.t) ?off ?len ~path () =
let headers =
Http.Header.replace res.headers "content-length" (Int.to_string len)
in
let res = { res with headers; body = IO.Bytes.with_capacity 0 } in
let res = { res with headers; body = Bytestring.empty } in
let _ = send conn req res in
if
req.meth != `HEAD && res.status != `No_content
Expand All @@ -235,7 +240,7 @@ let close conn (req : Request.t) (res : Response.t) =
if req.meth = `HEAD then ()
else if res.status = `No_content then ()
else
let _ = Atacama.Connection.send conn (IO.Bytes.of_string "0\r\n\r\n") in
let _ = Atacama.Connection.send conn (Bytestring.of_string "0\r\n\r\n") in
()

open Trail
Expand All @@ -245,12 +250,12 @@ let rec read_body ?limit ?(read_size = 1_024 * 1_024) conn (req : Request.t) =
| Http.Transfer.Chunked -> (
debug (fun f -> f "reading chunked body");
match
read_chunked_body ~read_size ~buffer:req.buffer
~body:(IO.Bytes.with_capacity 0) conn req
read_chunked_body ~read_size ~buffer:req.buffer ~body:Bytestring.empty
conn req
with
| Ok (body, buffer) ->
debug (fun f ->
f "read chunked_body: buffer=%d" (IO.Bytes.length buffer));
f "read chunked_body: buffer=%d" (Bytestring.length buffer));
Adapter.Ok ({ req with buffer }, body)
| Error reason -> Adapter.Error (req, reason))
| _ -> (
Expand All @@ -259,9 +264,9 @@ let rec read_body ?limit ?(read_size = 1_024 * 1_024) conn (req : Request.t) =
| Ok (body, buffer, body_remaining) ->
debug (fun f ->
f "read chunked_body: body_remaning=%d buffer=%d" body_remaining
(IO.Bytes.length buffer));
(Bytestring.length buffer));
let req = { req with buffer; body_remaining } in
if body_remaining = 0 && IO.Bytes.length buffer = 0 then (
if body_remaining = 0 && Bytestring.length buffer = 0 then (
debug (fun f -> f "read chunked_body: ok");
let req = { req with buffer; body_remaining = -1 } in
Adapter.Ok (req, body))
Expand All @@ -271,26 +276,26 @@ let rec read_body ?limit ?(read_size = 1_024 * 1_024) conn (req : Request.t) =
| Error reason -> Adapter.Error (req, reason))

and read_chunked_body ~read_size ~buffer ~body conn req =
let parts = IO.Bytes.split ~max:1 buffer ~on:"\r\n" in
debug (fun f -> f "body_size: %d" (IO.Bytes.length body));
debug (fun f -> f "buffer: %d" (IO.Bytes.length buffer));
let parts = split buffer in
debug (fun f -> f "body_size: %d" (Bytestring.length body));
debug (fun f -> f "buffer: %d" (Bytestring.length buffer));
debug (fun f ->
f "total_read: %d" (IO.Bytes.length buffer + IO.Bytes.length body));
f "total_read: %d" (Bytestring.length buffer + Bytestring.length body));
debug (fun f ->
match parts with
| size :: _ -> f "chunk_size: 0x%s" (IO.Bytes.to_string size)
| size :: _ -> f "chunk_size: 0x%s" (Bytestring.to_string size)
| _ -> ());

match parts with
| [ zero; _ ] when String.equal (IO.Bytes.to_string zero) "0" ->
| [ zero; _ ] when String.equal (Bytestring.to_string zero) "0" ->
debug (fun f -> f "read_chunked_body: last chunk!");
Ok (body, buffer)
| [ chunk_size; chunk_data ] -> (
let chunk_size =
Int64.(of_string ("0x" ^ IO.Bytes.to_string chunk_size) |> to_int)
Int64.(of_string ("0x" ^ Bytestring.to_string chunk_size) |> to_int)
in
debug (fun f -> f "read_chunked_body: chunk_size=%d" chunk_size);
let binstr_data = IO.Bytes.to_string chunk_data in
let binstr_data = Bytestring.to_string chunk_data in
debug (fun f ->
f "read_chunked_body: (%d bytes)" (String.length binstr_data));
let binstr_data = binstr_data |> Bitstring.bitstring_of_string in
Expand All @@ -302,59 +307,63 @@ and read_chunked_body ~read_size ~buffer ~body conn req =
debug (fun f -> f "read_chunked_body: read full chunk");
debug (fun f ->
f "read_chunked_body: rest=%d" (Bitstring.bitstring_length rest));
let rest = IO.Bytes.of_string (Bitstring.string_of_bitstring rest) in
let next_chunk = IO.Bytes.of_string next_chunk in
let body = IO.Bytes.join body next_chunk in
let rest =
Bytestring.of_string (Bitstring.string_of_bitstring rest)
in
let next_chunk = Bytestring.of_string next_chunk in
let body = Bytestring.join body next_chunk in
read_chunked_body ~read_size ~buffer:rest ~body conn req
| {| _ |} ->
let left_to_read = chunk_size - IO.Bytes.length chunk_data in
let left_to_read = chunk_size - Bytestring.length chunk_data in
debug (fun f ->
f "read_chunked_body: reading more data left_to_read=%d"
left_to_read);
let* chunk =
if left_to_read > 0 then read ~to_read:left_to_read ~read_size conn
else Atacama.Connection.receive conn
in
let buffer = IO.Bytes.join buffer chunk in
let buffer = Bytestring.join buffer chunk in
read_chunked_body ~read_size ~buffer ~body conn req)
| _ ->
debug (fun f -> f "read_chunked_body: need more data");
let* chunk = Atacama.Connection.receive conn in
let buffer = IO.Bytes.join buffer chunk in
let buffer = Bytestring.join buffer chunk in
read_chunked_body ~read_size ~buffer ~body conn req

and read_content_length_body ?limit ~read_size conn req =
let buffer = req.buffer in
let limit = Option.value ~default:req.body_remaining limit in
let to_read = limit - IO.Bytes.length buffer in
let to_read = limit - Bytestring.length buffer in
debug (fun f ->
f "read_content_length_body: up to limit=%d with preread_buffer=%d" limit
(IO.Bytes.length buffer));
(Bytestring.length buffer));
match req.body_remaining with
| n when n < 0 || to_read < 0 ->
debug (fun f -> f "read_content_length_body: excess body");
Error `Excess_body_read
| 0 when IO.Bytes.length buffer >= limit ->
| 0 when Bytestring.length buffer >= limit ->
debug (fun f -> f "read_content_length_body: can answer with buffer");
let len = Int.min limit (IO.Bytes.length buffer) in
let body = IO.Bytes.sub ~pos:0 ~len buffer in
Ok (body, IO.Bytes.empty, 0)
let len = Int.min limit (Bytestring.length buffer) in
let body = Bytestring.sub ~off:0 ~len buffer in
Ok (body, Bytestring.empty, 0)
| remaining_bytes ->
let to_read = Int.min (limit - IO.Bytes.length buffer) remaining_bytes in
let to_read =
Int.min (limit - Bytestring.length buffer) remaining_bytes
in
debug (fun f -> f "read_content_length_body: need to read %d" to_read);
let* chunk = read ~to_read ~read_size conn in
let body = IO.Bytes.join buffer chunk in
let body_remaining = remaining_bytes - IO.Bytes.length body in
Ok (body, IO.Bytes.empty, body_remaining)
let body = Bytestring.join buffer chunk in
let body_remaining = remaining_bytes - Bytestring.length body in
Ok (body, Bytestring.empty, body_remaining)

and read ~read_size ~to_read ?(buffer = IO.Bytes.empty) conn =
if to_read = 0 then Ok IO.Bytes.empty
and read ~read_size ~to_read ?(buffer = Bytestring.empty) conn =
if to_read = 0 then Ok Bytestring.empty
else
let* chunk = Atacama.Connection.receive ~limit:to_read ~read_size conn in
let remaining_bytes = to_read - IO.Bytes.length chunk in
let buffer = IO.Bytes.join buffer chunk in
let remaining_bytes = to_read - Bytestring.length chunk in
let buffer = Bytestring.join buffer chunk in
debug (fun f -> f "read: remaining_bytes %d" remaining_bytes);
debug (fun f -> f "read: buffer=%d" (IO.Bytes.length buffer));
debug (fun f -> f "read: buffer=%d" (Bytestring.length buffer));
if remaining_bytes > 0 then
read ~read_size ~to_read:remaining_bytes ~buffer conn
else Ok buffer
2 changes: 1 addition & 1 deletion nomad/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(public_name nomad)
(name nomad)
(preprocess
(pps ppx_bitstring))
(pps ppx_bitstring bytestring.ppx))
(libraries
trail
atacama
Expand Down
Loading

0 comments on commit 040ae22

Please sign in to comment.