-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchannel.h
99 lines (82 loc) · 1.7 KB
/
channel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#pragma once
#include <cassert>
#include <condition_variable>
#include <cstdlib>
#include <mutex>
#include <type_traits>
#include <utility>
#include <optional>
template <typename T>
class channel {
public:
void put(T&&);
std::optional<T> get();
inline void close() noexcept;
private:
enum state_t : uint8_t {
Empty,
Filled,
Closed,
};
std::mutex mw_, mh_;
std::condition_variable wait_, hand_;
state_t state_{Empty};
std::optional<T> store_;
};
template<typename T>
void channel<T>::close() noexcept {
{
std::lock_guard<std::mutex> lk(mw_);
assert(state_ != Closed);
state_ = Closed;
}
wait_.notify_all();
hand_.notify_all();
}
template<typename T>
void channel<T>::put(T && v) {
std::unique_lock<std::mutex> lw(mw_);
wait_.wait(lw, [this]() { return state_ != Filled; });
switch (state_) {
case Closed:
assert(false);
std::terminate();
case Empty:
store_ = std::move(v);
state_ = Filled;
lw.unlock();
wait_.notify_one();
{
// wait for reader
std::unique_lock<std::mutex> lh(mh_);
hand_.wait(lh, [this]() { return !store_; });
lh.unlock();
}
break;
case Filled:
assert(false);
}
}
template<typename T>
std::optional<T> channel<T>::get() {
std::optional<T> v;
std::unique_lock<std::mutex> lw(mw_);
wait_.wait(lw, [this]() { return state_ != Empty; });
switch (state_) {
case Closed:
break;
case Filled:
{
std::lock_guard<std::mutex> lh(mh_);
v.swap(store_);
state_ = Empty;
// notify writer
hand_.notify_one();
lw.unlock();
break;
}
case Empty:
assert(false);
}
return v;
}