Skip to content

Commit

Permalink
Allow merge operations to yield to a concurrent close
Browse files Browse the repository at this point in the history
  • Loading branch information
craigfe committed Jun 25, 2020
1 parent c40598a commit c1da2e6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 58 deletions.
138 changes: 83 additions & 55 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,13 @@ struct
writer_lock : IO.lock option;
mutable merge_lock : Mutex.t;
mutable rename_lock : Mutex.t;
mutable pending_cancel : bool;
(** Signal the merge thread to terminate prematurely *)
}

let check_pending_cancel instance =
match instance.pending_cancel with true -> `Abort | false -> `Continue

type t = instance option ref

let check_open t =
Expand Down Expand Up @@ -402,6 +407,7 @@ struct
merge_lock = Mutex.create ();
rename_lock = Mutex.create ();
writer_lock;
pending_cancel = false;
}

let (`Staged v) = with_cache ~v:v_no_cache ~clear
Expand Down Expand Up @@ -551,7 +557,7 @@ struct

(* Merge [log] with [index] into [dst_io], ignoring bindings that do not
satisfy [filter (k, v)]. [log] must be sorted by key hashes. *)
let merge_with ~filter log (index : index) dst_io =
let merge_with ~yield ~filter log (index : index) dst_io =
let entries = 10_000 in
let len = entries * entry_size in
let buf = Bytes.create len in
Expand All @@ -560,34 +566,40 @@ struct
let fan_out = index.fan_out in
refill 0L;
let rec go index_offset buf_offset log_i =
if index_offset >= index_end then
append_remaining_log fan_out log log_i dst_io
if index_offset >= index_end then (
append_remaining_log fan_out log log_i dst_io;
`Completed )
else
let buf_str = Bytes.sub buf buf_offset entry_size in
let index_offset = Int64.add index_offset entry_sizeL in
let e = Entry.decode buf_str 0 in
let log_i = merge_from_log fan_out log log_i e.key_hash dst_io in
Thread.yield ();
if
( log_i >= Array.length log
||
let key = log.(log_i).key in
not (K.equal key e.key) )
&& filter (e.key, e.value)
then
append_buf_fanout fan_out e.key_hash (Bytes.to_string buf_str) dst_io;
let buf_offset =
let n = buf_offset + entry_size in
if n >= Bytes.length buf then (
refill index_offset;
0 )
else n
in
(go [@tailcall]) index_offset buf_offset log_i
match yield () with
| `Abort -> `Aborted
| `Continue ->
Thread.yield ();
if
( log_i >= Array.length log
||
let key = log.(log_i).key in
not K.(equal key e.key) )
&& filter (e.key, e.value)
then
append_buf_fanout fan_out e.key_hash (Bytes.to_string buf_str)
dst_io;
let buf_offset =
let n = buf_offset + entry_size in
if n >= Bytes.length buf then (
refill index_offset;
0 )
else n
in
(go [@tailcall]) index_offset buf_offset log_i
in
(go [@tailcall]) 0L 0 0

let merge ?(blocking = false) ?(filter = fun _ -> true) ?hook ~witness t =
let yield () = check_pending_cancel t in
Mutex.lock t.merge_lock;
Log.info (fun l -> l "[%s] merge" (Filename.basename t.root));
Stats.incr_nb_merge ();
Expand Down Expand Up @@ -636,46 +648,50 @@ struct
~fan_size:(Int64.of_int (Fan.exported_size fan_out))
merge_path
in
let index =
let merge_result : [ `Index of index | `Aborted ] =
match t.index with
| None ->
let io =
IO.v ~fresh:true ~readonly:false ~generation ~fan_size:0L
(index_path t.root)
in
append_remaining_log fan_out log_array 0 merge;
{ io; fan_out }
| Some index ->
`Index { io; fan_out }
| Some index -> (
let index = { index with fan_out } in
merge_with ~filter log_array index merge;
index
match merge_with ~yield ~filter log_array index merge with
| `Completed -> `Index index
| `Aborted -> `Aborted )
in
Fan.finalize index.fan_out;
IO.set_fanout merge (Fan.export index.fan_out);
Mutex.with_lock t.rename_lock (fun () ->
IO.rename ~src:merge ~dst:index.io;
t.index <- Some index;
IO.clear ~keep_generation:true log.io;
Tbl.clear log.mem;
IO.set_generation log.io generation;
t.generation <- generation;
let log_async = assert_and_get t.log_async in
Tbl.iter
(fun key value ->
Tbl.replace log.mem key value;
append_key_value log.io key value)
log_async.mem;
IO.sync log.io;
t.log_async <- None);
may (fun f -> f `After) hook;
IO.clear log_async.io;
IO.close log_async.io;
Mutex.unlock t.merge_lock
match merge_result with
| `Index index ->
Fan.finalize index.fan_out;
IO.set_fanout merge (Fan.export index.fan_out);
Mutex.with_lock t.rename_lock (fun () ->
IO.rename ~src:merge ~dst:index.io;
t.index <- Some index;
IO.clear ~keep_generation:true log.io;
Tbl.clear log.mem;
IO.set_generation log.io generation;
t.generation <- generation;
let log_async = assert_and_get t.log_async in
Tbl.iter
(fun key value ->
Tbl.replace log.mem key value;
append_key_value log.io key value)
log_async.mem;
IO.sync log.io;
t.log_async <- None);
may (fun f -> f `After) hook;
IO.clear log_async.io;
IO.close log_async.io;
Mutex.unlock t.merge_lock;
`Completed
| `Aborted ->
Mutex.unlock t.merge_lock;
`Aborted
in
if blocking then (
go ();
Thread.return () )
else Thread.async go
if blocking then go () |> Thread.return else Thread.async go
let get_witness t =
match t.log with
Expand Down Expand Up @@ -705,7 +721,7 @@ struct
match witness with
| None ->
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root));
Thread.return ()
Thread.return `Completed
| Some witness -> merge ?hook ~witness t
let replace t key value =
Expand Down Expand Up @@ -743,7 +759,12 @@ struct
match witness with
| None ->
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root))
| Some witness -> Thread.await (merge ~blocking:true ~filter:f ~witness t)
| Some witness -> (
match Thread.await (merge ~blocking:true ~filter:f ~witness t) with
| Ok (`Aborted | `Completed) -> ()
| Error (`Async_exn exn) ->
Fmt.failwith "filter: asynchronous exception during merge (%s)"
(Printexc.to_string exn) )
let iter f t =
let t = check_open t in
Expand All @@ -762,11 +783,13 @@ struct
(fun (i : index) -> iter_io (fun e -> f e.key e.value) i.io)
t.index)
let close it =
let close' ~hook it =
match !it with
| None -> Log.info (fun l -> l "close: instance already closed")
| Some t ->
Log.info (fun l -> l "[%s] close" (Filename.basename t.root));
t.pending_cancel <- true;
hook `Abort_signalled;
Mutex.with_lock t.merge_lock (fun () ->
it := None;
t.open_instances <- t.open_instances - 1;
Expand All @@ -782,6 +805,8 @@ struct
t.log;
may (fun (i : index) -> IO.close i.io) t.index;
may (fun lock -> IO.unlock lock) t.writer_lock ))
let close = close' ~hook:(fun _ -> ())
end
module Make = Make_private
Expand All @@ -800,9 +825,12 @@ module Private = struct
module type S = sig
include S
type async
type 'a async
val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit
val force_merge : ?hook:[ `After | `Before ] Hook.t -> t -> async
val force_merge :
?hook:[ `After | `Before ] Hook.t -> t -> [ `Completed | `Aborted ] async
val await : 'a async -> ('a, [ `Async_exn of exn ]) result
Expand Down
10 changes: 8 additions & 2 deletions src/index.mli
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,16 @@ module Private : sig
module type S = sig
include S

type async
val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit
(** [`Abort_signalled]: after the cancellation signal has been sent to any
concurrent merge operations, but {i before} blocking on those
cancellations having completed. *)

type 'a async
(** The type of asynchronous computation. *)

val force_merge : ?hook:[ `After | `Before ] Hook.t -> t -> async
val force_merge :
?hook:[ `After | `Before ] Hook.t -> t -> [ `Completed | `Aborted ] async
(** [force_merge t] forces a merge for [t]. Optionally, a hook can be passed
that will be called twice:
Expand Down
3 changes: 2 additions & 1 deletion test/unix/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
(tests
(names main force_merge io_array)
(package index)
(libraries index index.unix alcotest fmt logs logs.fmt re stdlib-shims))
(libraries index index.unix alcotest fmt logs logs.fmt re stdlib-shims
threads.posix))
42 changes: 42 additions & 0 deletions test/unix/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,47 @@ module Close = struct
Not_found (fun () -> ignore_value (Index.find rw k1));
Index.close rw

(** [close] terminates an ongoing merge operation *)
let aborted_merge () =
let Context.{ rw; _ } = Context.full_index ~size:100 () in
let close_request, abort_signalled =
(* Both locks are initially held.
- [close_request] is dropped by the merge thread in the [`Before] hook
as a signal to the parent thread to issue the [close] operation.
- [abort_signalled] is dropped by the parent thread to signal to the
child to continue the merge operation (which must then abort prematurely).
*)
let m1, m2 = (Mutex.create (), Mutex.create ()) in
Mutex.lock m1;
Mutex.lock m2;
(m1, m2)
in
let hook = function
| `Before ->
Fmt.pr "Child: issuing request to close the index\n%!";
Mutex.unlock close_request
| `After -> Alcotest.fail "Merge completed despite concurrent close"
in
let merge_promise : _ Index.async =
Index.force_merge ~hook:(I.Private.Hook.v hook) rw
in
Fmt.pr "Parent: waiting for request to close the index\n%!";
Mutex.lock close_request;
Fmt.pr "Parent: closing the index\n%!";
Index.close'
~hook:
(I.Private.Hook.v (fun `Abort_signalled -> Mutex.unlock abort_signalled))
rw;
Fmt.pr "Parent: awaiting merge result\n%!";
Index.await merge_promise |> function
| Ok `Completed ->
Alcotest.fail "Force_merge returned `Completed despite concurrent close"
| Error (`Async_exn exn) ->
Alcotest.failf "Asynchronous exception occurred during force_merge: %s"
(Printexc.to_string exn)
| Ok `Aborted -> ()

let tests =
[
("close and reopen", `Quick, close_reopen_rw);
Expand All @@ -374,6 +415,7 @@ module Close = struct
("non-close operations fail after close", `Quick, fail_api_after_close);
("double close", `Quick, check_double_close);
("double restart", `Quick, restart_twice);
("aborted merge", `Quick, aborted_merge);
]
end

Expand Down

0 comments on commit c1da2e6

Please sign in to comment.