forked from longst/LWMessageQueue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLWMessageQueue.h
308 lines (251 loc) · 12.4 KB
/
LWMessageQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
/*
The MIT License (MIT)
Copyright (c) 2015 Marcus Spangenberg
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <assert.h>
#include <atomic>
#include <stdint.h>
namespace LWMessageQueue {
/**
@brief
A static size message queue used to send messages from many input threads to a single output thread. Input
and output operations are thread safe and wait-free as long as there is exactly one thread consuming messages.
Messages are user defined POD type structs.
@details
Messages are passed through ThreadChannels. Each thread producing messages (input thread) gets one
ThreadChannelInput instance each, and the single consumer (output) thread gets one ThreadChannelOutput instance
per input thread. A good design pattern could be to let the consumer thread own the LWMessageQueue instance,
and only pass ThreadChannelInput instances to the producer threads. If you need more consumers, simply create
an LWMessageQueue instance per consumer thread.
It is up to the user to make sure not to push messages to a full channel. The channel (SIZE) must be
dimensioned so that it never overflows. In debug builds, an assert will be hit if the channel is full when
pushing new messages.
It is also up to the user to never pop messages from an empty channel. This should be done by first
getting the number of pending messages from the output channel and then popping exactly that many messages.
This also makes sure the output thread will finish popping messages. In debug builds, an assert will be hit
if the channel is empty when popping a message.
Messages are defined as POD type structs, and a union of those structs is passed as a template parameter
(MESSAGE) to LWMessageQueue. Push and pop operations copy the message data, so structs should be small enuough
so that this still is a cheap operation.
Messages of any type (from the MESSAGE union) can be pushed to an input channel. When popping messages you get
an LWMessageQueue<>::MessageContainer instance. To get the actual message from the container, first call
messageContainer.getType() to determine the type, comparing it with the supplied TYPES enum values, and then
call messageContainer.getMessage<TYPE>() to get the message data cast to the correct POD type struct.
See the Example/Message.h and Example/example.cpp for more details on how to use LWMessageQueue and how to
define messages.
Template parameters:
SIZE is the number of allowed pending messages in one channel.
CHANNELS is the number of channels, i.e. the number of input/producer threads.
MESSAGE should be a union of all available Message types. Message types are POD type structs with message
specific data fields. See example message definitions and usage in Example/Message.h and Example/example.cpp.
TYPES should be a enum class with one entry per message type. See Example/Message.h and Example/example.cpp for
types definition.
*/
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
class LWMessageQueue {
private:
class ThreadChannel;
public:
/** Used for message storage in the queue. When you pop a message from an output channel, you get instances
of this type.
*/
class MessageContainer {
public:
/** Get a reference to the message data, as the correct message type. */
template<typename T>
inline const T& getMessage() const noexcept;
/** Check if a message container contains a message of a specific type. */
inline TYPES getType() const noexcept;
private:
TYPES type;
MESSAGE message;
friend class LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>;
};
/** Each input thread has its own ThreadChannelInput instance. Use it to push messages to the queue. */
class ThreadChannelInput {
public:
ThreadChannelInput(ThreadChannel& inThreadChannel) noexcept;
ThreadChannelInput(const ThreadChannelInput& other) = default;
ThreadChannelInput& operator=(const ThreadChannelInput& other) = default;
/** Check if the channel is full. No more messages can be pushed. Should only be used for debugging purposes.
The MessageQueue should always be dimensioned so that this does not happen.
*/
inline bool isFull() const noexcept;
/** Push a message to the channel. The user must make sure the channel is not full before calling. Only
one thread may push messages to a single channel.
@param inMessage Message data from the MESSAGE union.
@param inType Message type from the TYPES enum.
*/
template<typename T>
void pushMessage(const T& inMessage, const TYPES inType) noexcept;
private:
ThreadChannel& threadChannel;
};
/** The single output thread has one ThreadChannelOutput instance per input thread. Use them to pop messages
from the queue.
*/
class ThreadChannelOutput {
public:
ThreadChannelOutput(ThreadChannel& inThreadChannel) noexcept;
ThreadChannelOutput(const ThreadChannelOutput& other) = default;
ThreadChannelOutput& operator=(const ThreadChannelOutput& other) = default;
/** Get number of pending messages in the channel. The output thread should get this number and then pop
exactly that many messages from the channel. This way it is guaranteed that an empty channel is not
popped and that the output thread receive loop finishes.
*/
inline uint32_t getNumMessages() const noexcept;
/** Pop the next message from the channel. The user must make sure that the channel is not empty before
calling. Only one thread may pop messages from all channels.
@return A MessageContainer. Check type by calling messageContainer.getType();
Then get the message data with correct type by calling messageContainer.getMessage<MESSAGE_TYPE>();
*/
inline MessageContainer popMessage() noexcept;
private:
ThreadChannel& threadChannel;
};
LWMessageQueue() = default;
~LWMessageQueue() = default;
LWMessageQueue(const LWMessageQueue&) = delete;
LWMessageQueue& operator=(const LWMessageQueue&) = delete;
LWMessageQueue(const LWMessageQueue&&) = delete;
LWMessageQueue& operator=(const LWMessageQueue&&) = delete;
/** Get a thread channel input for an input thread. */
ThreadChannelInput getThreadChannelInput(const uint32_t inChannel) noexcept;
/** Get a thread channel output for the output thread. */
ThreadChannelOutput getThreadChannelOutput(const uint32_t inChannel) noexcept;
private:
class ThreadChannel {
public:
ThreadChannel() noexcept;
~ThreadChannel() = default;
ThreadChannel(const ThreadChannel&) = delete;
const ThreadChannel& operator=(const ThreadChannel&) = delete;
ThreadChannel(const ThreadChannel&&) = delete;
const ThreadChannel& operator=(const ThreadChannel&&) = delete;
inline uint32_t size() const noexcept;
void pushBack(const MessageContainer& inElement) noexcept;
MessageContainer popFront() noexcept;
private:
MessageContainer elements[SIZE];
uint32_t readPoint;
uint32_t writePoint;
std::atomic<uint32_t> numElements;
};
ThreadChannel threadChannels[CHANNELS];
static constexpr uint32_t sizeMinusOne = SIZE - 1;
};
namespace Internal {
constexpr bool isPowerOfTwo(const uint32_t value) {
return (value != 0) && ((value & (value - 1)) == 0);
}
} // namespace Internal
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
template<typename T>
inline const T& LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::MessageContainer::getMessage() const noexcept {
return *(reinterpret_cast<const T*>(&message));
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
inline TYPES LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::MessageContainer::getType() const noexcept {
return type;
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelInput::ThreadChannelInput(
ThreadChannel& inThreadChannel) noexcept
: threadChannel(inThreadChannel)
{
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
inline bool LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelInput::isFull() const noexcept {
return (threadChannel.size() == SIZE);
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
template<typename T>
void LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelInput::pushMessage(
const T& inMessage,
const TYPES type) noexcept
{
static_assert(sizeof(T) <= sizeof(MESSAGE), "Type T might not be part of union MESSAGE. Size mismatch.");
static_assert(alignof(MESSAGE) % alignof(T) == 0, "Type T might not be part of union MESSAGE. Alignment mismatch.");
MessageContainer messageContainer;
messageContainer.type = type;
T* messageData = reinterpret_cast<T*>(&messageContainer.message);
*messageData = inMessage;
threadChannel.pushBack(messageContainer);
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelOutput::ThreadChannelOutput(
ThreadChannel& inThreadChannel) noexcept
: threadChannel(inThreadChannel)
{
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
inline uint32_t LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelOutput::getNumMessages() const noexcept {
return threadChannel.size();
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
inline typename LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::MessageContainer
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelOutput::popMessage() noexcept {
return threadChannel.popFront();
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
typename LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelInput
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::getThreadChannelInput(const uint32_t inChannel) noexcept {
assert(inChannel < CHANNELS);
return ThreadChannelInput(threadChannels[inChannel]);
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
typename LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannelOutput
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::getThreadChannelOutput(const uint32_t inChannel) noexcept {
assert(inChannel < CHANNELS);
return ThreadChannelOutput(threadChannels[inChannel]);
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannel::ThreadChannel() noexcept
: readPoint(0),
writePoint(0),
numElements(0)
{
static_assert(Internal::isPowerOfTwo(SIZE), "Template parameter SIZE must be a power of two.");
assert(numElements.is_lock_free());
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
inline uint32_t LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannel::size() const noexcept {
return numElements;
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
void LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannel::pushBack(
const MessageContainer& inElement) noexcept
{
assert(numElements < SIZE);
assert(writePoint < SIZE);
elements[writePoint] = inElement;
writePoint = (writePoint + 1) & sizeMinusOne;
numElements.fetch_add(1);
}
template<uint32_t SIZE, uint32_t CHANNELS, typename MESSAGE, typename TYPES>
typename LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::MessageContainer
LWMessageQueue<SIZE, CHANNELS, MESSAGE, TYPES>::ThreadChannel::popFront() noexcept {
assert(numElements > 0);
assert(readPoint < SIZE);
MessageContainer returnElement = elements[readPoint];
readPoint = (readPoint + 1) & sizeMinusOne;
numElements.fetch_sub(1);
return returnElement;
}
} // namespace LWMessageQueue