diff --git a/include/seastar/core/byteorder.hh b/include/seastar/core/byteorder.hh index 3291d9066b7..8b9f4752584 100644 --- a/include/seastar/core/byteorder.hh +++ b/include/seastar/core/byteorder.hh @@ -71,10 +71,11 @@ read_le(const char* p) noexcept { template inline -void +char* write_le(char* p, T datum) noexcept { datum = cpu_to_le(datum); std::copy_n(reinterpret_cast(&datum), sizeof(T), p); + return p + sizeof(T); } template diff --git a/include/seastar/core/execution_stage.hh b/include/seastar/core/execution_stage.hh index 227afe6dedb..278bcced194 100644 --- a/include/seastar/core/execution_stage.hh +++ b/include/seastar/core/execution_stage.hh @@ -220,6 +220,7 @@ class concrete_execution_stage final : public execution_stage { struct work_item { input_type _in; + task_id _task_id; promise_type _ready; work_item(typename internal::wrap_for_es::type... args) : _in(std::move(args)...) { } @@ -244,7 +245,11 @@ private: auto wi_in = std::move(wi._in); auto wi_ready = std::move(wi._ready); _queue.pop_front(); - futurize::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); + { + tracepoint_run_execution_stage_task(wi._task_id); + auto st = switch_task(wi._task_id); + futurize::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); + } _stats.function_calls_executed++; if (internal::scheduler_need_preempt()) { diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 546cb00b601..04d4d30a6bb 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -254,10 +254,12 @@ public: capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void refund_tokens(capacity_t) noexcept; void replenish_capacity(clock_type::time_point now) noexcept; void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; capacity_t capacity_deficiency(capacity_t from) const noexcept; + capacity_t head() const noexcept; std::chrono::duration rate_limit_duration() const noexcept { std::chrono::duration dur((double)_token_bucket.limit() / _token_bucket.rate()); @@ -343,13 +345,12 @@ private: * in the middle of the waiting */ struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} + capacity_t head = 0; + capacity_t cap = 0; }; - std::optional _pending; + pending _pending; + capacity_t _queued_cap = 0; void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; @@ -359,7 +360,7 @@ private: enum class grab_result { grabbed, cant_preempt, pending }; grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + capacity_t reap_pending_capacity(bool& contact) noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 9296ced12b2..a223e5080f7 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -68,6 +68,7 @@ #include #include #include +#include #include #include #include @@ -281,6 +282,8 @@ private: }; boost::container::static_vector, max_scheduling_groups()> _task_queues; +public: + std::vector, float>> list_groups(); internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data; int64_t _last_vruntime = 0; task_queue_list _active_task_queues; diff --git a/include/seastar/core/task.hh b/include/seastar/core/task.hh index 8dbfb789434..0b2b18437f0 100644 --- a/include/seastar/core/task.hh +++ b/include/seastar/core/task.hh @@ -23,6 +23,7 @@ #include #include +#include #ifndef SEASTAR_MODULE #include @@ -31,7 +32,29 @@ namespace seastar { SEASTAR_MODULE_EXPORT + +extern thread_local uint64_t fresh_task_id; +extern thread_local uint64_t current_task_id; + +struct task_id { + uint64_t _value; + task_id(uint64_t id = current_task_id) : _value(id) {} + operator uint64_t() { return _value; }; +}; + +struct [[nodiscard]] switch_task { + task_id _prev; + switch_task(uint64_t id) { + current_task_id = id; + } + ~switch_task() { + current_task_id = _prev; + } +}; + class task { +public: + task_id _id; protected: scheduling_group _sg; private: diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index 91c718763a7..9a2f87fd353 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -116,8 +116,9 @@ class shared_token_bucket { rovers_t _rovers; T tail() const noexcept { return _rovers.tail.load(std::memory_order_relaxed); } +public: T head() const noexcept { return _rovers.head.load(std::memory_order_relaxed); } - +private: /* * Need to make sure that the multiplication in accumulated_in() doesn't * overflow. Not to introduce an extra branch there, define that the @@ -159,6 +160,10 @@ public: _rovers.release(tokens); } + void refund(T tokens) noexcept { + fetch_add(_rovers.head, tokens); + } + void replenish(typename Clock::time_point now) noexcept { auto ts = _replenished.load(std::memory_order_relaxed); diff --git a/include/seastar/util/tracer.hh b/include/seastar/util/tracer.hh new file mode 100644 index 00000000000..0a6127793e6 --- /dev/null +++ b/include/seastar/util/tracer.hh @@ -0,0 +1,185 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace seastar { + +struct tracer { + static constexpr size_t buffer_size = (128 * 1024); + std::deque> _old; + std::vector _current; + size_t _cur_pos = 0; + + tracer() { + for (int i = 0; i < 480; ++i) { + _old.push_back(std::vector()); + } + _current.resize(buffer_size); + } + + void rotate() { + _current.resize(_cur_pos); + _old.push_back(std::move(_current)); + _current = std::move(_old.front()); + _old.pop_front(); + _current.resize(buffer_size); + _cur_pos = 0; + } + + std::byte* write(size_t n) { + if (_current.size() - _cur_pos < n) [[unlikely]] { + rotate(); + } + auto result = &_current[_cur_pos]; + _cur_pos += n; + return result; + } + + std::deque> snapshot() { + auto result = _old; + auto cur = _current; + cur.resize(_cur_pos); + result.push_back(cur); + return result; + } + + uint64_t rdtsc() { + uint64_t rax, rdx; + asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) ); + return (uint64_t)(( rdx << 32 ) + rax); + } +}; +extern thread_local tracer g_tracer; + + +enum class trace_events { + POLL, + SLEEP, + WAKEUP, + RUN_TASK_QUEUE, + RUN_TASK_QUEUE_END, + RUN_TASK, + RUN_EXECUTION_STAGE_TASK, + GRAB_CAPACITY, + GRAB_CAPACITY_PENDING, + DISPATCH_REQUESTS, + DISPATCH_QUEUE, + REPLENISH, + IO_QUEUE, + IO_DISPATCH, + IO_COMPLETE, + IO_CANCEL, + MONITORING_SCRAPE, + COUNT, +}; + +[[gnu::always_inline]] +inline void tracepoint_nullary(trace_events header) { + auto p = reinterpret_cast(g_tracer.write(12)); + p = seastar::write_le(p, static_cast(header)); + p = seastar::write_le(p, g_tracer.rdtsc()); +} + +template +[[gnu::always_inline]] +inline void tracepoint_unary(uint32_t header, T arg) { + auto p = reinterpret_cast(g_tracer.write(12 + sizeof(T))); + p = seastar::write_le(p, static_cast(header)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, arg); +} + +template +[[gnu::always_inline]] +inline void tracepoint_unary(trace_events header, T arg) { + tracepoint_unary(static_cast(header), arg); +} + +inline void tracepoint_poll() { + tracepoint_nullary(trace_events::POLL); +} + +inline void tracepoint_sleep() { + tracepoint_nullary(trace_events::SLEEP); +} + +inline void tracepoint_wakeup() { + tracepoint_nullary(trace_events::WAKEUP); +} + +inline void tracepoint_run_task_queue(uint8_t sg) { + tracepoint_unary(trace_events::RUN_TASK_QUEUE, sg); +} + +inline void tracepoint_run_task_queue_end() { + tracepoint_nullary(trace_events::RUN_TASK_QUEUE_END); +} + +inline void tracepoint_run_task(int64_t task_id) { + tracepoint_unary(trace_events::RUN_TASK, task_id); +} + +inline void tracepoint_run_execution_stage_task(int64_t task_id) { + tracepoint_unary(trace_events::RUN_EXECUTION_STAGE_TASK, task_id); +} + +inline void tracepoint_io_queue(uint8_t direction, uint64_t tokens, uint64_t io_id) { + auto p = reinterpret_cast(g_tracer.write(12 + 17)); + p = seastar::write_le(p, static_cast(trace_events::IO_QUEUE)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, direction); + p = seastar::write_le(p, tokens); + p = seastar::write_le(p, io_id); +} + +inline void tracepoint_io_dispatch(uint64_t io_id) { + tracepoint_unary(trace_events::IO_DISPATCH, io_id); +} + +inline void tracepoint_io_complete(uint64_t io_id) { + tracepoint_unary(trace_events::IO_COMPLETE, io_id); +} + +inline void tracepoint_io_cancel(uint64_t io_id) { + tracepoint_unary(trace_events::IO_CANCEL, io_id); +} + +inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) { + auto p = reinterpret_cast(g_tracer.write(12 + 24)); + p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, cap); + p = seastar::write_le(p, want_head); + p = seastar::write_le(p, head); +} + +inline void tracepoint_grab_capacity_pending(uint64_t cap, uint64_t head) { + auto p = reinterpret_cast(g_tracer.write(12 + 16)); + p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY_PENDING)); + p = seastar::write_le(p, g_tracer.rdtsc()); + p = seastar::write_le(p, cap); + p = seastar::write_le(p, head); +} + +inline void tracepoint_replenish(uint64_t new_head) { + tracepoint_unary(trace_events::REPLENISH, new_head); +} + +inline void tracepoint_dispatch_queue(uint8_t id) { + tracepoint_unary(trace_events::DISPATCH_QUEUE, id); +} + +inline void tracepoint_dispatch_requests(uint64_t queued) { + tracepoint_unary(trace_events::DISPATCH_REQUESTS, queued); +} + +inline void tracepoint_monitoring_scrape() { + tracepoint_nullary(trace_events::MONITORING_SCRAPE); +} + + +} // namespace seastar diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index f23edf3dedc..dc2d7f175e0 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -115,6 +115,12 @@ auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t { void fair_group::replenish_capacity(clock_type::time_point now) noexcept { _token_bucket.replenish(now); + tracepoint_replenish(_token_bucket.head()); +} + +void fair_group::refund_tokens(capacity_t cap) noexcept { + _token_bucket.refund(cap); + tracepoint_replenish(_token_bucket.head()); } void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { @@ -131,6 +137,10 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity return _token_bucket.deficiency(from); } +auto fair_group::head() const noexcept -> capacity_t { + return _token_bucket.head(); +} + // Priority class, to be used with a given fair_queue class fair_queue::priority_class_data { friend class fair_queue; @@ -141,9 +151,10 @@ class fair_queue::priority_class_data { bool _queued = false; bool _plugged = true; uint32_t _activations = 0; + fair_queue::class_id _id; public: - explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {} + explicit priority_class_data(uint32_t shares, fair_queue::class_id id) noexcept : _shares(std::max(shares, 1u)), _id(id) {} priority_class_data(const priority_class_data&) = delete; priority_class_data(priority_class_data&&) = delete; @@ -223,34 +234,29 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - if (_group.capacity_deficiency(_pending->head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; +auto fair_queue::reap_pending_capacity(bool& contact) noexcept -> capacity_t { + capacity_t result = 0; + contact = true; + if (_pending.cap) { + _group.maybe_replenish_capacity(_group_replenish); + capacity_t head = _group.head(); + // tracepoint_grab_capacity(ent._capacity, _pending->head, head); + auto diff = internal::wrapping_difference(_pending.head, head); + contact = diff <= _pending.cap; + if (diff < _pending.cap) { + result = _pending.cap - diff; + _pending.cap = diff; + } } - - _pending.reset(); - return grab_result::grabbed; + return result; } auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - capacity_t cap = ent._capacity; capacity_t want_head = _group.grab_capacity(cap); - if (_group.capacity_deficiency(want_head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - + capacity_t head = _group.head(); + tracepoint_grab_capacity(ent._capacity, want_head, head); + _pending = pending{want_head, cap}; return grab_result::grabbed; } @@ -262,7 +268,7 @@ void fair_queue::register_priority_class(class_id id, uint32_t shares) { } _handles.reserve(_nr_classes + 1); - _priority_classes[id] = std::make_unique(shares); + _priority_classes[id] = std::make_unique(shares, id); _nr_classes++; } @@ -297,17 +303,19 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { push_priority_class_from_idle(pc); } pc._queue.push_back(ent); + _queued_cap += ent.capacity(); } void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { } void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { + _queued_cap -= ent._capacity; ent._capacity = 0; } fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { + if (_pending.cap) { /* * We expect the disk to release the ticket within some time, * but it's ... OK if it doesn't -- the pending wait still @@ -318,7 +326,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept * which's sub-optimal. The expectation is that we think disk * works faster, than it really does. */ - auto over = _group.capacity_deficiency(_pending->head); + auto over = _group.capacity_deficiency(_pending.head); auto ticks = _group.capacity_duration(over); return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); } @@ -327,27 +335,46 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept } void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; - boost::container::small_vector preempt; - - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + tracepoint_dispatch_requests(_queued_cap); + _pending.cap = std::min(_pending.cap, _queued_cap); + bool contact = false; + capacity_t available = reap_pending_capacity(contact); + capacity_t recycled = 0; + uint64_t can_grab_this_tick = _group.per_tick_grab_threshold(); + + // tracepoint_dispatch_requests(); + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); + tracepoint_dispatch_queue(h._id); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); continue; } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + if (req._capacity <= available) { + // pass + } else if (req._capacity <= available + _pending.cap) { + _pending.cap += available; + available = 0; + break; + } else if (contact) { + can_grab_this_tick += available + _pending.cap; + recycled = available + _pending.cap; + _pending.cap = 0; + available = 0; + capacity_t grab_amount = std::min(can_grab_this_tick, _queued_cap); + grab_capacity(grab_amount); + can_grab_this_tick -= grab_amount; + available += reap_pending_capacity(contact); + if (req._capacity > available) { + break; + } + } else { break; } - if (gr == grab_result::cant_preempt) { - pop_priority_class(h); - preempt.emplace_back(&h); - continue; - } + available -= req._capacity; _last_accumulated = std::max(h._accumulated, _last_accumulated); pop_priority_class(h); @@ -374,7 +401,7 @@ void fair_queue::dispatch_requests(std::function cb) { } h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; + _queued_cap -= req_cap; cb(req); @@ -383,8 +410,11 @@ void fair_queue::dispatch_requests(std::function cb) { } } - for (auto&& h : preempt) { - push_priority_class(*h); + if (_pending.cap == 0 && available) { + grab_capacity(available); + } + if (available + recycled) { + _group.refund_tokens(available + recycled); } } diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index 52d38aadd3d..f79236695bb 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -48,6 +48,7 @@ module seastar; #include #include #include +#include #endif namespace seastar { @@ -219,6 +220,8 @@ class io_desc_read_write final : public io_completion { promise _pr; iovec_keeper _iovs; uint64_t _dispatched_polls; + task_id _task_id; + uint64_t _io_id; public: io_desc_read_write(io_queue& ioq, io_queue::priority_class_data& pc, stream_id stream, io_direction_and_length dnl, fair_queue_entry::capacity_t cap, iovec_keeper iovs) @@ -231,6 +234,9 @@ class io_desc_read_write final : public io_completion { , _iovs(std::move(iovs)) { io_log.trace("dev {} : req {} queue len {} capacity {}", _ioq.dev_id(), fmt::ptr(this), _dnl.length(), _fq_capacity); + static thread_local uint64_t io_id = 0; + _io_id = io_id++; + tracepoint_io_queue(dnl.rw_idx(), _fq_capacity, _io_id); } virtual void set_exception(std::exception_ptr eptr) noexcept override { @@ -243,6 +249,7 @@ class io_desc_read_write final : public io_completion { virtual void complete(size_t res) noexcept override { io_log.trace("dev {} : req {} complete", _ioq.dev_id(), fmt::ptr(this)); + tracepoint_io_complete(_io_id); auto now = io_queue::clock_type::now(); auto delay = std::chrono::duration_cast>(now - _ts); _pclass.on_complete(delay); @@ -252,6 +259,7 @@ class io_desc_read_write final : public io_completion { } void cancel() noexcept { + tracepoint_io_cancel(_io_id); _pclass.on_cancel(); _pr.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled())); delete this; @@ -259,6 +267,7 @@ class io_desc_read_write final : public io_completion { void dispatch() noexcept { io_log.trace("dev {} : req {} submit", _ioq.dev_id(), fmt::ptr(this)); + tracepoint_io_dispatch(_io_id); auto now = io_queue::clock_type::now(); _pclass.on_dispatch(_dnl, std::chrono::duration_cast>(now - _ts)); _ts = now; diff --git a/src/core/metrics.cc b/src/core/metrics.cc index 14f1d5a53eb..5e68eb0aeb9 100644 --- a/src/core/metrics.cc +++ b/src/core/metrics.cc @@ -375,6 +375,7 @@ const value_map& get_value_map() { } foreign_ptr get_values() { + tracepoint_monitoring_scrape(); shared_ptr res_ref = ::seastar::make_shared(); auto& res = *(res_ref.get()); auto& mv = res.values; diff --git a/src/core/reactor.cc b/src/core/reactor.cc index a8b98e3e230..cd10eae7292 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -173,6 +173,7 @@ module seastar; #include #include #include +#include #include "core/reactor_backend.hh" #include "core/syscall_result.hh" #include "core/thread_pool.hh" @@ -185,6 +186,11 @@ module seastar; namespace seastar { +thread_local tracer g_tracer; + +thread_local uint64_t fresh_task_id = 1; +thread_local uint64_t current_task_id = 0; + static_assert(posix::shutdown_mask(SHUT_RD) == posix::rcv_shutdown); static_assert(posix::shutdown_mask(SHUT_WR) == posix::snd_shutdown); static_assert(posix::shutdown_mask(SHUT_RDWR) == (posix::snd_shutdown | posix::rcv_shutdown)); @@ -240,6 +246,16 @@ shard_id reactor::cpu_id() const { return _id; } +std::vector, float>> reactor::list_groups() { + std::vector, float>> result; + for (int i = 0; i < _task_queues.size(); ++i) { + if (_task_queues[i]) { + result.emplace_back(i, _task_queues[i]->_name, _task_queues[i]->_shares); + } + } + return result; +} + void reactor::update_shares_for_queues(internal::priority_class pc, uint32_t shares) { for (auto&& q : _io_queues) { q.second->update_shares_for_class(pc, shares); @@ -2589,7 +2605,11 @@ void reactor::run_tasks(task_queue& tq) { STAP_PROBE(seastar, reactor_run_tasks_single_start); internal::task_histogram_add_task(*tsk); _current_task = tsk; - tsk->run_and_dispose(); + { + tracepoint_run_task(tsk->_id); + auto st = switch_task(tsk->_id); + tsk->run_and_dispose(); + } _current_task = nullptr; STAP_PROBE(seastar, reactor_run_tasks_single_end); ++tq._tasks_processed; @@ -3055,6 +3075,7 @@ reactor::run_some_tasks() { auto t_run_started = t_run_completed; insert_activating_task_queues(); task_queue* tq = pop_active_task_queue(t_run_started); + tracepoint_run_task_queue(tq->_id); sched_print("running tq {} {}", (void*)tq, tq->_name); _last_vruntime = std::max(tq->_vruntime, _last_vruntime); run_tasks(*tq); @@ -3077,6 +3098,7 @@ reactor::run_some_tasks() { // Settle on a regular need_preempt(), which will return true in // debug mode. } while (have_more_tasks() && !need_preempt()); + tracepoint_run_task_queue_end(); _cpu_stall_detector->end_task_run(t_run_completed); STAP_PROBE(seastar, reactor_run_tasks_end); *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run diff --git a/src/util/tracer.cc b/src/util/tracer.cc new file mode 100644 index 00000000000..42c4992b932 --- /dev/null +++ b/src/util/tracer.cc @@ -0,0 +1,7 @@ +#include "seastar/util/tracer.hh" + +namespace seastar { + +extern thread_local tracer g_tracer; + +} // namespace seastar \ No newline at end of file