diff --git a/ydb/core/client/client_ut.cpp b/ydb/core/client/client_ut.cpp index efbccf9f1343..aeea507657cf 100644 --- a/ydb/core/client/client_ut.cpp +++ b/ydb/core/client/client_ut.cpp @@ -2139,7 +2139,8 @@ Y_UNIT_TEST_SUITE(TClientTest) { TPortManager tp; ui16 port = tp.GetPort(2134); - const auto settings = TServerSettings(port); + const auto settings = TServerSettings(port) + .SetUseRealThreads(false); TServer server(settings); TClient client(settings); SetupLogging(server); @@ -2153,32 +2154,129 @@ Y_UNIT_TEST_SUITE(TClientTest) { const TActorId edge = runtime.AllocateEdgeActor(); { + ui64 confirmationsCount = 0; + auto observeConfirmations = [&](TAutoPtr& ev) { + switch (ev->GetTypeRewrite()) { + case TEvBlobStorage::TEvPut::EventType: { + const auto* msg = ev->Get(); + // step 1 is snapshot + // step 2 is schema alter + // step 3 is expected write below + if (msg->Id.TabletID() == tabletId && + msg->Id.Channel() == 0 && + msg->Id.Cookie() == 1 && + msg->Id.Step() > 2) + { + ++confirmationsCount; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(observeConfirmations); + const TActorId leaderTablet = runtime.Register(CreateTablet(edge, tabletInfo.Get(), setupInfo.Get(), 0, nullptr, nullptr)); const TActorId leaderId = runtime.GrabEdgeEvent(edge)->Get()->UserTabletActor; - Y_UNUSED(leaderId); + + // we use it to kill leader only when it has sent the write to the follower and it is confirmed + const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr)); + + auto doLeaderWrite = [&](ui64 key, ui64 value) { + const char *writeQuery = R"__(( + (let row_ '('('key (Uint64 '%lu)))) + (let update_ '('('v_ui64 (Uint64 '%lu)))) + (let result_ (UpdateRow 't_by_ui64 row_ update_)) + (return (AsList result_)) + ))__"; + + THolder reqWrite = MakeHolder(); + reqWrite->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(writeQuery, key, value)); + runtime.Send(new IEventHandle(leaderId, edge, reqWrite.Release())); + + auto reply = runtime.GrabEdgeEvent(edge); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0); + }; + + doLeaderWrite(42, 51); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&](){ return confirmationsCount > 0; }, "Write confirmed"); runtime.Send(new IEventHandle(leaderTablet, edge, new TEvents::TEvPoisonPill())); auto reply = runtime.GrabEdgeEvent(edge); UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId); + + runtime.Send(new IEventHandle(followerTablet, edge, new TEvents::TEvPoisonPill())); + reply = runtime.GrabEdgeEvent(edge); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId); } - const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr)); - Y_UNUSED(followerTablet); + // now we start follower without its leader - const TActorId followerId = runtime.GrabEdgeEvent(edge)->Get()->UserTabletActor; - Y_UNUSED(followerId); + const TActorId followerEdge = runtime.AllocateEdgeActor(); + const TActorId followerTablet = runtime.Register(CreateTabletFollower(followerEdge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr)); + Y_UNUSED(followerTablet); + const TActorId followerId = runtime.GrabEdgeEvent(followerEdge)->Get()->UserTabletActor; { NTabletPipe::TClientConfig pipeClientConfig; pipeClientConfig.AllowFollower = true; pipeClientConfig.ForceFollower = true; pipeClientConfig.RetryPolicy = {.RetryLimitCount = 2}; - runtime.Register(NTabletPipe::CreateClient(edge, tabletId, pipeClientConfig)); + runtime.Register(NTabletPipe::CreateClient(followerEdge, tabletId, pipeClientConfig)); - auto reply = runtime.GrabEdgeEvent(edge); + auto reply = runtime.GrabEdgeEvent(followerEdge); UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Status, NKikimrProto::OK); } + + auto doFollowerRead = [&](ui64 key) -> TMaybe { + const char *readQuery = R"__(( + (let row_ '('('key (Uint64 '%lu)))) + (let select_ '('v_ui64)) + (let pgmReturn (AsList + (SetResult 'res (SelectRow 't_by_ui64 row_ select_)) + )) + (return pgmReturn) + ))__"; + + THolder reqRead = MakeHolder(); + reqRead->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(readQuery, key)); + runtime.Send(new IEventHandle(followerId, followerEdge, reqRead.Release())); + + auto reply = runtime.GrabEdgeEvent(followerEdge); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0); + const auto res = reply->Get()->Record + .GetExecutionEngineEvaluatedResponse() + .GetValue() + .GetStruct(0) + .GetOptional(); + if (!res.HasOptional()) { + return Nothing(); + } + + return res + .GetOptional() + .GetStruct(0) + .GetOptional() + .GetUint64(); + }; + + // Perform basic sanity checks + UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(41), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(42), 51u); } Y_UNIT_TEST(FollowerOfflineBoot) {