Skip to content

Commit

Permalink
Merge 740bb6f into ea2ef69
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Jun 5, 2024
2 parents ea2ef69 + 740bb6f commit 5401d84
Show file tree
Hide file tree
Showing 21 changed files with 363 additions and 42 deletions.
66 changes: 48 additions & 18 deletions ydb/core/engine/mkql_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
namespace NKikimr::NMiniKQL {

// NOTE: TCell's can reference memomry from tupleValue
// TODO: Place notNull flag in to the NScheme::TTypeInfo?
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& types,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand All @@ -28,6 +30,18 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
return false; \
}

// Please note we modify notNullTypes during tuplyType verification to allow cast nullable to non nullable value
if (notNullTypes) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() == types.size(),
"The size of type array and given not null markers must be equial");
if (tupleType) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() >= tupleType->GetTuple().ElementSize(),
"The size of tuple type and given not null markers must be equal");
}
} else {
notNullTypes.resize(types.size());
}

if (tupleType) {
CHECK_OR_RETURN_ERROR(tupleType->GetKind() == NKikimrMiniKQL::Tuple ||
(tupleType->GetKind() == NKikimrMiniKQL::Unknown && tupleType->GetTuple().ElementSize() == 0), "Must be a tuple");
Expand All @@ -36,8 +50,14 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,

for (size_t i = 0; i < tupleType->GetTuple().ElementSize(); ++i) {
const auto& ti = tupleType->GetTuple().GetElement(i);
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
const auto& item = ti.GetOptional().GetItem();
if (notNullTypes[i]) {
// For not null column type we allow to build cell from nullable mkql type for compatibility reason.
notNullTypes[i] = ti.GetKind() != NKikimrMiniKQL::Optional;
} else {
// But we do not allow to build cell for nullable column from not nullable type
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
}
const auto& item = notNullTypes[i] ? ti : ti.GetOptional().GetItem();
CHECK_OR_RETURN_ERROR(item.GetKind() == NKikimrMiniKQL::Data, "Element at index " + ToString(i) + " Item kind is not Data");
const auto& typeId = item.GetData().GetScheme();
CHECK_OR_RETURN_ERROR(typeId == types[i].GetTypeId() ||
Expand All @@ -53,26 +73,36 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
}

for (ui32 i = 0; i < tupleValue.TupleSize(); ++i) {
auto& o = tupleValue.GetTuple(i);

auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));
auto& o = tupleValue.GetTuple(i);

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
if (notNullTypes[i]) {
CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0 &&
!o.HasOptional(),
Sprintf("Primitive type is expected in tuple at position %" PRIu32, i));
} else {
auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
}
}

auto& v = o.GetOptional();
auto& v = notNullTypes[i] ? o : o.GetOptional();

auto value_case = v.value_value_case();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/engine/mkql_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class THolderFactory;
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& expectedTypes,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/engine/mkql_proto_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Y_UNIT_TEST(TestExportVariantStructTypeYdb) {
TVector<TCell> cells;
TVector<TString> memoryOwner;
TString errStr;
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, true, cells, errStr, memoryOwner);
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, {}, true, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL_C(res, errStr.empty(), paramsProto);

return errStr;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/resolve_local_db_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace NGRpcService {
const NTable::TScheme::TTableInfo* tableInfo = scheme.Tables.FindPtr(*ti);

for (const auto& col : tableInfo->Columns) {
entry.Columns[col.first] = TSysTables::TTableColumnInfo(col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder);
entry.Columns[col.first] = TSysTables::TTableColumnInfo(
col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder,
{}, TSysTables::TTableColumnInfo::EDefaultKind::DEFAULT_UNDEFINED, {}, false, col.second.NotNull);
}
}

Expand Down
22 changes: 21 additions & 1 deletion ydb/core/grpc_services/rpc_read_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
} else {
TStringStream str;
str << "Response version missmatched";
if (msg->Record.HasReadTableResponseVersion()) {
str << " , got: " << msg->Record.GetReadTableResponseVersion();
}
LOG_ERROR(ctx, NKikimrServices::READ_TABLE_API,
"%s", str.Str().data());
const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str());
Expand Down Expand Up @@ -243,8 +246,11 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
return ReplyFinishStream(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx);
}
case TEvTxUserProxy::TResultStatus::ResolveError: {
const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
auto tmp = issueMessage.Add();
for (const auto& unresolved : msg->Record.GetUnresolvedKeys()) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(unresolved));
}
NYql::IssueToMessage(issue, tmp);
return ReplyFinishStream(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx);
}
Expand Down Expand Up @@ -525,6 +531,20 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
settings.TablePath = req->path();
settings.Ordered = req->ordered();
settings.RequireResultSet = true;

// Right now assume return_not_null_data_as_optional is true by default
// Sometimes we well change this default
switch (req->return_not_null_data_as_optional()) {
case Ydb::FeatureFlag::DISABLED:
settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSetWithNotNullSupport;
break;
case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
case Ydb::FeatureFlag::ENABLED:
default:
settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSet;
break;
}

if (req->row_limit()) {
settings.MaxRows = req->row_limit();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ message TReadTableTransaction {
optional string Name = 2;
optional uint32 TypeId = 3;
optional NKikimrProto.TTypeInfo TypeInfo = 4;
optional bool NotNull = 5 [default = false]; //If not set datashard will treat not null type as nullable (for compatibility)
}

optional NKikimrDataEvents.TTableId TableId = 1;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message TReadTableTransaction {
enum EVersion {
UNSPECIFIED = 0;
YDB_V1 = 1;
YDB_V2 = 2; // Like V1 but allows NotNull types in result set
}
optional string Path = 1;
optional bool Ordered = 2 [default = false];
Expand Down
22 changes: 15 additions & 7 deletions ydb/core/tx/datashard/read_table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ class TRowsToOldResult : public TRowsToResult {

class TRowsToYdbResult : public TRowsToResult {
public:
TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request)
TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request, bool allowNotNull)
: TRowsToResult(request)
, AllowNotNull(allowNotNull)
{
BuildResultCommonPart(request);
StartNewMessage();
Expand Down Expand Up @@ -345,16 +346,16 @@ class TRowsToYdbResult : public TRowsToResult {
pg->set_typlen(0);
pg->set_typmod(0);
} else {
bool notNullResp = AllowNotNull && col.GetNotNull();
auto id = static_cast<NYql::NProto::TypeIds>(col.GetTypeId());
auto xType = notNullResp ? meta->mutable_type() : meta->mutable_type()->mutable_optional_type()->mutable_item();
if (id == NYql::NProto::Decimal) {
auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item()
->mutable_decimal_type();
auto decimalType = xType->mutable_decimal_type();
//TODO: Pass decimal params here
decimalType->set_precision(22);
decimalType->set_scale(9);
} else {
meta->mutable_type()->mutable_optional_type()->mutable_item()
->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id));
xType->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id));
}
}
}
Expand All @@ -363,6 +364,7 @@ class TRowsToYdbResult : public TRowsToResult {
}

Ydb::ResultSet YdbResultSet;
const bool AllowNotNull;
};

class TReadTableScan : public TActor<TReadTableScan>, public NTable::IScan {
Expand Down Expand Up @@ -390,8 +392,14 @@ class TReadTableScan : public TActor<TReadTableScan>, public NTable::IScan {
, PendingAcks(0)
, Finished(false)
{
if (tx.HasApiVersion() && tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) {
Writer = MakeHolder<TRowsToYdbResult>(tx);
if (tx.HasApiVersion()) {
if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) {
Writer = MakeHolder<TRowsToYdbResult>(tx, false);
} else if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V2) {
Writer = MakeHolder<TRowsToYdbResult>(tx, true);
} else {
Writer = MakeHolder<TRowsToOldResult>(tx);
}
} else {
Writer = MakeHolder<TRowsToOldResult>(tx);
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/sys_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct TSysTables {
EDefaultKind DefaultKind;
Ydb::TypedValue DefaultFromLiteral;
bool IsBuildInProgress = false;
bool IsNotNullColumn = false; //maybe move into TTypeInfo?

TTableColumnInfo() = default;

Expand All @@ -54,7 +55,7 @@ struct TSysTables {
const TString& typeMod = {}, i32 keyOrder = -1,
const TString& defaultFromSequence = {},
EDefaultKind defaultKind = EDefaultKind::DEFAULT_UNDEFINED,
const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false)
const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false, bool isNotNullColumn = false)
: Name(name)
, Id(colId)
, PType(type)
Expand All @@ -64,6 +65,7 @@ struct TSysTables {
, DefaultKind(defaultKind)
, DefaultFromLiteral(defaultFromLiteral)
, IsBuildInProgress(isBuildInProgress)
, IsNotNullColumn(isNotNullColumn)
{}
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}

if (columnDesc.GetNotNull()) {
column.IsNotNullColumn = true;
NotNullColumns.insert(columnDesc.GetName());
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
TVector<TCell> cells;
TString error;
TVector<TString> memoryOwner;
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, false,
cells, error, memoryOwner)) {
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, {},
false, cells, error, memoryOwner)) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Invalid partition boundary at position: %u, error: %s", i, error.data());
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6430,7 +6430,7 @@ bool TSchemeShard::FillSplitPartitioning(TVector<TString>& rangeEnds, const TCon
if (boundary.HasSerializedKeyPrefix()) {
prefix.Parse(boundary.GetSerializedKeyPrefix());
rangeEnd = TVector<TCell>(prefix.GetCells().begin(), prefix.GetCells().end());
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, false, rangeEnd, errStr, memoryOwner)) {
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, {}, false, rangeEnd, errStr, memoryOwner)) {
errStr = Sprintf("Error at split boundary %d: %s", i, errStr.data());
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult &record) {
const auto& b = descr.GetTable().GetSplitBoundary(i);
TVector<TCell> cells;
TVector<TString> memoryOwner;
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, false, cells, errStr, memoryOwner);
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, {}, false, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL(errStr, "");

TString serialized = TSerializedCellVec::Serialize(cells);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/tx_proxy/datareq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,7 @@ void TDataReq::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev,
toExpand = toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL;
}
}

if (!ParseRangeKey(ReadTableRequest->Range.GetFrom(), keyTypes,
ReadTableRequest->FromValues, fromExpand)
|| !ParseRangeKey(ReadTableRequest->Range.GetTo(), keyTypes,
Expand Down Expand Up @@ -3033,7 +3034,7 @@ bool TDataReq::ParseRangeKey(const NKikimrMiniKQL::TParams &proto,
auto& value = proto.GetValue();
auto& type = proto.GetType();
TString errStr;
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, true, key, errStr, memoryOwner);
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, {}, true, key, errStr, memoryOwner);
if (!res) {
UnresolvedKeys.push_back("Failed to parse range key tuple: " + errStr);
return false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/read_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NTxProxy {
enum class EReadTableFormat {
OldResultSet,
YdbResultSet,
YdbResultSetWithNotNullSupport
};

struct TReadTableSettings {
Expand Down
Loading

0 comments on commit 5401d84

Please sign in to comment.