diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 599ffff4482c..6a1313776694 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -830,6 +830,7 @@ message TEvUploadRowsRequest { optional bool WriteToTableShadow = 5; optional uint64 OverloadSubscribe = 7; optional bool UpsertIfExists = 8 [ default = false ]; + optional uint64 SchemaVersion = 9; } message TEvUploadRowsResponse { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 5f1161d04f40..46c8977a6335 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -47,6 +47,17 @@ bool TCommonUploadOps::Execute(TDataShard* self, TTrans return true; } + if (record.GetSchemaVersion() && tableInfo.GetTableSchemaVersion() && + record.GetSchemaVersion() != tableInfo.GetTableSchemaVersion()) + { + SetError(NKikimrTxDataShard::TError::SCHEME_ERROR, TStringBuilder() + << "Schema version mismatch" + << ": requested " << record.GetSchemaVersion() + << ", expected " << tableInfo.GetTableSchemaVersion() + << ". Retry request with an updated schema."); + return true; + } + for (size_t i = 0; i < tableInfo.KeyColumnIds.size(); ++i) { if (record.GetRowScheme().GetKeyColumnIds(i) != tableInfo.KeyColumnIds[i]) { SetError(NKikimrTxDataShard::TError::SCHEME_ERROR, Sprintf("Key column schema at position %" PRISZT, i)); diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp index 9b71c4c781a7..0c82d8648601 100644 --- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp @@ -849,6 +849,94 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) { DoShouldRejectOnChangeQueueOverflow(true); } + + Y_UNIT_TEST(BulkUpsertDuringAddIndexRaceCorruption) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 2), (3, 4);"); + + std::vector> bulkUpserts; + auto captureBulkUpserts = runtime.AddObserver( + [&](TEvDataShard::TEvUploadRowsRequest::TPtr& ev) { + bulkUpserts.emplace_back(ev.Release()); + }); + + // Start writing to key 5 using bulk upsert + NThreading::TFuture bulkUpsertFuture; + { + Ydb::Table::BulkUpsertRequest request; + request.set_table("/Root/table-1"); + auto* r = request.mutable_rows(); + + auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("value"); + reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32); + + auto* reqRows = r->mutable_value(); + auto* row1 = reqRows->add_items(); + row1->add_items()->set_uint32_value(5); + row1->add_items()->set_uint32_value(6); + + using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>; + bulkUpsertFuture = NRpcService::DoLocalRpc( + std::move(request), "/Root", "", runtime.GetActorSystem(0)); + } + + WaitFor(runtime, [&]{ return bulkUpserts.size() > 0; }, "captured bulk upsert"); + UNIT_ASSERT_VALUES_EQUAL(bulkUpserts.size(), 1u); + captureBulkUpserts.Remove(); + + Cerr << "... creating a by_value index" << Endl; + WaitTxNotification(server, sender, + AsyncAlterAddIndex(server, "/Root", "/Root/table-1", + TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal})); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Unblock the captured bulk upsert + for (auto& ev : bulkUpserts) { + runtime.Send(ev.release(), 0, true); + } + bulkUpserts.clear(); + + // Wait for the bulk upsert to finish + Cerr << "... waiting for bulk upsert to finish" << Endl; + auto response = AwaitResponse(runtime, std::move(bulkUpsertFuture)); + Cerr << "... bulk upsert finished with status " << response.operation().status() << Endl; + + // Whether bulk upsert succeeds or not we shouldn't get a corrupted index (bug KIKIMR-20765) + auto data1 = KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` ORDER BY key + )")); + auto data2 = KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` VIEW by_value ORDER BY key + )")); + UNIT_ASSERT_VALUES_EQUAL(data1, data2); + } } } // namespace NKikimr diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 907578f0346d..0765fe1a29fc 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -1039,6 +1039,9 @@ class TUploadRowsBase : public TActorBootstrappedRecord.SetCancelDeadlineMs(Deadline().MilliSeconds()); ev->Record.SetTableId(keyRange->TableId.PathId.LocalPathId); + if (keyRange->TableId.SchemaVersion) { + ev->Record.SetSchemaVersion(keyRange->TableId.SchemaVersion); + } for (const auto& fd : KeyColumnPositions) { ev->Record.MutableRowScheme()->AddKeyColumnIds(fd.ColId); }