diff --git a/src/sample.cpp b/src/sample.cpp index b95521b0..b38f8034 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -6,6 +6,7 @@ #include "portable_archive/portable_oarchive.hpp" #include "util/cast.hpp" #include <boost/endian/conversion.hpp> +#include <thread> using namespace lsl; using lslboost::endian::endian_reverse_inplace; @@ -466,11 +467,15 @@ sample *factory::pop_freelist() { } factory::~factory() { - for (sample *cur = tail_, *next = cur->next_;; cur = next, next = next->next_) { - if (cur != sentinel()) delete cur; - if (!next) break; - } - delete[] storage_; + sample* cur = tail_.load(); + while (cur) { + sample* next = cur->next_.load(); + if (cur != sentinel()) { + delete cur; + } + cur = next; + } + delete[] storage_; } void factory::reclaim_sample(sample *s) { diff --git a/testing/ext/bench_pushpull.cpp b/testing/ext/bench_pushpull.cpp index a5b7e020..03a397b8 100644 --- a/testing/ext/bench_pushpull.cpp +++ b/testing/ext/bench_pushpull.cpp @@ -23,32 +23,82 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) const TestType data[max_nchan * chunk_size] = {sample_value<TestType>::val}; - const char *name = SampleType<TestType>::fmt_string(); + const char *base_name = SampleType<TestType>::fmt_string(); lsl::channel_format_t cf = (lsl::channel_format_t)SampleType<TestType>::chan_fmt; for (auto nchan : param_nchan) { - lsl::stream_outlet out( - lsl::stream_info(name, "PushPull", (int)nchan, chunk_size, cf, "streamid")); - auto found_stream_info(lsl::resolve_stream("name", name, 1, 2.0)); - REQUIRE(!found_stream_info.empty()); - - std::list<lsl::stream_inlet> inlet_list; for (auto n_inlets : param_inlets) { - while (inlet_list.size() < n_inlets) { - inlet_list.emplace_front(found_stream_info[0], 300, false); - inlet_list.front().open_stream(.5); - } - std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); + // Create a unique inlet name for each test combination + auto now = std::chrono::high_resolution_clock::now(); + auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count(); + std::string unique_name = std::string(base_name) + "_" + + std::to_string(nchan) + "_" + + std::to_string(n_inlets) + "_" + + std::to_string(timestamp); + + // Create outlet in its own scope + std::unique_ptr<lsl::stream_outlet> out; + std::vector<std::unique_ptr<lsl::stream_inlet>> inlets; + + try { + out = std::make_unique<lsl::stream_outlet>( + lsl::stream_info(unique_name, "PushPull", (int)nchan, chunk_size, cf, unique_name + "_src")); + + // Wait for outlet + std::this_thread::sleep_for(std::chrono::milliseconds(200)); - BENCHMARK("push_sample_nchan_" + suffix) { - for (size_t s = 0; s < chunk_size; s++) out.push_sample(data); - for (auto &inlet : inlet_list) inlet.flush(); - }; + // Resolve the stream + auto found_stream_info = lsl::resolve_stream("name", unique_name, 1, 3.0); + if (found_stream_info.empty()) { + WARN("Could not resolve stream " << unique_name); + continue; + } - BENCHMARK("push_chunk_nchan_" + suffix) { - out.push_chunk_multiplexed(data, chunk_size); - for (auto &inlet : inlet_list) inlet.flush(); - }; + // Create inlets + for (std::size_t i = 0; i < n_inlets; ++i) { + inlets.emplace_back(std::make_unique<lsl::stream_inlet>(found_stream_info[0], 300, 0, false)); + inlets.back()->open_stream(1.0); + } + + // Wait for all consumers to connect + if (n_inlets > 0) { + auto start_wait = std::chrono::steady_clock::now(); + while (!out->have_consumers() && + std::chrono::steady_clock::now() - start_wait < std::chrono::seconds(2)) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); + + BENCHMARK("push_sample_nchan_" + suffix) { + for (size_t s = 0; s < chunk_size; s++) { + out->push_sample(data); + } + for (auto &inlet : inlets) { + inlet->flush(); + } + }; + BENCHMARK("push_chunk_nchan_" + suffix) { + out->push_chunk_multiplexed(data, chunk_size); + for (auto &inlet : inlets) { + inlet->flush(); + } + }; + } catch (const std::exception& e) { + WARN("Exception in benchmark: " << e.what()); + } + // Cleanup + for (auto& inlet : inlets) { + try { + inlet->close_stream(); + } catch (...) { + // Ignore cleanup errors + } + } + inlets.clear(); + out.reset(); + // Give extra time for cleanup + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } }