From b68efd35ccfa3b259cd4af3e952d62dae5158b4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Fri, 14 Apr 2023 11:42:24 +0200 Subject: [PATCH 1/4] checkpoint --- include/seastar/core/execution_stage.hh | 6 +- include/seastar/core/task.hh | 67 +++++++++++++++ src/core/io_queue.cc | 6 ++ src/core/reactor.cc | 8 +- src/util/backtrace.cc | 106 ++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 2 deletions(-) diff --git a/include/seastar/core/execution_stage.hh b/include/seastar/core/execution_stage.hh index 227afe6dedb..1eb9f487aa5 100644 --- a/include/seastar/core/execution_stage.hh +++ b/include/seastar/core/execution_stage.hh @@ -220,6 +220,7 @@ class concrete_execution_stage final : public execution_stage { struct work_item { input_type _in; + task_id _task_id; promise_type _ready; work_item(typename internal::wrap_for_es::type... args) : _in(std::move(args)...) { } @@ -244,7 +245,10 @@ private: auto wi_in = std::move(wi._in); auto wi_ready = std::move(wi._ready); _queue.pop_front(); - futurize::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); + { + auto st = switch_task(11, wi._task_id); + futurize::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); + } _stats.function_calls_executed++; if (internal::scheduler_need_preempt()) { diff --git a/include/seastar/core/task.hh b/include/seastar/core/task.hh index 8dbfb789434..5c2c27b7982 100644 --- a/include/seastar/core/task.hh +++ b/include/seastar/core/task.hh @@ -31,7 +31,74 @@ namespace seastar { SEASTAR_MODULE_EXPORT + +inline int64_t rdtsc() { + uint64_t rax, rdx; + asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) ); + return (int64_t)(( rdx << 32 ) + rax); +} + +struct tracer { + struct entry { + uint64_t event; + uint64_t id; + uint64_t arg; + int64_t ts; + }; + + tracer(); + static constexpr size_t buffer_size = (32 * 1024); + std::vector _buf; + size_t _head = 0; + size_t _tail = buffer_size - 1; + void add(uint64_t event, uint64_t id, uint64_t arg) { + if (_head == _tail) { + return; + } + _buf[_head++] = entry{.event = event, .id = id, .arg = arg, .ts = rdtsc()}; + if (_head % (buffer_size / 2) == 0) { + commit(); + if (_head == buffer_size) { + _head = 0; + } + } + } + + struct impl; + std::unique_ptr _impl; + void commit(); + void start(); + future<> stop(); +}; +extern thread_local tracer g_tracer; + +extern thread_local uint64_t fresh_task_id; +extern thread_local uint64_t current_task_id; + +struct task_id { + uint64_t _value; + task_id(uint64_t id = current_task_id) : _value(id) {} + operator uint64_t() { return _value; }; +}; + +struct [[nodiscard]] switch_task { + task_id _prev; + switch_task(uint64_t event, uint64_t id) { + current_task_id = id; + g_tracer.add(event, _prev, current_task_id); + } + ~switch_task() { + current_task_id = _prev; + } +}; + +inline void task_event(uint64_t event, uint64_t arg, uint64_t id = current_task_id) { + g_tracer.add(event, id, arg); +} + class task { +public: + task_id _id; protected: scheduling_group _sg; private: diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index 52d38aadd3d..c72bc300d22 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -219,6 +219,8 @@ class io_desc_read_write final : public io_completion { promise _pr; iovec_keeper _iovs; uint64_t _dispatched_polls; + task_id _task_id; + uint64_t _io_id; public: io_desc_read_write(io_queue& ioq, io_queue::priority_class_data& pc, stream_id stream, io_direction_and_length dnl, fair_queue_entry::capacity_t cap, iovec_keeper iovs) @@ -243,6 +245,7 @@ class io_desc_read_write final : public io_completion { virtual void complete(size_t res) noexcept override { io_log.trace("dev {} : req {} complete", _ioq.dev_id(), fmt::ptr(this)); + task_event(5, _io_id, _task_id); auto now = io_queue::clock_type::now(); auto delay = std::chrono::duration_cast>(now - _ts); _pclass.on_complete(delay); @@ -266,6 +269,9 @@ class io_desc_read_write final : public io_completion { } future get_future() { + static thread_local uint64_t io_id = 0; + _io_id = io_id++; + task_event(4, _io_id); return _pr.get_future(); } diff --git a/src/core/reactor.cc b/src/core/reactor.cc index a8b98e3e230..85ffcd01204 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -185,6 +185,9 @@ module seastar; namespace seastar { +thread_local uint64_t fresh_task_id = 1; +thread_local uint64_t current_task_id = 0; + static_assert(posix::shutdown_mask(SHUT_RD) == posix::rcv_shutdown); static_assert(posix::shutdown_mask(SHUT_WR) == posix::snd_shutdown); static_assert(posix::shutdown_mask(SHUT_RDWR) == (posix::snd_shutdown | posix::rcv_shutdown)); @@ -2589,7 +2592,10 @@ void reactor::run_tasks(task_queue& tq) { STAP_PROBE(seastar, reactor_run_tasks_single_start); internal::task_histogram_add_task(*tsk); _current_task = tsk; - tsk->run_and_dispose(); + { + auto st = switch_task(0, tsk->_id); + tsk->run_and_dispose(); + } _current_task = nullptr; STAP_PROBE(seastar, reactor_run_tasks_single_end); ++tq._tasks_processed; diff --git a/src/util/backtrace.cc b/src/util/backtrace.cc index cadf841e983..66f6b16bda1 100644 --- a/src/util/backtrace.cc +++ b/src/util/backtrace.cc @@ -194,4 +194,110 @@ bool tasktrace::operator==(const tasktrace& o) const noexcept { tasktrace::~tasktrace() {} +thread_local tracer g_tracer; + +struct tracer::impl { + std::optional> _loop; + server_socket _ss; + std::optional _conn; + tracer* _parent; + condition_variable _cv; + bool _stopping = false; + + size_t queued() { + auto t = _parent->_tail; + auto h = _parent->_head; + if (h > t) { + return h - t; + } else { + return _parent->_buf.size() - (t - h); + } + } + + impl(tracer* p) : _parent(p) {} + + future<> handle() { + auto out = _conn->output(_parent->_buf.size() * sizeof(_parent->_buf[0])); + while (true) { + co_await _cv.when([&] {return queued() >= 1024 || _stopping;}); + if (_stopping) { + break; + } + auto t = _parent->_tail; + auto h = _parent->_head; + if (h > t) { + auto base = t; + auto size = h - t; + co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); + } else { + auto base = t; + auto size = _parent->_buf.size() - t; + co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); + base = 0; + size = h; + co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); + } + if (h == t) { + std::cerr << "OVERFULL\n"; + } + _parent->_tail = (h > 0) ? (h - 1) : (_parent->_buf.size() - 1); + } + } + + void commit() { + _cv.signal(); + } + + future<> run_loop() { + listen_options lo; + lo.reuse_address = true; + auto addr = socket_address(uint16_t(12345)); + _ss = seastar::listen(addr, lo); + while (true) { + accept_result ar = co_await _ss.accept(); + _conn = std::move(ar.connection); + try { + co_await handle(); + } catch(...) { + } + _conn->shutdown_input(); + _conn->shutdown_output(); + _conn = {}; + } + } + + void start() { + _loop = run_loop(); + } + + future<> stop() { + _stopping = true; + try { + _ss.abort_accept(); + if (_conn) { + _conn->shutdown_input(); + _conn->shutdown_output(); + } + _cv.signal(); + } catch (...) { + } + return std::move(*std::exchange(_loop, std::nullopt)); + } +}; + +tracer::tracer() : _buf(buffer_size), _impl(new impl(this)) { +} + +void tracer::start() { + _impl->start(); +} + +future<> tracer::stop() { + return _impl->stop(); +} + +void tracer::commit() { + _impl->commit(); +} + } // namespace seastar From e2f29cbe20bb7053126ab9f61321f9734eb12d83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Tue, 17 Dec 2024 22:47:38 +0100 Subject: [PATCH 2/4] checkpoint --- include/seastar/core/byteorder.hh | 3 +- include/seastar/core/execution_stage.hh | 3 +- include/seastar/core/fair_queue.hh | 1 + include/seastar/core/task.hh | 48 +----- include/seastar/util/shared_token_bucket.hh | 3 +- include/seastar/util/tracer.hh | 177 ++++++++++++++++++++ src/core/fair_queue.cc | 20 ++- src/core/io_queue.cc | 10 +- src/core/reactor.cc | 8 +- src/util/backtrace.cc | 106 ------------ src/util/tracer.cc | 7 + 11 files changed, 222 insertions(+), 164 deletions(-) create mode 100644 include/seastar/util/tracer.hh create mode 100644 src/util/tracer.cc diff --git a/include/seastar/core/byteorder.hh b/include/seastar/core/byteorder.hh index 3291d9066b7..8b9f4752584 100644 --- a/include/seastar/core/byteorder.hh +++ b/include/seastar/core/byteorder.hh @@ -71,10 +71,11 @@ read_le(const char* p) noexcept { template inline -void +char* write_le(char* p, T datum) noexcept { datum = cpu_to_le(datum); std::copy_n(reinterpret_cast(&datum), sizeof(T), p); + return p + sizeof(T); } template diff --git a/include/seastar/core/execution_stage.hh b/include/seastar/core/execution_stage.hh index 1eb9f487aa5..278bcced194 100644 --- a/include/seastar/core/execution_stage.hh +++ b/include/seastar/core/execution_stage.hh @@ -246,7 +246,8 @@ private: auto wi_ready = std::move(wi._ready); _queue.pop_front(); { - auto st = switch_task(11, wi._task_id); + tracepoint_run_execution_stage_task(wi._task_id); + auto st = switch_task(wi._task_id); futurize::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); } _stats.function_calls_executed++; diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 546cb00b601..8e10ea37184 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -258,6 +258,7 @@ public: void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; capacity_t capacity_deficiency(capacity_t from) const noexcept; + capacity_t head() const noexcept; std::chrono::duration rate_limit_duration() const noexcept { std::chrono::duration dur((double)_token_bucket.limit() / _token_bucket.rate()); diff --git a/include/seastar/core/task.hh b/include/seastar/core/task.hh index 5c2c27b7982..0b2b18437f0 100644 --- a/include/seastar/core/task.hh +++ b/include/seastar/core/task.hh @@ -23,6 +23,7 @@ #include #include +#include #ifndef SEASTAR_MODULE #include @@ -32,46 +33,6 @@ namespace seastar { SEASTAR_MODULE_EXPORT -inline int64_t rdtsc() { - uint64_t rax, rdx; - asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) ); - return (int64_t)(( rdx << 32 ) + rax); -} - -struct tracer { - struct entry { - uint64_t event; - uint64_t id; - uint64_t arg; - int64_t ts; - }; - - tracer(); - static constexpr size_t buffer_size = (32 * 1024); - std::vector _buf; - size_t _head = 0; - size_t _tail = buffer_size - 1; - void add(uint64_t event, uint64_t id, uint64_t arg) { - if (_head == _tail) { - return; - } - _buf[_head++] = entry{.event = event, .id = id, .arg = arg, .ts = rdtsc()}; - if (_head % (buffer_size / 2) == 0) { - commit(); - if (_head == buffer_size) { - _head = 0; - } - } - } - - struct impl; - std::unique_ptr _impl; - void commit(); - void start(); - future<> stop(); -}; -extern thread_local tracer g_tracer; - extern thread_local uint64_t fresh_task_id; extern thread_local uint64_t current_task_id; @@ -83,19 +44,14 @@ struct task_id { struct [[nodiscard]] switch_task { task_id _prev; - switch_task(uint64_t event, uint64_t id) { + switch_task(uint64_t id) { current_task_id = id; - g_tracer.add(event, _prev, current_task_id); } ~switch_task() { current_task_id = _prev; } }; -inline void task_event(uint64_t event, uint64_t arg, uint64_t id = current_task_id) { - g_tracer.add(event, id, arg); -} - class task { public: task_id _id; diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index 91c718763a7..ee0d6513e8e 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -116,8 +116,9 @@ class shared_token_bucket { rovers_t _rovers; T tail() const noexcept { return _rovers.tail.load(std::memory_order_relaxed); } +public: T head() const noexcept { return _rovers.head.load(std::memory_order_relaxed); } - +private: /* * Need to make sure that the multiplication in accumulated_in() doesn't * overflow. Not to introduce an extra branch there, define that the diff --git a/include/seastar/util/tracer.hh b/include/seastar/util/tracer.hh new file mode 100644 index 00000000000..3fba591ff08 --- /dev/null +++ b/include/seastar/util/tracer.hh @@ -0,0 +1,177 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace seastar { + +struct tracer { + static constexpr size_t buffer_size = (128 * 1024); + std::deque> _old; + std::vector _current; + size_t _cur_pos = 0; + + tracer() { + for (int i = 0; i < 80; ++i) { + _old.push_back(std::vector()); + } + _current.resize(buffer_size); + } + + void rotate() { + _current.resize(_cur_pos); + _old.push_back(std::move(_current)); + _current = std::move(_old.front()); + _old.pop_front(); + _current.resize(buffer_size); + _cur_pos = 0; + } + + std::byte* write(size_t n) { + if (_current.size() - _cur_pos < n) [[unlikely]] { + rotate(); + } + auto result = &_current[_cur_pos]; + _cur_pos += n; + return result; + } + + std::deque> snapshot() { + auto result = _old; + auto cur = _current; + cur.resize(_cur_pos); + result.push_back(cur); + return result; + } + + uint64_t rdtsc() { + uint64_t rax, rdx; + asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) ); + return (uint64_t)(( rdx << 32 ) + rax); + } +}; +extern thread_local tracer g_tracer; + + +enum class trace_events { + POLL, + SLEEP, + WAKEUP, + RUN_TASK_QUEUE, + RUN_TASK_QUEUE_END, + RUN_TASK, + RUN_EXECUTION_STAGE_TASK, + GRAB_CAPACITY, + GRAB_CAPACITY_PENDING, + DISPATCH_REQUESTS, + DISPATCH_QUEUE, + REPLENISH, + IO_QUEUE, + IO_DISPATCH, + IO_COMPLETE, + COUNT, +}; + +[[gnu::always_inline]] +inline void tracepoint_nullary(trace_events header) { + auto p = reinterpret_cast(g_tracer.write(12)); + p = seastar::write_le(p, static_cast(header)); + p = seastar::write_le(p, g_tracer.rdtsc()); +} + +template +[[gnu::always_inline]] +inline void tracepoint_unary(uint32_t header, T arg) { + auto p = reinterpret_cast(g_tracer.write(12 + sizeof(T))); + p = seastar::write_le(p, static_cast(header)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, arg); +} + +template +[[gnu::always_inline]] +inline void tracepoint_unary(trace_events header, T arg) { + tracepoint_unary(static_cast(header), arg); +} + +inline void tracepoint_poll() { + tracepoint_nullary(trace_events::POLL); +} + +inline void tracepoint_sleep() { + tracepoint_nullary(trace_events::SLEEP); +} + +inline void tracepoint_wakeup() { + tracepoint_nullary(trace_events::WAKEUP); +} + +inline void tracepoint_run_task_queue(uint8_t sg) { + tracepoint_unary(trace_events::RUN_TASK_QUEUE, sg); +} + +inline void tracepoint_run_task_queue_end() { + tracepoint_nullary(trace_events::RUN_TASK_QUEUE_END); +} + +inline void tracepoint_run_task(int64_t task_id) { + tracepoint_unary(trace_events::RUN_TASK, task_id); +} + +inline void tracepoint_run_execution_stage_task(int64_t task_id) { + tracepoint_unary(trace_events::RUN_EXECUTION_STAGE_TASK, task_id); +} + +inline void tracepoint_io_queue(uint8_t direction, uint64_t tokens, uint64_t io_id) { + auto p = reinterpret_cast(g_tracer.write(12 + 17)); + p = seastar::write_le(p, static_cast(trace_events::IO_QUEUE)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, direction); + p = seastar::write_le(p, tokens); + p = seastar::write_le(p, io_id); +} + +inline void tracepoint_io_dispatch(uint64_t io_id) { + tracepoint_unary(trace_events::IO_DISPATCH, io_id); +} + +inline void tracepoint_io_complete(uint64_t io_id) { + tracepoint_unary(trace_events::IO_COMPLETE, io_id); +} + +inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) { + auto p = reinterpret_cast(g_tracer.write(12 + 24)); + p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, cap); + p = seastar::write_le(p, want_head); + p = seastar::write_le(p, head); +} + +inline void tracepoint_grab_capacity_pending(uint64_t cap, uint64_t head) { + auto p = reinterpret_cast(g_tracer.write(12 + 16)); + p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY_PENDING)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, cap); + p = seastar::write_le(p, head); +} + +inline void tracepoint_replenish(uint64_t new_head) { + tracepoint_unary(trace_events::REPLENISH, new_head); +} + +inline void tracepoint_dispatch_queue(uint8_t id) { + auto p = reinterpret_cast(g_tracer.write(5)); + p = seastar::write_le(p, static_cast(trace_events::DISPATCH_QUEUE)); + p = seastar::write_le(p, id); +} + +inline void tracepoint_dispatch_requests() { + tracepoint_nullary(trace_events::DISPATCH_REQUESTS); +} + + +} // namespace seastar diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index f23edf3dedc..309b722e900 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -115,6 +115,7 @@ auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t { void fair_group::replenish_capacity(clock_type::time_point now) noexcept { _token_bucket.replenish(now); + tracepoint_replenish(_token_bucket.head()); } void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { @@ -131,6 +132,10 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity return _token_bucket.deficiency(from); } +auto fair_group::head() const noexcept -> capacity_t { + return _token_bucket.head(); +} + // Priority class, to be used with a given fair_queue class fair_queue::priority_class_data { friend class fair_queue; @@ -141,9 +146,10 @@ class fair_queue::priority_class_data { bool _queued = false; bool _plugged = true; uint32_t _activations = 0; + fair_queue::class_id _id; public: - explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {} + explicit priority_class_data(uint32_t shares, fair_queue::class_id id) noexcept : _shares(std::max(shares, 1u)), _id(id) {} priority_class_data(const priority_class_data&) = delete; priority_class_data(priority_class_data&&) = delete; @@ -226,7 +232,9 @@ void fair_queue::unplug_class(class_id cid) noexcept { auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { _group.maybe_replenish_capacity(_group_replenish); - if (_group.capacity_deficiency(_pending->head)) { + capacity_t head = _group.head(); + tracepoint_grab_capacity(ent._capacity, _pending->head, head); + if (internal::wrapping_difference(_pending->head, head)) { return grab_result::pending; } @@ -246,7 +254,9 @@ auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_res capacity_t cap = ent._capacity; capacity_t want_head = _group.grab_capacity(cap); - if (_group.capacity_deficiency(want_head)) { + capacity_t head = _group.head(); + tracepoint_grab_capacity(ent._capacity, want_head, head); + if (internal::wrapping_difference(want_head, head)) { _pending.emplace(want_head, cap); return grab_result::pending; } @@ -262,7 +272,7 @@ void fair_queue::register_priority_class(class_id id, uint32_t shares) { } _handles.reserve(_nr_classes + 1); - _priority_classes[id] = std::make_unique(shares); + _priority_classes[id] = std::make_unique(shares, id); _nr_classes++; } @@ -330,8 +340,10 @@ void fair_queue::dispatch_requests(std::function cb) { capacity_t dispatched = 0; boost::container::small_vector preempt; + tracepoint_dispatch_requests(); while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { priority_class_data& h = *_handles.top(); + tracepoint_dispatch_queue(h._id); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); continue; diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index c72bc300d22..7bdb44b2bb4 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -48,6 +48,7 @@ module seastar; #include #include #include +#include #endif namespace seastar { @@ -233,6 +234,9 @@ class io_desc_read_write final : public io_completion { , _iovs(std::move(iovs)) { io_log.trace("dev {} : req {} queue len {} capacity {}", _ioq.dev_id(), fmt::ptr(this), _dnl.length(), _fq_capacity); + static thread_local uint64_t io_id = 0; + _io_id = io_id++; + tracepoint_io_queue(_fq_capacity, dnl.rw_idx(), _io_id); } virtual void set_exception(std::exception_ptr eptr) noexcept override { @@ -245,7 +249,7 @@ class io_desc_read_write final : public io_completion { virtual void complete(size_t res) noexcept override { io_log.trace("dev {} : req {} complete", _ioq.dev_id(), fmt::ptr(this)); - task_event(5, _io_id, _task_id); + tracepoint_io_complete(_io_id); auto now = io_queue::clock_type::now(); auto delay = std::chrono::duration_cast>(now - _ts); _pclass.on_complete(delay); @@ -262,6 +266,7 @@ class io_desc_read_write final : public io_completion { void dispatch() noexcept { io_log.trace("dev {} : req {} submit", _ioq.dev_id(), fmt::ptr(this)); + tracepoint_io_dispatch(_io_id); auto now = io_queue::clock_type::now(); _pclass.on_dispatch(_dnl, std::chrono::duration_cast>(now - _ts)); _ts = now; @@ -269,9 +274,6 @@ class io_desc_read_write final : public io_completion { } future get_future() { - static thread_local uint64_t io_id = 0; - _io_id = io_id++; - task_event(4, _io_id); return _pr.get_future(); } diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 85ffcd01204..8f8063c2211 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -173,6 +173,7 @@ module seastar; #include #include #include +#include #include "core/reactor_backend.hh" #include "core/syscall_result.hh" #include "core/thread_pool.hh" @@ -185,6 +186,8 @@ module seastar; namespace seastar { +thread_local tracer g_tracer; + thread_local uint64_t fresh_task_id = 1; thread_local uint64_t current_task_id = 0; @@ -2593,7 +2596,8 @@ void reactor::run_tasks(task_queue& tq) { internal::task_histogram_add_task(*tsk); _current_task = tsk; { - auto st = switch_task(0, tsk->_id); + tracepoint_run_task(tsk->_id); + auto st = switch_task(tsk->_id); tsk->run_and_dispose(); } _current_task = nullptr; @@ -3061,6 +3065,7 @@ reactor::run_some_tasks() { auto t_run_started = t_run_completed; insert_activating_task_queues(); task_queue* tq = pop_active_task_queue(t_run_started); + tracepoint_run_task_queue(tq->_id); sched_print("running tq {} {}", (void*)tq, tq->_name); _last_vruntime = std::max(tq->_vruntime, _last_vruntime); run_tasks(*tq); @@ -3083,6 +3088,7 @@ reactor::run_some_tasks() { // Settle on a regular need_preempt(), which will return true in // debug mode. } while (have_more_tasks() && !need_preempt()); + tracepoint_run_task_queue_end(); _cpu_stall_detector->end_task_run(t_run_completed); STAP_PROBE(seastar, reactor_run_tasks_end); *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run diff --git a/src/util/backtrace.cc b/src/util/backtrace.cc index 66f6b16bda1..cadf841e983 100644 --- a/src/util/backtrace.cc +++ b/src/util/backtrace.cc @@ -194,110 +194,4 @@ bool tasktrace::operator==(const tasktrace& o) const noexcept { tasktrace::~tasktrace() {} -thread_local tracer g_tracer; - -struct tracer::impl { - std::optional> _loop; - server_socket _ss; - std::optional _conn; - tracer* _parent; - condition_variable _cv; - bool _stopping = false; - - size_t queued() { - auto t = _parent->_tail; - auto h = _parent->_head; - if (h > t) { - return h - t; - } else { - return _parent->_buf.size() - (t - h); - } - } - - impl(tracer* p) : _parent(p) {} - - future<> handle() { - auto out = _conn->output(_parent->_buf.size() * sizeof(_parent->_buf[0])); - while (true) { - co_await _cv.when([&] {return queued() >= 1024 || _stopping;}); - if (_stopping) { - break; - } - auto t = _parent->_tail; - auto h = _parent->_head; - if (h > t) { - auto base = t; - auto size = h - t; - co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); - } else { - auto base = t; - auto size = _parent->_buf.size() - t; - co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); - base = 0; - size = h; - co_await out.write((char*)(&_parent->_buf[base]), sizeof(_parent->_buf[0]) * size); - } - if (h == t) { - std::cerr << "OVERFULL\n"; - } - _parent->_tail = (h > 0) ? (h - 1) : (_parent->_buf.size() - 1); - } - } - - void commit() { - _cv.signal(); - } - - future<> run_loop() { - listen_options lo; - lo.reuse_address = true; - auto addr = socket_address(uint16_t(12345)); - _ss = seastar::listen(addr, lo); - while (true) { - accept_result ar = co_await _ss.accept(); - _conn = std::move(ar.connection); - try { - co_await handle(); - } catch(...) { - } - _conn->shutdown_input(); - _conn->shutdown_output(); - _conn = {}; - } - } - - void start() { - _loop = run_loop(); - } - - future<> stop() { - _stopping = true; - try { - _ss.abort_accept(); - if (_conn) { - _conn->shutdown_input(); - _conn->shutdown_output(); - } - _cv.signal(); - } catch (...) { - } - return std::move(*std::exchange(_loop, std::nullopt)); - } -}; - -tracer::tracer() : _buf(buffer_size), _impl(new impl(this)) { -} - -void tracer::start() { - _impl->start(); -} - -future<> tracer::stop() { - return _impl->stop(); -} - -void tracer::commit() { - _impl->commit(); -} - } // namespace seastar diff --git a/src/util/tracer.cc b/src/util/tracer.cc new file mode 100644 index 00000000000..42c4992b932 --- /dev/null +++ b/src/util/tracer.cc @@ -0,0 +1,7 @@ +#include "seastar/util/tracer.hh" + +namespace seastar { + +extern thread_local tracer g_tracer; + +} // namespace seastar \ No newline at end of file From bd5aa7223d7dc21ca4f5a219bd5ba50aa8b0ac8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Wed, 18 Dec 2024 14:23:37 +0100 Subject: [PATCH 3/4] checkpoint --- include/seastar/core/reactor.hh | 3 +++ include/seastar/util/tracer.hh | 11 +++++++---- src/core/fair_queue.cc | 2 +- src/core/io_queue.cc | 2 +- src/core/metrics.cc | 1 + src/core/reactor.cc | 10 ++++++++++ 6 files changed, 23 insertions(+), 6 deletions(-) diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 9296ced12b2..a223e5080f7 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -68,6 +68,7 @@ #include #include #include +#include #include #include #include @@ -281,6 +282,8 @@ private: }; boost::container::static_vector, max_scheduling_groups()> _task_queues; +public: + std::vector, float>> list_groups(); internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data; int64_t _last_vruntime = 0; task_queue_list _active_task_queues; diff --git a/include/seastar/util/tracer.hh b/include/seastar/util/tracer.hh index 3fba591ff08..c46b410f2ee 100644 --- a/include/seastar/util/tracer.hh +++ b/include/seastar/util/tracer.hh @@ -15,7 +15,7 @@ struct tracer { size_t _cur_pos = 0; tracer() { - for (int i = 0; i < 80; ++i) { + for (int i = 0; i < 480; ++i) { _old.push_back(std::vector()); } _current.resize(buffer_size); @@ -72,6 +72,7 @@ enum class trace_events { IO_QUEUE, IO_DISPATCH, IO_COMPLETE, + MONITORING_SCRAPE, COUNT, }; @@ -164,14 +165,16 @@ inline void tracepoint_replenish(uint64_t new_head) { } inline void tracepoint_dispatch_queue(uint8_t id) { - auto p = reinterpret_cast(g_tracer.write(5)); - p = seastar::write_le(p, static_cast(trace_events::DISPATCH_QUEUE)); - p = seastar::write_le(p, id); + tracepoint_unary(trace_events::DISPATCH_QUEUE, id); } inline void tracepoint_dispatch_requests() { tracepoint_nullary(trace_events::DISPATCH_REQUESTS); } +inline void tracepoint_monitoring_scrape() { + tracepoint_nullary(trace_events::MONITORING_SCRAPE); +} + } // namespace seastar diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 309b722e900..2be38e6c32a 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -340,7 +340,7 @@ void fair_queue::dispatch_requests(std::function cb) { capacity_t dispatched = 0; boost::container::small_vector preempt; - tracepoint_dispatch_requests(); + // tracepoint_dispatch_requests(); while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { priority_class_data& h = *_handles.top(); tracepoint_dispatch_queue(h._id); diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index 7bdb44b2bb4..dad0cfd02d2 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -236,7 +236,7 @@ class io_desc_read_write final : public io_completion { io_log.trace("dev {} : req {} queue len {} capacity {}", _ioq.dev_id(), fmt::ptr(this), _dnl.length(), _fq_capacity); static thread_local uint64_t io_id = 0; _io_id = io_id++; - tracepoint_io_queue(_fq_capacity, dnl.rw_idx(), _io_id); + tracepoint_io_queue(dnl.rw_idx(), _fq_capacity, _io_id); } virtual void set_exception(std::exception_ptr eptr) noexcept override { diff --git a/src/core/metrics.cc b/src/core/metrics.cc index 14f1d5a53eb..5e68eb0aeb9 100644 --- a/src/core/metrics.cc +++ b/src/core/metrics.cc @@ -375,6 +375,7 @@ const value_map& get_value_map() { } foreign_ptr get_values() { + tracepoint_monitoring_scrape(); shared_ptr res_ref = ::seastar::make_shared(); auto& res = *(res_ref.get()); auto& mv = res.values; diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 8f8063c2211..cd10eae7292 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -246,6 +246,16 @@ shard_id reactor::cpu_id() const { return _id; } +std::vector, float>> reactor::list_groups() { + std::vector, float>> result; + for (int i = 0; i < _task_queues.size(); ++i) { + if (_task_queues[i]) { + result.emplace_back(i, _task_queues[i]->_name, _task_queues[i]->_shares); + } + } + return result; +} + void reactor::update_shares_for_queues(internal::priority_class pc, uint32_t shares) { for (auto&& q : _io_queues) { q.second->update_shares_for_class(pc, shares); From b0ec97d4117481101ec464f5d8e18a2bb7124827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Sun, 22 Dec 2024 01:52:25 +0100 Subject: [PATCH 4/4] fix? --- include/seastar/core/fair_queue.hh | 12 +-- include/seastar/util/shared_token_bucket.hh | 4 + include/seastar/util/tracer.hh | 9 +- src/core/fair_queue.cc | 96 ++++++++++++--------- src/core/io_queue.cc | 1 + 5 files changed, 75 insertions(+), 47 deletions(-) diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 8e10ea37184..04d4d30a6bb 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -254,6 +254,7 @@ public: capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void refund_tokens(capacity_t) noexcept; void replenish_capacity(clock_type::time_point now) noexcept; void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; @@ -344,13 +345,12 @@ private: * in the middle of the waiting */ struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} + capacity_t head = 0; + capacity_t cap = 0; }; - std::optional _pending; + pending _pending; + capacity_t _queued_cap = 0; void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; @@ -360,7 +360,7 @@ private: enum class grab_result { grabbed, cant_preempt, pending }; grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + capacity_t reap_pending_capacity(bool& contact) noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index ee0d6513e8e..9a2f87fd353 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -160,6 +160,10 @@ public: _rovers.release(tokens); } + void refund(T tokens) noexcept { + fetch_add(_rovers.head, tokens); + } + void replenish(typename Clock::time_point now) noexcept { auto ts = _replenished.load(std::memory_order_relaxed); diff --git a/include/seastar/util/tracer.hh b/include/seastar/util/tracer.hh index c46b410f2ee..0a6127793e6 100644 --- a/include/seastar/util/tracer.hh +++ b/include/seastar/util/tracer.hh @@ -72,6 +72,7 @@ enum class trace_events { IO_QUEUE, IO_DISPATCH, IO_COMPLETE, + IO_CANCEL, MONITORING_SCRAPE, COUNT, }; @@ -143,6 +144,10 @@ inline void tracepoint_io_complete(uint64_t io_id) { tracepoint_unary(trace_events::IO_COMPLETE, io_id); } +inline void tracepoint_io_cancel(uint64_t io_id) { + tracepoint_unary(trace_events::IO_CANCEL, io_id); +} + inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) { auto p = reinterpret_cast(g_tracer.write(12 + 24)); p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY)); @@ -168,8 +173,8 @@ inline void tracepoint_dispatch_queue(uint8_t id) { tracepoint_unary(trace_events::DISPATCH_QUEUE, id); } -inline void tracepoint_dispatch_requests() { - tracepoint_nullary(trace_events::DISPATCH_REQUESTS); +inline void tracepoint_dispatch_requests(uint64_t queued) { + tracepoint_unary(trace_events::DISPATCH_REQUESTS, queued); } inline void tracepoint_monitoring_scrape() { diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 2be38e6c32a..dc2d7f175e0 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -118,6 +118,11 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept { tracepoint_replenish(_token_bucket.head()); } +void fair_group::refund_tokens(capacity_t cap) noexcept { + _token_bucket.refund(cap); + tracepoint_replenish(_token_bucket.head()); +} + void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { auto now = clock_type::now(); auto extra = _token_bucket.accumulated_in(now - local_ts); @@ -229,38 +234,29 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - capacity_t head = _group.head(); - tracepoint_grab_capacity(ent._capacity, _pending->head, head); - if (internal::wrapping_difference(_pending->head, head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; +auto fair_queue::reap_pending_capacity(bool& contact) noexcept -> capacity_t { + capacity_t result = 0; + contact = true; + if (_pending.cap) { + _group.maybe_replenish_capacity(_group_replenish); + capacity_t head = _group.head(); + // tracepoint_grab_capacity(ent._capacity, _pending->head, head); + auto diff = internal::wrapping_difference(_pending.head, head); + contact = diff <= _pending.cap; + if (diff < _pending.cap) { + result = _pending.cap - diff; + _pending.cap = diff; + } } - - _pending.reset(); - return grab_result::grabbed; + return result; } auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - capacity_t cap = ent._capacity; capacity_t want_head = _group.grab_capacity(cap); capacity_t head = _group.head(); tracepoint_grab_capacity(ent._capacity, want_head, head); - if (internal::wrapping_difference(want_head, head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - + _pending = pending{want_head, cap}; return grab_result::grabbed; } @@ -307,17 +303,19 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { push_priority_class_from_idle(pc); } pc._queue.push_back(ent); + _queued_cap += ent.capacity(); } void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { } void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { + _queued_cap -= ent._capacity; ent._capacity = 0; } fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { + if (_pending.cap) { /* * We expect the disk to release the ticket within some time, * but it's ... OK if it doesn't -- the pending wait still @@ -328,7 +326,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept * which's sub-optimal. The expectation is that we think disk * works faster, than it really does. */ - auto over = _group.capacity_deficiency(_pending->head); + auto over = _group.capacity_deficiency(_pending.head); auto ticks = _group.capacity_duration(over); return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); } @@ -337,11 +335,15 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept } void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; - boost::container::small_vector preempt; + tracepoint_dispatch_requests(_queued_cap); + _pending.cap = std::min(_pending.cap, _queued_cap); + bool contact = false; + capacity_t available = reap_pending_capacity(contact); + capacity_t recycled = 0; + uint64_t can_grab_this_tick = _group.per_tick_grab_threshold(); // tracepoint_dispatch_requests(); - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); tracepoint_dispatch_queue(h._id); if (h._queue.empty() || !h._plugged) { @@ -350,16 +352,29 @@ void fair_queue::dispatch_requests(std::function cb) { } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + if (req._capacity <= available) { + // pass + } else if (req._capacity <= available + _pending.cap) { + _pending.cap += available; + available = 0; + break; + } else if (contact) { + can_grab_this_tick += available + _pending.cap; + recycled = available + _pending.cap; + _pending.cap = 0; + available = 0; + capacity_t grab_amount = std::min(can_grab_this_tick, _queued_cap); + grab_capacity(grab_amount); + can_grab_this_tick -= grab_amount; + available += reap_pending_capacity(contact); + if (req._capacity > available) { + break; + } + } else { break; } - if (gr == grab_result::cant_preempt) { - pop_priority_class(h); - preempt.emplace_back(&h); - continue; - } + available -= req._capacity; _last_accumulated = std::max(h._accumulated, _last_accumulated); pop_priority_class(h); @@ -386,7 +401,7 @@ void fair_queue::dispatch_requests(std::function cb) { } h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; + _queued_cap -= req_cap; cb(req); @@ -395,8 +410,11 @@ void fair_queue::dispatch_requests(std::function cb) { } } - for (auto&& h : preempt) { - push_priority_class(*h); + if (_pending.cap == 0 && available) { + grab_capacity(available); + } + if (available + recycled) { + _group.refund_tokens(available + recycled); } } diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index dad0cfd02d2..f79236695bb 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -259,6 +259,7 @@ class io_desc_read_write final : public io_completion { } void cancel() noexcept { + tracepoint_io_cancel(_io_id); _pclass.on_cancel(); _pr.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled())); delete this;