Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix channel resource leak on close of underlying flow; add exceptions to channel #119

Closed
wants to merge 8 commits into from
15 changes: 14 additions & 1 deletion _oasis
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ Library channel
Findlibname: channel
Modules: Channel
BuildDepends: io-page,
mirage-profile,
mirage-types,
ipaddr,
cstruct,
lwt
lwt,
lwt.syntax

Library dhcpv4
CompiledObject: best
Expand Down Expand Up @@ -298,3 +300,14 @@ Library "tcpip-stack-socket"
lwt.unix,
ipaddr.unix,
io-page.unix

Executable test_channel
Build$: flag(tests)
Path: lib_test
MainIs: test_channel.ml
ByteOpt: -g
BuildDepends: oUnit, lwt, lwt.unix, io-page.unix, tcpip.channel, mirage-flow

Test test_channel
Run$: flag(tests)
Command: $test_channel
32 changes: 31 additions & 1 deletion _tags
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# OASIS_START
# DO NOT EDIT (digest: f9fff6055d6ed99bbe8e141b101ae9f5)
# DO NOT EDIT (digest: f07509fc184dadc7227e522e969c33b0)
# Ignore VCS directories, you can use the same kind of rule outside
# OASIS_START/STOP if you want to exclude directories that contains
# useless stuff for the build process
Expand Down Expand Up @@ -67,6 +67,8 @@ true: annot, bin_annot
<channel/*.ml{,i,y}>: pkg_io-page
<channel/*.ml{,i,y}>: pkg_ipaddr
<channel/*.ml{,i,y}>: pkg_lwt
<channel/*.ml{,i,y}>: pkg_lwt.syntax
<channel/*.ml{,i,y}>: pkg_mirage-profile
<channel/*.ml{,i,y}>: pkg_mirage-types
# Library dhcpv4
"dhcp/dhcpv4.cmxs": use_dhcpv4
Expand Down Expand Up @@ -119,6 +121,7 @@ true: annot, bin_annot
"unix/tcpv6-socket.cmxs": use_tcpv6-socket
# Library tcpip-stack-unix
"unix/tcpip-stack-unix.cmxs": use_tcpip-stack-unix
<unix/*.ml{,i,y}>: pkg_lwt.syntax
<unix/*.ml{,i,y}>: pkg_mirage-clock-unix
<unix/*.ml{,i,y}>: pkg_mirage-console.unix
<unix/*.ml{,i,y}>: pkg_mirage-net-unix
Expand Down Expand Up @@ -157,6 +160,33 @@ true: annot, bin_annot
<unix/*.ml{,i,y}>: use_udp
<unix/*.ml{,i,y}>: use_udpv4-socket
<unix/*.ml{,i,y}>: use_udpv6-socket
# Executable test_channel
"lib_test/test_channel.byte": oasis_executable_test_channel_byte
<lib_test/*.ml{,i,y}>: oasis_executable_test_channel_byte
"lib_test/test_channel.byte": pkg_cstruct
"lib_test/test_channel.byte": pkg_io-page
"lib_test/test_channel.byte": pkg_io-page.unix
"lib_test/test_channel.byte": pkg_ipaddr
"lib_test/test_channel.byte": pkg_lwt
"lib_test/test_channel.byte": pkg_lwt.syntax
"lib_test/test_channel.byte": pkg_lwt.unix
"lib_test/test_channel.byte": pkg_mirage-flow
"lib_test/test_channel.byte": pkg_mirage-profile
"lib_test/test_channel.byte": pkg_mirage-types
"lib_test/test_channel.byte": pkg_oUnit
"lib_test/test_channel.byte": use_channel
<lib_test/*.ml{,i,y}>: pkg_cstruct
<lib_test/*.ml{,i,y}>: pkg_io-page
<lib_test/*.ml{,i,y}>: pkg_io-page.unix
<lib_test/*.ml{,i,y}>: pkg_ipaddr
<lib_test/*.ml{,i,y}>: pkg_lwt
<lib_test/*.ml{,i,y}>: pkg_lwt.syntax
<lib_test/*.ml{,i,y}>: pkg_lwt.unix
<lib_test/*.ml{,i,y}>: pkg_mirage-flow
<lib_test/*.ml{,i,y}>: pkg_mirage-profile
<lib_test/*.ml{,i,y}>: pkg_mirage-types
<lib_test/*.ml{,i,y}>: pkg_oUnit
<lib_test/*.ml{,i,y}>: use_channel
# OASIS_STOP
true: annot, bin_annot, principal, strict_sequence, debug
<tcp/pcb.ml>: pkg_cstruct.syntax
Expand Down
91 changes: 56 additions & 35 deletions channel/channel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ module Make(Flow:V1_LWT.FLOW) = struct
type +'a io = 'a Lwt.t
type 'a io_stream = 'a Lwt_stream.t

exception End_of_file (* at least one user understands this exception *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this comment means -- do you mean one external user is already expecting this exception to be thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, good catch! I think that might be Pervasives.End_of_file, so it still wouldn't be caught by defining another End_of_file here in Channel (exceptions are not like polymorphic variants -- if you define them twice in two different modules, they are two different exceptions for the purposes of catching them). I suspect that End_of_file leaked from the socket backend (still needs to be confirmed). This basically reinforces the "exceptions need to disappear from these interfaces" argument, but this fixup en route is just fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code is doing two horrible things: using an exception to represent the (non-exceptional, should-be-handled) case of end of file, and then converting it to the magic value "" to mean the same thing.

The cohttp docs say:

It returns an empty string if EOF or some other error condition occurs on the input channel

Would be good to standardise on something a bit saner everywhere...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed ... I think keeping the same (crazy) semantics in that PR is useful though,

exception Write_error of string
exception Read_error of string

type t = {
flow: flow;
mutable ibuf: Cstruct.t option; (* Queue of incoming buf *)
Expand All @@ -35,81 +39,95 @@ module Make(Flow:V1_LWT.FLOW) = struct
abort_u: unit Lwt.u;
}

exception Closed

let create flow =
let ibuf = None in
let obufq = [] in
let obuf = None in
let opos = 0 in
let abort_t, abort_u = Lwt.task () in
let abort_t, abort_u = MProf.Trace.named_task "Channel.t.abort" in
{ ibuf; obuf; flow; obufq; opos; abort_t; abort_u }

let to_flow { flow; _ } = flow

let ibuf_refill t =
Flow.read t.flow >>= function
| `Ok buf ->
(* users of get_ibuf (and therefore ibuf_refill) expect the buffer
returned here to have length >0; if Flow.read ever gives us empty
buffers, this will be violated causing Channel users to see Cstruct
exceptions *)
t.ibuf <- Some buf;
return_unit
| `Error _ | `Eof ->
fail Closed
| `Error e ->
fail (Read_error (Flow.error_message e))
| `Eof ->
(* close the flow before throwing exception; otherwise it will never be
GC'd *)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like it might be another bug. Why doesn't the flow get GC'd eventually? Can we close forgotten flows automatically using Gc.finalise or something (as a last resort, logging a warning)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow provided by tcp/flow.ml (and tcp/pcb.ml) doesn't do anything to clean itself up on EOF; rather it relies on the state machine logic (tcp/state.ml) to call close when logic dictates that the communications are over. I suspect this particular case is related to #107 (we don't close Established connections when we receive an RST).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Flow currently has type t = Pcb.t. If we instead wrapped it as type t = Flow of Pcb.t then we could attach a finaliser to it. If that got called, it would mean that neither write nor close could ever be called again and we could close the flow automatically (with a warning), I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit afraid that adding finalisers on every flow will put a lot of pressure on the GC.

Flow.close t.flow >>= fun () ->
fail End_of_file

let rec get_ibuf t =
match t.ibuf with
|None -> ibuf_refill t >>= fun () -> get_ibuf t
|Some buf when Cstruct.len buf = 0 -> ibuf_refill t >>= fun () -> get_ibuf t
|Some buf -> return buf
| None -> ibuf_refill t >>= fun () -> get_ibuf t
| Some buf when Cstruct.len buf = 0 -> ibuf_refill t >>= fun () -> get_ibuf t
| Some buf -> return buf

(* Read one character from the input channel *)
let read_char t =
get_ibuf t >>= fun buf ->
get_ibuf t (* the fact that we returned means we have at least 1 char *)
>>= fun buf ->
let c = Cstruct.get_char buf 0 in
t.ibuf <- Some (Cstruct.shift buf 1);
t.ibuf <- Some (Cstruct.shift buf 1); (* advance read buffer, possibly to
EOF *)
return c

(* Read up to len characters from the input channel
and at most a full view. If not specified, read all *)
let read_some ?len t =
(* get_ibuf potentially throws EOF-related exceptions *)
get_ibuf t >>= fun buf ->
let avail = Cstruct.len buf in
let len = match len with |Some len -> len |None -> avail in
if len < avail then begin
let hd,tl = Cstruct.split buf len in
t.ibuf <- Some tl;
t.ibuf <- Some tl; (* leave some in the buffer; next time, we won't do a
blocking read *)
return hd
end else begin
t.ibuf <- None;
return buf
end

(* Read up to len characters from the input channel as a
stream (and read all available if no length specified *)
(* Read up to len characters from the input channel as a
stream (and read all available if no length specified *)
let read_stream ?len t =
Lwt_stream.from (fun () ->
Lwt.catch
(fun () -> read_some ?len t >>= fun v -> return (Some v))
(function Closed -> return_none | e -> fail e)
(function End_of_file -> return_none | e -> fail e)
)

(* Read until a character is found *)
let read_until t ch =
get_ibuf t >>= fun buf ->
let len = Cstruct.len buf in
let rec scan off =
if off = len then None else begin
if Cstruct.get_char buf off = ch then
Some off else scan (off+1)
end
in
match scan 0 with
|None -> (* not found, return what we have until EOF *)
t.ibuf <- None;
return (false, buf)
|Some off -> (* found, so split the buffer *)
let hd = Cstruct.sub buf 0 off in
t.ibuf <- Some (Cstruct.shift buf (off+1));
return (true, hd)
Lwt.catch
(fun () -> get_ibuf t >>= fun buf ->
let len = Cstruct.len buf in
let rec scan off =
if off = len then None else begin
if Cstruct.get_char buf off = ch then
Some off else scan (off+1)
end
in
match scan 0 with
|None -> (* not found, return what we have until EOF *)
t.ibuf <- None; (* basically guaranteeing that next read is EOF *)
return (false, buf)
|Some off -> (* found, so split the buffer *)
let hd = Cstruct.sub buf 0 off in
t.ibuf <- Some (Cstruct.shift buf (off+1));
return (true, hd)
)
(function End_of_file -> return (false, Cstruct.create 0) | e -> fail e)

(* This reads a line of input, which is terminated either by a CRLF
sequence, or the end of the channel (which counts as a line).
Expand Down Expand Up @@ -195,12 +213,15 @@ module Make(Flow:V1_LWT.FLOW) = struct
queue_obuf t;
let l = List.rev t.obufq in
t.obufq <- [];
Flow.writev t.flow l
>>= fun _ -> return_unit
Flow.writev t.flow l >>= function
| `Ok () -> Lwt.return_unit
| `Error (e : Flow.error) -> fail (Write_error (Flow.error_message e))
| `Eof -> fail (End_of_file)

let close t =
flush t
>>= fun () ->
Flow.close t.flow
try_lwt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use

Lwt.finalize (fun () -> flush t) (fun () -> Flow.close t.flow)

to avoid adding the lwt.syntax dependency (since camlp4 is on its way out).

flush t
finally
Flow.close t.flow

end
9 changes: 8 additions & 1 deletion channel/channel.mli
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,11 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

module Make(F:V1_LWT.FLOW) : V1_LWT.CHANNEL with type flow = F.flow
module Make(F:V1_LWT.FLOW) : sig
include V1_LWT.CHANNEL with type flow = F.flow

exception End_of_file
exception Read_error of string
exception Write_error of string

end
5 changes: 3 additions & 2 deletions lib/META
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# OASIS_START
# DO NOT EDIT (digest: 22882ae34792fd8f66fa101d4ff23bcd)
# DO NOT EDIT (digest: 21cb243d33c9355d3bd79282fe0e1009)
version = "2.3.0"
description = "Ethernet, TCP/IPv4 and DHCPv4 library"
requires = "io-page mirage-types ipaddr cstruct mirage-profile bytes"
Expand Down Expand Up @@ -250,7 +250,8 @@ package "dhcpv4" (
package "channel" (
version = "2.3.0"
description = "Ethernet, TCP/IPv4 and DHCPv4 library"
requires = "io-page mirage-types ipaddr cstruct lwt"
requires =
"io-page mirage-profile mirage-types ipaddr cstruct lwt lwt.syntax"
archive(byte) = "channel.cma"
archive(byte, plugin) = "channel.cma"
archive(native) = "channel.cmxa"
Expand Down
59 changes: 59 additions & 0 deletions lib_test/test_channel.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
open Lwt

(* this is a very small set of tests for the channel interface, intended to
ensure that EOF conditions on the underlying flow are handled properly *)
module Channel = Channel.Make(Fflow)

let cmp a b =
match (String.compare a b) with | 0 -> true | _ -> false

let printer = function
| `Success -> "success"
| `Failure s -> s

let test_read_char_eof () =
let f = Fflow.make () in
let c = Channel.create f in
let try_char_read () =
Channel.read_char c >>= fun ch ->
OUnit.assert_failure (Printf.sprintf "character %c was returned from
Channel.read_char on an empty flow" ch)
in
Lwt.try_bind
(try_char_read)
(fun () -> Lwt.return (`Failure "no exception" )) (* "success" case (no exceptions) *)
(function
| Channel.End_of_file -> Lwt.return (`Success)
| e -> Lwt.return (`Failure (Printf.sprintf "wrong exception: %s"
(Printexc.to_string e)))
)

let test_read_until_eof () =
let check a b = OUnit.assert_equal ~printer:(fun a -> a) ~cmp a
(Cstruct.to_string b) in
let input = Fflow.input_string "I am the very model of a modern major general"
in
let f = Fflow.make ~input () in
let c = Channel.create f in
Channel.read_until c 'v' >>= function
| true, buf ->
check "I am the " buf;
Channel.read_until c '\xff' >>= fun (found, buf) ->
OUnit.assert_equal ~msg:"claimed we found a char that couldn't have been
there in read_until" false found;
check "ery model of a modern major general" buf;
Channel.read_until c '\n' >>= fun (found, buf) ->
OUnit.assert_equal ~msg:"claimed we found a char after EOF in read_until"
false found;
OUnit.assert_equal ~printer:string_of_int 0 (Cstruct.len buf);
Lwt.return_unit
| false, _ ->
OUnit.assert_failure "thought we couldn't find a 'v' in input test"

let _ =
Lwt_main.run (
test_read_char_eof () >>= fun res ->
OUnit.assert_equal ~printer `Success res;
test_read_until_eof () >>= fun () ->
Lwt.return_unit
)
14 changes: 11 additions & 3 deletions myocamlbuild.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(* OASIS_START *)
(* DO NOT EDIT (digest: f9dd58c01c63e9d9806a7de24a74ac2c) *)
(* DO NOT EDIT (digest: c7dcd8bad5ef5429c8bf256f1919a8a1) *)
module OASISGettext = struct
(* # 22 "src/oasis/OASISGettext.ml" *)

Expand Down Expand Up @@ -647,12 +647,20 @@ let package_default =
[
(OASISExpr.EBool true,
S [A "-ccopt"; A "-O2"; A "-ccopt"; A "${XEN_CFLAGS}"])
])
]);
(["oasis_executable_test_channel_byte"; "ocaml"; "link"; "byte"],
[(OASISExpr.EBool true, S [A "-g"])]);
(["oasis_executable_test_channel_byte"; "ocaml"; "ocamldep"; "byte"
],
[(OASISExpr.EBool true, S [A "-g"])]);
(["oasis_executable_test_channel_byte"; "ocaml"; "compile"; "byte"],
[(OASISExpr.EBool true, S [A "-g"])])
];
includes =
[
("unix", ["channel"; "lib"; "tcp"]);
("tcp", ["lib"]);
("lib_test", ["channel"]);
("lib", ["dhcp"; "tcp"]);
("dhcp", ["lib"])
]
Expand All @@ -663,6 +671,6 @@ let conf = {MyOCamlbuildFindlib.no_automatic_syntax = false}

let dispatch_default = MyOCamlbuildBase.dispatch_default conf package_default;;

# 667 "myocamlbuild.ml"
# 675 "myocamlbuild.ml"
(* OASIS_STOP *)
Ocamlbuild_plugin.dispatch dispatch_default;;
2 changes: 2 additions & 0 deletions opam
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ depends: [
"mirage-net-unix" {>= "1.1.0"}
"ipaddr" {>= "2.2.0"}
"mirage-profile"
"mirage-flow" {test}
"ounit2" {test}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ounit

]
depopts: [
"mirage-xen"
Expand Down
Loading