Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow read table returns and use not null types via public API #5219

Merged
merged 4 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions ydb/core/engine/mkql_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
namespace NKikimr::NMiniKQL {

// NOTE: TCell's can reference memomry from tupleValue
// TODO: Place notNull flag in to the NScheme::TTypeInfo?
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& types,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand All @@ -28,6 +30,18 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
return false; \
}

// Please note we modify notNullTypes during tuplyType verification to allow cast nullable to non nullable value
if (notNullTypes) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() == types.size(),
"The size of type array and given not null markers must be equial");
if (tupleType) {
CHECK_OR_RETURN_ERROR(notNullTypes.size() >= tupleType->GetTuple().ElementSize(),
"The size of tuple type and given not null markers must be equal");
}
} else {
notNullTypes.resize(types.size());
}

if (tupleType) {
CHECK_OR_RETURN_ERROR(tupleType->GetKind() == NKikimrMiniKQL::Tuple ||
(tupleType->GetKind() == NKikimrMiniKQL::Unknown && tupleType->GetTuple().ElementSize() == 0), "Must be a tuple");
Expand All @@ -36,8 +50,14 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,

for (size_t i = 0; i < tupleType->GetTuple().ElementSize(); ++i) {
const auto& ti = tupleType->GetTuple().GetElement(i);
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
const auto& item = ti.GetOptional().GetItem();
if (notNullTypes[i]) {
// For not null column type we allow to build cell from nullable mkql type for compatibility reason.
notNullTypes[i] = ti.GetKind() != NKikimrMiniKQL::Optional;
} else {
// But we do not allow to build cell for nullable column from not nullable type
CHECK_OR_RETURN_ERROR(ti.GetKind() == NKikimrMiniKQL::Optional, "Element at index " + ToString(i) + " in not an Optional");
}
const auto& item = notNullTypes[i] ? ti : ti.GetOptional().GetItem();
CHECK_OR_RETURN_ERROR(item.GetKind() == NKikimrMiniKQL::Data, "Element at index " + ToString(i) + " Item kind is not Data");
const auto& typeId = item.GetData().GetScheme();
CHECK_OR_RETURN_ERROR(typeId == types[i].GetTypeId() ||
Expand All @@ -53,26 +73,36 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
}

for (ui32 i = 0; i < tupleValue.TupleSize(); ++i) {
auto& o = tupleValue.GetTuple(i);

auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));
auto& o = tupleValue.GetTuple(i);

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
if (notNullTypes[i]) {
CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0 &&
!o.HasOptional(),
Sprintf("Primitive type is expected in tuple at position %" PRIu32, i));
} else {
auto element_case = o.value_value_case();

CHECK_OR_RETURN_ERROR(element_case == NKikimrMiniKQL::TValue::kOptional ||
element_case == NKikimrMiniKQL::TValue::VALUE_VALUE_NOT_SET,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

CHECK_OR_RETURN_ERROR(o.ListSize() == 0 &&
o.StructSize() == 0 &&
o.TupleSize() == 0 &&
o.DictSize() == 0,
Sprintf("Optional type is expected in tuple at position %" PRIu32, i));

if (!o.HasOptional()) {
key.push_back(TCell());
continue;
}
}

auto& v = o.GetOptional();
auto& v = notNullTypes[i] ? o : o.GetOptional();

auto value_case = v.value_value_case();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/engine/mkql_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class THolderFactory;
bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
const NKikimrMiniKQL::TValue& tupleValue,
const TConstArrayRef<NScheme::TTypeInfo>& expectedTypes,
TVector<bool> notNullTypes,
bool allowCastFromString,
TVector<TCell>& key,
TString& errStr,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/engine/mkql_proto_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Y_UNIT_TEST(TestExportVariantStructTypeYdb) {
TVector<TCell> cells;
TVector<TString> memoryOwner;
TString errStr;
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, true, cells, errStr, memoryOwner);
bool res = CellsFromTuple(&params.GetType(), params.GetValue(), types, {}, true, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL_C(res, errStr.empty(), paramsProto);

return errStr;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/resolve_local_db_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace NGRpcService {
const NTable::TScheme::TTableInfo* tableInfo = scheme.Tables.FindPtr(*ti);

for (const auto& col : tableInfo->Columns) {
entry.Columns[col.first] = TSysTables::TTableColumnInfo(col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder);
entry.Columns[col.first] = TSysTables::TTableColumnInfo(
col.second.Name, col.first, col.second.PType, col.second.PTypeMod, col.second.KeyOrder,
{}, TSysTables::TTableColumnInfo::EDefaultKind::DEFAULT_UNDEFINED, {}, false, col.second.NotNull);
}
}

Expand Down
22 changes: 21 additions & 1 deletion ydb/core/grpc_services/rpc_read_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
} else {
TStringStream str;
str << "Response version missmatched";
if (msg->Record.HasReadTableResponseVersion()) {
str << " , got: " << msg->Record.GetReadTableResponseVersion();
}
LOG_ERROR(ctx, NKikimrServices::READ_TABLE_API,
"%s", str.Str().data());
const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, str.Str());
Expand Down Expand Up @@ -243,8 +246,11 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
return ReplyFinishStream(Ydb::StatusIds::UNAUTHORIZED, issueMessage, ctx);
}
case TEvTxUserProxy::TResultStatus::ResolveError: {
const NYql::TIssue& issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Got ResolveError response from TxProxy");
auto tmp = issueMessage.Add();
for (const auto& unresolved : msg->Record.GetUnresolvedKeys()) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(unresolved));
}
NYql::IssueToMessage(issue, tmp);
return ReplyFinishStream(Ydb::StatusIds::SCHEME_ERROR, issueMessage, ctx);
}
Expand Down Expand Up @@ -525,6 +531,20 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
settings.TablePath = req->path();
settings.Ordered = req->ordered();
settings.RequireResultSet = true;

// Right now assume return_not_null_data_as_optional is true by default
// Sometimes we well change this default
switch (req->return_not_null_data_as_optional()) {
case Ydb::FeatureFlag::DISABLED:
settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSetWithNotNullSupport;
break;
case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
case Ydb::FeatureFlag::ENABLED:
default:
settings.DataFormat = NTxProxy::EReadTableFormat::YdbResultSet;
break;
}

if (req->row_limit()) {
settings.MaxRows = req->row_limit();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ message TReadTableTransaction {
optional string Name = 2;
optional uint32 TypeId = 3;
optional NKikimrProto.TTypeInfo TypeInfo = 4;
optional bool NotNull = 5 [default = false]; //If not set datashard will treat not null type as nullable (for compatibility)
}

optional NKikimrDataEvents.TTableId TableId = 1;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message TReadTableTransaction {
enum EVersion {
UNSPECIFIED = 0;
YDB_V1 = 1;
YDB_V2 = 2; // Like V1 but allows NotNull types in result set
}
optional string Path = 1;
optional bool Ordered = 2 [default = false];
Expand Down
22 changes: 15 additions & 7 deletions ydb/core/tx/datashard/read_table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ class TRowsToOldResult : public TRowsToResult {

class TRowsToYdbResult : public TRowsToResult {
public:
TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request)
TRowsToYdbResult(const NKikimrTxDataShard::TReadTableTransaction& request, bool allowNotNull)
: TRowsToResult(request)
, AllowNotNull(allowNotNull)
{
BuildResultCommonPart(request);
StartNewMessage();
Expand Down Expand Up @@ -345,16 +346,16 @@ class TRowsToYdbResult : public TRowsToResult {
pg->set_typlen(0);
pg->set_typmod(0);
} else {
bool notNullResp = AllowNotNull && col.GetNotNull();
auto id = static_cast<NYql::NProto::TypeIds>(col.GetTypeId());
auto xType = notNullResp ? meta->mutable_type() : meta->mutable_type()->mutable_optional_type()->mutable_item();
if (id == NYql::NProto::Decimal) {
auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item()
->mutable_decimal_type();
auto decimalType = xType->mutable_decimal_type();
//TODO: Pass decimal params here
decimalType->set_precision(22);
decimalType->set_scale(9);
} else {
meta->mutable_type()->mutable_optional_type()->mutable_item()
->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id));
xType->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id));
}
}
}
Expand All @@ -363,6 +364,7 @@ class TRowsToYdbResult : public TRowsToResult {
}

Ydb::ResultSet YdbResultSet;
const bool AllowNotNull;
};

class TReadTableScan : public TActor<TReadTableScan>, public NTable::IScan {
Expand Down Expand Up @@ -390,8 +392,14 @@ class TReadTableScan : public TActor<TReadTableScan>, public NTable::IScan {
, PendingAcks(0)
, Finished(false)
{
if (tx.HasApiVersion() && tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) {
Writer = MakeHolder<TRowsToYdbResult>(tx);
if (tx.HasApiVersion()) {
if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V1) {
Writer = MakeHolder<TRowsToYdbResult>(tx, false);
} else if (tx.GetApiVersion() == NKikimrTxUserProxy::TReadTableTransaction::YDB_V2) {
Writer = MakeHolder<TRowsToYdbResult>(tx, true);
} else {
Writer = MakeHolder<TRowsToOldResult>(tx);
}
} else {
Writer = MakeHolder<TRowsToOldResult>(tx);
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/sys_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct TSysTables {
EDefaultKind DefaultKind;
Ydb::TypedValue DefaultFromLiteral;
bool IsBuildInProgress = false;
bool IsNotNullColumn = false; //maybe move into TTypeInfo?

TTableColumnInfo() = default;

Expand All @@ -54,7 +55,7 @@ struct TSysTables {
const TString& typeMod = {}, i32 keyOrder = -1,
const TString& defaultFromSequence = {},
EDefaultKind defaultKind = EDefaultKind::DEFAULT_UNDEFINED,
const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false)
const Ydb::TypedValue& defaultFromLiteral = {}, bool isBuildInProgress = false, bool isNotNullColumn = false)
: Name(name)
, Id(colId)
, PType(type)
Expand All @@ -64,6 +65,7 @@ struct TSysTables {
, DefaultKind(defaultKind)
, DefaultFromLiteral(defaultFromLiteral)
, IsBuildInProgress(isBuildInProgress)
, IsNotNullColumn(isNotNullColumn)
{}
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}

if (columnDesc.GetNotNull()) {
column.IsNotNullColumn = true;
NotNullColumns.insert(columnDesc.GetName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
TVector<TCell> cells;
TString error;
TVector<TString> memoryOwner;
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, false,
cells, error, memoryOwner)) {
if (!NMiniKQL::CellsFromTuple(nullptr, op.GetPartitionBoundaries(i), pqGroupInfo->KeySchema, {},
false, cells, error, memoryOwner)) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Invalid partition boundary at position: %u, error: %s", i, error.data());
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6430,7 +6430,7 @@ bool TSchemeShard::FillSplitPartitioning(TVector<TString>& rangeEnds, const TCon
if (boundary.HasSerializedKeyPrefix()) {
prefix.Parse(boundary.GetSerializedKeyPrefix());
rangeEnd = TVector<TCell>(prefix.GetCells().begin(), prefix.GetCells().end());
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, false, rangeEnd, errStr, memoryOwner)) {
} else if (!NMiniKQL::CellsFromTuple(nullptr, boundary.GetKeyPrefix(), keyColTypes, {}, false, rangeEnd, errStr, memoryOwner)) {
errStr = Sprintf("Error at split boundary %d: %s", i, errStr.data());
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult &record) {
const auto& b = descr.GetTable().GetSplitBoundary(i);
TVector<TCell> cells;
TVector<TString> memoryOwner;
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, false, cells, errStr, memoryOwner);
NMiniKQL::CellsFromTuple(nullptr, b.GetKeyPrefix(), keyColTypes, {}, false, cells, errStr, memoryOwner);
UNIT_ASSERT_VALUES_EQUAL(errStr, "");

TString serialized = TSerializedCellVec::Serialize(cells);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/tx_proxy/datareq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,7 @@ void TDataReq::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev,
toExpand = toInclusive ? EParseRangeKeyExp::NONE : EParseRangeKeyExp::TO_NULL;
}
}

if (!ParseRangeKey(ReadTableRequest->Range.GetFrom(), keyTypes,
ReadTableRequest->FromValues, fromExpand)
|| !ParseRangeKey(ReadTableRequest->Range.GetTo(), keyTypes,
Expand Down Expand Up @@ -3033,7 +3034,7 @@ bool TDataReq::ParseRangeKey(const NKikimrMiniKQL::TParams &proto,
auto& value = proto.GetValue();
auto& type = proto.GetType();
TString errStr;
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, true, key, errStr, memoryOwner);
bool res = NMiniKQL::CellsFromTuple(&type, value, keyType, {}, true, key, errStr, memoryOwner);
if (!res) {
UnresolvedKeys.push_back("Failed to parse range key tuple: " + errStr);
return false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/read_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NTxProxy {
enum class EReadTableFormat {
OldResultSet,
YdbResultSet,
YdbResultSetWithNotNullSupport
};

struct TReadTableSettings {
Expand Down
Loading
Loading