Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ message TEvUploadRowsRequest {
optional bool WriteToTableShadow = 5;
optional uint64 OverloadSubscribe = 7;
optional bool UpsertIfExists = 8 [ default = false ];
optional uint64 SchemaVersion = 9;
}

message TEvUploadRowsResponse {
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/datashard/datashard_common_upload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
return true;
}

if (record.GetSchemaVersion() && tableInfo.GetTableSchemaVersion() &&
record.GetSchemaVersion() != tableInfo.GetTableSchemaVersion())
{
SetError(NKikimrTxDataShard::TError::SCHEME_ERROR, TStringBuilder()
<< "Schema version mismatch"
<< ": requested " << record.GetSchemaVersion()
<< ", expected " << tableInfo.GetTableSchemaVersion()
<< ". Retry request with an updated schema.");
return true;
}

for (size_t i = 0; i < tableInfo.KeyColumnIds.size(); ++i) {
if (record.GetRowScheme().GetKeyColumnIds(i) != tableInfo.KeyColumnIds[i]) {
SetError(NKikimrTxDataShard::TError::SCHEME_ERROR, Sprintf("Key column schema at position %" PRISZT, i));
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,94 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) {
DoShouldRejectOnChangeQueueOverflow(true);
}

Y_UNIT_TEST(BulkUpsertDuringAddIndexRaceCorruption) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(1000);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);

InitRoot(server, sender);

auto opts = TShardedTableOptions()
.Shards(1)
.Columns({
{"key", "Uint32", true, false},
{"value", "Uint32", false, false}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 2), (3, 4);");

std::vector<std::unique_ptr<IEventHandle>> bulkUpserts;
auto captureBulkUpserts = runtime.AddObserver<TEvDataShard::TEvUploadRowsRequest>(
[&](TEvDataShard::TEvUploadRowsRequest::TPtr& ev) {
bulkUpserts.emplace_back(ev.Release());
});

// Start writing to key 5 using bulk upsert
NThreading::TFuture<Ydb::Table::BulkUpsertResponse> bulkUpsertFuture;
{
Ydb::Table::BulkUpsertRequest request;
request.set_table("/Root/table-1");
auto* r = request.mutable_rows();

auto* reqRowType = r->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type();
auto* reqKeyType = reqRowType->add_members();
reqKeyType->set_name("key");
reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT32);
auto* reqValueType = reqRowType->add_members();
reqValueType->set_name("value");
reqValueType->mutable_type()->set_type_id(Ydb::Type::UINT32);

auto* reqRows = r->mutable_value();
auto* row1 = reqRows->add_items();
row1->add_items()->set_uint32_value(5);
row1->add_items()->set_uint32_value(6);

using TEvBulkUpsertRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<
Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>;
bulkUpsertFuture = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(
std::move(request), "/Root", "", runtime.GetActorSystem(0));
}

WaitFor(runtime, [&]{ return bulkUpserts.size() > 0; }, "captured bulk upsert");
UNIT_ASSERT_VALUES_EQUAL(bulkUpserts.size(), 1u);
captureBulkUpserts.Remove();

Cerr << "... creating a by_value index" << Endl;
WaitTxNotification(server, sender,
AsyncAlterAddIndex(server, "/Root", "/Root/table-1",
TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}));
runtime.SimulateSleep(TDuration::Seconds(1));

// Unblock the captured bulk upsert
for (auto& ev : bulkUpserts) {
runtime.Send(ev.release(), 0, true);
}
bulkUpserts.clear();

// Wait for the bulk upsert to finish
Cerr << "... waiting for bulk upsert to finish" << Endl;
auto response = AwaitResponse(runtime, std::move(bulkUpsertFuture));
Cerr << "... bulk upsert finished with status " << response.operation().status() << Endl;

// Whether bulk upsert succeeds or not we shouldn't get a corrupted index (bug KIKIMR-20765)
auto data1 = KqpSimpleExec(runtime, Q_(R"(
SELECT key, value FROM `/Root/table-1` ORDER BY key
)"));
auto data2 = KqpSimpleExec(runtime, Q_(R"(
SELECT key, value FROM `/Root/table-1` VIEW by_value ORDER BY key
)"));
UNIT_ASSERT_VALUES_EQUAL(data1, data2);
}
}

} // namespace NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,9 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
ev->Record.SetCancelDeadlineMs(Deadline().MilliSeconds());

ev->Record.SetTableId(keyRange->TableId.PathId.LocalPathId);
if (keyRange->TableId.SchemaVersion) {
ev->Record.SetSchemaVersion(keyRange->TableId.SchemaVersion);
}
for (const auto& fd : KeyColumnPositions) {
ev->Record.MutableRowScheme()->AddKeyColumnIds(fd.ColId);
}
Expand Down