Skip to content

Commit

Permalink
Merge pull request #302 from bikallem/with_connect
Browse files Browse the repository at this point in the history
net: add Net.with_tcp_connect
  • Loading branch information
talex5 authored Sep 27, 2022
2 parents 3af12b9 + bd160c9 commit f1e18c3
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 5 deletions.
22 changes: 22 additions & 0 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ exception Connection_reset of exn
(** This is a wrapper for EPIPE, ECONNRESET and similar errors.
It indicates that the flow has failed, and data may have been lost. *)

exception Connection_failure of exn

module Ipaddr = struct
type 'a t = string (* = [Unix.inet_addr], but avoid a Unix dependency here *)
Expand Down Expand Up @@ -208,3 +209,24 @@ let getaddrinfo_datagram ?service t hostname =
let getnameinfo (t:#t) sockaddr = t#getnameinfo sockaddr

let close = Flow.close

let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
Switch.run @@ fun sw ->
let rec aux = function
| [] -> raise (Connection_failure (Failure (Fmt.str "No TCP addresses for %S" host)))
| addr :: addrs ->
match Time.Timeout.run_exn timeout (fun () -> connect ~sw t addr) with
| conn -> f conn
| exception (Time.Timeout | Connection_failure _) when addrs <> [] ->
aux addrs
| exception (Connection_failure _ as ex) ->
raise ex
| exception (Time.Timeout as ex) ->
raise (Connection_failure ex)
in
getaddrinfo_stream ~service t host
|> List.filter_map (function
| `Tcp _ as x -> Some x
| `Unix _ -> None
)
|> aux
29 changes: 28 additions & 1 deletion lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*)

exception Connection_reset of exn
exception Connection_failure of exn

(** IP addresses. *)
module Ipaddr : sig
Expand Down Expand Up @@ -113,7 +114,33 @@ end
val connect : sw:Switch.t -> #t -> Sockaddr.stream -> <stream_socket; Flow.close>
(** [connect ~sw t addr] is a new socket connected to remote address [addr].
The new socket will be closed when [sw] finishes, unless closed manually first. *)
The new socket will be closed when [sw] finishes, unless closed manually first.
@raise Connection_failure if connection couldn't be established. *)

val with_tcp_connect :
?timeout:Time.Timeout.t ->
host:string ->
service:string ->
#t ->
(<stream_socket; Flow.close> -> 'b) ->
'b
(** [with_tcp_connect ~host ~service t f] creates a tcp connection [conn] to [host] and [service] and executes
[f conn].
[conn] is closed after [f] returns (if it isn't already closed by then).
[host] is either an IP address or a domain name, eg. "www.example.org", "www.ocaml.org" or "127.0.0.1".
[service] is an IANA recognized service name or port number, eg. "http", "ftp", "8080" etc.
See https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml.
Addresses are tried in the order they are returned by {!getaddrinfo}, until one succeeds.
@param timeout Limits how long to wait for each connection attempt before moving on to the next.
By default there is no timeout (beyond what the underlying network does).
@raise Connection_failure A connection couldn't be established for any of the addresses defined for [host]. *)

(** {2 Incoming Connections} *)

Expand Down
3 changes: 2 additions & 1 deletion lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ module Low_level = struct
let res = enter (enqueue_connect fd addr) in
Log.debug (fun l -> l "connect returned");
if res < 0 then (
raise (Unix.Unix_error (Uring.error_of_errno res, "connect", ""))
let ex = Unix.Unix_error (Uring.error_of_errno res, "connect", "") in
raise (Eio.Net.Connection_failure ex)
)

let send_msg fd ?(fds=[]) ?dst buf =
Expand Down
7 changes: 4 additions & 3 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,9 @@ module Low_level = struct

let connect_pipe ~sw path =
let sock = Luv.Pipe.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in
await_exn (fun _loop _fiber -> Luv.Pipe.connect (Handle.get "connect" sock) path);
sock
match await (fun _loop _fiber -> Luv.Pipe.connect (Handle.get "connect" sock) path) with
| Ok () -> sock
| Error e -> raise (Eio.Net.Connection_failure (Luv_error e))

let connect_tcp ~sw addr =
let sock = Luv.TCP.init ~loop:(get_loop ()) () |> or_raise in
Expand All @@ -503,7 +504,7 @@ module Low_level = struct
Luv.Handle.close sock ignore;
match Fiber_context.get_error k.fiber with
| Some ex -> enqueue_failed_thread st k ex
| None -> enqueue_failed_thread st k (Luv_error e)
| None -> enqueue_failed_thread st k (Eio.Net.Connection_failure (Luv_error e))
);
Fiber_context.set_cancel_fn k.fiber (fun _ex ->
match Luv.Handle.fileno sock with
Expand Down
149 changes: 149 additions & 0 deletions tests/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,20 @@ EPIPE:
- : unit = ()
```

Connection refused:

```ocaml
# Eio_main.run @@ fun env ->
Switch.run @@ fun sw ->
try
ignore (Eio.Net.connect ~sw env#net (`Unix "idontexist.sock"));
assert false
with Eio.Net.Connection_failure _ ->
traceln "Connection failure";;
+Connection failure
- : unit = ()
```

## Shutdown

```ocaml
Expand Down Expand Up @@ -546,3 +560,138 @@ EPIPE:
Eio.Net.getnameinfo env#net sockaddr;;
- : string * string = ("localhost", "http")
```

## with_tcp_connet

```ocaml
let net = Eio_mock.Net.make "mock-net"
let addr1 = `Tcp (Eio.Net.Ipaddr.V4.loopback, 80)
let addr2 = `Tcp (Eio.Net.Ipaddr.of_raw "\001\002\003\004", 8080)
let connection_failure = Eio.Net.Connection_failure (Failure "Simulated connection failure")
```

No usable addresses:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Net.on_getaddrinfo net [`Return [`Unix "/foo"]];
Eio.Net.with_tcp_connect ~host:"www.example.com" ~service:"http" net (fun _ -> assert false);;
+mock-net: getaddrinfo ~service:http www.example.com
Exception:
Eio__Net.Connection_failure
(Failure "No TCP addresses for \"www.example.com\"").
```

First address works:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]];
let mock_flow = Eio_mock.Flow.make "flow" in
Eio_mock.Net.on_connect net [`Return mock_flow];
Eio.Net.with_tcp_connect ~host:"www.example.com" ~service:"http" net (fun conn ->
let req = "GET / HTTP/1.1\r\nHost:www.example.com:80\r\n\r\n" in
Eio.Flow.copy_string req conn
);;
+mock-net: getaddrinfo ~service:http www.example.com
+mock-net: connect to tcp:127.0.0.1:80
+flow: wrote "GET / HTTP/1.1\r\n"
+ "Host:www.example.com:80\r\n"
+ "\r\n"
+flow: closed
- : unit = ()
```

Second address works:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]];
let mock_flow = Eio_mock.Flow.make "flow" in
Eio_mock.Net.on_connect net [`Raise connection_failure;
`Return mock_flow];
Eio.Net.with_tcp_connect ~host:"www.example.com" ~service:"http" net (fun conn ->
let req = "GET / HTTP/1.1\r\nHost:www.example.com:80\r\n\r\n" in
Eio.Flow.copy_string req conn
);;
+mock-net: getaddrinfo ~service:http www.example.com
+mock-net: connect to tcp:127.0.0.1:80
+mock-net: connect to tcp:1.2.3.4:8080
+flow: wrote "GET / HTTP/1.1\r\n"
+ "Host:www.example.com:80\r\n"
+ "\r\n"
+flow: closed
- : unit = ()
```

Both addresses fail:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]];
Eio_mock.Net.on_connect net [`Raise connection_failure; `Raise connection_failure];
Eio.Net.with_tcp_connect ~host:"www.example.com" ~service:"http" net (fun _ -> assert false);;
+mock-net: getaddrinfo ~service:http www.example.com
+mock-net: connect to tcp:127.0.0.1:80
+mock-net: connect to tcp:1.2.3.4:8080
Exception:
Eio__Net.Connection_failure (Failure "Simulated connection failure").
```

First attempt times out:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let clock = Eio_mock.Clock.make () in
let timeout = Eio.Time.Timeout.of_s clock 10. in
Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]];
let mock_flow = Eio_mock.Flow.make "flow" in
Eio_mock.Net.on_connect net [`Run Fiber.await_cancel; `Return mock_flow];
Fiber.both
(fun () ->
Eio.Net.with_tcp_connect ~timeout ~host:"www.example.com" ~service:"http" net (fun conn ->
let req = "GET / HTTP/1.1\r\nHost:www.example.com:80\r\n\r\n" in
Eio.Flow.copy_string req conn
)
)
(fun () ->
Eio_mock.Clock.advance clock
);;
+mock-net: getaddrinfo ~service:http www.example.com
+mock-net: connect to tcp:127.0.0.1:80
+mock time is now 10
+mock-net: connect to tcp:1.2.3.4:8080
+flow: wrote "GET / HTTP/1.1\r\n"
+ "Host:www.example.com:80\r\n"
+ "\r\n"
+flow: closed
- : unit = ()
```

Both attempts time out:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let clock = Eio_mock.Clock.make () in
let timeout = Eio.Time.Timeout.of_s clock 10. in
Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]];
Eio_mock.Net.on_connect net [`Run Fiber.await_cancel; `Run Fiber.await_cancel];
Fiber.both
(fun () ->
Eio.Net.with_tcp_connect ~timeout ~host:"www.example.com" ~service:"http" net (fun _ ->
assert false
)
)
(fun () ->
Eio_mock.Clock.advance clock;
Fiber.yield ();
Fiber.yield ();
Eio_mock.Clock.advance clock
);;
+mock-net: getaddrinfo ~service:http www.example.com
+mock-net: connect to tcp:127.0.0.1:80
+mock time is now 10
+mock-net: connect to tcp:1.2.3.4:8080
+mock time is now 20
Exception: Eio__Net.Connection_failure Eio__Time.Timeout.
```

0 comments on commit f1e18c3

Please sign in to comment.