-
Notifications
You must be signed in to change notification settings - Fork 70
/
stream_inlet_impl.h
345 lines (320 loc) · 15.6 KB
/
stream_inlet_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#ifndef STREAM_INLET_IMPL_H
#define STREAM_INLET_IMPL_H
#include "common.h"
#include "data_receiver.h"
#include "info_receiver.h"
#include "inlet_connection.h"
#include "time_postprocessor.h"
#include "time_receiver.h"
#include <loguru.hpp>
namespace lsl {
/**
* A stream inlet.
*
* Inlets are used to receive streaming data (and meta-data) from the lab network.
* This class is just a thin wrapper (or facade) around its four components to which it adds no
* extra functionality.
*/
class stream_inlet_impl {
public:
/**
* Construct a new stream inlet from a resolved stream info.
*
* @param info A resolved stream info object (as coming from one of the resolver functions).
* @param max_buflen Optionally the maximum amount of data to buffer in samples (per-channel).
* Recording applications want to use a fairly large buffer size here, while real-time
* applications want to only buffer as much as they need to perform their next calculation.
* @param max_chunklen Optionally the maximum size, in samples, at which chunks are transmitted
* (the default corresponds to the chunk sizes used by the sender).
* Recording applications can use a generous size here (leaving it to the network how to pack
* things), while real-time applications may want a finer (perhaps 1-sample) granularity.
* @param recover Try to silently recover lost streams that are recoverable (=those that that
* have a source_id set).
* In all other cases (recover is false or the stream is not recoverable) a lsl::lost_error
* is thrown where indicated if the stream's source is lost (e.g. due to an app or computer
* crash).
*/
stream_inlet_impl(const stream_info_impl &info, int32_t max_buflen = 360,
int32_t max_chunklen = 0, bool recover = true)
: conn_(info, recover), info_receiver_(conn_), time_receiver_(conn_),
data_receiver_(conn_, max_buflen, max_chunklen),
postprocessor_([this]() { return time_receiver_.time_correction(5); },
[this]() { return conn_.current_srate(); },
[this]() { return time_receiver_.was_reset(); }) {
ensure_lsl_initialized();
conn_.engage();
}
/// Destructor. The stream will stop reading from the source if destroyed.
~stream_inlet_impl() {
try {
conn_.disengage();
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error during inlet shutdown: %s", e.what());
} catch (...) { LOG_F(ERROR, "Severe error during stream inlet shutdown."); }
}
/**
* Pull a sample from the inlet and read it into a vector of values.
*
* Handles type checking & conversion, allocates the memory in the vector if necessary.
* @param sample An STL vector to hold the resulting values.
* @param block If true, the function will block until a sample is available; otherwise, it will
* return 0.0 and no new data.
* @return The capture time of the sample on the remote machine, or 0.0 if no new sample was
* available. To remap this time stamp to the local clock, add the value returned by
* .time_correction() to it. This is only necessary if the clocks of the source and destination
* machine are not synchronized to high enough precision.
*/
double pull_sample(std::vector<float> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<double> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<int64_t> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<int32_t> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<int16_t> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<char> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
double pull_sample(std::vector<std::string> &data, double timeout = FOREVER) {
data.resize(conn_.type_info().channel_count());
return pull_sample(data.data(), (int32_t)data.size(), timeout);
}
/**
* Pull a sample from the inlet and read it into a pointer to values.
*
* Handles type checking & conversion.
* @param buffer A pointer to hold the resulting values.
* @param buffer_elements The number of samples allocated in the buffer.
* @note it is the responsibility of the user to allocate enough memory.
* @param block If true, the function will block until a sample is available; otherwise, it will
* return 0.0 and no new data.
* @return The capture time of the sample on the remote machine, or 0.0 if no new sample was
* available. To remap this time stamp to the local clock, add the value returned by
* .time_correction() to it. This is only necessary if the clocks of the source and destination
* machine are not synchronized to high enough precision.
*/
double pull_sample(float *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(double *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(int64_t *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(int32_t *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(int16_t *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(char *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
double pull_sample(std::string *buffer, int32_t buffer_elements, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
}
template <typename T>
double pull_sample_noexcept(T *buffer, int32_t buffer_elements, double timeout = FOREVER,
lsl_error_code_t *ec = nullptr) noexcept {
lsl_error_code_t dummy;
if (!ec) ec = &dummy;
*ec = lsl_no_error;
try {
return postprocess(data_receiver_.pull_sample_typed(buffer, buffer_elements, timeout));
} catch (timeout_error &) { *ec = lsl_timeout_error; } catch (lost_error &) {
*ec = lsl_lost_error;
} catch (std::invalid_argument &) { *ec = lsl_argument_error; } catch (std::range_error &) {
*ec = lsl_argument_error;
} catch (std::exception &e) {
LOG_F(ERROR, "Unexpected error in %s: %s", __func__, e.what());
*ec = lsl_internal_error;
}
return 0.0;
}
/**
* Pull a sample from the inlet and read it into a pointer to raw data.
*
* No type checking or conversions are done (not recommended!). Do not use for
* variable-size/string-formatted streams.
* @param sample A pointer to hold the resulting raw sample data.
* @param buffer_bytes The number of bytes allocated in the buffer. Note: it is the
* responsibility of the user to allocate enough memory.
* @param block If true, the function will block until a sample is available; otherwise, it will
* return 0.0 and no data.
* @return The capture time of the sample on the remote machine, or 0.0 if no new sample was
* available. To remap this time stamp to the local clock, add the value returned by
* .time_correction() to it. This is only necessary if the clocks of the source and destination
* machine are not synchronized to high enough precision.
*/
double pull_numeric_raw(void *sample, int32_t buffer_bytes, double timeout = FOREVER) {
return postprocess(data_receiver_.pull_sample_untyped(sample, buffer_bytes, timeout));
}
/**
* Pull a chunk of data from the inlet.
*
* @warning The provided buffer size is measured in channel values (e.g., floats), not samples.
* @param data_buffer A pointer to a buffer of data values where the results shall be stored.
* @param timestamp_buffer A pointer to a buffer of timestamp values where time stamps shall be
* stored. If this is NULL, no time stamps will be returned.
* @param data_buffer_elements The size of the data buffer, in channel data elements (of type
* T). Must be a multiple of the stream's channel count.
* @param timestamp_buffer_elements The size of the timestamp buffer. If a timestamp buffer is
* provided then this must correspond to the same number of samples as data_buffer_elements.
* @param timeout The timeout for this operation, if any. When the timeout expires, the function
* may return before the entire buffer is filled. The default value of 0.0 will retrieve only
* data available for immediate pickup.
* @return data_elements_written Number of channel data elements written to the data buffer.
* @throws lost_error (if the stream source has been lost).
*/
template <class T>
uint32_t pull_chunk_multiplexed(T *data_buffer, double *timestamp_buffer,
std::size_t data_buffer_elements, std::size_t timestamp_buffer_elements,
double timeout = 0.0) {
std::size_t samples_written = 0, num_chans = info().channel_count(),
max_samples = data_buffer_elements / num_chans;
if (data_buffer_elements % num_chans != 0)
throw std::runtime_error(
"The number of buffer elements must be a multiple of the stream's channel count.");
if (timestamp_buffer && max_samples != timestamp_buffer_elements)
throw std::runtime_error(
"The timestamp buffer must hold the same number of samples as the data buffer.");
double end_time = timeout ? lsl_clock() + timeout : 0.0;
for (samples_written = 0; samples_written < max_samples; samples_written++) {
if (double ts = pull_sample(&data_buffer[samples_written * num_chans], (int) num_chans,
timeout ? end_time - lsl_clock() : 0.0)) {
if (timestamp_buffer) timestamp_buffer[samples_written] = ts;
} else
break;
}
return static_cast<uint32_t>(samples_written * num_chans);
}
template <class T>
uint32_t pull_chunk_multiplexed_noexcept(T *data_buffer, double *timestamp_buffer,
std::size_t data_buffer_elements, std::size_t timestamp_buffer_elements,
double timeout = 0.0, lsl_error_code_t *ec = nullptr) noexcept {
lsl_error_code_t dummy;
if (!ec) ec = &dummy;
*ec = lsl_no_error;
try {
return pull_chunk_multiplexed(data_buffer, timestamp_buffer, data_buffer_elements,
timestamp_buffer_elements, timeout);
} catch (timeout_error &) { *ec = lsl_timeout_error; } catch (lost_error &) {
*ec = lsl_lost_error;
} catch (std::invalid_argument &) { *ec = lsl_argument_error; } catch (std::range_error &) {
*ec = lsl_argument_error;
} catch (std::exception &e) {
LOG_F(ERROR, "Unexpected error in %s: %s", __func__, e.what());
*ec = lsl_internal_error;
}
return 0;
}
/**
* Retrieve the complete information of the given stream, including the extended description.
*
* Can be invoked at any time of the stream's lifetime.
* @param timeout Timeout of the operation (default: no timeout).
* @throws timeout_error (if the timeout expires), or lost_error (if the stream source has been
* lost).
*/
const stream_info_impl &info(double timeout = FOREVER) { return info_receiver_.info(timeout); }
/**
* Retrieve an estimated time correction offset for the given stream.
*
* The first call to this function takes several msec for an initial estimate, subsequent calls
* are instantaneous. The correction offset is periodically re-estimated in the background (once
* every few sec.).
* @param remote_time The current time of the remote computer that was used to generate this
* time_correction.
* @param uncertainty. The maximum uncertainty of the given time correction.
* @timeout Timeout for first time-correction estimate.
* @return The time correction estimate.
* @throws timeout_error If the initial estimate times out.
*/
double time_correction(double timeout = 2) { return time_receiver_.time_correction(timeout); }
double time_correction(double *remote_time, double *uncertainty, double timeout = 2) {
return time_receiver_.time_correction(remote_time, uncertainty, timeout);
}
/**
* Set post-processing flags to use.
*
* By default, the inlet performs NO post-processing and returns the ground-truth time stamps,
* which can then be manually synchronized using time_correction(), and then smoothed/dejittered
* if desired. This function allows automating these two and possibly more operations.
*
* @warning when you enable this, you will no longer receive or be able to recover the original
* time stamps.
* @param flags An integer that is the result of bitwise OR'ing one or more options from
* processing_options_t together (e.g., proc_clocksync|proc_dejitter); the default is to enable
* all options.
*/
void set_postprocessing(uint32_t flags = proc_ALL) { postprocessor_.set_options(flags); }
/**
* Open a new data stream.
*
* All samples pushed in at the other end from this moment onwards will be queued and
* eventually be delivered in response to pull_sample() or pull_chunk() calls.
* A pull call without preceding begin_feed serves as an implicit begin_feed.
* @param timeout Optional timeout of the operation (default: no timeout).
* @throws timeout_error (if the timeout expires), or lost_error (if the stream source has been
* lost).
*/
void open_stream(double timeout = FOREVER) { data_receiver_.open_stream(timeout); }
/**
* Close the current data stream.
*
* All samples still buffered or in flight will be dropped and the source will halt its
* buffering of data for this inlet. If an application stops being interested in data from a
* source (temporarily or not), it should call drop_stream() to not pressure the source outlet
* to buffer unnecessarily large amounts of data (perhaps even running out of memory).
*/
void close_stream() { data_receiver_.close_stream(); }
/**
* Query the current size of the buffer, i.e. the number of samples that are buffered.
* Note that this value may be inaccurate and should not be relied on for program logic.
*/
std::size_t samples_available() { return data_receiver_.samples_available(); }
/// Flush the queue, return the number of dropped samples
uint32_t flush() {
int nskipped = data_receiver_.flush();
postprocessor_.skip_samples(nskipped);
return nskipped;
}
/** Query whether the clock was potentially reset since the last call to was_clock_reset().
*
* This is only interesting for applications that combine multiple time_correction values to
* estimate clock drift and which should tolerate (rare) cases where the source machine was
* hot-swapped or restarted.
*/
bool was_clock_reset() { return time_receiver_.was_reset(); }
/// Override the half-time (forget factor) of the time-stamp smoothing.
void smoothing_halftime(float value) { postprocessor_.smoothing_halftime(value); }
private:
/// post-process a time stamp
double postprocess(double stamp) {
return stamp ? postprocessor_.process_timestamp(stamp) : stamp;
}
/// the inlet connection
inlet_connection conn_;
// the content receiver classes
info_receiver info_receiver_;
time_receiver time_receiver_;
data_receiver data_receiver_;
/// class for post-processing time stamps
time_postprocessor postprocessor_;
};
} // namespace lsl
#endif