forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 1
/
sample.cpp
498 lines (445 loc) · 17 KB
/
sample.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
#define BOOST_MATH_DISABLE_STD_FPCLASSIFY
#include "sample.h"
#include "portable_archive/portable_iarchive.hpp"
#include "portable_archive/portable_oarchive.hpp"
#include "util/cast.hpp"
#include <boost/endian/conversion.hpp>
using namespace lsl;
using lslboost::endian::endian_reverse_inplace;
#ifdef _MSC_VER
#pragma warning(suppress : 4291)
#endif
/// range-for helper class for `values()`
template <typename T> class dataiter {
T *begin_, *end_;
public:
dataiter(void *begin, std::size_t n) : begin_(reinterpret_cast<T *>(begin)), end_(begin_ + n) {}
dataiter(const void *begin, std::size_t n)
: begin_(reinterpret_cast<const T *>(begin)), end_(begin_ + n) {}
// called from `f(auto val: dataiter)`
T *begin() const noexcept { return begin_; }
T *end() const noexcept { return end_; }
};
/// range-for helper. Sample usage: `for(auto &val: samplevalues<int32_t>()) val = 2;`
template <typename T> inline dataiter<T> samplevals(sample &s) noexcept {
return dataiter<T>(iterhelper(s), s.num_channels());
}
template <typename T> inline dataiter<const T> samplevals(const sample &s) noexcept {
return dataiter<const T>(iterhelper(s), s.num_channels());
}
/// Copy an array, converting between LSL types if needed
template <typename T, typename U>
inline void copyconvert_array(const T *src, U *dst, std::size_t n) noexcept {
for (const T *end = src + n; src < end;)
*dst++ = static_cast<U>(*src++); // NOLINT(bugprone-signed-char-misuse)
}
/// Copy an array, special case: source and destination have the same type
template <typename T> inline void copyconvert_array(const T *src, T *dst, std::size_t n) noexcept {
memcpy(dst, src, n * sizeof(T));
}
/// Copy an array, special case: destination is a string array
template <typename T> inline void copyconvert_array(const T *src, std::string *dst, std::size_t n) {
for (const T *end = src + n; src < end;) *dst++ = lsl::to_string(*src++);
}
/// Copy an array, special case: source is a string array
template <typename U> inline void copyconvert_array(const std::string *src, U *dst, std::size_t n) {
for (const auto *end = src + n; src < end;) *dst++ = lsl::from_string<U>(*src++);
}
/// Copy an array, special case: both arrays are string arrays
inline void copyconvert_array(const std::string *src, std::string *dst, std::size_t n) {
std::copy_n(src, n, dst);
}
template <typename T, typename U> void lsl::sample::conv_from(const U *src) {
copyconvert_array(src, reinterpret_cast<T *>(&data_), num_channels_);
}
template <typename T, typename U> void lsl::sample::conv_into(U *dst) {
copyconvert_array(reinterpret_cast<const T *>(&data_), dst, num_channels_);
}
void sample::operator delete(void *x) noexcept {
if(x == nullptr) return;
lsl::factory *factory = reinterpret_cast<sample *>(x)->factory_;
// delete the underlying memory only if it wasn't allocated in the factory's storage area
if (x < factory->storage_ || x >= factory->storage_ + factory->storage_size_)
delete[](char *) x;
}
/// ensure that a given value is a multiple of some base, round up if necessary
constexpr uint32_t ensure_multiple(uint32_t v, unsigned base) {
return (v % base) ? v - (v % base) + base : v;
}
// Sample functions
lsl::sample::~sample() noexcept {
if (format_ == cft_string)
for (auto &val : samplevals<std::string>(*this)) val.~basic_string<char>();
}
bool sample::operator==(const sample &rhs) const noexcept {
if ((timestamp_ != rhs.timestamp_) || (format_ != rhs.format_) ||
(num_channels_ != rhs.num_channels_))
return false;
if (format_ != cft_string)
return memcmp(&(rhs.data_), &data_, datasize()) == 0;
// For string values, each value has to be compared individually
auto thisvals = samplevals<const std::string>(*this);
return std::equal(thisvals.begin(), thisvals.end(), samplevals<std::string>(rhs).begin());
}
template <class T> void lsl::sample::assign_typed(const T *src) {
switch (format_) {
case cft_float32: conv_from<float>(src); break;
case cft_double64: conv_from<double>(src); break;
case cft_int8: conv_from<int8_t>(src); break;
case cft_int16: conv_from<int16_t>(src); break;
case cft_int32: conv_from<int32_t>(src); break;
#ifndef BOOST_NO_INT64_T
case cft_int64: conv_from<int64_t>(src); break;
#endif
case cft_string: conv_from<std::string>(src); break;
default: throw std::invalid_argument("Unsupported channel format.");
}
}
template <class T> void lsl::sample::retrieve_typed(T *dst) {
switch (format_) {
case cft_float32: conv_into<float>(dst); break;
case cft_double64: conv_into<double>(dst); break;
case cft_int8: conv_into<int8_t>(dst); break;
case cft_int16: conv_into<int16_t>(dst); break;
case cft_int32: conv_into<int32_t>(dst); break;
#ifndef BOOST_NO_INT64_T
case cft_int64: conv_into<int64_t>(dst); break;
#endif
case cft_string: conv_into<std::string>(dst); break;
default: throw std::invalid_argument("Unsupported channel format.");
}
}
void lsl::sample::assign_untyped(const void *newdata) {
if (format_ != cft_string)
memcpy(&data_, newdata, datasize());
else
throw std::invalid_argument("Cannot assign untyped data to a string-formatted sample.");
}
void lsl::sample::retrieve_untyped(void *newdata) {
if (format_ != cft_string)
memcpy(newdata, &data_, datasize());
else
throw std::invalid_argument("Cannot retrieve untyped data from a string-formatted sample.");
}
/// Helper function to save raw binary data to a stream buffer.
void save_raw(std::streambuf &sb, const void *address, std::size_t count) {
if ((std::size_t)sb.sputn((const char *)address, (std::streamsize)count) != count)
throw std::runtime_error("Output stream error.");
}
void save_byte(std::streambuf& sb, uint8_t v) {
if (sb.sputc(*reinterpret_cast<const char *>(&v)) == std::streambuf::traits_type::eof())
throw std::runtime_error("Output stream error.");
}
/// Helper function to load raw binary data from a stream buffer.
void load_raw(std::streambuf &sb, void *address, std::size_t count) {
if ((std::size_t)sb.sgetn((char *)address, (std::streamsize)count) != count)
throw std::runtime_error("Input stream error.");
}
uint8_t load_byte(std::streambuf &sb) {
auto res = sb.sbumpc();
if(res == std::streambuf::traits_type::eof())
throw std::runtime_error("Input stream error.");
return static_cast<uint8_t>(res);
}
/// Load a value from a stream buffer with correct endian treatment.
template <typename T> T load_value(std::streambuf &sb, bool reverse_byte_order) {
T tmp;
load_raw(sb, &tmp, sizeof(T));
if (sizeof(T) > 1 && reverse_byte_order) endian_reverse_inplace(tmp);
return tmp;
}
/// Save a value to a stream buffer with correct endian treatment.
template <typename T> void save_value(std::streambuf &sb, T v, bool reverse_byte_order) {
if (sizeof(T) > 1 && reverse_byte_order) endian_reverse_inplace(v);
save_raw(sb, &v, sizeof(T));
}
void sample::save_streambuf(
std::streambuf &sb, int /*protocol_version*/, bool reverse_byte_order, void *scratchpad) const {
// write sample header
if (timestamp_ == DEDUCED_TIMESTAMP) {
save_byte(sb, TAG_DEDUCED_TIMESTAMP);
} else {
save_byte(sb, TAG_TRANSMITTED_TIMESTAMP);
save_value(sb, timestamp_, reverse_byte_order);
}
// write channel data
if (format_ == cft_string) {
for (const auto &str : samplevals<std::string>(*this)) {
// write string length as variable-length integer
if (str.size() <= 0xFF) {
save_byte(sb, static_cast<uint8_t>(sizeof(uint8_t)));
save_byte(sb, static_cast<uint8_t>(str.size()));
} else {
if (str.size() <= 0xFFFFFFFF) {
save_byte(sb, static_cast<uint8_t>(sizeof(uint32_t)));
save_value(sb, static_cast<uint32_t>(str.size()), reverse_byte_order);
} else {
save_byte(sb, static_cast<uint8_t>(sizeof(uint64_t)));
save_value(sb, static_cast<std::size_t>(str.size()), reverse_byte_order);
}
}
// write string contents
if (!str.empty()) save_raw(sb, str.data(), str.size());
}
} else {
// write numeric data in binary
if (!reverse_byte_order || format_sizes[format_] == 1) {
save_raw(sb, &data_, datasize());
} else {
memcpy(scratchpad, &data_, datasize());
convert_endian(scratchpad, num_channels_, format_sizes[format_]);
save_raw(sb, scratchpad, datasize());
}
}
}
void sample::load_streambuf(
std::streambuf &sb, int /*unused*/, bool reverse_byte_order, bool suppress_subnormals) {
// read sample header
if (load_byte(sb) == TAG_DEDUCED_TIMESTAMP)
// deduce the timestamp
timestamp_ = DEDUCED_TIMESTAMP;
else
// read the time stamp
timestamp_ = load_value<double>(sb, reverse_byte_order);
// read channel data
if (format_ == cft_string) {
for (auto &str : samplevals<std::string>(*this)) {
// read string length as variable-length integer
std::size_t len = 0;
auto lenbytes = load_byte(sb);
if (sizeof(std::size_t) < 8 && lenbytes > sizeof(std::size_t))
throw std::runtime_error(
"This platform does not support strings of 64-bit length.");
switch (lenbytes) {
case sizeof(uint8_t): len = load_byte(sb); break;
case sizeof(uint16_t): len = load_value<uint16_t>(sb, reverse_byte_order); break;
case sizeof(uint32_t): len = load_value<uint32_t>(sb, reverse_byte_order); break;
#ifndef BOOST_NO_INT64_T
case sizeof(uint64_t): len = load_value<uint64_t>(sb, reverse_byte_order); break;
#endif
default: throw std::runtime_error("Stream contents corrupted (invalid varlen int).");
}
// read string contents
str.resize(len);
if (len > 0) load_raw(sb, &(str[0]), len);
}
} else {
// read numeric channel data
load_raw(sb, &data_, datasize());
if (reverse_byte_order && format_sizes[format_] > 1)
convert_endian(&data_, num_channels(), format_sizes[format_]);
if (suppress_subnormals && format_float[format_]) {
if (format_ == cft_float32) {
for (auto &val : samplevals<uint32_t>(*this))
if (val && ((val & UINT32_C(0x7fffffff)) <= UINT32_C(0x007fffff)))
val &= UINT32_C(0x80000000);
} else {
#ifndef BOOST_NO_INT64_T
for (auto &val : samplevals<uint64_t>(*this))
if (val &&
((val & UINT64_C(0x7fffffffffffffff)) <= UINT64_C(0x000fffffffffffff)))
val &= UINT64_C(0x8000000000000000);
#endif
}
}
}
}
void lsl::sample::convert_endian(void *data, uint32_t n, uint32_t width) {
void *dataptr = reinterpret_cast<void *>(data);
switch (width) {
case 1: break;
case sizeof(int16_t):
for (auto &val : dataiter<int16_t>(dataptr, n)) endian_reverse_inplace(val);
break;
case sizeof(int32_t):
for (auto &val : dataiter<int32_t>(dataptr, n)) endian_reverse_inplace(val);
break;
case sizeof(int64_t):
for (auto &val : dataiter<int64_t>(dataptr, n)) endian_reverse_inplace(val);
break;
default: throw std::runtime_error("Unsupported channel format for endian conversion.");
}
}
template <class Archive> void sample::serialize_channels(Archive &ar, const uint32_t /*unused*/) {
switch (format_) {
case cft_float32:
for (auto &val : samplevals<float>(*this)) ar &val;
break;
case cft_double64:
for (auto &val : samplevals<double>(*this)) ar &val;
break;
case cft_string:
for (auto &val : samplevals<std::string>(*this)) ar &val;
break;
case cft_int8:
for (auto &val : samplevals<int8_t>(*this)) ar &val;
break;
case cft_int16:
for (auto &val : samplevals<int16_t>(*this)) ar &val;
break;
case cft_int32:
for (auto &val : samplevals<int32_t>(*this)) ar &val;
break;
#ifndef BOOST_NO_INT64_T
case cft_int64:
for (auto &val : samplevals<int64_t>(*this)) ar &val;
break;
#endif
default: throw std::runtime_error("Unsupported channel format.");
}
}
void lsl::sample::serialize(eos::portable_oarchive &ar, const uint32_t archive_version) const {
// write sample header
if (timestamp_ == DEDUCED_TIMESTAMP) {
ar &TAG_DEDUCED_TIMESTAMP;
} else {
ar &TAG_TRANSMITTED_TIMESTAMP ×tamp_;
}
// write channel data
const_cast<sample *>(this)->serialize_channels(ar, archive_version);
}
void lsl::sample::serialize(eos::portable_iarchive &ar, const uint32_t archive_version) {
// read sample header
char tag;
ar &tag;
if (tag == TAG_DEDUCED_TIMESTAMP) {
// deduce the timestamp
timestamp_ = DEDUCED_TIMESTAMP;
} else {
// read the time stamp
ar ×tamp_;
}
// read channel data
serialize_channels(ar, archive_version);
}
template <typename T> void test_pattern(T *data, uint32_t num_channels, int offset) {
for (std::size_t k = 0; k < num_channels; k++) {
std::size_t val = k + static_cast<std::size_t>(offset);
if (std::is_integral<T>::value)
val %= static_cast<std::size_t>(std::numeric_limits<T>::max());
data[k] = (k % 2 == 0) ? static_cast<T>(val) : -static_cast<T>(val);
}
}
sample &sample::assign_test_pattern(int offset) {
pushthrough = true;
timestamp_ = 123456.789;
switch (format_) {
case cft_float32:
test_pattern(samplevals<float>(*this).begin(), num_channels_, offset + 0);
break;
case cft_double64:
test_pattern(samplevals<double>(*this).begin(), num_channels_, offset + 16777217);
break;
case cft_string: {
std::string *data = samplevals<std::string>(*this).begin();
for (int32_t k = 0u; k < (int)num_channels_; k++)
data[k] = to_string((k + 10) * (k % 2 == 0 ? 1 : -1));
break;
}
case cft_int32:
test_pattern(samplevals<int32_t>(*this).begin(), num_channels_, offset + 65537);
break;
case cft_int16:
test_pattern(samplevals<int16_t>(*this).begin(), num_channels_, offset + 257);
break;
case cft_int8:
test_pattern(samplevals<int8_t>(*this).begin(), num_channels_, offset + 1);
break;
#ifndef BOOST_NO_INT64_T
case cft_int64: {
int64_t *data = samplevals<int64_t>(*this).begin();
int64_t offset64 = 2147483649ll + offset;
for (uint32_t k = 0; k < num_channels_; k++) {
data[k] = (k + offset64);
if (k % 2 == 1) data[k] = -data[k];
}
break;
}
#endif
default: throw std::invalid_argument("Unsupported channel format used to construct a sample.");
}
return *this;
}
lsl::sample::sample(lsl_channel_format_t fmt, uint32_t num_channels, factory *fact)
: format_(fmt), num_channels_(num_channels), refcount_(0), next_(nullptr), factory_(fact) {
// construct std::strings in the data section via placement new
if (format_ == cft_string)
for (auto &val : samplevals<std::string>(*this)) new (&val) std::string();
}
factory::factory(lsl_channel_format_t fmt, uint32_t num_chans, uint32_t num_reserve)
: fmt_(fmt), num_chans_(num_chans),
sample_size_(ensure_multiple(
sizeof(sample) - sizeof(sample::data_) + format_sizes[fmt] * num_chans, 16)),
storage_size_(sample_size_ * std::max(2u, num_reserve + 1)),
storage_(new char[storage_size_]), head_(sentinel()), tail_(sentinel()) {
// pre-construct an array of samples in the storage area and chain into a freelist
// this is functionally identical to calling `reclaim_sample()` for each sample, but alters
// the head_/tail_ positions only once
sample *s = sample_by_index(0);
for (char *p = storage_, *e = p + storage_size_; p < e;) {
s = new (reinterpret_cast<sample *>(p)) sample(fmt, num_chans, this);
s->next_ = reinterpret_cast<sample *>(p += sample_size_);
}
s->next_ = nullptr;
head_.store(s);
}
sample_p factory::new_sample(double timestamp, bool pushthrough) {
sample *result;
// try to retrieve a free sample, adding fresh samples until it succeeds
while((result = pop_freelist()) == nullptr)
reclaim_sample(new (new char[sample_size_]) sample(fmt_, num_chans_, this));
result->timestamp_ = timestamp;
result->pushthrough = pushthrough;
return {result};
}
sample *factory::pop_freelist() {
sample *tail = tail_, *next = tail->next_.load(std::memory_order_acquire);
if (tail == sentinel()) {
// no samples available
if (!next) return nullptr;
tail_.store(next, std::memory_order_relaxed);
tail = next;
next = next->next_.load(std::memory_order_acquire);
}
if (next) {
tail_.store(next, std::memory_order_relaxed);
return tail;
}
sample *head = head_.load(std::memory_order_acquire);
//
if (tail != head) return nullptr;
reclaim_sample(sentinel());
next = tail->next_.load(std::memory_order_acquire);
if (next) {
tail_ = next;
return tail;
}
return nullptr;
}
factory::~factory() {
for (sample *cur = tail_, *next = cur->next_;; cur = next, next = next->next_) {
if (cur != sentinel()) delete cur;
if (!next) break;
}
delete[] storage_;
}
void factory::reclaim_sample(sample *s) {
s->next_.store(nullptr, std::memory_order_release); // TODO: might be _relaxed?
sample *prev = head_.exchange(s, std::memory_order_acq_rel);
prev->next_.store(s, std::memory_order_release);
}
// template instantiations
template void lsl::sample::assign_typed(float const *);
template void lsl::sample::assign_typed(double const *);
template void lsl::sample::assign_typed(char const *);
template void lsl::sample::assign_typed(int16_t const *);
template void lsl::sample::assign_typed(int32_t const *);
template void lsl::sample::assign_typed(int64_t const *);
template void lsl::sample::assign_typed(std::string const *);
template void lsl::sample::retrieve_typed(float *);
template void lsl::sample::retrieve_typed(double *);
template void lsl::sample::retrieve_typed(char *);
template void lsl::sample::retrieve_typed(int16_t *);
template void lsl::sample::retrieve_typed(int32_t *);
template void lsl::sample::retrieve_typed(int64_t *);
template void lsl::sample::retrieve_typed(std::string *);