diff --git a/containers-thread.opam b/containers-thread.opam deleted file mode 100644 index fb21ef4ea..000000000 --- a/containers-thread.opam +++ /dev/null @@ -1,27 +0,0 @@ -opam-version: "2.0" -version: "3.12" -author: "Simon Cruanes" -maintainer: "simon.cruanes.2007@m4x.org" -license: "BSD-2-Clause" -synopsis: "An extension of containers for threading. DEPRECATED: use moonpool, domainslib, etc." -build: [ - ["dune" "build" "-p" name "-j" jobs] - ["dune" "build" "@doc" "-p" name ] {with-doc} - ["dune" "runtest" "-p" name "-j" jobs] {with-test} -] -depends: [ - "ocaml" { >= "4.08.0" } - "dune" { >= "2.0" } - "base-threads" - "dune-configurator" - "containers" { = version } - "iter" { with-test } - "qcheck-core" {>= "0.18" & with-test} - "uutf" { with-test } - "odoc" { with-doc } -] -tags: [ "containers" "thread" "semaphore" "blocking queue" ] -homepage: "https://github.com/c-cube/ocaml-containers/" -doc: "https://c-cube.github.io/ocaml-containers" -dev-repo: "git+https://github.com/c-cube/ocaml-containers.git" -bug-reports: "https://github.com/c-cube/ocaml-containers/issues/" diff --git a/src/threads/CCBlockingQueue.ml b/src/threads/CCBlockingQueue.ml deleted file mode 100644 index 79f1b43f4..000000000 --- a/src/threads/CCBlockingQueue.ml +++ /dev/null @@ -1,154 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Blocking Queue} *) - -type 'a t = { - q: 'a Queue.t; - lock: Mutex.t; - cond: Condition.t; - capacity: int; - mutable size: int; -} - -let create n = - if n < 1 then invalid_arg "BloquingQueue.create"; - let q = - { - q = Queue.create (); - lock = Mutex.create (); - cond = Condition.create (); - capacity = n; - size = 0; - } - in - q - -let incr_size_ q = - assert (q.size < q.capacity); - q.size <- q.size + 1 - -let decr_size_ q = - assert (q.size > 0); - q.size <- q.size - 1 - -let finally_ f x ~h = - try - let res = f x in - ignore (h ()); - res - with e -> - ignore (h ()); - raise e - -let with_lock_ q f = - Mutex.lock q.lock; - finally_ f () ~h:(fun () -> Mutex.unlock q.lock) - -let push q x = - with_lock_ q (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - assert (q.size < q.capacity); - Queue.push x q.q; - (* if there are blocked receivers, awake one of them *) - incr_size_ q; - Condition.broadcast q.cond) - -let take q = - with_lock_ q (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let x = Queue.take q.q in - (* if there are blocked senders, awake one of them *) - decr_size_ q; - Condition.broadcast q.cond; - x) - -let push_list q l = - (* push elements until it's not possible. - Assumes the lock is acquired. *) - let rec push_ q l = - match l with - | [] -> l - | _ :: _ when q.size = q.capacity -> l (* no room remaining *) - | x :: tl -> - Queue.push x q.q; - incr_size_ q; - push_ q tl - in - (* push chunks of [l] in [q] until [l] is empty *) - let rec aux q l = - match l with - | [] -> () - | _ :: _ -> - let l = - with_lock_ q (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - let l = push_ q l in - Condition.broadcast q.cond; - l) - in - aux q l - in - aux q l - -let take_list q n = - (* take at most [n] elements of [q] and prepend them to [acc] *) - let rec pop_ acc q n = - if n = 0 || Queue.is_empty q.q then - acc, n - else ( - (* take next element *) - let x = Queue.take q.q in - decr_size_ q; - pop_ (x :: acc) q (n - 1) - ) - in - (* call [pop_] until [n] elements have been gathered *) - let rec aux acc q n = - if n = 0 then - List.rev acc - else ( - let acc, n = - with_lock_ q (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let acc, n = pop_ acc q n in - Condition.broadcast q.cond; - acc, n) - in - aux acc q n - ) - in - aux [] q n - -let try_take q = - with_lock_ q (fun () -> - if q.size = 0 then - None - else ( - decr_size_ q; - Some (Queue.take q.q) - )) - -let try_push q x = - with_lock_ q (fun () -> - if q.size = q.capacity then - false - else ( - incr_size_ q; - Queue.push x q.q; - Condition.signal q.cond; - true - )) - -let peek q = - with_lock_ q (fun () -> try Some (Queue.peek q.q) with Queue.Empty -> None) - -let size q = with_lock_ q (fun () -> q.size) -let capacity q = q.capacity diff --git a/src/threads/CCBlockingQueue.mli b/src/threads/CCBlockingQueue.mli deleted file mode 100644 index a043e7631..000000000 --- a/src/threads/CCBlockingQueue.mli +++ /dev/null @@ -1,51 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Blocking Queue} - - This queue has a limited size. Pushing a value on the queue when it - is full will block. - - @since 0.16 *) - -[@@@deprecated -"use moonpool or domainslib or saturn, libraries designed for multicore"] - -type 'a t -(** Safe-thread queue for values of type ['a] *) - -val create : int -> 'a t -(** Create a new queue of size [n]. Using [n=max_int] amounts to using - an infinite queue (2^61 items is a lot to fit in memory); using [n=1] - amounts to using a box with 0 or 1 elements inside. - @raise Invalid_argument if [n < 1]. *) - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] into [q], blocking if the queue is full. *) - -val take : 'a t -> 'a -(** Take the first element, blocking if needed. *) - -val push_list : 'a t -> 'a list -> unit -(** Push items of the list, one by one. *) - -val take_list : 'a t -> int -> 'a list -(** [take_list n q] takes [n] elements out of [q]. *) - -val try_take : 'a t -> 'a option -(** Take the first element if the queue is not empty, return [None] - otherwise. *) - -val try_push : 'a t -> 'a -> bool -(** [try_push q x] pushes [x] into [q] if [q] is not full, in which - case it returns [true]. - If it fails because [q] is full, it returns [false]. *) - -val peek : 'a t -> 'a option -(** [peek q] returns [Some x] if [x] is the first element of [q], - otherwise it returns [None]. *) - -val size : _ t -> int -(** Number of elements currently in the queue. *) - -val capacity : _ t -> int -(** Number of values the queue can hold. *) diff --git a/src/threads/CCLock.ml b/src/threads/CCLock.ml deleted file mode 100644 index 3e3a71f0f..000000000 --- a/src/threads/CCLock.ml +++ /dev/null @@ -1,113 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Utils around Mutex} *) - -type 'a t = { mutex: Mutex.t; mutable content: 'a } -type 'a lock = 'a t - -let create content = { mutex = Mutex.create (); content } - -let with_lock l f = - Mutex.lock l.mutex; - try - let x = f l.content in - Mutex.unlock l.mutex; - x - with e -> - Mutex.unlock l.mutex; - raise e - -let try_with_lock l f = - if Mutex.try_lock l.mutex then ( - try - let x = f l.content in - Mutex.unlock l.mutex; - Some x - with e -> - Mutex.unlock l.mutex; - raise e - ) else - None - -module LockRef = struct - type 'a t = 'a lock - - let get t = t.content - let set t x = t.content <- x - let update t f = t.content <- f t.content -end - -let with_lock_as_ref l ~f = - Mutex.lock l.mutex; - try - let x = f l in - Mutex.unlock l.mutex; - x - with e -> - Mutex.unlock l.mutex; - raise e - -let mutex l = l.mutex -let update l f = with_lock l (fun x -> l.content <- f x) - -let update_map l f = - with_lock l (fun x -> - let x', y = f x in - l.content <- x'; - y) - -let get l = - Mutex.lock l.mutex; - let x = l.content in - Mutex.unlock l.mutex; - x - -let set l x = - Mutex.lock l.mutex; - l.content <- x; - Mutex.unlock l.mutex - -let incr l = update l Stdlib.succ -let decr l = update l Stdlib.pred - -let incr_then_get l = - Mutex.lock l.mutex; - l.content <- l.content + 1; - let x = l.content in - Mutex.unlock l.mutex; - x - -let get_then_incr l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- l.content + 1; - Mutex.unlock l.mutex; - x - -let decr_then_get l = - Mutex.lock l.mutex; - l.content <- l.content - 1; - let x = l.content in - Mutex.unlock l.mutex; - x - -let get_then_decr l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- l.content - 1; - Mutex.unlock l.mutex; - x - -let get_then_set l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- true; - Mutex.unlock l.mutex; - x - -let get_then_clear l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- false; - Mutex.unlock l.mutex; - x diff --git a/src/threads/CCLock.mli b/src/threads/CCLock.mli deleted file mode 100644 index 9fcc5c6b8..000000000 --- a/src/threads/CCLock.mli +++ /dev/null @@ -1,94 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Utils around Mutex} - - A value wrapped into a Mutex, for more safety. - - @since 0.8 *) - -[@@@deprecated -"use moonpool or domainslib or saturn, libraries designed for multicore"] - -type 'a t -(** A value surrounded with a lock *) - -val create : 'a -> 'a t -(** Create a new protected value. *) - -val with_lock : 'a t -> ('a -> 'b) -> 'b -(** [with_lock l f] runs [f x] where [x] is the value protected with - the lock [l], in a critical section. If [f x] fails, [with_lock l f] - fails too but the lock is released. *) - -val try_with_lock : 'a t -> ('a -> 'b) -> 'b option -(** [try_with_lock l f] runs [f x] in a critical section if [l] is not - locked. [x] is the value protected by the lock [l]. If [f x] - fails, [try_with_lock l f] fails too but the lock is released. - @since 0.22 *) - -(** Type allowing to manipulate the lock as a reference. - @since 0.13 *) -module LockRef : sig - type 'a t - - val get : 'a t -> 'a - val set : 'a t -> 'a -> unit - val update : 'a t -> ('a -> 'a) -> unit -end - -val with_lock_as_ref : 'a t -> f:('a LockRef.t -> 'b) -> 'b -(** [with_lock_as_ref l f] calls [f] with a reference-like object - that allows to manipulate the value of [l] safely. - The object passed to [f] must not escape the function call. - @since 0.13 *) - -val update : 'a t -> ('a -> 'a) -> unit -(** [update l f] replaces the content [x] of [l] with [f x], atomically. *) - -val update_map : 'a t -> ('a -> 'a * 'b) -> 'b -(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] - and returns [y]. - @since 0.16 *) - -val mutex : _ t -> Mutex.t -(** Underlying mutex. *) - -val get : 'a t -> 'a -(** Atomically get the value in the lock. The value that is returned - isn't protected! *) - -val set : 'a t -> 'a -> unit -(** Atomically set the value. - @since 0.13 *) - -val incr : int t -> unit -(** Atomically increment the value. - @since 0.13 *) - -val decr : int t -> unit -(** Atomically decrement the value. - @since 0.13 *) - -val incr_then_get : int t -> int -(** [incr_then_get x] increments [x], and returns its new value. - @since 0.16 *) - -val get_then_incr : int t -> int -(** [get_then_incr x] increments [x], and returns its previous value. - @since 0.16 *) - -val decr_then_get : int t -> int -(** [decr_then_get x] decrements [x], and returns its new value. - @since 0.16 *) - -val get_then_decr : int t -> int -(** [get_then_decr x] decrements [x], and returns its previous value. - @since 0.16 *) - -val get_then_set : bool t -> bool -(** [get_then_set b] sets [b] to [true], and returns the old value. - @since 0.16 *) - -val get_then_clear : bool t -> bool -(** [get_then_clear b] sets [b] to [false], and returns the old value. - @since 0.16 *) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml deleted file mode 100644 index 03678c1b7..000000000 --- a/src/threads/CCPool.ml +++ /dev/null @@ -1,529 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Thread Pool, and Futures} *) - -type +'a state = Done of 'a | Waiting | Failed of exn - -module type PARAM = sig - val max_size : int - (** Maximum number of threads in the pool *) -end - -exception Stopped - -(** {2 Thread pool} *) -module Make (P : PARAM) = struct - type job = - | Job1 : ('a -> _) * 'a -> job - | Job2 : ('a -> 'b -> _) * 'a * 'b -> job - | Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job - | Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job - - type t = { - mutable stop: bool; (* indicate that threads should stop *) - mutable exn_handler: exn -> unit; - mutex: Mutex.t; - cond: Condition.t; - jobs: job Queue.t; (* waiting jobs *) - mutable cur_size: int; (* total number of threads *) - mutable cur_idle: int; (* number of idle threads *) - } - (** Dynamic, growable thread pool *) - - let nop_ _ = () - - (* singleton pool *) - let pool = - { - stop = false; - exn_handler = nop_; - cond = Condition.create (); - cur_size = 0; - cur_idle = 0; - (* invariant: cur_idle <= cur_size *) - jobs = Queue.create (); - mutex = Mutex.create (); - } - - let set_exn_handler f = pool.exn_handler <- f - - let[@inline] with_lock_ t f = - Mutex.lock t.mutex; - try - let x = f t in - Mutex.unlock t.mutex; - x - with e -> - Mutex.unlock t.mutex; - raise e - - let incr_size_ p = p.cur_size <- p.cur_size + 1 - let decr_size_ p = p.cur_size <- p.cur_size - 1 - let incr_idle_ p = p.cur_idle <- p.cur_idle + 1 - let decr_idle_ p = p.cur_idle <- p.cur_idle - 1 - - (* next thing a thread should do *) - type command = - | Process of job - | Wait - (* wait on condition *) - | Die - (* thread has no work to do *) - - (* thread: seek what to do next (including dying). - Assumes the pool is locked. *) - let get_next_ pool = - (*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*) - if pool.stop then - Die - else ( - match Queue.take pool.jobs with - | exception Queue.Empty -> - if pool.cur_idle > 0 then - (* die: there's already at least one idle thread *) - (*Printf.printf "DIE (idle>0)\n%!";*) - Die - else - (*Printf.printf "WAIT\n%!";*) - Wait - | job -> Process job - ) - - (* heuristic criterion for starting a new thread. *) - let[@inline] can_start_thread_ p = p.cur_size < P.max_size - - (* Thread: entry point. They seek jobs in the queue *) - let rec serve pool = - assert (pool.cur_size <= P.max_size); - assert (pool.cur_size > 0); - Mutex.lock pool.mutex; - let cmd = get_next_ pool in - maybe_start_runner_ pool; - run_cmd pool cmd - - (* run a command *) - and run_cmd pool = function - | Die -> - decr_size_ pool; - Mutex.unlock pool.mutex; - () - | Wait -> - (*Printf.printf "WAIT\n%!";*) - incr_idle_ pool; - Condition.wait pool.cond pool.mutex; - decr_idle_ pool; - Mutex.unlock pool.mutex; - serve pool - | Process job -> - Mutex.unlock pool.mutex; - run_job pool job - - (* execute the job *) - and run_job pool job = - match job with - | Job1 (f, x) -> - (try ignore (f x) with e -> pool.exn_handler e); - serve pool - | Job2 (f, x, y) -> - (try ignore (f x y) with e -> pool.exn_handler e); - serve pool - | Job3 (f, x, y, z) -> - (try ignore (f x y z) with e -> pool.exn_handler e); - serve pool - | Job4 (f, x, y, z, w) -> - (try ignore (f x y z w) with e -> pool.exn_handler e); - serve pool - - and maybe_start_runner_ pool : unit = - if (not (Queue.is_empty pool.jobs)) && can_start_thread_ pool then ( - (* there's room for another thread to start processing jobs, - starting with [Queue.pop pool.jobs] *) - let job' = Queue.pop pool.jobs in - launch_worker_on_ pool job' - ) - - and[@inline] launch_worker_on_ pool job = - incr_size_ pool; - ignore (Thread.create (run_job pool) job) - - let run_job job = - (* acquire lock and push job in queue, or start thread directly - if the queue is empty *) - with_lock_ pool (fun pool -> - if pool.stop then raise Stopped; - if - Queue.is_empty pool.jobs && can_start_thread_ pool - && pool.cur_idle = 0 - then - (* create the thread now, on [job], since no other job in - the queue takes precedence. - We do not want to wait for the busy threads to do our task - if we are allowed to spawn a new thread, and no thread is - just idle waiting for new jobs. *) - launch_worker_on_ pool job - else if pool.cur_idle > 0 then ( - (* at least one idle thread, wake it up *) - Queue.push job pool.jobs; - Condition.broadcast pool.cond (* wake up some worker *) - ) else ( - Queue.push job pool.jobs; - - (* we might still be able to start another thread to help the - active ones (none is idle). This thread is not necessarily - going to process [job], but rather the first job in the queue *) - if can_start_thread_ pool then ( - let job' = Queue.pop pool.jobs in - launch_worker_on_ pool job' - ) - )) - - (* run the function on the argument in the given pool *) - let run1 f x = run_job (Job1 (f, x)) - let run f = run1 f () - let run2 f x y = run_job (Job2 (f, x, y)) - let run3 f x y z = run_job (Job3 (f, x, y, z)) - let run4 f x y z w = run_job (Job4 (f, x, y, z, w)) - let active () = not pool.stop - - (* kill threads in the pool *) - let stop () = - with_lock_ pool (fun p -> - p.stop <- true; - Queue.clear p.jobs; - Condition.broadcast p.cond (* wait up idlers *)) - - (* stop threads if pool is GC'd *) - let () = Gc.finalise (fun _ -> stop ()) pool - - (** {6 Futures} *) - module Fut = struct - type 'a handler = 'a state -> unit - - type 'a cell = { - mutable state: 'a state; - mutable handlers: 'a handler list; (* handlers *) - f_mutex: Mutex.t; - condition: Condition.t; - } - (** A proper future, with a delayed computation *) - - (** A future value of type 'a *) - type 'a t = Return of 'a | FailNow of exn | Run of 'a cell - - type 'a future = 'a t - - (** {2 Basic Future functions} *) - - let return x = Return x - let fail e = FailNow e - - let create_cell () = - { - state = Waiting; - handlers = []; - f_mutex = Mutex.create (); - condition = Condition.create (); - } - - let with_lock_ cell f = - Mutex.lock cell.f_mutex; - try - let x = f cell in - Mutex.unlock cell.f_mutex; - x - with e -> - Mutex.unlock cell.f_mutex; - raise e - - (* TODO: exception handler for handler errors *) - - let set_done_ cell x = - with_lock_ cell (fun cell -> - match cell.state with - | Waiting -> - (* set state and signal *) - cell.state <- Done x; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) - - let set_fail_ cell e = - with_lock_ cell (fun cell -> - match cell.state with - | Waiting -> - cell.state <- Failed e; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) - - (* calls [f x], and put result or exception in [cell] *) - let run_and_set1 cell f x = - try - let y = f x in - set_done_ cell y - with e -> set_fail_ cell e - - let run_and_set2 cell f x y = - try - let z = f x y in - set_done_ cell z - with e -> set_fail_ cell e - - let make1 f x = - let cell = create_cell () in - run3 run_and_set1 cell f x; - Run cell - - let make f = make1 f () - - let make2 f x y = - let cell = create_cell () in - run4 run_and_set2 cell f x y; - Run cell - - let get = function - | Return x -> x - | FailNow e -> raise e - | Run cell -> - let rec get_ cell = - match cell.state with - | Waiting -> - Condition.wait cell.condition cell.f_mutex; - (* wait *) - get_ cell - | Done x -> x - | Failed e -> raise e - in - with_lock_ cell get_ - - (* access the result without locking *) - let get_nolock_ = function - | Return x | Run { state = Done x; _ } -> x - | FailNow _ | Run { state = Failed _ | Waiting; _ } -> assert false - - let state = function - | Return x -> Done x - | FailNow e -> Failed e - | Run cell -> with_lock_ cell (fun cell -> cell.state) - - let is_not_waiting = function - | Waiting -> false - | Failed _ | Done _ -> true - - let is_done = function - | Return _ | FailNow _ -> true - | Run cell -> with_lock_ cell (fun c -> is_not_waiting c.state) - - (** {2 Combinators *) - - let add_handler_ cell f = - with_lock_ cell (fun cell -> - match cell.state with - | Waiting -> cell.handlers <- f :: cell.handlers - | Done _ | Failed _ -> f cell.state) - - let on_finish fut k = - match fut with - | Return x -> k (Done x) - | FailNow e -> k (Failed e) - | Run cell -> add_handler_ cell k - - let on_success fut k = - on_finish fut (function - | Done x -> k x - | _ -> ()) - - let on_failure fut k = - on_finish fut (function - | Failed e -> k e - | _ -> ()) - - let map_cell_ ~async f cell ~into:cell' = - add_handler_ cell (function - | Done x -> - if async then - run3 run_and_set1 cell' f x - else - run_and_set1 cell' f x - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let map_ ~async f fut = - match fut with - | Return x -> - if async then - make1 f x - else - Return (f x) - | FailNow e -> FailNow e - | Run cell -> map_cell_ ~async f cell ~into:(create_cell ()) - - let map f fut = map_ ~async:false f fut - let map_async f fut = map_ ~async:true f fut - - let app_ ~async f x = - match f, x with - | Return f, Return x -> - if async then - make1 f x - else - Return (f x) - | FailNow e, _ | _, FailNow e -> FailNow e - | Return f, Run x -> - map_cell_ ~async (fun x -> f x) x ~into:(create_cell ()) - | Run f, Return x -> - map_cell_ ~async (fun f -> f x) f ~into:(create_cell ()) - | Run f, Run x -> - let cell' = create_cell () in - add_handler_ f (function - | Done f -> ignore (map_cell_ ~async f x ~into:cell') - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let app f x = app_ ~async:false f x - let app_async f x = app_ ~async:true f x - - let monoid_product f x y = - match x, y with - | Return x, Return y -> Return (f x y) - | FailNow e, _ | _, FailNow e -> FailNow e - | Return x, Run y -> - map_cell_ ~async:false (fun y -> f x y) y ~into:(create_cell ()) - | Run x, Return y -> - map_cell_ ~async:false (fun x -> f x y) x ~into:(create_cell ()) - | Run x, Run y -> - let cell' = create_cell () in - add_handler_ x (function - | Done x -> - ignore (map_cell_ ~async:false (fun y -> f x y) y ~into:cell') - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let flat_map f fut = - match fut with - | Return x -> f x - | FailNow e -> FailNow e - | Run cell -> - let cell' = create_cell () in - add_handler_ cell (function - | Done x -> - let fut' = f x in - on_finish fut' (function - | Done y -> set_done_ cell' y - | Failed e -> set_fail_ cell' e - | Waiting -> assert false) - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let and_then fut f = flat_map (fun _ -> f ()) fut - - type _ array_or_list = - | A_ : 'a array -> 'a array_or_list - | L_ : 'a list -> 'a array_or_list - - let iter_aol : type a. a array_or_list -> (a -> unit) -> unit = - fun aol f -> - match aol with - | A_ a -> Array.iter f a - | L_ l -> List.iter f l - - (* [sequence_ l f] returns a future that waits for every element of [l] - to return of fail, and call [f ()] to obtain the result (as a closure) - in case every element succeeded (otherwise a failure is - returned automatically) *) - let sequence_ : type a res. a t array_or_list -> (unit -> res) -> res t = - fun aol f -> - let n = - match aol with - | A_ a -> Array.length a - | L_ l -> List.length l - in - assert (n > 0); - let cell = create_cell () in - let n_err = CCLock.create 0 in - (* number of failed threads *) - let n_ok = CCLock.create 0 in - (* number of succeeding threads *) - iter_aol aol (fun fut -> - on_finish fut (function - | Failed e -> - let x = CCLock.incr_then_get n_err in - (* if first failure, then seal [cell]'s fate now *) - if x = 1 then set_fail_ cell e - | Done _ -> - let x = CCLock.incr_then_get n_ok in - (* if [n] successes, then [cell] succeeds. Otherwise, some - job has not finished or some job has failed. *) - if x = n then ( - let res = f () in - set_done_ cell res - ) - | Waiting -> assert false)); - Run cell - - (* map an array of futures to a future array *) - let sequence_a a = - match a with - | [||] -> return [||] - | [| x |] -> map (fun x -> [| x |]) x - | _ -> sequence_ (A_ a) (fun () -> Array.map get_nolock_ a) - - let map_a f a = sequence_a (Array.map f a) - - let sequence_l l = - match l with - | [] -> return [] - | _ :: _ -> - let l = List.rev l in - sequence_ (L_ l) (fun () -> List.rev_map get_nolock_ l) - - (* reverse twice *) - let map_l f l = - match l with - | [] -> return [] - | _ -> - let l = List.rev_map f l in - sequence_ (L_ l) (fun () -> List.rev_map get_nolock_ l) - - let choose_ : type a. a t array_or_list -> a t = - fun aol -> - let cell = create_cell () in - let is_done = CCLock.create false in - iter_aol aol (fun fut -> - on_finish fut (fun res -> - match res with - | Waiting -> assert false - | Done x -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_done_ cell x - | Failed e -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_fail_ cell e)); - Run cell - - let choose_a a = choose_ (A_ a) - let choose_l l = choose_ (L_ l) - let sleep time = make1 Thread.delay time - - module Infix = struct - let ( >>= ) x f = flat_map f x - let ( >> ) a f = and_then a f - let ( >|= ) a f = map f a - let ( <*> ) = app - let ( let+ ) = ( >|= ) - let ( let* ) = ( >>= ) - let[@inline] ( and+ ) a1 a2 = monoid_product (fun x y -> x, y) a1 a2 - let ( and* ) = ( and+ ) - end - - include Infix - end -end diff --git a/src/threads/CCPool.mli b/src/threads/CCPool.mli deleted file mode 100644 index ad159480e..000000000 --- a/src/threads/CCPool.mli +++ /dev/null @@ -1,162 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Thread Pool, and Futures} - - Renamed and heavily updated from [CCFuture]. - @since 0.16 *) - -[@@@deprecated -"use moonpool or domainslib or saturn, libraries designed for multicore"] - -type +'a state = Done of 'a | Waiting | Failed of exn - -module type PARAM = sig - val max_size : int - (** Maximum number of threads in the pool. *) -end - -exception Stopped - -[@@@ocaml.warning "-67"] - -(** {2 Create a new Pool} *) -module Make (P : PARAM) : sig - val run : (unit -> _) -> unit - (** [run f] schedules [f] for being executed in the thread pool. *) - - val run1 : ('a -> _) -> 'a -> unit - (** [run1 f x] is similar to [run (fun () -> f x)]. *) - - val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit - val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit - val set_exn_handler : (exn -> unit) -> unit - - val active : unit -> bool - (** [active ()] is true as long as [stop()] has not been called yet. *) - - val stop : unit -> unit - (** After calling [stop ()], most functions will raise Stopped. - This has the effect of preventing new tasks from being executed. *) - - (** {4 Futures} - - The futures are registration points for callbacks, storing a {!state}, - that are executed in the pool using {!run}. *) - module Fut : sig - type 'a t - (** A future value of type ['a] *) - - type 'a future = 'a t - - (** {2 Constructors} *) - - val return : 'a -> 'a t - (** Future that is already computed. *) - - val fail : exn -> 'a t - (** Future that fails immediately. *) - - val make : (unit -> 'a) -> 'a t - (** Create a future, representing a value that will be computed by - the function. If the function raises, the future will fail. *) - - val make1 : ('a -> 'b) -> 'a -> 'b t - val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t - - (** {2 Basics} *) - - val get : 'a t -> 'a - (** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned. - Raise e if the future failed with e. *) - - val state : 'a t -> 'a state - (** State of the future. *) - - val is_done : 'a t -> bool - (** Is the future evaluated (success/failure)? *) - - (** {2 Combinators} *) - - val on_success : 'a t -> ('a -> unit) -> unit - (** Attach a handler to be called upon success. - The handler should not call functions on the future. - Might be evaluated now if the future is already done. *) - - val on_failure : _ t -> (exn -> unit) -> unit - (** Attach a handler to be called upon failure. - The handler should not call any function on the future. - Might be evaluated now if the future is already done. *) - - val on_finish : 'a t -> ('a state -> unit) -> unit - (** Attach a handler to be called when the future is evaluated. - The handler should not call functions on the future. - Might be evaluated now if the future is already done. *) - - val flat_map : ('a -> 'b t) -> 'a t -> 'b t - (** Monadic combination of futures. *) - - val and_then : 'a t -> (unit -> 'b t) -> 'b t - (** Wait for the first future to succeed, then launch the second. *) - - val sequence_a : 'a t array -> 'a array t - (** Future that waits for all previous futures to terminate. If any future - in the array fails, [sequence_a l] fails too. *) - - val map_a : ('a -> 'b t) -> 'a array -> 'b array t - (** [map_a f a] maps [f] on every element of [a], and will return - the array of every result if all calls succeed, or an error otherwise. *) - - val sequence_l : 'a t list -> 'a list t - (** Future that waits for all previous futures to terminate. If any future - in the list fails, [sequence_l l] fails too. *) - - val map_l : ('a -> 'b t) -> 'a list -> 'b list t - (** [map_l f l] maps [f] on every element of [l], and will return - the list of every result if all calls succeed, or an error otherwise. *) - - val choose_a : 'a t array -> 'a t - (** Choose among those futures (the first to terminate). Behaves like - the first future that terminates, by failing if the future fails. *) - - val choose_l : 'a t list -> 'a t - (** Choose among those futures (the first to terminate). Behaves like - the first future that terminates, by failing if the future fails. *) - - val map : ('a -> 'b) -> 'a t -> 'b t - (** Map the value inside the future. The function doesn't run in its - own task; if it can take time, use {!flat_map} or {!map_async}. *) - - val map_async : ('a -> 'b) -> 'a t -> 'b t - (** Map the value inside the future, to be computed in a separated job. *) - - val monoid_product : ('a -> 'b -> 'c) -> 'a t -> 'b t -> 'c t - (** Cartesian product of the content of these futures. - @since 2.8 *) - - val app : ('a -> 'b) t -> 'a t -> 'b t - (** [app f x] applies the result of [f] to the result of [x]. *) - - val app_async : ('a -> 'b) t -> 'a t -> 'b t - (** [app_async f x] applies the result of [f] to the result of [x], in - a separated job scheduled in the pool. *) - - val sleep : float -> unit t - (** Future that returns with success in the given amount of seconds. Blocks - the thread! If you need to wait on many events, consider - using {!CCTimer}. *) - - module Infix : sig - val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t - val ( >> ) : 'a t -> (unit -> 'b t) -> 'b t - val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t - val ( <*> ) : ('a -> 'b) t -> 'a t -> 'b t - val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t - val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t - val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t - val ( and* ) : 'a t -> 'b t -> ('a * 'b) t - end - - include module type of Infix - end -end diff --git a/src/threads/CCSemaphore.ml b/src/threads/CCSemaphore.ml deleted file mode 100644 index b5d76c088..000000000 --- a/src/threads/CCSemaphore.ml +++ /dev/null @@ -1,53 +0,0 @@ -(** {1 Semaphores} *) - -type t = { mutable n: int; mutex: Mutex.t; cond: Condition.t } - -let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; mutex = Mutex.create (); cond = Condition.create () } - -let get t = t.n - -(* assume [t.mutex] locked, try to acquire [t] *) -let acquire_once_locked_ m t = - while t.n < m do - Condition.wait t.cond t.mutex - done; - assert (t.n >= m); - t.n <- t.n - m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - -let acquire m t = - Mutex.lock t.mutex; - acquire_once_locked_ m t - -(* assume [t.mutex] locked, try to release [t] *) -let release_once_locked_ m t = - t.n <- t.n + m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - -let release m t = - Mutex.lock t.mutex; - release_once_locked_ m t; - () - -let with_acquire ~n t ~f = - acquire n t; - try - let x = f () in - release n t; - x - with e -> - release n t; - raise e - -let wait_until_at_least ~n t ~f = - Mutex.lock t.mutex; - while t.n < n do - Condition.wait t.cond t.mutex - done; - assert (t.n >= n); - Mutex.unlock t.mutex; - f () diff --git a/src/threads/CCSemaphore.mli b/src/threads/CCSemaphore.mli deleted file mode 100644 index 27e6abb4e..000000000 --- a/src/threads/CCSemaphore.mli +++ /dev/null @@ -1,33 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Semaphores} - - @since 0.13 *) - -[@@@deprecated "use the stdlib's Semaphore module"] - -type t -(** A semaphore *) - -val create : int -> t -(** [create n] creates a semaphore with initial value [n]. - @raise Invalid_argument if [n <= 0]. *) - -val get : t -> int -(** Current value. *) - -val acquire : int -> t -> unit -(** [acquire n s] blocks until [get s >= n], then atomically - sets [s := !s - n]. *) - -val release : int -> t -> unit -(** [release n s] atomically sets [s := !s + n]. *) - -val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a -(** [with_acquire ~n s ~f] first acquires [s] with [n] units, - calls [f ()], and then releases [s] with [n] units. - Safely release the semaphore even if [f ()] fails. *) - -val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a -(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()] - and returns its result. Doesn't modify the semaphore. *) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml deleted file mode 100644 index 40566d38f..000000000 --- a/src/threads/CCThread.ml +++ /dev/null @@ -1,52 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Threads} *) - -type t = Thread.t - -let spawn f = Thread.create f () -let spawn1 f x = Thread.create f x -let spawn2 f x y = Thread.create (fun () -> f x y) () -let detach f = ignore (Thread.create f ()) - -let finally_ f x ~h = - try - let res = f x in - ignore (h ()); - res - with e -> - ignore (h ()); - raise e - -module Arr = struct - let spawn n f = Array.init n (fun i -> Thread.create f i) - let join a = Array.iter Thread.join a -end - -module Barrier = struct - type t = { lock: Mutex.t; cond: Condition.t; mutable activated: bool } - - let create () = - { lock = Mutex.create (); cond = Condition.create (); activated = false } - - let with_lock_ b f = - Mutex.lock b.lock; - finally_ f () ~h:(fun () -> Mutex.unlock b.lock) - - let reset b = with_lock_ b (fun () -> b.activated <- false) - - let wait b = - with_lock_ b (fun () -> - while not b.activated do - Condition.wait b.cond b.lock - done) - - let activate b = - with_lock_ b (fun () -> - if not b.activated then ( - b.activated <- true; - Condition.broadcast b.cond - )) - - let activated b = with_lock_ b (fun () -> b.activated) -end diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli deleted file mode 100644 index 08965ce8c..000000000 --- a/src/threads/CCThread.mli +++ /dev/null @@ -1,61 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Threads} - - {b status: unstable} - @since 0.13 *) - -[@@@deprecated -"use moonpool or domainslib or saturn, libraries designed for multicore"] - -type t = Thread.t - -val spawn : (unit -> _) -> t -(** [spawn f] creates a new thread that runs [f ()]. *) - -val spawn1 : ('a -> _) -> 'a -> t -(** [spawn1 f x] is like [spawn (fun () -> f x)]. - @since 0.16 *) - -val spawn2 : ('a -> 'b -> _) -> 'a -> 'b -> t -(** [spawn2 f x y] is like [spawn (fun () -> f x y)]. - @since 0.16 *) - -val detach : (unit -> 'a) -> unit -(** [detach f] is the same as [ignore (spawn f)]. *) - -(** {2 Array of threads} *) -module Arr : sig - val spawn : int -> (int -> 'a) -> t array - (** [Arr.spawn n f] creates an array [res] of length [n], such that - [res.(i) = spawn (fun () -> f i)]. *) - - val join : t array -> unit - (** [Arr.join a] joins every thread in [a]. *) -end - -(** {2 Single-Use Barrier} *) - -module Barrier : sig - type t - (** Barrier, used to synchronize threads *) - - val create : unit -> t - (** Create a barrier. *) - - val reset : t -> unit - (** Reset to initial (non-triggered) state. *) - - val wait : t -> unit - (** [wait b] waits for barrier [b] to be activated by [activate b]. - All threads calling this wait until [activate b] is called. - If [b] is already activated, [wait b] does nothing. *) - - val activate : t -> unit - (** [activate b] unblocks all threads that were waiting on [b]. *) - - val activated : t -> bool - (** [activated b] returns [true] iff [activate b] was called, and [reset b] - was not called since. In other words, [activated b = true] means - [wait b] will not block. *) -end diff --git a/src/threads/CCTimer.ml b/src/threads/CCTimer.ml deleted file mode 100644 index 3f9b59872..000000000 --- a/src/threads/CCTimer.ml +++ /dev/null @@ -1,162 +0,0 @@ -type job = Job : float * (unit -> 'a) -> job - -let ( <= ) (a : float) b = Stdlib.( <= ) a b -let ( >= ) (a : float) b = Stdlib.( >= ) a b -let ( < ) (a : float) b = Stdlib.( < ) a b -let ( > ) (a : float) b = Stdlib.( > ) a b - -module TaskHeap = CCHeap.Make (struct - type t = job - - let leq (Job (f1, _)) (Job (f2, _)) = f1 <= f2 -end) - -exception Stopped - -type t = { - mutable stop: bool; - mutable tasks: TaskHeap.t; - mutable exn_handler: exn -> unit; - t_mutex: Mutex.t; - fifo_in: Unix.file_descr; - fifo_out: Unix.file_descr; -} - -let set_exn_handler timer f = timer.exn_handler <- f -let standby_wait = 10. -(* when no task is scheduled, this is the amount of time that is waited - in a row for something to happen. This is also the maximal delay - between the call to {!stop} and the actual termination of the - thread. *) - -let epsilon = 0.0001 -(* accepted time diff for actions. *) - -let with_lock_ t f = - Mutex.lock t.t_mutex; - try - let x = f t in - Mutex.unlock t.t_mutex; - x - with e -> - Mutex.unlock t.t_mutex; - raise e - -type command = Quit | Run : (unit -> _) -> command | Wait of float - -let pop_task_ t = - let tasks, _ = TaskHeap.take_exn t.tasks in - t.tasks <- tasks - -let call_ timer f = try ignore (f ()) with e -> timer.exn_handler e - -(* check next task *) -let next_task_ timer = - match TaskHeap.find_min timer.tasks with - | _ when timer.stop -> Quit - | None -> Wait standby_wait - | Some (Job (time, f)) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time then ( - (* now! *) - pop_task_ timer; - Run f - ) else - Wait (time -. now) - -(* The main thread function: wait for next event, run it, and loop *) -let serve timer = - let buf = Bytes.make 1 '_' in - (* acquire lock, call [process_task] and do as it commands *) - let rec next () = - match with_lock_ timer next_task_ with - | Quit -> () - | Run f -> - call_ timer f; - (* call outside of any lock *) - next () - | Wait delay -> wait delay - (* wait for [delay] seconds, or until something happens on [fifo_in] *) - and wait delay = - ignore (Unix.select [ timer.fifo_in ] [] [] delay : _ * _ * _); - (* remove char from fifo, so that next write can happen *) - (try ignore (Unix.read timer.fifo_in buf 0 1 : int) with _ -> ()); - next () - in - next () - -let nop_handler_ _ = () - -let create () = - let fifo_in, fifo_out = Unix.pipe () in - Unix.set_nonblock fifo_in; - let timer = - { - stop = false; - tasks = TaskHeap.empty; - exn_handler = nop_handler_; - t_mutex = Mutex.create (); - fifo_in; - fifo_out; - } - in - (* start a thread to process tasks *) - let _t = Thread.create serve timer in - timer - -let underscore_ = Bytes.make 1 '_' - -(* awake the thread *) -let awaken_ timer = ignore (Unix.single_write timer.fifo_out underscore_ 0 1) - -(** [at s t ~f] will run [f ()] at the Unix echo [t] *) -let at timer time ~f = - if timer.stop then raise Stopped; - let now = Unix.gettimeofday () in - if now >= time then - call_ timer f - else - with_lock_ timer (fun timer -> - if timer.stop then raise Stopped; - (* time of the next scheduled event *) - let next_time = - match TaskHeap.find_min timer.tasks with - | None -> max_float - | Some (Job (d, _)) -> d - in - (* insert task *) - timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; - (* see if the timer thread needs to be awaken earlier *) - if time < next_time then awaken_ timer) - -let after timer delay ~f = - assert (delay >= 0.); - let now = Unix.gettimeofday () in - at timer (now +. delay) ~f - -exception ExitEvery - -let every ?delay timer d ~f = - let rec run () = - try - ignore (f ()); - schedule () - with ExitEvery -> () - (* stop *) - and schedule () = after timer d ~f:run in - match delay with - | None -> run () - | Some d -> after timer d ~f:run - -let active timer = not timer.stop - -(** Stop the given timer, cancelling pending tasks *) -let stop timer = - with_lock_ timer (fun timer -> - if not timer.stop then ( - timer.stop <- true; - (* empty heap of tasks *) - timer.tasks <- TaskHeap.empty; - (* tell the thread to stop *) - awaken_ timer - )) diff --git a/src/threads/CCTimer.mli b/src/threads/CCTimer.mli deleted file mode 100644 index 0ef4d1312..000000000 --- a/src/threads/CCTimer.mli +++ /dev/null @@ -1,43 +0,0 @@ -(** Event timer - - Used to be part of [CCFuture]. - @since 0.16 *) - -[@@@deprecated -"use moonpool or domainslib or saturn, libraries designed for multicore"] - -type t -(** A scheduler for events. It runs in its own thread. *) - -val create : unit -> t -(** A new timer. *) - -val set_exn_handler : t -> (exn -> unit) -> unit -(** [set_exn_handler timer f] registers [f] so that any exception - raised by a task scheduled in [timer] is given to [f]. *) - -exception Stopped - -val after : t -> float -> f:(unit -> _) -> unit -(** Call the callback [f] after the given number of seconds. - @raise Stopped if the timer was stopped. *) - -val at : t -> float -> f:(unit -> _) -> unit -(** Create a future that evaluates to [()] at the given Unix timestamp. - @raise Stopped if the timer was stopped. *) - -exception ExitEvery - -val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit -(** [every timer n ~f] calls [f ()] every [n] seconds. - [f()] can raise ExitEvery to stop the cycle. - @param delay if provided, the first call to [f ()] is delayed by - that many seconds. - @raise Stopped if the timer was stopped. *) - -val stop : t -> unit -(** Stop the given timer, cancelling pending tasks. Idempotent. - From now on, calling most other operations on the timer will raise Stopped. *) - -val active : t -> bool -(** Return [true] until [stop t] has been called. *) diff --git a/src/threads/dune b/src/threads/dune deleted file mode 100644 index 0aafb8edb..000000000 --- a/src/threads/dune +++ /dev/null @@ -1,12 +0,0 @@ -(library - (name containers_thread) - (public_name containers-thread) - (synopsis "DEPRECATED library for threading") - (wrapped false) - (optional) - (flags :standard -warn-error -a+8 -w -32 -safe-string) - (preprocess - (action - (run %{project_root}/src/core/cpp/cpp.exe %{input-file}))) - (libraries containers threads)) -