diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1d498ed9afd1..54f55fed62a4 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -537,6 +537,14 @@ class TAsyncIndexChangeSenderMain return; } + if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) { + LOG_D("Index is planned to drop, waiting for the EvRemoveSender command"); + + RemoveRecords(); + KillSenders(); + return Become(&TThis::StatePendingRemove); + } + Y_ABORT_UNLESS(entry.ListNodeEntry->Children.size() == 1); const auto& indexTable = entry.ListNodeEntry->Children.at(0); @@ -559,7 +567,7 @@ class TAsyncIndexChangeSenderMain STATEFN(StateResolveIndexTable) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleIndexTable); - sFunc(TEvents::TEvWakeup, ResolveIndexTable); + sFunc(TEvents::TEvWakeup, ResolveIndex); default: return StateBase(ev); } @@ -638,7 +646,7 @@ class TAsyncIndexChangeSenderMain STATEFN(StateResolveKeys) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleKeys); - sFunc(TEvents::TEvWakeup, ResolveIndexTable); + sFunc(TEvents::TEvWakeup, ResolveIndex); default: return StateBase(ev); } @@ -690,7 +698,7 @@ class TAsyncIndexChangeSenderMain } void Resolve() override { - ResolveIndexTable(); + ResolveIndex(); } bool IsResolved() const override { @@ -758,6 +766,11 @@ class TAsyncIndexChangeSenderMain PassAway(); } + void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + RemoveRecords(std::move(ev->Get()->Records)); + } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx); } @@ -797,6 +810,15 @@ class TAsyncIndexChangeSenderMain } } + STFUNC(StatePendingRemove) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove); + hFunc(TEvChangeExchange::TEvRemoveSender, Handle); + HFunc(NMon::TEvRemoteHttpInfo, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + private: const TTableId UserTableId; mutable TMaybe LogPrefix; diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index d01d1dfe8242..9c5ba160f230 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -334,6 +334,8 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } } + + Self->CheckStateChange(ctx); } private: diff --git a/ydb/core/tx/datashard/drop_table_unit.cpp b/ydb/core/tx/datashard/drop_table_unit.cpp index c925c1bdff27..5e0fda61dc27 100644 --- a/ydb/core/tx/datashard/drop_table_unit.cpp +++ b/ydb/core/tx/datashard/drop_table_unit.cpp @@ -19,6 +19,7 @@ class TDropTableUnit : public TExecutionUnit { const TActorContext &ctx) override; private: + TVector> RemoveSenders; }; TDropTableUnit::TDropTableUnit(TDataShard &dataShard, @@ -75,6 +76,20 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op, Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == schemeTx.GetDropTable().GetPathId().GetOwnerId()); tableId = schemeTx.GetDropTable().GetPathId().GetLocalId(); } + + auto it = DataShard.GetUserTables().find(tableId); + Y_ABORT_UNLESS(it != DataShard.GetUserTables().end()); + { + for (const auto& [indexPathId, indexInfo] : it->second->Indexes) { + if (indexInfo.Type == TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) { + RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId)); + } + } + for (const auto& [streamPathId, _] : it->second->CdcStreams) { + RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(streamPathId)); + } + } + DataShard.DropUserTable(txc, tableId); // FIXME: transactions need to specify ownerId @@ -96,12 +111,15 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op, BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); - return EExecutionStatus::ExecutedNoMoreRestarts; + return EExecutionStatus::DelayCompleteNoMoreRestarts; } void TDropTableUnit::Complete(TOperation::TPtr, - const TActorContext &) + const TActorContext &ctx) { + for (auto& ev : RemoveSenders) { + ctx.Send(DataShard.GetChangeSender(), ev.Release()); + } } THolder CreateDropTableUnit(TDataShard &dataShard, diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp index c9db4bf06f38..0d05b12d68ce 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -360,4 +361,55 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) { t.TestEnv->TestWaitNotification(runtime, t.TxId); }); } + + Y_UNIT_TEST_WITH_REBOOTS(DropTableWithInflightChanges) { + T t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + auto origObserver = runtime.SetObserverFunc([&](TAutoPtr& ev) { + return TTestActorRuntime::DefaultObserverFunc(ev); + }); + + TVector> enqueued; + runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { + enqueued.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return origObserver(ev); + }); + + { + TInactiveZone inactive(activeZone); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "indexed" Type: "Uint32" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "UserDefinedIndex" + KeyColumnNames: ["indexed"] + Type: EIndexTypeGlobalAsync + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + Prepare(runtime, "/MyRoot/Table", {1, 10, 100}, true); + } + + TestDropTable(runtime, ++t.TxId, "/MyRoot", "Table"); + + runtime.SetObserverFunc(origObserver); + for (auto& ev : std::exchange(enqueued, {})) { + runtime.Send(ev.Release(), 0, true); + } + + t.TestEnv->TestWaitNotification(runtime, t.TxId); + t.TestEnv->TestWaitTabletDeletion(runtime, { + TTestTxConfig::FakeHiveTablets, + TTestTxConfig::FakeHiveTablets + 1, + }); + }); + } }