Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eigen rapid #1

Open
wants to merge 4 commits into
base: eigen-mailbox
Choose a base branch
from
Open

Eigen rapid #1

wants to merge 4 commits into from

Conversation

EgorkaZ
Copy link
Owner

@EgorkaZ EgorkaZ commented Apr 12, 2024

No description provided.

}

std::unique_ptr<Task> TryPushTask(std::unique_ptr<Task> task, int threadId) {
threadId = std::max(threadId, 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

для начала думаю можно считать что у нас тот rapid-start который себя уже хорошо показал, поэтому предлагаю считать что схема такая:

  1. мастер-поток (threadId == 0) единственный кто имеет право запускать рэпид-старт
  2. рэпид-старт рассылает таску на N потоков размера 1/N от исходного рейнджа каждому, мастер поток ждет завершения всех потоков
  3. никакой вложенности!!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а то окажется что все модификации которые придумываются работают плохо (будет очень грустно), а та версия гарантированно работает норм

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вложенно rapid start не происходит, но чтобы не нарушать интерфейс, всё держится в одном parallel_for, который вложенным быть может

Модификации я выключаю-включаю. Пока получается шустрее с похожим на асинхронный вариант

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

асинхронный вариант ускоряет фазу распределения только, он всё также плодит таски как и обычный ParallelFor, только вот таски он изначально раздаёт потокам быстрее (а не как в классическом stealing все потоки starving в самом начале). в асинхронном варианте так мы решили действовать:

  1. поделить исходный рейндж поровну между всеми подписчиками RapidStart
  2. отправить на исполнение в RapidStart работу вида "заспавни таску размера своего рейндж" (типа "распределительная таска")
  3. мейн поток ждет пока остальные потоки закончат исходную таску, то есть не просто ее заспавнят, а именно что исполнят

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"заспавни таску размера своего рейндж" -- принципиальный момент, нужно именно что её положить (не обязательно в очередь, можно куда-то на стек), и сказать рэпиду что исполнил, а потом уже самому её начать исполнять (в этом и состоит вся асинхронность)

Comment on lines +165 to +174
auto left_mask = mask;
left_mask |= left_mask << 1;
left_mask |= left_mask << 2;
left_mask |= left_mask << 4;
left_mask |= left_mask << 8;
left_mask |= left_mask << 16;
left_mask |= left_mask << 32;

auto cleaned_mask = CaughtMask_ & ~left_mask;
int part = __builtin_popcountl(cleaned_mask);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

точно не помню что делает такая битовая магия но надеюсь что это эквивалентно сорс-коду референс rapid-start))

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В маске один бит. Заполняем все биты слева от него (включительно). Инвертируем - получаем все биты справа. Оставляем в CaughtMask_ биты строго правее нашего, считаем -> получаем номер нашего куска из всех

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок!

}

auto runMask = Owner_->RunMask_.load(std::memory_order_acquire);
if (runMask & mask) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

перед этой строчкой нужно (обязательно) добавить while (run_mask.load(acq) == DISTRIBUTING) { machine_pause(); } (по-аккуратнее с DISTRIBUTING == 0, а то это же значение run_mask по умолчанию, так можно и зависнуть навсегда)


threadData.ResetIdle();
thread_data.ResetIdle();
bool processed_anything = false;
bool all_empty = false;
while (!cancelled_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

перед началом цикла нужно подписаться на рэпидгруппу (если это не вложенный вызов worker-loop [а такое вообще бывает в эйгене?])

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вложенный вызов worker loop'ов бывает. Отслеживаю через IsRunning_ в тред-локальном Subscriber

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

нужно тогда перед while проверку "если я не вложенный то подпишись на RapidStart и проверь есть ли там работа" -- чтобы как раз таки то ради чего придумывался RapidStart (быстрое распределение работы) быстро работало

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а, ну раз тут асинхронный рэпид то можно без проверок таких

if (!pt.rapid_subscriber.IsSubscribed() || pt.rapid_subscriber.Unsubscribe()) {
return task;
}
thread_data_[pt.thread_id].PushTask(task, true);
Copy link
Collaborator

@blonded04 blonded04 Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут нужна тоньше настройка:

  • если во время unsubscribe было замечено (уже после того, как мы убрали битик нашего потока из group_mask), что в run_mask стоит битик нашего потока, то нужно:
    1. заспавнить task на локальную очередь
    2. исполнить работу от rapidgroup (вот прямо тут прямо сейчас)

Copy link
Owner Author

@EgorkaZ EgorkaZ Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что делаем с unsubscribe-subscribe в RunIfAvailable? Начал отписываться перед проверкой - стало лучше. Подписываться ли потом обратно?

Copy link
Collaborator

@blonded04 blonded04 Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в асинхронном варианте нужно отписываться после исполнения распределительной таски, в синхронном не нужно

// std::cout << "~RapidStartTask(this=" << (const void*)this << ")" << std::endl;
// }

void operator()(int part, int totalParts) override {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вроде как всё круто: эта таска не должна аллоцировать child тасок (вроде бы так и есть)

Copy link
Collaborator

@blonded04 blonded04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

всё что я написал если что применимо к асинхронному рэпид-старту, давай тогда на нем сфокусируемся

GroupMask_.fetch_and(~mask, std::memory_order_release);
}

bool IsSubscribed(uint64_t mask) const noexcept {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

лучше тредлокальную переменную в каждого подписчика завести "подписан ли я сейчас?", чтоб меньше contention был


bool TryPushTask(Task* task, uint64_t mask) {
bool locked = false;
if (!Locked_.compare_exchange_strong(locked, true, std::memory_order_acq_rel)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

если не удалось, то можно не возвращать false наверх, а исполнить эту таску как обычный параллел фор (без ускоренного распределения)

return true;
}

void WaitAndClean() noexcept {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

мейн поток мне кажется всё равно должен весь-весь рейндж исполненный ждать (не именно что всё заспавнится, а именно что всё заспавнится И закончит исполнение), иначе это получился не ParallelFor, а просто быстрый fork-барьер

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

быстрый fork-барьер

что круто всё равно)

}

bool SubscribeTo(RapidGroup& group) noexcept {
if (!IsRunning_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

у нас в случае асинхронного RapidStart вложенности рэпид-таски в рэпид-таску не может быть by design, тут для уменьшения contention (общение с глобальной маской всегда) лучше проверку thread_local флажка if (worker_state == UNSUBSCRIBED) (общение с глобальной маской почти всегда)

// returns true if there is work subscriber must do after unsubscribing
bool Unsubscribe(uint64_t mask) noexcept {
assert(mask == Mask_);
if (IsRunning_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут аналогично как и чуть повыше

auto success = runnext.compare_exchange_strong(
p, nullptr, std::memory_order_acquire);
p, nullptr, std::memory_order_seq_cst);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acq_rel думаю достаточно


threadData.ResetIdle();
thread_data.ResetIdle();
bool processed_anything = false;
bool all_empty = false;
while (!cancelled_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а, ну раз тут асинхронный рэпид то можно без проверок таких

if (t) {
Tracing::TaskStolen();
if (is_subscribed && pt->rapid_subscriber.RunIfAvailable(mask)) {
is_subscribed = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вот эту переменную бы в subscriber утащить и геттер ей выделить и будет думаю чудесно с перформансом

@@ -483,11 +730,25 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
if (once) {
break;
}
if (all_empty && (++current_stale == StaleLimit)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

так лучше: if (all_empty && !is_subscribed && (++current_stale == StaleLimit)) {

// Func_(i);
// }

IntoTask<Initial::FALSE>(from, to)();
Copy link
Collaborator

@blonded04 blonded04 Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. исполнять IntoTask нужно только после того, как поставили рэпидстарту свой битик в финишмасочке: Eigen rapid #1 (comment) , до этого её нужно куда-то сложить (в идеале на стек)
  2. принципиально, чтобы поток, который работу запланировал в этой RapidStart эпохе -- ждал, пока не просто распределение завершится, но исполнение завершится (чтобы не было всяких сегфолтов и out of bounds и uninitialized memory access)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants