From 4d78d83a2934993c54ddfccf330ac86138b63801 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 17 Jun 2022 09:56:29 +0100 Subject: [PATCH] cohttp-eio: more flexible server API - Use `Eio.Buf_write` from Eio 0.4 for writing. - Expose client address. This is allows writing logging middleware. - Provide `Server.connection_handler`, to let people use their own accept loop if they prefer. - Remove switch argument from `Server.run`. Taking a switch makes it look like the function create resources that outlive it, but it doesn't, so there's no need for this. Also, change the return type to `'a` to show that the function never returns. - Add some test cases using the new `Eio_mock` library. - More precise type for `Server.run`. This allows `Server.run` to be used on platforms which don't provide the entire standard environment, but do provide the bits needed by `run`. --- CHANGES.md | 1 + cohttp-eio.opam | 3 +- cohttp-eio/examples/server1.ml | 5 +- cohttp-eio/src/body.ml | 28 ++++--- cohttp-eio/src/cohttp_eio.mli | 34 ++++---- cohttp-eio/src/server.ml | 76 +++++++++--------- cohttp-eio/src/writer.ml | 72 ----------------- cohttp-eio/tests/dune | 4 + cohttp-eio/tests/server.md | 109 ++++++++++++++++++++++++++ cohttp-eio/tests/test_chunk_server.ml | 7 +- cohttp-eio/tests/test_server.ml | 27 ++----- dune-project | 5 +- 12 files changed, 205 insertions(+), 166 deletions(-) delete mode 100644 cohttp-eio/src/writer.ml create mode 100644 cohttp-eio/tests/server.md diff --git a/CHANGES.md b/CHANGES.md index ddbd8c02c0..6827e5ae8c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,5 @@ ## Unreleased +- cohttp-eio: use Eio.Buf_write and improve server API (talex5 #887) - cohttp-eio: update to Eio 0.3 (talex5 #886) - cohttp-eio: convert to Eio.Buf_read (talex5 #882) - cohttp lwt client: Connection cache and explicit pipelining (madroach #853) diff --git a/cohttp-eio.opam b/cohttp-eio.opam index 2a1b970fa8..fa6e487aa1 100644 --- a/cohttp-eio.opam +++ b/cohttp-eio.opam @@ -21,12 +21,13 @@ bug-reports: "https://github.com/mirage/ocaml-cohttp/issues" depends: [ "dune" {>= "2.9"} "base-domains" - "eio" {>= "0.3"} + "eio" {>= "0.4"} "eio_main" {with-test} "uri" {with-test} "cstruct" "bigstringaf" "fmt" + "mdx" {with-test} "eio_main" {with-test} "http" {= version} "odoc" {with-doc} diff --git a/cohttp-eio/examples/server1.ml b/cohttp-eio/examples/server1.ml index 10e781ec9c..306212c0b3 100644 --- a/cohttp-eio/examples/server1.ml +++ b/cohttp-eio/examples/server1.ml @@ -30,7 +30,7 @@ let text = open Cohttp_eio -let app (req, _reader) = +let app (req, _reader, _client_addr) = match Http.Request.resource req with | "/" -> Server.text_response text | "/html" -> Server.html_response text @@ -42,5 +42,4 @@ let () = [ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ] ignore "An HTTP/1.1 server"; - Eio_main.run @@ fun env -> - Eio.Switch.run @@ fun sw -> Server.run ~port:!port env sw app + Eio_main.run @@ fun env -> Server.run ~port:!port env app diff --git a/cohttp-eio/src/body.ml b/cohttp-eio/src/body.ml index 94a2d77bfc..ef2f571f69 100644 --- a/cohttp-eio/src/body.ml +++ b/cohttp-eio/src/body.ml @@ -1,7 +1,9 @@ +module Write = Eio.Buf_write + type t = | Fixed of string | Chunked of chunk_writer - | Custom of (Eio.Flow.sink -> unit) + | Custom of (Write.t -> unit) | Empty and chunk_writer = { @@ -228,10 +230,10 @@ let read_chunked reader headers f = let write_headers t headers = Http.Header.iter (fun k v -> - Writer.write_string t k; - Writer.write_string t ": "; - Writer.write_string t v; - Writer.write_string t "\r\n") + Write.string t k; + Write.string t ": "; + Write.string t v; + Write.string t "\r\n") headers (* https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 *) @@ -242,21 +244,21 @@ let write_chunked t chunk_writer = let v = match value with None -> "" | Some v -> Printf.sprintf "=%s" v in - Writer.write_string t (Printf.sprintf ";%s%s" name v)) + Write.string t (Printf.sprintf ";%s%s" name v)) exts in let write_body = function | Chunk { size; data; extensions = exts } -> - Writer.write_string t (Printf.sprintf "%X" size); + Write.string t (Printf.sprintf "%X" size); write_extensions exts; - Writer.write_string t "\r\n"; - Writer.write_string t data; - Writer.write_string t "\r\n" + Write.string t "\r\n"; + Write.string t data; + Write.string t "\r\n" | Last_chunk exts -> - Writer.write_string t "0"; + Write.string t "0"; write_extensions exts; - Writer.write_string t "\r\n" + Write.string t "\r\n" in chunk_writer.body_writer write_body; chunk_writer.trailer_writer (write_headers t); - Writer.write_string t "\r\n" + Write.string t "\r\n" diff --git a/cohttp-eio/src/cohttp_eio.mli b/cohttp-eio/src/cohttp_eio.mli index 63fef2ad96..737d1b8068 100644 --- a/cohttp-eio/src/cohttp_eio.mli +++ b/cohttp-eio/src/cohttp_eio.mli @@ -2,7 +2,7 @@ module Body : sig type t = | Fixed of string | Chunked of chunk_writer - | Custom of (Eio.Flow.sink -> unit) + | Custom of (Eio.Buf_write.t -> unit) | Empty and chunk_writer = { @@ -28,25 +28,27 @@ end (** [Server] is a HTTP 1.1 server. *) module Server : sig - type middleware = handler -> handler - and handler = request -> response - and request = Http.Request.t * Eio.Buf_read.t - and response = Http.Response.t * Body.t + type request = Http.Request.t * Eio.Buf_read.t * Eio.Net.Sockaddr.stream + (** The request headers, a reader for the socket, and the address of the + client. To read the request body, use {!read_fixed} or {!read_chunked}. *) + + type response = Http.Response.t * Body.t + type handler = request -> response (** {1 Request} *) - val read_fixed : request -> string - (** [read_fixed (request,reader)] is bytes of length [n] if "Content-Length" - header is a valid integer value [n] in [request]. [reader] is updated to - reflect that [n] bytes was read. + val read_fixed : Http.Request.t -> Eio.Buf_read.t -> string + (** [read_fixed request body] reads a string of length [n] if "Content-Length" + header is a valid integer value [n] in [request]. @raise Invalid_argument if ["Content-Length"] header is missing or is an invalid value in [headers] OR if the request http method is not one of [POST], [PUT] or [PATCH]. *) - val read_chunked : request -> (Body.chunk -> unit) -> Http.Header.t - (** [read_chunked request chunk_handler] is [updated_headers] if + val read_chunked : + Http.Request.t -> Eio.Buf_read.t -> (Body.chunk -> unit) -> Http.Header.t + (** [read_chunked request body chunk_handler] is [updated_headers] if "Transfer-Encoding" header value is "chunked" in [headers] and all chunks in [reader] are read successfully. [updated_headers] is the updated headers as specified by the chunked encoding algorithm in @@ -81,10 +83,14 @@ module Server : sig ?socket_backlog:int -> ?domains:int -> port:int -> - Eio.Stdenv.t -> - Eio.Switch.t -> + < domain_mgr : Eio.Domain_manager.t ; net : Eio.Net.t ; .. > -> handler -> - unit + 'a + + val connection_handler : + handler -> #Eio.Net.stream_socket -> Eio.Net.Sockaddr.stream -> unit + (** [connection_handler request_handler] is a connection handler, suitable for + passing to {!Eio.Net.accept_fork}. *) (** {1 Basic Handlers} *) diff --git a/cohttp-eio/src/server.ml b/cohttp-eio/src/server.ml index f62691166e..adb971aacd 100644 --- a/cohttp-eio/src/server.ml +++ b/cohttp-eio/src/server.ml @@ -1,8 +1,9 @@ open Eio.Std +module Write = Eio.Buf_write type middleware = handler -> handler and handler = request -> response -and request = Http.Request.t * Eio.Buf_read.t +and request = Http.Request.t * Eio.Buf_read.t * Eio.Net.Sockaddr.stream and response = Http.Response.t * Body.t let domain_count = @@ -12,8 +13,8 @@ let domain_count = (* Request *) -let read_fixed ((request, reader) : request) = - match request.meth with +let read_fixed request reader = + match Http.Request.meth request with | `POST | `PUT | `PATCH -> Body.read_fixed reader request.headers | _ -> let err = @@ -23,8 +24,8 @@ let read_fixed ((request, reader) : request) = in raise @@ Invalid_argument err -let read_chunked : request -> (Body.chunk -> unit) -> Http.Header.t = - fun (request, reader) f -> Body.read_chunked reader request.headers f +let read_chunked request reader f = + Body.read_chunked reader (Http.Request.headers request) f (* Responses *) @@ -64,64 +65,61 @@ let internal_server_error_response = let bad_request_response = (Http.Response.make ~status:`Bad_request (), Body.Empty) -let write_response (writer : Writer.t) +let write_response (writer : Write.t) ((response, body) : Http.Response.t * Body.t) = let version = Http.Version.to_string response.version in let status = Http.Status.to_string response.status in - Writer.write_string writer version; - Writer.write_string writer " "; - Writer.write_string writer status; - Writer.write_string writer "\r\n"; + Write.string writer version; + Write.string writer " "; + Write.string writer status; + Write.string writer "\r\n"; Body.write_headers writer response.headers; - Writer.write_string writer "\r\n"; + Write.string writer "\r\n"; match body with - | Fixed s -> Writer.write_string writer s + | Fixed s -> Write.string writer s | Chunked chunk_writer -> Body.write_chunked writer chunk_writer - | Custom f -> - Writer.wakeup writer; - f (writer.sink :> Eio.Flow.sink) + | Custom f -> f writer | Empty -> () (* main *) -let rec handle_request reader writer flow handler = +let rec handle_request client_addr reader writer flow handler = match Reader.http_request reader with | request -> - let response, body = handler (request, reader) in + let response, body = handler (request, reader, client_addr) in write_response writer (response, body); - (* A custom response needs to write the main response before calling - the custom function for the body. Response.write wakes the writer for - us if that is the case. *) - if not (is_custom body) then Writer.wakeup writer; if Http.Request.is_keep_alive request then - handle_request reader writer flow handler + handle_request client_addr reader writer flow handler | (exception End_of_file) | (exception Eio.Net.Connection_reset _) -> () - | exception Failure _e -> + | exception (Failure _ as ex) -> write_response writer bad_request_response; - Writer.wakeup writer - | exception _ -> + raise ex + | exception ex -> write_response writer internal_server_error_response; - Writer.wakeup writer + raise ex + +let connection_handler (handler : handler) flow client_addr = + let reader = + Eio.Buf_read.of_flow ~initial_size:0x1000 ~max_size:max_int flow + in + Write.with_flow flow (fun writer -> + handle_request client_addr reader writer flow handler) let run_domain ssock handler = let on_error exn = Printf.fprintf stderr "Error handling connection: %s\n%!" (Printexc.to_string exn) in + let handler = connection_handler handler in Switch.run (fun sw -> - while true do - Eio.Net.accept_fork ~sw ssock ~on_error (fun flow _addr -> - Eio.Switch.run @@ fun sw -> - let reader = - Eio.Buf_read.of_flow ~initial_size:0x1000 ~max_size:max_int - (flow :> Eio.Flow.source) - in - let writer = Writer.create (flow :> Eio.Flow.sink) in - Eio.Fiber.fork ~sw (fun () -> Writer.run writer); - handle_request reader writer flow handler) - done) - -let run ?(socket_backlog = 128) ?(domains = domain_count) ~port env sw handler = + let rec loop () = + Eio.Net.accept_fork ~sw ssock ~on_error handler; + loop () + in + loop ()) + +let run ?(socket_backlog = 128) ?(domains = domain_count) ~port env handler = + Switch.run @@ fun sw -> let domain_mgr = Eio.Stdenv.domain_mgr env in let ssock = Eio.Net.listen (Eio.Stdenv.net env) ~sw ~reuse_addr:true ~reuse_port:true diff --git a/cohttp-eio/src/writer.ml b/cohttp-eio/src/writer.ml deleted file mode 100644 index fb8f0de3b1..0000000000 --- a/cohttp-eio/src/writer.ml +++ /dev/null @@ -1,72 +0,0 @@ -(*---------------------------------------------------------------------------- - Copyright (c) 2017 Inhabited Type LLC. - All rights reserved. - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions - are met: - 1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - 3. Neither the name of the author nor the names of his contributors - may be used to endorse or promote products derived from this software - without specific prior written permission. - THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS - OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR - ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - POSSIBILITY OF SUCH DAMAGE. - ----------------------------------------------------------------------------*) -module Optional_thunk : sig - type t - - val none : t - val some : (unit -> unit) -> t - val call_if_some : t -> unit -end = struct - type t = unit -> unit - - let none = Sys.opaque_identity (fun () -> ()) - - let some f = - if f == none then - failwith - "Optional_thunk: this function is not representable as a some value"; - f - - let call_if_some t = t () -end - -type t = { - sink : Eio.Flow.sink; - buf : Buffer.t; - mutable wakeup : Optional_thunk.t; -} - -let create sink = - let buf = Buffer.create 0x1000 in - { sink; buf; wakeup = Optional_thunk.none } - -let wakeup t = - let f = t.wakeup in - t.wakeup <- Optional_thunk.none; - Optional_thunk.call_if_some f - -let write_string t s = Buffer.add_string t.buf s - -let run t = - let rec loop () = - if Buffer.length t.buf > 0 then ( - Eio.Flow.copy_string (Buffer.contents t.buf) t.sink; - Buffer.clear t.buf; - loop ()) - else t.wakeup <- Optional_thunk.some loop - in - loop () diff --git a/cohttp-eio/tests/dune b/cohttp-eio/tests/dune index 075a372000..c7dfde49c0 100644 --- a/cohttp-eio/tests/dune +++ b/cohttp-eio/tests/dune @@ -13,6 +13,10 @@ (modules crlf) (libraries unix)) +(mdx + (package cohttp-eio) + (packages cohttp-eio)) + (env (_ (binaries diff --git a/cohttp-eio/tests/server.md b/cohttp-eio/tests/server.md new file mode 100644 index 0000000000..aa3ede80cd --- /dev/null +++ b/cohttp-eio/tests/server.md @@ -0,0 +1,109 @@ +## Setup + +```ocaml +# #require "eio.mock";; +# #require "cohttp-eio";; +``` + +```ocaml +open Eio.Std +open Cohttp_eio +``` + +A mock socket for testing: + +```ocaml +let socket = Eio_mock.Flow.make "socket" +``` + +## Example request handler + +```ocaml +let chunk data = Body.Chunk { size = String.length data; data; extensions = [] } +let end_chunks = Body.Last_chunk [] + +let stream_response () = + let headers = Http.Header.init () in + let headers = Http.Header.add_transfer_encoding headers Http.Transfer.Chunked in + let body_writer fn = fn (chunk "Hello"); Fiber.yield (); traceln "Resuming..."; fn (chunk "World"); fn end_chunks in + let trailer_writer _fn = () in + let body = Body.Chunked { body_writer; trailer_writer } in + Http.Response.make ~version:`HTTP_1_1 ~status:`OK ~headers (), body + +let app (req, _body, _client_addr) = + match Http.Request.resource req with + | "/" -> Server.text_response "root" + | "/stream" -> stream_response () + | _ -> Server.not_found_response + +let connection_handler = Server.connection_handler app +``` + +To test it, we run the connection handler with our mock socket: + +```ocaml +let run test_case = + Eio_mock.Backend.run @@ fun () -> + Fiber.both test_case + (fun () -> + connection_handler socket (`Unix "test-socket") + );; +``` + +## Tests + +Asking for the root: + +```ocaml +# run @@ fun () -> + Eio_mock.Flow.on_read socket [ + `Return "GET / HTTP/1.1\r\n\r\n"; + `Raise End_of_file; + ];; ++socket: read "GET / HTTP/1.1\r\n" ++ "\r\n" ++socket: wrote "HTTP/1.1 200 OK\r\n" ++ "content-length: 4\r\n" ++ "content-type: text/plain; charset=UTF-8\r\n" ++ "\r\n" ++ "root" +- : unit = () +``` + +A missing page: + +```ocaml +# run @@ fun () -> + Eio_mock.Flow.on_read socket [ + `Return "GET /missing HTTP/1.1\r\n\r\n"; + `Raise End_of_file; + ] ;; ++socket: read "GET /missing HTTP/1.1\r\n" ++ "\r\n" ++socket: wrote "HTTP/1.1 404 Not Found\r\n" ++ "\r\n" +- : unit = () +``` + +Streaming a response: + +```ocaml +# run @@ fun () -> + Eio_mock.Flow.on_read socket [ + `Return "GET /stream HTTP/1.1\r\n\r\n"; + `Raise End_of_file; + ];; ++socket: read "GET /stream HTTP/1.1\r\n" ++ "\r\n" ++socket: wrote "HTTP/1.1 200 OK\r\n" ++ "transfer-encoding: chunked\r\n" ++ "\r\n" ++ "5\r\n" ++ "Hello\r\n" ++Resuming... ++socket: wrote "5\r\n" ++ "World\r\n" ++ "0\r\n" ++ "\r\n" +- : unit = () +``` diff --git a/cohttp-eio/tests/test_chunk_server.ml b/cohttp-eio/tests/test_chunk_server.ml index b9edad9be4..72e9c8c7e1 100644 --- a/cohttp-eio/tests/test_chunk_server.ml +++ b/cohttp-eio/tests/test_chunk_server.ml @@ -5,11 +5,11 @@ let dump_chunk buf chunk = let s = Format.asprintf "\n%a" Body.pp_chunk chunk in Buffer.add_string buf s -let app (req, reader) = +let app (req, reader, _client_addr) = match Http.Request.resource req with | "/" -> ( let chunk_buf = Buffer.create 0 in - match Server.read_chunked (req, reader) (dump_chunk chunk_buf) with + match Server.read_chunked req reader (dump_chunk chunk_buf) with | headers -> let req = { req with headers } in Buffer.contents chunk_buf @@ -24,5 +24,4 @@ let () = [ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ] ignore "An HTTP/1.1 server"; - Eio_main.run @@ fun env -> - Eio.Switch.run @@ fun sw -> run ~port:!port env sw app + Eio_main.run @@ fun env -> run ~port:!port env app diff --git a/cohttp-eio/tests/test_server.ml b/cohttp-eio/tests/test_server.ml index 97aa33860b..1a35b19848 100644 --- a/cohttp-eio/tests/test_server.ml +++ b/cohttp-eio/tests/test_server.ml @@ -1,29 +1,18 @@ open Cohttp_eio -let read_body (req, reader) = - let body = Server.read_fixed (req, reader) in - let buf = Buffer.create 0 in - let fmt = Format.formatter_of_buffer buf in - Http.Request.pp fmt req; - Format.fprintf fmt "\n\n%s%!" body; - Server.text_response (Buffer.contents buf) +let read_body req reader = + let body = Server.read_fixed req reader in + Server.text_response @@ Fmt.str "%a\n\n%s" Http.Request.pp req body -let app (req, reader) = +let app (req, reader, _client_addr) = match Http.Request.resource req with - | "/get" -> - let buf = Buffer.create 0 in - let fmt = Format.formatter_of_buffer buf in - Http.Request.pp fmt req; - Format.fprintf fmt "%!"; - Server.text_response (Buffer.contents buf) + | "/get" -> Server.text_response (Fmt.to_to_string Http.Request.pp req) | "/get_error" -> ( try - let _ = Server.read_fixed (req, reader) in + let _ = Server.read_fixed req reader in assert false with Invalid_argument e -> Server.text_response e) - | "/post" -> read_body (req, reader) + | "/post" -> read_body req reader | _ -> Server.bad_request_response -let () = - Eio_main.run @@ fun env -> - Eio.Switch.run @@ fun sw -> Server.run ~port:8080 env sw app +let () = Eio_main.run @@ fun env -> Server.run ~port:8080 env app diff --git a/dune-project b/dune-project index 24cf8ed513..dff414491c 100644 --- a/dune-project +++ b/dune-project @@ -4,6 +4,8 @@ (license ISC) +(using mdx 0.1) + (cram enable) (maintainers "Anil Madhavapeddy ") @@ -337,12 +339,13 @@ should also be fine under Windows too. "A CoHTTP server and client implementation based on `eio` library. `cohttp-eio`features a multicore capable HTTP 1.1 server. The library promotes and is built with direct style of coding as opposed to a monadic.") (depends base-domains - (eio (>= 0.3)) + (eio (>= 0.4)) (eio_main :with-test) (uri :with-test) cstruct bigstringaf fmt + (mdx :with-test) (eio_main :with-test) (http (= :version))))