Skip to content

Commit

Permalink
Not null support for ReadTable key boundary (ydb-platform#4368)
Browse files Browse the repository at this point in the history
Changelog category

Bugfix
  • Loading branch information
dcherednik authored May 10, 2024
1 parent c0f4520 commit c30f11a
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 29 deletions.
69 changes: 51 additions & 18 deletions ydb/core/engine/mkql_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
namespace NKikimr::NMiniKQL {

// NOTE: TCell's can reference memomry from tupleValue
// TODO: Place notNull flag in to the NScheme::TTypeInfo?
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& types,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand All @@ -28,6 +30,21 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
return false; \
}

CHECK_OR_RETURN_ERROR(types.size() >= tupleValue.TupleSize(),
"The size fo type array less then value tuple size");

// Please note we modify notNullTypes during tuplyType verification to allow cast nullable to non nullable value
if (notNullTypes) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() == types.size(),
"The size of type array and given not null markers must be equial");
if (tupleType) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() >= tupleType->GetTuple().ElementSize(),
"The size of tuple type and given not null markers must be equal");
}
} else {
notNullTypes.resize(types.size());
}

if (tupleType) {
CHECK_OR_RETURN_ERROR(tupleType->GetKind() == NKikimrMiniKQL::Tuple ||
(tupleType->GetKind() == NKikimrMiniKQL::Unknown && tupleType->GetTuple().ElementSize() == 0), "Must be a tuple");
Expand All @@ -36,8 +53,14 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,

for (size_t i = 0; i < tupleType->GetTuple().ElementSize(); ++i) {
const auto& ti = tupleType->GetTuple().GetElement(i);
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
const auto& item = ti.GetOptional().GetItem();
if (notNullTypes[i]) {
// For not null column type we allow to build cell from nullable mkql type for compatibility reason.
notNullTypes[i] = ti.GetKind() != NKikimrMiniKQL::Optional;
} else {
// But we do not allow to build cell for nullable column from not nullable type
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
}
const auto& item = notNullTypes[i] ? ti : ti.GetOptional().GetItem();
CHECK_OR_RETURN_ERROR(item.GetKind() == NKikimrMiniKQL::Data, "Element at index " + ToString(i) + " Item kind is not Data");
const auto& typeId = item.GetData().GetScheme();
CHECK_OR_RETURN_ERROR(typeId == types[i].GetTypeId() ||
Expand All @@ -53,26 +76,36 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
}

for (ui32 i = 0; i < tupleValue.TupleSize(); ++i) {
auto& o = tupleValue.GetTuple(i);

auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));
auto& o = tupleValue.GetTuple(i);

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
if (notNullTypes[i]) {
CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0 &&
!o.HasOptional(),
Sprintf("Primitive type is expected in tuple at position %" PRIu32, i));
} else {
auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
}
}

auto& v = o.GetOptional();
auto& v = notNullTypes[i] ? o : o.GetOptional();

auto value_case = v.value_value_case();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/engine/mkql_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class THolderFactory;
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& expectedTypes,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/engine/mkql_proto_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Y_UNIT_TEST(TestExportVariantStructTypeYdb) {
TVector<TCell> cells;
TVector<TString> memoryOwner;
TString errStr;
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, true, cells, errStr, memoryOwner);
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, {}, true, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL_C(res, errStr.empty(), paramsProto);

return errStr;
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/grpc_services/rpc_read_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
return ReplyFinishStream(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx);
}
case TEvTxUserProxy::TResultStatus::ResolveError: {
const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
auto tmp = issueMessage.Add();
for (const auto& unresolved : msg->Record.GetUnresolvedKeys()) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(unresolved));
}
NYql::IssueToMessage(issue, tmp);
return ReplyFinishStream(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
TVector<TCell> cells;
TString error;
TVector<TString> memoryOwner;
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, false,
cells, error, memoryOwner)) {
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, {},
false, cells, error, memoryOwner)) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Invalid partition boundary at position: %u, error: %s", i, error.data());
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6475,7 +6475,7 @@ bool TSchemeShard::FillSplitPartitioning(TVector<TString>& rangeEnds, const TCon
if (boundary.HasSerializedKeyPrefix()) {
prefix.Parse(boundary.GetSerializedKeyPrefix());
rangeEnd = TVector<TCell>(prefix.GetCells().begin(), prefix.GetCells().end());
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, false, rangeEnd, errStr, memoryOwner)) {
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, {}, false, rangeEnd, errStr, memoryOwner)) {
errStr = Sprintf("Error at split boundary %d: %s", i, errStr.data());
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult &record) {
const auto& b = descr.GetTable().GetSplitBoundary(i);
TVector<TCell> cells;
TVector<TString> memoryOwner;
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, false, cells, errStr, memoryOwner);
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, {}, false, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL(errStr, "");

TString serialized = TSerializedCellVec::Serialize(cells);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/tx_proxy/datareq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ void TDataReq::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev,
toExpand = toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL;
}
}

if (!ParseRangeKey(ReadTableRequest->Range.GetFrom(), keyTypes,
ReadTableRequest->FromValues, fromExpand)
|| !ParseRangeKey(ReadTableRequest->Range.GetTo(), keyTypes,
Expand Down Expand Up @@ -3026,7 +3027,7 @@ bool TDataReq::ParseRangeKey(const NKikimrMiniKQL::TParams &proto,
auto& value = proto.GetValue();
auto& type = proto.GetType();
TString errStr;
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, true, key, errStr, memoryOwner);
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, {}, true, key, errStr, memoryOwner);
if (!res) {
UnresolvedKeys.push_back("Failed to parse range key tuple: " + errStr);
return false;
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/tx/tx_proxy/read_table_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace {
bool ParseRangeKey(
const NKikimrMiniKQL::TParams& proto,
TConstArrayRef<NScheme::TTypeInfo> keyTypes,
const TVector<bool>& notNullTypes,
TSerializedCellVec& buf,
EParseRangeKeyExp exp,
TVector<TString>& unresolvedKeys)
Expand All @@ -108,7 +109,7 @@ namespace {
auto& value = proto.GetValue();
auto& type = proto.GetType();
TString errStr;
bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, true, key, errStr, memoryOwner);
bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, notNullTypes, true, key, errStr, memoryOwner);
if (!res) {
unresolvedKeys.push_back("Failed to parse range key tuple: " + errStr);
return false;
Expand Down Expand Up @@ -534,16 +535,20 @@ class TReadTableWorker : public TActorBootstrapped<TReadTableWorker> {
}

TVector<NScheme::TTypeInfo> keyTypes(res.Columns.size());
TVector<bool> notNullKeys(res.Columns.size());
TVector<TKeyDesc::TColumnOp> columns(res.Columns.size());
{
size_t no = 0;
size_t keys = 0;

const auto& notNullColumns = res.NotNullColumns;

for (auto &entry : res.Columns) {
auto& col = entry.second;

if (col.KeyOrder != -1) {
keyTypes[col.KeyOrder] = col.PType;
notNullKeys[col.KeyOrder] = notNullColumns.contains(col.Name);
++keys;
}

Expand All @@ -564,6 +569,7 @@ class TReadTableWorker : public TActorBootstrapped<TReadTableWorker> {
}

keyTypes.resize(keys);
notNullKeys.resize(keys);
}

if (!colNameToPos.empty()) {
Expand Down Expand Up @@ -595,9 +601,9 @@ class TReadTableWorker : public TActorBootstrapped<TReadTableWorker> {
? (toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL)
: EParseRangeKeyExp::NONE);

if (!ParseRangeKey(Settings.KeyRange.GetFrom(), keyTypes,
if (!ParseRangeKey(Settings.KeyRange.GetFrom(), keyTypes, notNullKeys,
KeyFromValues, fromExpand, UnresolvedKeys) ||
!ParseRangeKey(Settings.KeyRange.GetTo(), keyTypes,
!ParseRangeKey(Settings.KeyRange.GetTo(), keyTypes, notNullKeys,
KeyToValues, toExpand, UnresolvedKeys))
{
TxProxyMon->ResolveKeySetWrongRequest->Inc();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/tx_proxy/resolvereq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
auto& value = proto.GetValue();
auto& type = proto.GetType();
TString errStr;
bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, true, key, errStr, memoryOwner);
bool res = NMiniKQL::CellsFromTuple(&type, value, keyTypes, {}, true, key, errStr, memoryOwner);
if (!res) {
unresolvedKeys.push_back("Failed to parse range key tuple: " + errStr);
return false;
Expand Down
Loading

0 comments on commit c30f11a

Please sign in to comment.