-
Notifications
You must be signed in to change notification settings - Fork 0
/
future.c
162 lines (123 loc) · 3.93 KB
/
future.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include "future.h"
#include "threadpool.h"
#include "err.h"
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
enum error {
malloc_failed = -1
};
typedef void *(*function_t)(void *);
typedef struct input {
future_t* fut;
callable_t call;
} input_t;
void initFuture(future_t* future) {
int err;
if ((err = pthread_mutex_init(&future->lock, NULL)) != 0)
syserr(err, "failed mutex init");
if ((err = pthread_cond_init(&future->await, NULL)) != 0)
syserr(err, "failed cond init");
future->solution = NULL;
future->mappedFut = NULL;
future->isMapped = false;
future->mappedPool = NULL;
}
void* fun_wrapper(void *arg, size_t argsz);
runnable_t makeRunnable(callable_t callable, future_t* future) {
runnable_t run;
run.argsz = callable.argsz;
input_t* in = (input_t*)malloc(sizeof(input_t));
if (!in)
return run;
in->fut = future;
in->call = callable;
run.arg = in;
run.function = (void *)fun_wrapper;
return run;
}
// Wraps a function in arg so it signals when it finishes computing.
void* fun_wrapper(void *arg, size_t argsz) {
int err;
input_t* in = (input_t*)arg;
callable_t call = in->call;
future_t* fut = in->fut;
size_t* argP = (size_t*)malloc(argsz);
if (!argP)
syserr(malloc_failed, "malloc failed");
if ((err = pthread_mutex_lock(&fut->lock)) != 0)
syserr(err, "mutex lock in fun_wrapper failed");
void* temp = call.function(call.arg, call.argsz, argP);
fut->solution = temp;
free(argP);
if (fut->isMapped) {
fut->mappedCall.arg = temp;
fut->mappedCall.argsz = sizeof(temp);
runnable_t run = makeRunnable(fut->mappedCall, fut->mappedFut);
if ((err = defer(fut->mappedPool, run) != 0))
syserr(err, "defer map to pool failed");
}
free(in);
if ((err = pthread_cond_signal(&fut->await)) != 0)
syserr (err, "cond signal failed");
if ((err = pthread_mutex_unlock(&fut->lock)) != 0)
syserr(err, "mutex unlock failed");
return NULL;
}
int async(thread_pool_t *pool, future_t *future, callable_t callable) {
initFuture(future);
runnable_t run = makeRunnable(callable, future);
if (!run.arg)
return malloc_failed;
return defer(pool, run);
}
void *await(future_t *future) {
int err;
if ((err = pthread_mutex_lock(&future->lock)) != 0)
syserr(err, "await lock failed");
if (!future->solution) {
if ((err = pthread_cond_wait(&future->await, &future->lock)) != 0)
syserr(err, "cond wait failed");
}
if ((err = pthread_mutex_unlock(&future->lock)) != 0)
syserr(err, "await unlock failed");
return future->solution;
}
int map(thread_pool_t *pool, future_t *future, future_t *from,
void *(*function)(void *, size_t, size_t *)) {
int err;
if ((err = pthread_mutex_lock(&from->lock)) != 0)
syserr(err, "mutex lock in map failed");
initFuture(future);
if (from->solution) {
callable_t call;
call.arg = from->solution;
call.argsz = sizeof(from->solution);
call.function = function;
runnable_t run = makeRunnable(call, future);
if (!run.arg) {
if ((err = pthread_mutex_unlock(&from->lock)) != 0)
syserr(err, "mutex unlock failed");
return malloc_failed;
}
err = defer(pool, run);
int ret = err;
if ((err = pthread_mutex_unlock(&from->lock)) != 0)
syserr(err, "mutex unlock failed");
return ret;
} else {
from->isMapped = true;
from->mappedFut = future;
from->mappedPool = pool;
callable_t call;
call.function = function;
from->mappedCall = call;
}
if ((err = pthread_mutex_unlock(&from->lock)) != 0)
syserr(err, "mutex unlock failed");
return 0;
}