-
Notifications
You must be signed in to change notification settings - Fork 2
/
parallel-futures.lisp
85 lines (76 loc) · 2.95 KB
/
parallel-futures.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
(defpackage "PARALLEL-FUTURE"
(:use "CL" "SB-EXT")
(:export "*CONTEXT*" "WITH-CONTEXT"
"FUTURE" "P"
"MAKE"))
;;; Parallel futures: hooking up futures with the work queue
;;;
;;; A parallel is a future with an execution in three periods:
;;; - a list designator of setup functions
;;; - a vector of subtasks to execute in parallel
;;; - a list designator of cleanup functions
;;;
;;; When a parallel future is ready for execution, a task that
;;; executes the setup functions and pushes the subtasks to
;;; the local stack is created. That task is enqueued or pushed
;;; to the local stack.
;;; Once the setup functions have been executed, the subtasks are
;;; pushed as a bulk-task.
;;; Once the bulk-task is completed, the cleanup functions are executed,
;;; and the future is marked as done.
(in-package "PARALLEL-FUTURE")
(defvar *context* (work-queue:make 2))
(defmacro with-context ((count) &body body)
(let ((context (gensym "CONTEXT")))
`(let* ((,context nil))
(unwind-protect (progn
(setf ,context (work-queue:make ,count))
(let ((*context* ,context))
,@body))
(when ,context
(work-queue:stop ,context))))))
(defstruct (future
(:include future:future))
(setup nil :type (or list symbol function)))
(declaim (inline p))
(defun p (x)
(future-p x))
(defun map-list-designator (functions argument)
(etypecase functions
(null)
(list
(dolist (function functions)
(funcall function argument)))
((or symbol function)
(funcall functions argument)))
nil)
(defun future-push-self (future)
(declare (type future future))
(let ((setup (future-setup future)))
(setf (future-setup future) nil)
(work-queue:push-self
(lambda ()
(map-list-designator setup future)
(cond ((plusp (future-waiting future))
(work-queue:push-self future))
((zerop (future-remaining future))
(map-list-designator (future-cleanup future) future)
(setf (future-cleanup future) nil))
(t (error "Mu?"))))
(or (work-queue:current-queue)
*context*))))
(defun make (dependencies setup subtasks cleanup &optional constructor &rest arguments)
(declare (type simple-vector dependencies subtasks))
(let* ((count (length subtasks))
(future (apply (or constructor #'make-future)
:function #'future-push-self
:dependencies dependencies
:setup setup
:subtasks subtasks
:waiting count
:remaining count
:cleanup (if (listp cleanup)
(append cleanup (list #'future:mark-done))
(list cleanup #'future:mark-done))
arguments)))
(future:mark-dependencies future)))