diff --git a/CHANGES.md b/CHANGES.md index 20fb2562..1f475c63 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,12 +3,18 @@ ## Added - Added `Index_unix.Syscalls`, a module exposing various Unix bindings for - interacting with file-systems. + interacting with file-systems. (#176) + +## Changed + +- `Index.close` will now abort an ongoing asynchronous merge operation, rather + than waiting for it to finish. (#185) ## Fixed -- Fail when `Index_unix.IO` file version number is not as expected. -- Fixed creation of an index when an empty `data` file exists. +- Fail when `Index_unix.IO` file version number is not as expected. (#178) + +- Fixed creation of an index when an empty `data` file exists. (#173) # 1.2.0 (2020-02-25) @@ -18,9 +24,8 @@ ## Changed -- Parameterise `Index.Make` over arbitrary mutex and thread implementations - (and remove the obligation for `IO` to provide this functionality). (#160, - #161) +- Parameterise `Index.Make` over arbitrary mutex and thread implementations (and + remove the obligation for `IO` to provide this functionality). (#160, #161) # 1.1.0 (2019-12-21) diff --git a/src/index.ml b/src/index.ml index d12d5187..35338bcd 100644 --- a/src/index.ml +++ b/src/index.ml @@ -15,94 +15,9 @@ furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. *) +include Index_intf module Stats = Stats -module type Key = sig - type t - - val equal : t -> t -> bool - - val hash : t -> int - - val hash_size : int - - val encode : t -> string - - val encoded_size : int - - val decode : string -> int -> t - - val pp : t Fmt.t -end - -module type Value = sig - type t - - val encode : t -> string - - val encoded_size : int - - val decode : string -> int -> t - - val pp : t Fmt.t -end - -module type IO = Io.S - -module type MUTEX = sig - type t - - val create : unit -> t - - val lock : t -> unit - - val unlock : t -> unit - - val with_lock : t -> (unit -> 'a) -> 'a -end - -module type THREAD = sig - type 'a t - - val async : (unit -> 'a) -> 'a t - - val await : 'a t -> ('a, [ `Async_exn of exn ]) result - - val return : 'a -> 'a t - - val yield : unit -> unit -end - -module type S = sig - type t - - type key - - type value - - val v : ?fresh:bool -> ?readonly:bool -> log_size:int -> string -> t - - val clear : t -> unit - - val find : t -> key -> value - - val mem : t -> key -> bool - - exception Invalid_key_size of key - - exception Invalid_value_size of value - - val replace : t -> key -> value -> unit - - val filter : t -> (key * value -> bool) -> unit - - val iter : (key -> value -> unit) -> t -> unit - - val flush : ?with_fsync:bool -> t -> unit - - val close : t -> unit -end - let may f = function None -> () | Some bf -> f bf let assert_and_get = function None -> assert false | Some e -> e @@ -114,7 +29,7 @@ exception Closed module Make_private (K : Key) (V : Value) - (IO : IO) + (IO : Io.S) (Mutex : MUTEX) (Thread : THREAD) = struct diff --git a/src/index.mli b/src/index.mli index c2ffc5d8..efb64fb1 100644 --- a/src/index.mli +++ b/src/index.mli @@ -31,211 +31,5 @@ all copies or substantial portions of the Software. *) - A `lock` IO to ensure safe concurrent access. *) -(** The input of [Make] for keys. *) -module type Key = sig - type t - (** The type for keys. *) - - val equal : t -> t -> bool - (** The equality function for keys. *) - - val hash : t -> int - (** Note: Unevenly distributed hash functions may result in performance drops. *) - - val hash_size : int - (** The number of bits necessary to encode the maximum output value of - {!hash}. `Hashtbl.hash` uses 30 bits. - - Overestimating the [hash_size] will result in performance drops; - underestimation will result in undefined behavior. *) - - val encode : t -> string - (** [encode] is an encoding function. The resultant encoded values must have - size {!encoded_size}. *) - - val encoded_size : int - (** [encoded_size] is the size of the result of {!encode}, expressed in number - of bytes. *) - - val decode : string -> int -> t - (** [decode s off] is the decoded form of the encoded value at the offset - [off] of string [s]. Must satisfy [decode (encode t) 0 = t]. *) - - val pp : t Fmt.t - (** Formatter for keys *) -end - -module Stats = Stats - -(** The input of [Make] for values. The same requirements as for [Key] apply. *) -module type Value = sig - type t - - val encode : t -> string - - val encoded_size : int - - val decode : string -> int -> t - - val pp : t Fmt.t -end - -module type IO = Io.S - -module type MUTEX = sig - (** Locks for mutual exclusion *) - - type t - (** The type of mutual-exclusion locks. *) - - val create : unit -> t - (** Return a fresh mutex. *) - - val lock : t -> unit - (** Lock the given mutex. Locks are not assumed to be re-entrant. *) - - val unlock : t -> unit - (** Unlock the mutex. If any threads are attempting to lock the mutex, exactly - one of them will gain access to the lock. *) - - val with_lock : t -> (unit -> 'a) -> 'a - (** [with_lock t f] first obtains [t], then computes [f ()], and finally - unlocks [t]. *) -end - -module type THREAD = sig - (** Cooperative threads. *) - - type 'a t - (** The type of thread handles. *) - - val async : (unit -> 'a) -> 'a t - (** [async f] creates a new thread of control which executes [f ()] and - returns the corresponding thread handle. The thread terminates whenever - [f ()] returns a value or raises an exception. *) - - val await : 'a t -> ('a, [ `Async_exn of exn ]) result - (** [await t] blocks on the termination of [t]. *) - - val return : 'a -> 'a t - (** [return ()] is a pre-terminated thread handle. *) - - val yield : unit -> unit - (** Re-schedule the calling thread without suspending it. *) -end - -exception RO_not_allowed -(** The exception raised when a write operation is attempted on a read_only - index. *) - -exception Closed -(** The exception raised when any operation is attempted on a closed index, - except for [close], which is idempotent. *) - -(** Index module signature. *) -module type S = sig - type t - (** The type for indexes. *) - - type key - (** The type for keys. *) - - type value - (** The type for values. *) - - val v : ?fresh:bool -> ?readonly:bool -> log_size:int -> string -> t - (** The constructor for indexes. - - @param fresh whether an existing index should be overwritten. - @param read_only whether read-only mode is enabled for this index. - @param log_size the maximum number of bindings in the `log` IO. *) - - val clear : t -> unit - (** [clear t] clears [t] so that there are no more bindings in it. *) - - val find : t -> key -> value - (** [find t k] is the binding of [k] in [t]. *) - - val mem : t -> key -> bool - (** [mem t k] is [true] iff [k] is bound in [t]. *) - - exception Invalid_key_size of key - - exception Invalid_value_size of value - (** The exceptions raised when trying to add a key or a value of different - size than encoded_size *) - - val replace : t -> key -> value -> unit - (** [replace t k v] binds [k] to [v] in [t], replacing any existing binding of - [k]. *) - - val filter : t -> (key * value -> bool) -> unit - (** [filter t p] removes all the bindings (k, v) that do not satisfy [p]. This - operation is costly and blocking. *) - - val iter : (key -> value -> unit) -> t -> unit - (** Iterates over the index bindings. Limitations: - - - Order is not specified. - - In case of recent replacements of existing values (since the last - merge), this will hit both the new and old bindings. - - May not observe recent concurrent updates to the index by other - processes. *) - - val flush : ?with_fsync:bool -> t -> unit - (** Flushes all internal buffers of the [IO] instances. If [with_fsync] is - [true], this also flushes the OS caches for each [IO] instance. *) - - val close : t -> unit - (** Closes all resources used by [t]. *) -end - -module Make (K : Key) (V : Value) (IO : IO) (M : MUTEX) (T : THREAD) : - S with type key = K.t and type value = V.t - -(** These modules should not be used. They are exposed purely for testing - purposes. *) -module Private : sig - module Hook : sig - type 'a t - - val v : ('a -> unit) -> 'a t - end - - module Search : module type of Search - - module Io_array : module type of Io_array - - module Fan : module type of Fan - - module type S = sig - include S - - val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit - (** [`Abort_signalled]: after the cancellation signal has been sent to any - concurrent merge operations, but {i before} blocking on those - cancellations having completed. *) - - type 'a async - (** The type of asynchronous computation. *) - - val force_merge : - ?hook:[ `After | `Before ] Hook.t -> t -> [ `Completed | `Aborted ] async - (** [force_merge t] forces a merge for [t]. Optionally, a hook can be passed - that will be called twice: - - - [`Before]: immediately before merging (while holding the merge lock); - - [`After]: immediately after merging (while holding the merge lock). *) - - val await : 'a async -> ('a, [ `Async_exn of exn ]) result - (** Wait for an asynchronous computation to finish. *) - - val replace_with_timer : ?sampling_interval:int -> t -> key -> value -> unit - (** Time replace operations. The reported time is an average on an number of - consecutive operations, which can be specified by [sampling_interval]. - If [sampling_interval] is not set, no operation is timed. *) - end - - module Make (K : Key) (V : Value) (IO : IO) (M : MUTEX) (T : THREAD) : - S with type key = K.t and type value = V.t -end +include Index_intf.Index +(** @inline *) diff --git a/src/index_intf.ml b/src/index_intf.ml new file mode 100644 index 00000000..c842c4b6 --- /dev/null +++ b/src/index_intf.ml @@ -0,0 +1,269 @@ +(* The MIT License + +Copyright (c) 2019 Craig Ferguson + Thomas Gazagnaire + Ioana Cristescu + Clément Pascutto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. *) + +module type Key = sig + type t + (** The type for keys. *) + + val equal : t -> t -> bool + (** The equality function for keys. *) + + val hash : t -> int + (** Note: Unevenly distributed hash functions may result in performance drops. *) + + val hash_size : int + (** The number of bits necessary to encode the maximum output value of + {!hash}. `Hashtbl.hash` uses 30 bits. + + Overestimating the [hash_size] will result in performance drops; + underestimation will result in undefined behavior. *) + + val encode : t -> string + (** [encode] is an encoding function. The resultant encoded values must have + size {!encoded_size}. *) + + val encoded_size : int + (** [encoded_size] is the size of the result of {!encode}, expressed in number + of bytes. *) + + val decode : string -> int -> t + (** [decode s off] is the decoded form of the encoded value at the offset + [off] of string [s]. Must satisfy [decode (encode t) 0 = t]. *) + + val pp : t Fmt.t + (** Formatter for keys *) +end + +module type Value = sig + type t + + val encode : t -> string + + val encoded_size : int + + val decode : string -> int -> t + + val pp : t Fmt.t +end + +module type IO = Io.S + +module type MUTEX = sig + (** Locks for mutual exclusion *) + + type t + (** The type of mutual-exclusion locks. *) + + val create : unit -> t + (** Return a fresh mutex. *) + + val lock : t -> unit + (** Lock the given mutex. Locks are not assumed to be re-entrant. *) + + val unlock : t -> unit + (** Unlock the mutex. If any threads are attempting to lock the mutex, exactly + one of them will gain access to the lock. *) + + val with_lock : t -> (unit -> 'a) -> 'a + (** [with_lock t f] first obtains [t], then computes [f ()], and finally + unlocks [t]. *) +end + +module type THREAD = sig + (** Cooperative threads. *) + + type 'a t + (** The type of thread handles. *) + + val async : (unit -> 'a) -> 'a t + (** [async f] creates a new thread of control which executes [f ()] and + returns the corresponding thread handle. The thread terminates whenever + [f ()] returns a value or raises an exception. *) + + val await : 'a t -> ('a, [ `Async_exn of exn ]) result + (** [await t] blocks on the termination of [t]. *) + + val return : 'a -> 'a t + (** [return ()] is a pre-terminated thread handle. *) + + val yield : unit -> unit + (** Re-schedule the calling thread without suspending it. *) +end + +module type S = sig + type t + (** The type for indexes. *) + + type key + (** The type for keys. *) + + type value + (** The type for values. *) + + val v : ?fresh:bool -> ?readonly:bool -> log_size:int -> string -> t + (** The constructor for indexes. + + @param fresh whether an existing index should be overwritten. + @param read_only whether read-only mode is enabled for this index. + @param log_size the maximum number of bindings in the `log` IO. *) + + val clear : t -> unit + (** [clear t] clears [t] so that there are no more bindings in it. *) + + val find : t -> key -> value + (** [find t k] is the binding of [k] in [t]. *) + + val mem : t -> key -> bool + (** [mem t k] is [true] iff [k] is bound in [t]. *) + + exception Invalid_key_size of key + + exception Invalid_value_size of value + (** The exceptions raised when trying to add a key or a value of different + size than encoded_size *) + + val replace : t -> key -> value -> unit + (** [replace t k v] binds [k] to [v] in [t], replacing any existing binding of + [k]. *) + + val filter : t -> (key * value -> bool) -> unit + (** [filter t p] removes all the bindings (k, v) that do not satisfy [p]. This + operation is costly and blocking. *) + + val iter : (key -> value -> unit) -> t -> unit + (** Iterates over the index bindings. Limitations: + + - Order is not specified. + - In case of recent replacements of existing values (since the last + merge), this will hit both the new and old bindings. + - May not observe recent concurrent updates to the index by other + processes. *) + + val flush : ?with_fsync:bool -> t -> unit + (** Flushes all internal buffers of the [IO] instances. If [with_fsync] is + [true], this also flushes the OS caches for each [IO] instance. *) + + val close : t -> unit + (** Closes all resources used by [t]. *) +end + +module type Index = sig + (** The input of [Make] for keys. *) + module type Key = sig + (* N.B. We use [sig ... end] redirections to avoid linking to the [_intf] + file in the generated docs. Once Odoc 2 is released, this can be + removed. *) + + include Key + (** @inline *) + end + + module Stats : sig + include module type of Stats + (** @inline *) + end + + (** The input of [Make] for values. The same requirements as for [Key] apply. *) + module type Value = sig + include Value + (** @inline *) + end + + module type IO = sig + include Io.S + (** @inline *) + end + + module type MUTEX = sig + include MUTEX + (** @inline *) + end + + module type THREAD = sig + include THREAD + (** @inline *) + end + + (** Index module signature. *) + module type S = sig + include S + (** @inline *) + end + + exception RO_not_allowed + (** The exception raised when a write operation is attempted on a read_only + index. *) + + exception Closed + (** The exception raised when any operation is attempted on a closed index, + except for [close], which is idempotent. *) + + module Make (K : Key) (V : Value) (IO : IO) (M : MUTEX) (T : THREAD) : + S with type key = K.t and type value = V.t + + (** These modules should not be used. They are exposed purely for testing + purposes. *) + module Private : sig + module Hook : sig + type 'a t + + val v : ('a -> unit) -> 'a t + end + + module Search : module type of Search + + module Io_array : module type of Io_array + + module Fan : module type of Fan + + module type S = sig + include S + + val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit + (** [`Abort_signalled]: after the cancellation signal has been sent to any + concurrent merge operations, but {i before} blocking on those + cancellations having completed. *) + + type 'a async + (** The type of asynchronous computation. *) + + val force_merge : + ?hook:[ `After | `Before ] Hook.t -> + t -> + [ `Completed | `Aborted ] async + (** [force_merge t] forces a merge for [t]. Optionally, a hook can be + passed that will be called twice: + + - [`Before]: immediately before merging (while holding the merge + lock); + - [`After]: immediately after merging (while holding the merge lock). *) + + val await : 'a async -> ('a, [ `Async_exn of exn ]) result + (** Wait for an asynchronous computation to finish. *) + + val replace_with_timer : + ?sampling_interval:int -> t -> key -> value -> unit + (** Time replace operations. The reported time is an average on an number + of consecutive operations, which can be specified by + [sampling_interval]. If [sampling_interval] is not set, no operation + is timed. *) + end + + module Make (K : Key) (V : Value) (IO : IO) (M : MUTEX) (T : THREAD) : + S with type key = K.t and type value = V.t + end +end diff --git a/src/search.ml b/src/search.ml index bc172673..971c3948 100644 --- a/src/search.ml +++ b/src/search.ml @@ -15,70 +15,18 @@ furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. *) -module type ARRAY = sig - type t - - type elt - - val get : t -> int64 -> elt - - val length : t -> int64 - - val pre_fetch : t -> low:int64 -> high:int64 -> unit -end - -module type ENTRY = sig - type t - - module Key : sig - type t - - val equal : t -> t -> bool - end - - module Value : sig - type t - end - - val to_key : t -> Key.t - - val to_value : t -> Value.t -end - (* Metrics must be - totally ordered - computable from entries and (potentially redundantly) from keys - linearly interpolate-able on the int64 type *) -module type METRIC = sig - type t - - module Entry : ENTRY - - val compare : t -> t -> int - - val of_entry : Entry.t -> t - val of_key : Entry.Key.t -> t - - val linear_interpolate : low:int64 * t -> high:int64 * t -> t -> int64 -end - -module type S = sig - module Entry : ENTRY - - module Array : ARRAY with type elt = Entry.t - - val interpolation_search : - Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t -end +include Search_intf module Make (Entry : ENTRY) (Array : ARRAY with type elt = Entry.t) (Metric : METRIC with module Entry := Entry) : S with module Entry := Entry and module Array := Array = struct - module Entry = Entry - module Array = Array module Value = Entry.Value module Key = struct diff --git a/src/search.mli b/src/search.mli index 682b1caf..be9f650a 100644 --- a/src/search.mli +++ b/src/search.mli @@ -15,61 +15,5 @@ furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. *) -module type ARRAY = sig - type t - - type elt - - val get : t -> int64 -> elt - - val length : t -> int64 - - val pre_fetch : t -> low:int64 -> high:int64 -> unit -end - -module type ENTRY = sig - type t - - module Key : sig - type t - - val equal : t -> t -> bool - end - - module Value : sig - type t - end - - val to_key : t -> Key.t - - val to_value : t -> Value.t -end - -module type METRIC = sig - type t - - module Entry : ENTRY - - val compare : t -> t -> int - - val of_entry : Entry.t -> t - - val of_key : Entry.Key.t -> t - - val linear_interpolate : low:int64 * t -> high:int64 * t -> t -> int64 -end - -module type S = sig - module Entry : ENTRY - - module Array : ARRAY with type elt = Entry.t - - val interpolation_search : - Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t -end - -module Make - (Entry : ENTRY) - (Array : ARRAY with type elt = Entry.t) - (Metric : METRIC with module Entry := Entry) : - S with module Entry := Entry and module Array := Array +include Search_intf.Search +(** @inline *) diff --git a/src/search_intf.ml b/src/search_intf.ml new file mode 100644 index 00000000..d7b4571a --- /dev/null +++ b/src/search_intf.ml @@ -0,0 +1,85 @@ +(* The MIT License + +Copyright (c) 2019 Craig Ferguson + Thomas Gazagnaire + Ioana Cristescu + Clément Pascutto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. *) + +module type ARRAY = sig + type t + + type elt + + val get : t -> int64 -> elt + + val length : t -> int64 + + val pre_fetch : t -> low:int64 -> high:int64 -> unit +end + +module type ENTRY = sig + type t + + module Key : sig + type t + + val equal : t -> t -> bool + end + + module Value : sig + type t + end + + val to_key : t -> Key.t + + val to_value : t -> Value.t +end + +module type METRIC = sig + type t + + module Entry : ENTRY + + val compare : t -> t -> int + + val of_entry : Entry.t -> t + + val of_key : Entry.Key.t -> t + + val linear_interpolate : low:int64 * t -> high:int64 * t -> t -> int64 +end + +module type S = sig + module Entry : ENTRY + + module Array : ARRAY with type elt = Entry.t + + val interpolation_search : + Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t +end + +module type Search = sig + module type ARRAY = ARRAY + + module type ENTRY = ENTRY + + module type METRIC = METRIC + + module type S = S + + module Make + (Entry : ENTRY) + (Array : ARRAY with type elt = Entry.t) + (Metric : METRIC with module Entry := Entry) : + S with module Entry := Entry and module Array := Array +end