From d7c005305a472e9c81c26064e8adbaefe4a05dd2 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Sat, 31 May 2025 13:05:09 +0300 Subject: [PATCH] fix row size estimation in stream lookup join (#19094) --- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 11 ++-------- ydb/core/kqp/ut/join/kqp_join_ut.cpp | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 182162ebd5d2..de993a21dacd 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace NKikimr { namespace NKqp { @@ -57,11 +58,6 @@ std::vector> GetRangePartitioning(const TKqpSt return rangePartition; } -NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) { - YQL_ENSURE(type); - return NScheme::TypeInfoFromMiniKQLType(type); -} - } // !namespace @@ -901,10 +897,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { i64 storageReadBytes = 0; - for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) { - auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i)); - leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes; - } + leftRowSize = NYql::NDq::TDqDataSerializer::EstimateSize(leftRowInfo.Row, leftRowType); if (!rightRow.empty()) { leftRowInfo.RightRowExist = true; diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp index 3f9117e5641e..21cc10749154 100644 --- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp @@ -275,6 +275,28 @@ Y_UNIT_TEST_SUITE(KqpJoin) { } } + Y_UNIT_TEST_TWIN(IndexLoookupJoinStructJoin, StreamLookupJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateSampleTables(session); + + auto result = session.ExecuteDataQuery(Q_(R"( + $a = AsList(AsStruct(AsStruct("Key" as Key) as join_info), AsStruct(AsStruct("Name1" as Key) as join_info)); + SELECT a.join_info.Key as Key, b.Value as Value from AS_TABLE($a) as a + LEFT JOIN `/Root/Join1_3` as b + ON a.join_info.Key = b.Key + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + CompareYson(R"([["Key";#];["Name1";[1001]]])", + FormatResultSetYson(result.GetResultSet(0))); + } + Y_UNIT_TEST(IdxLookupPartialLeftPredicate) { TKikimrSettings settings; TKikimrRunner kikimr(settings);