diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index b1ff5e4da588..adef56df9c9a 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -6,6 +6,8 @@ #include <absl/strings/match.h> +#include <random> + #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" @@ -16,6 +18,7 @@ #include "server/string_family.h" #include "server/test_utils.h" #include "server/transaction.h" +#include "util/fibers/fibers.h" using namespace testing; using namespace std; @@ -1323,7 +1326,7 @@ TEST_F(ListFamilyTest, LMPopWrongType) { EXPECT_THAT(resp, RespArray(ElementsAre("l1", RespArray(ElementsAre("e1"))))); } -// Reproduce a flow that trigerred a wrong DCHECK in the transaction flow. +// Blocking command wakeup is complicated by running multi transaction at the same time TEST_F(ListFamilyTest, AwakeMulti) { auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] { for (unsigned i = 0; i < 100; ++i) { @@ -1352,6 +1355,33 @@ TEST_F(ListFamilyTest, AwakeMulti) { f3.Join(); } +TEST_F(ListFamilyTest, PressureBLMove) { +#ifndef NDEBUG + GTEST_SKIP() << "Requires release build to reproduce"; +#endif + + auto consumer = [this](string_view id, string_view src, string_view dest) { + for (unsigned i = 0; i < 1000; ++i) { + Run(id, {"blmove", src, dest, "LEFT", "LEFT", "0"}); + }; + }; + auto producer = [this](string_view id, size_t delay, string_view src) { + for (unsigned i = 0; i < 1000; ++i) { + Run(id, {"lpush", src, "a"}); + ThisFiber::SleepFor(1us * delay); + } + }; + + for (size_t delay : {1, 2, 5}) { + LOG(INFO) << "Running with delay: " << delay; + auto f1 = pp_->at(1)->LaunchFiber([=] { consumer("c1", "src", "dest"); }); + auto f2 = pp_->at(1)->LaunchFiber([=] { producer("p1", delay, "src"); }); + + f1.Join(); + f2.Join(); + } +} + TEST_F(ListFamilyTest, AwakeDb1) { const char* kDbId = "1"; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 168a22c30266..93825560164e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -766,11 +766,7 @@ void Transaction::ScheduleInternal() { ScheduleContext schedule_ctx{this, optimistic_exec}; - // TODO: this optimization is disabled due to a issue #4648 revealing this code can - // lead to transaction not being scheduled. - // To reproduce the bug remove the false in the condition and run - // ./list_family_test --gtest_filter=*AwakeMulti on alpine machine - if (false && unique_shard_cnt_ == 1) { + if (unique_shard_cnt_ == 1) { // Single shard optimization. Note: we could apply the same optimization // to multi-shard transactions as well by creating a vector of ScheduleContext. schedule_queues[unique_shard_id_].queue.Push(&schedule_ctx); @@ -1221,7 +1217,7 @@ void Transaction::ScheduleBatchInShard() { // We do this to avoid the situation where we have a data race, where // a transaction is added to the queue, we've checked that sq.armed is true and skipped // adding the callback that fetches the transaction. - sq.armed.store(false, memory_order_release); + sq.armed.exchange(false, memory_order_acq_rel); } }