From 6140ef799e6d2c1dce5fefc6aa84dd3fb5d78a94 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 10 May 2024 12:13:18 +0200 Subject: [PATCH 1/4] Not null support for ReadTable key boundary (#4368) Changelog category Bugfix --- ydb/core/engine/mkql_proto.cpp | 69 +++++-- ydb/core/engine/mkql_proto.h | 1 + ydb/core/engine/mkql_proto_ut.cpp | 2 +- ydb/core/grpc_services/rpc_read_table.cpp | 5 +- .../schemeshard__operation_create_pq.cpp | 4 +- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 +- .../tx/schemeshard/ut_helpers/ls_checks.cpp | 2 +- ydb/core/tx/tx_proxy/datareq.cpp | 3 +- ydb/core/tx/tx_proxy/read_table_impl.cpp | 12 +- ydb/core/tx/tx_proxy/resolvereq.cpp | 2 +- ydb/services/ydb/ydb_table_ut.cpp | 189 ++++++++++++++++++ 11 files changed, 262 insertions(+), 29 deletions(-) diff --git a/ydb/core/engine/mkql_proto.cpp b/ydb/core/engine/mkql_proto.cpp index 7ed500cc1a73..779b74f85563 100644 --- a/ydb/core/engine/mkql_proto.cpp +++ b/ydb/core/engine/mkql_proto.cpp @@ -13,9 +13,11 @@ namespace NKikimr::NMiniKQL { // NOTE: TCell's can reference memomry from tupleValue +// TODO: Place notNull flag in to the NScheme::TTypeInfo? bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, const NKikimrMiniKQL::TValue& tupleValue, const TConstArrayRef& types, + TVector notNullTypes, bool allowCastFromString, TVector& key, TString& errStr, @@ -28,6 +30,21 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, return false; \ } + CHECK_OR_RETURN_ERROR(types.size() >= tupleValue.TupleSize(), + "The size fo type array less then value tuple size"); + + // Please note we modify notNullTypes during tuplyType verification to allow cast nullable to non nullable value + if (notNullTypes) { + CHECK_OR_RETURN_ERROR(notNullTypes.size() == types.size(), + "The size of type array and given not null markers must be equial"); + if (tupleType) { + CHECK_OR_RETURN_ERROR(notNullTypes.size() >= tupleType->GetTuple().ElementSize(), + "The size of tuple type and given not null markers must be equal"); + } + } else { + notNullTypes.resize(types.size()); + } + if (tupleType) { CHECK_OR_RETURN_ERROR(tupleType->GetKind() == NKikimrMiniKQL::Tuple || (tupleType->GetKind() == NKikimrMiniKQL::Unknown && tupleType->GetTuple().ElementSize() == 0), "Must be a tuple"); @@ -36,8 +53,14 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, for (size_t i = 0; i < tupleType->GetTuple().ElementSize(); ++i) { const auto& ti = tupleType->GetTuple().GetElement(i); - CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional"); - const auto& item = ti.GetOptional().GetItem(); + if (notNullTypes[i]) { + // For not null column type we allow to build cell from nullable mkql type for compatibility reason. + notNullTypes[i] = ti.GetKind() != NKikimrMiniKQL::Optional; + } else { + // But we do not allow to build cell for nullable column from not nullable type + CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional"); + } + const auto& item = notNullTypes[i] ? ti : ti.GetOptional().GetItem(); CHECK_OR_RETURN_ERROR(item.GetKind() == NKikimrMiniKQL::Data, "Element at index " + ToString(i) + " Item kind is not Data"); const auto& typeId = item.GetData().GetScheme(); CHECK_OR_RETURN_ERROR(typeId == types[i].GetTypeId() || @@ -53,26 +76,36 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, } for (ui32 i = 0; i < tupleValue.TupleSize(); ++i) { - auto& o = tupleValue.GetTuple(i); - - auto element_case = o.value_value_case(); - CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional || - element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET, - Sprintf("Optional type is expected in tuple at position %" PRIu32, i)); - - CHECK_OR_RETURN_ERROR(o.ListSize() == 0 && - o.StructSize() == 0 && - o.TupleSize() == 0 && - o.DictSize() == 0, - Sprintf("Optional type is expected in tuple at position %" PRIu32, i)); + auto& o = tupleValue.GetTuple(i); - if (!o.HasOptional()) { - key.push_back(TCell()); - continue; + if (notNullTypes[i]) { + CHECK_OR_RETURN_ERROR(o.ListSize() == 0 && + o.StructSize() == 0 && + o.TupleSize() == 0 && + o.DictSize() == 0 && + !o.HasOptional(), + Sprintf("Primitive type is expected in tuple at position %" PRIu32, i)); + } else { + auto element_case = o.value_value_case(); + + CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional || + element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET, + Sprintf("Optional type is expected in tuple at position %" PRIu32, i)); + + CHECK_OR_RETURN_ERROR(o.ListSize() == 0 && + o.StructSize() == 0 && + o.TupleSize() == 0 && + o.DictSize() == 0, + Sprintf("Optional type is expected in tuple at position %" PRIu32, i)); + + if (!o.HasOptional()) { + key.push_back(TCell()); + continue; + } } - auto& v = o.GetOptional(); + auto& v = notNullTypes[i] ? o : o.GetOptional(); auto value_case = v.value_value_case(); diff --git a/ydb/core/engine/mkql_proto.h b/ydb/core/engine/mkql_proto.h index 6e7880622785..ce9da1e72125 100644 --- a/ydb/core/engine/mkql_proto.h +++ b/ydb/core/engine/mkql_proto.h @@ -16,6 +16,7 @@ class THolderFactory; bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, const NKikimrMiniKQL::TValue& tupleValue, const TConstArrayRef& expectedTypes, + TVector notNullTypes, bool allowCastFromString, TVector& key, TString& errStr, diff --git a/ydb/core/engine/mkql_proto_ut.cpp b/ydb/core/engine/mkql_proto_ut.cpp index eda9583be7cf..39e24b6968e8 100644 --- a/ydb/core/engine/mkql_proto_ut.cpp +++ b/ydb/core/engine/mkql_proto_ut.cpp @@ -511,7 +511,7 @@ Y_UNIT_TEST(TestExportVariantStructTypeYdb) { TVector cells; TVector memoryOwner; TString errStr; - bool res = CellsFromTuple(¶ms.GetType(), params.GetValue(), types, true, cells, errStr, memoryOwner); + bool res = CellsFromTuple(¶ms.GetType(), params.GetValue(), types, {}, true, cells, errStr, memoryOwner); UNIT_ASSERT_VALUES_EQUAL_C(res, errStr.empty(), paramsProto); return errStr; diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index 908ce028e4df..4a50b8918e9b 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -243,8 +243,11 @@ class TReadTableRPC : public TActorBootstrapped { return ReplyFinishStream(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx); } case TEvTxUserProxy::TResultStatus::ResolveError: { - const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy"); + NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy"); auto tmp = issueMessage.Add(); + for (const auto& unresolved : msg->Record.GetUnresolvedKeys()) { + issue.AddSubIssue(MakeIntrusive(unresolved)); + } NYql::IssueToMessage(issue, tmp); return ReplyFinishStream(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index f22f6e8dbc0d..c0ba68e372c3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -122,8 +122,8 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context, TVector cells; TString error; TVector memoryOwner; - if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, false, - cells, error, memoryOwner)) { + if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, {}, + false, cells, error, memoryOwner)) { status = NKikimrScheme::StatusSchemeError; errStr = Sprintf("Invalid partition boundary at position: %u, error: %s", i, error.data()); return nullptr; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index fad1fc55ae99..69738de8e420 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6430,7 +6430,7 @@ bool TSchemeShard::FillSplitPartitioning(TVector& rangeEnds, const TCon if (boundary.HasSerializedKeyPrefix()) { prefix.Parse(boundary.GetSerializedKeyPrefix()); rangeEnd = TVector(prefix.GetCells().begin(), prefix.GetCells().end()); - } else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, false, rangeEnd, errStr, memoryOwner)) { + } else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, {}, false, rangeEnd, errStr, memoryOwner)) { errStr = Sprintf("Error at split boundary %d: %s", i, errStr.data()); return false; } diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 90cc00f16cd8..bd7706cf0056 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -508,7 +508,7 @@ void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult &record) { const auto& b = descr.GetTable().GetSplitBoundary(i); TVector cells; TVector memoryOwner; - NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, false, cells, errStr, memoryOwner); + NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, {}, false, cells, errStr, memoryOwner); UNIT_ASSERT_VALUES_EQUAL(errStr, ""); TString serialized = TSerializedCellVec::Serialize(cells); diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp index d5c66d3f6ab4..17d975648976 100644 --- a/ydb/core/tx/tx_proxy/datareq.cpp +++ b/ydb/core/tx/tx_proxy/datareq.cpp @@ -1564,6 +1564,7 @@ void TDataReq::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev, toExpand = toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL; } } + if (!ParseRangeKey(ReadTableRequest->Range.GetFrom(), keyTypes, ReadTableRequest->FromValues, fromExpand) || !ParseRangeKey(ReadTableRequest->Range.GetTo(), keyTypes, @@ -3033,7 +3034,7 @@ bool TDataReq::ParseRangeKey(const NKikimrMiniKQL::TParams &proto, auto& value = proto.GetValue(); auto& type = proto.GetType(); TString errStr; - bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, true, key, errStr, memoryOwner); + bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, {}, true, key, errStr, memoryOwner); if (!res) { UnresolvedKeys.push_back("Failed to parse range key tuple: " + errStr); return false; diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index f8d9dcb56038..5f887a89ab57 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -93,6 +93,7 @@ namespace { bool ParseRangeKey( const NKikimrMiniKQL::TParams& proto, TConstArrayRef keyTypes, + const TVector& notNullTypes, TSerializedCellVec& buf, EParseRangeKeyExp exp, TVector& unresolvedKeys) @@ -108,7 +109,7 @@ namespace { auto& value = proto.GetValue(); auto& type = proto.GetType(); TString errStr; - bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, true, key, errStr, memoryOwner); + bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, notNullTypes, true, key, errStr, memoryOwner); if (!res) { unresolvedKeys.push_back("Failed to parse range key tuple: " + errStr); return false; @@ -534,16 +535,20 @@ class TReadTableWorker : public TActorBootstrapped { } TVector keyTypes(res.Columns.size()); + TVector notNullKeys(res.Columns.size()); TVector columns(res.Columns.size()); { size_t no = 0; size_t keys = 0; + const auto& notNullColumns = res.NotNullColumns; + for (auto &entry : res.Columns) { auto& col = entry.second; if (col.KeyOrder != -1) { keyTypes[col.KeyOrder] = col.PType; + notNullKeys[col.KeyOrder] = notNullColumns.contains(col.Name); ++keys; } @@ -564,6 +569,7 @@ class TReadTableWorker : public TActorBootstrapped { } keyTypes.resize(keys); + notNullKeys.resize(keys); } if (!colNameToPos.empty()) { @@ -595,9 +601,9 @@ class TReadTableWorker : public TActorBootstrapped { ? (toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL) : EParseRangeKeyExp::NONE); - if (!ParseRangeKey(Settings.KeyRange.GetFrom(), keyTypes, + if (!ParseRangeKey(Settings.KeyRange.GetFrom(), keyTypes, notNullKeys, KeyFromValues, fromExpand, UnresolvedKeys) || - !ParseRangeKey(Settings.KeyRange.GetTo(), keyTypes, + !ParseRangeKey(Settings.KeyRange.GetTo(), keyTypes, notNullKeys, KeyToValues, toExpand, UnresolvedKeys)) { TxProxyMon->ResolveKeySetWrongRequest->Inc(); diff --git a/ydb/core/tx/tx_proxy/resolvereq.cpp b/ydb/core/tx/tx_proxy/resolvereq.cpp index b1e3dbcb0f3c..399466137d16 100644 --- a/ydb/core/tx/tx_proxy/resolvereq.cpp +++ b/ydb/core/tx/tx_proxy/resolvereq.cpp @@ -35,7 +35,7 @@ namespace { auto& value = proto.GetValue(); auto& type = proto.GetType(); TString errStr; - bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, true, key, errStr, memoryOwner); + bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, {}, true, key, errStr, memoryOwner); if (!res) { unresolvedKeys.push_back("Failed to parse range key tuple: " + errStr); return false; diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 2316c15a99f3..43274e7d2c6a 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -1521,6 +1521,195 @@ R"___(
: Error: Transaction not found: , code: 2015 } } + Y_UNIT_TEST(TestReadTableNotNullBorder) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto connection = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + NYdb::NTable::TTableClient client(connection); + auto session = client.CreateSession().ExtractValueSync().GetSession(); + + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `Root/Test` ( + Key Uint64 NOT NULL, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + result = session.ExecuteDataQuery(R"___( + UPSERT INTO `Root/Test` (Key, Value) VALUES (0u, "Zero"); + UPSERT INTO `Root/Test` (Key, Value) VALUES (1u, "One"); + )___", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + { + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .Uint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + } + + { + // Allow to use Optional values for NOT NULL columns + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .OptionalUint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + } + + { + // Allow to use Optional values for NOT NULL columns + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .OptionalUint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + } + + { + // Allow to use Optional values for NOT NULL columns + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .OptionalUint64(Nothing()) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[0u];[\"Zero\"]];[[1u];[\"One\"]]]"); + } + + { + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .Uint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Exclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[]"); + } + } + + // Same but use reverce order of column in tuple + Y_UNIT_TEST(TestReadTableNotNullBorder2) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto connection = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + NYdb::NTable::TTableClient client(connection); + auto session = client.CreateSession().ExtractValueSync().GetSession(); + + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `Root/Test` ( + aaa String, + zzz Uint64 NOT NULL, + PRIMARY KEY (zzz) + ); + )___").ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + result = session.ExecuteDataQuery(R"___( + UPSERT INTO `Root/Test` (zzz, aaa) VALUES (1u, "One"); + )___", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + { + auto selectResult = session.ExecuteDataQuery(R"( + SELECT zzz, aaa FROM `Root/Test`; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_EQUAL(selectResult.GetStatus(), EStatus::SUCCESS); + auto text = FormatResultSetYson(selectResult.GetResultSet(0)); + UNIT_ASSERT_VALUES_EQUAL("[[1u;[\"One\"]]]", text); + } + + { + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .Uint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[\"One\"];[1u]]]"); + } + } + enum class EReadTableMultiShardMode { Normal, UseSnapshot, From 33d7886872142a89dabe36c60404b1623ff1e6a2 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 13 May 2024 11:49:50 +0200 Subject: [PATCH 2/4] Remove duplicate check added by commit c30f11a748 (#4442) We already have checks for value tuple element count and do not want to change error messages in case of violation. Changelog category Bugfix --- ydb/core/engine/mkql_proto.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/ydb/core/engine/mkql_proto.cpp b/ydb/core/engine/mkql_proto.cpp index 779b74f85563..4725924e7c84 100644 --- a/ydb/core/engine/mkql_proto.cpp +++ b/ydb/core/engine/mkql_proto.cpp @@ -30,9 +30,6 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, return false; \ } - CHECK_OR_RETURN_ERROR(types.size() >= tupleValue.TupleSize(), - "The size fo type array less then value tuple size"); - // Please note we modify notNullTypes during tuplyType verification to allow cast nullable to non nullable value if (notNullTypes) { CHECK_OR_RETURN_ERROR(notNullTypes.size() == types.size(), From 037797dfc30672fb59f2277c5732b4084f4f4dd7 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 31 May 2024 11:37:56 +0200 Subject: [PATCH 3/4] Allow Read table call returns not null type (#4779) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Support for not null response for ReadTable [1/2 commit] - We need to support compatibility with old datashard. In this commit if client ask we send ReadTableTransaction and ask for V2 response format. Old datashard doesn't know about this format and use legasy response instead. We treat this response as error.

 Support for not null response for ReadTable [2/2 commit] - Datashard support for ReadTable transaction with not null support. Now datashard can handle new flag and returns proper ResultSet --- ydb/core/grpc_services/rpc_read_table.cpp | 17 ++++++ ydb/core/protos/tx_datashard.proto | 1 + ydb/core/protos/tx_proxy.proto | 1 + ydb/core/tx/datashard/read_table_scan.cpp | 22 +++++--- ydb/core/tx/tx_proxy/read_table.h | 1 + ydb/core/tx/tx_proxy/read_table_impl.cpp | 15 +++-- ydb/public/api/protos/ydb_table.proto | 1 + .../client/ydb_table/impl/table_client.cpp | 7 +++ ydb/public/sdk/cpp/client/ydb_table/table.h | 2 + ydb/services/ydb/ydb_table_ut.cpp | 55 +++++++++++++++++-- 10 files changed, 105 insertions(+), 17 deletions(-) diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index 4a50b8918e9b..d8b4ae724fcb 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -214,6 +214,9 @@ class TReadTableRPC : public TActorBootstrapped { } else { TStringStream str; str << "Response version missmatched"; + if (msg->Record.HasReadTableResponseVersion()) { + str << " , got: " << msg->Record.GetReadTableResponseVersion(); + } LOG_ERROR(ctx, NKikimrServices::READ_TABLE_API, "%s", str.Str().data()); const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str()); @@ -528,6 +531,20 @@ class TReadTableRPC : public TActorBootstrapped { settings.TablePath = req->path(); settings.Ordered = req->ordered(); settings.RequireResultSet = true; + + // Right now assume return_not_null_data_as_optional is true by default + // Sometimes we well change this default + switch (req->return_not_null_data_as_optional()) { + case Ydb::FeatureFlag::DISABLED: + settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSetWithNotNullSupport; + break; + case Ydb::FeatureFlag::STATUS_UNSPECIFIED: + case Ydb::FeatureFlag::ENABLED: + default: + settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSet; + break; + } + if (req->row_limit()) { settings.MaxRows = req->row_limit(); } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index dca9720c06d6..cb2e5fe3b9eb 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -101,6 +101,7 @@ message TReadTableTransaction { optional string Name = 2; optional uint32 TypeId = 3; optional NKikimrProto.TTypeInfo TypeInfo = 4; + optional bool NotNull = 5 [default = false]; //If not set datashard will treat not null type as nullable (for compatibility) } optional NKikimrDataEvents.TTableId TableId = 1; diff --git a/ydb/core/protos/tx_proxy.proto b/ydb/core/protos/tx_proxy.proto index 5eac2fede50e..d983efcf7a1a 100644 --- a/ydb/core/protos/tx_proxy.proto +++ b/ydb/core/protos/tx_proxy.proto @@ -60,6 +60,7 @@ message TReadTableTransaction { enum EVersion { UNSPECIFIED = 0; YDB_V1 = 1; + YDB_V2 = 2; // Like V1 but allows NotNull types in result set } optional string Path = 1; optional bool Ordered = 2 [default = false]; diff --git a/ydb/core/tx/datashard/read_table_scan.cpp b/ydb/core/tx/datashard/read_table_scan.cpp index c622d23bcec4..bc0be6f68684 100644 --- a/ydb/core/tx/datashard/read_table_scan.cpp +++ b/ydb/core/tx/datashard/read_table_scan.cpp @@ -301,8 +301,9 @@ class TRowsToOldResult : public TRowsToResult { class TRowsToYdbResult : public TRowsToResult { public: - TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request) + TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request, bool allowNotNull) : TRowsToResult(request) + , AllowNotNull(allowNotNull) { BuildResultCommonPart(request); StartNewMessage(); @@ -345,16 +346,16 @@ class TRowsToYdbResult : public TRowsToResult { pg->set_typlen(0); pg->set_typmod(0); } else { + bool notNullResp = AllowNotNull && col.GetNotNull(); auto id = static_cast(col.GetTypeId()); + auto xType = notNullResp ? meta->mutable_type() : meta->mutable_type()->mutable_optional_type()->mutable_item(); if (id == NYql::NProto::Decimal) { - auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item() - ->mutable_decimal_type(); + auto decimalType = xType->mutable_decimal_type(); //TODO: Pass decimal params here decimalType->set_precision(22); decimalType->set_scale(9); } else { - meta->mutable_type()->mutable_optional_type()->mutable_item() - ->set_type_id(static_cast(id)); + xType->set_type_id(static_cast(id)); } } } @@ -363,6 +364,7 @@ class TRowsToYdbResult : public TRowsToResult { } Ydb::ResultSet YdbResultSet; + const bool AllowNotNull; }; class TReadTableScan : public TActor, public NTable::IScan { @@ -390,8 +392,14 @@ class TReadTableScan : public TActor, public NTable::IScan { , PendingAcks(0) , Finished(false) { - if (tx.HasApiVersion() && tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) { - Writer = MakeHolder(tx); + if (tx.HasApiVersion()) { + if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) { + Writer = MakeHolder(tx, false); + } else if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V2) { + Writer = MakeHolder(tx, true); + } else { + Writer = MakeHolder(tx); + } } else { Writer = MakeHolder(tx); } diff --git a/ydb/core/tx/tx_proxy/read_table.h b/ydb/core/tx/tx_proxy/read_table.h index 8b5af2316989..66d9f433b889 100644 --- a/ydb/core/tx/tx_proxy/read_table.h +++ b/ydb/core/tx/tx_proxy/read_table.h @@ -11,6 +11,7 @@ namespace NTxProxy { enum class EReadTableFormat { OldResultSet, YdbResultSet, + YdbResultSetWithNotNullSupport }; struct TReadTableSettings { diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index 5f887a89ab57..71c98ad1810a 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -1449,6 +1449,9 @@ class TReadTableWorker : public TActorBootstrapped { case EReadTableFormat::YdbResultSet: tx.SetApiVersion(NKikimrTxUserProxy::TReadTableTransaction::YDB_V1); break; + case EReadTableFormat::YdbResultSetWithNotNullSupport: + tx.SetApiVersion(NKikimrTxUserProxy::TReadTableTransaction::YDB_V2); + break; } for (auto &col : Columns) { @@ -1460,6 +1463,7 @@ class TReadTableWorker : public TActorBootstrapped { if (columnType.TypeInfo) { *c.MutableTypeInfo() = *columnType.TypeInfo; } + c.SetNotNull(col.IsNotNullColumn); } auto& txRange = *tx.MutableRange(); @@ -1741,15 +1745,19 @@ class TReadTableWorker : public TActorBootstrapped { void SendEmptyResponseData(const TActorContext& ctx) { TString data; ui32 apiVersion = 0; + bool allowNotNull = false; switch (Settings.DataFormat) { case EReadTableFormat::OldResultSet: // we don't support empty result sets return; + case EReadTableFormat::YdbResultSetWithNotNullSupport: + allowNotNull = true; case EReadTableFormat::YdbResultSet: { Ydb::ResultSet res; for (auto& col : Columns) { + bool notNullResp = allowNotNull && col.IsNotNullColumn; auto* meta = res.add_columns(); meta->set_name(col.Name); @@ -1759,16 +1767,15 @@ class TReadTableWorker : public TActorBootstrapped { pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); } else { + auto xType = notNullResp ? meta->mutable_type() : meta->mutable_type()->mutable_optional_type()->mutable_item(); auto id = static_cast(col.PType.GetTypeId()); if (id == NYql::NProto::Decimal) { - auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item() - ->mutable_decimal_type(); + auto decimalType = xType->mutable_decimal_type(); //TODO: Pass decimal params here decimalType->set_precision(22); decimalType->set_scale(9); } else { - meta->mutable_type()->mutable_optional_type()->mutable_item() - ->set_type_id(static_cast(id)); + xType->set_type_id(static_cast(id)); } } } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index e878cb5bdf17..7a2d12fb8777 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -1074,6 +1074,7 @@ message ReadTableRequest { // greater than any of the limits uint64 batch_limit_bytes = 8; uint64 batch_limit_rows = 9; + Ydb.FeatureFlag.Status return_not_null_data_as_optional = 10; } // ReadTable doesn't use Operation, returns result directly diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 596e38a85846..5b0d6a6c8034 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -786,6 +786,13 @@ NThreading::TFuture>(); Connections_->StartReadStream( diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index ee1a176ee708..8355cfa13f6c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -1577,6 +1577,8 @@ struct TReadTableSettings : public TRequestSettings { FLUENT_SETTING_OPTIONAL(ui64, BatchLimitBytes); FLUENT_SETTING_OPTIONAL(ui64, BatchLimitRows); + + FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional); }; //! Represents all session operations diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 43274e7d2c6a..17a4b17f9b2e 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -1537,14 +1537,15 @@ R"___(
: Error: Transaction not found: , code: 2015 CREATE TABLE `Root/Test` ( Key Uint64 NOT NULL, Value String, + Amount Decimal(22,9) NOT NULL, PRIMARY KEY (Key) ); )___").ExtractValueSync(); UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); result = session.ExecuteDataQuery(R"___( - UPSERT INTO `Root/Test` (Key, Value) VALUES (0u, "Zero"); - UPSERT INTO `Root/Test` (Key, Value) VALUES (1u, "One"); + UPSERT INTO `Root/Test` (Key, Value, Amount) VALUES (0u, "Zero", UNWRAP(CAST("0.11" AS Decimal(22, 9)))); + UPSERT INTO `Root/Test` (Key, Value, Amount) VALUES (1u, "One", UNWRAP(CAST("1.11" AS Decimal(22, 9)))); )___", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); @@ -1565,7 +1566,28 @@ R"___(
: Error: Transaction not found: , code: 2015 UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); - UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"];[\"1.11\"]]]"); + } + + { + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .Uint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .ReturnNotNullAsOptional(false) + .From(TKeyBound::Inclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[[1u;[\"One\"];\"1.11\"]]"); } { @@ -1586,7 +1608,7 @@ R"___(
: Error: Transaction not found: , code: 2015 UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); - UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"];[\"1.11\"]]]"); } { @@ -1607,7 +1629,7 @@ R"___(
: Error: Transaction not found: , code: 2015 UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); - UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"]]]"); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[1u];[\"One\"];[\"1.11\"]]]"); } { @@ -1628,7 +1650,27 @@ R"___(
: Error: Transaction not found: , code: 2015 UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::SUCCESS, streamPart.GetIssues().ToString()); auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); - UNIT_ASSERT_VALUES_EQUAL(str, "[[[0u];[\"Zero\"]];[[1u];[\"One\"]]]"); + UNIT_ASSERT_VALUES_EQUAL(str, "[[[0u];[\"Zero\"];[\"0.11\"]];[[1u];[\"One\"];[\"1.11\"]]]"); + } + + { + TValueBuilder valueFrom; + valueFrom.BeginTuple() + .AddElement() + .Uint64(1) + .EndTuple(); + + auto settings = TReadTableSettings() + .Ordered() + .From(TKeyBound::Exclusive(valueFrom.Build())); + + auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); + + TReadTableResultPart streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(streamPart.GetStatus(), EStatus::SUCCESS); + + auto str = NYdb::FormatResultSetYson(streamPart.ExtractPart()); + UNIT_ASSERT_VALUES_EQUAL(str, "[]"); } { @@ -1640,6 +1682,7 @@ R"___(
: Error: Transaction not found: , code: 2015 auto settings = TReadTableSettings() .Ordered() + .ReturnNotNullAsOptional(false) .From(TKeyBound::Exclusive(valueFrom.Build())); auto it = session.ReadTable("Root/Test", settings).ExtractValueSync(); From 740bb6fbf359c5bde92f5f55c5284b0bb7b58d6c Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Wed, 22 May 2024 13:25:11 +0200 Subject: [PATCH 4/4] pass not null flag as TTableColumnInfo field (#4731) --- ydb/core/grpc_services/resolve_local_db_table.cpp | 4 +++- ydb/core/tx/datashard/sys_tables.h | 4 +++- ydb/core/tx/scheme_board/cache.cpp | 1 + ydb/core/tx/tx_proxy/read_table_impl.cpp | 4 +--- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ydb/core/grpc_services/resolve_local_db_table.cpp b/ydb/core/grpc_services/resolve_local_db_table.cpp index 81ae3d12a0cd..daa0ebc8919c 100644 --- a/ydb/core/grpc_services/resolve_local_db_table.cpp +++ b/ydb/core/grpc_services/resolve_local_db_table.cpp @@ -45,7 +45,9 @@ namespace NGRpcService { const NTable::TScheme::TTableInfo* tableInfo = scheme.Tables.FindPtr(*ti); for (const auto& col : tableInfo->Columns) { - entry.Columns[col.first] = TSysTables::TTableColumnInfo(col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder); + entry.Columns[col.first] = TSysTables::TTableColumnInfo( + col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder, + {}, TSysTables::TTableColumnInfo::EDefaultKind::DEFAULT_UNDEFINED, {}, false, col.second.NotNull); } } diff --git a/ydb/core/tx/datashard/sys_tables.h b/ydb/core/tx/datashard/sys_tables.h index a1ac25c0a841..7378daf1e26c 100644 --- a/ydb/core/tx/datashard/sys_tables.h +++ b/ydb/core/tx/datashard/sys_tables.h @@ -31,6 +31,7 @@ struct TSysTables { EDefaultKind DefaultKind; Ydb::TypedValue DefaultFromLiteral; bool IsBuildInProgress = false; + bool IsNotNullColumn = false; //maybe move into TTypeInfo? TTableColumnInfo() = default; @@ -54,7 +55,7 @@ struct TSysTables { const TString& typeMod = {}, i32 keyOrder = -1, const TString& defaultFromSequence = {}, EDefaultKind defaultKind = EDefaultKind::DEFAULT_UNDEFINED, - const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false) + const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false, bool isNotNullColumn = false) : Name(name) , Id(colId) , PType(type) @@ -64,6 +65,7 @@ struct TSysTables { , DefaultKind(defaultKind) , DefaultFromLiteral(defaultFromLiteral) , IsBuildInProgress(isBuildInProgress) + , IsNotNullColumn(isNotNullColumn) {} }; diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index fc5197482469..89f0796b905a 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -785,6 +785,7 @@ class TSchemeCache: public TMonitorableActor { } if (columnDesc.GetNotNull()) { + column.IsNotNullColumn = true; NotNullColumns.insert(columnDesc.GetName()); } } diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index 71c98ad1810a..ba8f20f990c4 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -541,14 +541,12 @@ class TReadTableWorker : public TActorBootstrapped { size_t no = 0; size_t keys = 0; - const auto& notNullColumns = res.NotNullColumns; - for (auto &entry : res.Columns) { auto& col = entry.second; if (col.KeyOrder != -1) { keyTypes[col.KeyOrder] = col.PType; - notNullKeys[col.KeyOrder] = notNullColumns.contains(col.Name); + notNullKeys[col.KeyOrder] = col.IsNotNullColumn; ++keys; }