From ecdd0f28f14dc56fca7b44af168012b5b4c66097 Mon Sep 17 00:00:00 2001 From: Igor Munkin Date: Wed, 4 Sep 2024 14:34:42 +0000 Subject: [PATCH] Implement BlockMapJoinCore with key column drops --- .../comp_nodes/mkql_block_map_join.cpp | 75 ++++++++++++++----- 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp index 84c50b3c405c..63ac603640d6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp @@ -15,6 +15,18 @@ namespace NMiniKQL { namespace { +template +const TVector TupleToVector(const TRuntimeNode tupleNode) { + const auto tupleLiteral = AS_VALUE(TTupleLiteral, tupleNode); + TVector vector; + vector.reserve(tupleLiteral->GetValuesCount()); + for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i)); + vector.emplace_back(item->AsValue().Get()); + } + return vector; +} + size_t CalcMaxBlockLength(const TVector& items) { return CalcBlockLen(std::accumulate(items.cbegin(), items.cend(), 0ULL, [](size_t max, const TType* type) { @@ -27,13 +39,14 @@ template class TBlockJoinState : public TBlockState { public: TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, - const TVector& inputItems, + const TVector& inputItems, const TSet& keyDrops, const TVector outputItems, NUdf::TUnboxedValue**const fields) : TBlockState(memInfo, outputItems.size()) , InputWidth_(inputItems.size() - 1) , OutputWidth_(outputItems.size() - 1) , Inputs_(inputItems.size()) + , KeyDrops_(keyDrops) , InputsDescr_(ToValueDescr(inputItems)) { const auto& pgBuilder = ctx.Builder->GetPgBuilder(); @@ -46,6 +59,9 @@ class TBlockJoinState : public TBlockState { } // The last output column (i.e. block length) doesn't require a block builder. for (size_t i = 0; i < OutputWidth_; i++) { + if (KeyDrops_.contains(i)) { + continue; + } const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType(); Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_)); } @@ -145,10 +161,16 @@ class TBlockJoinState : public TBlockState { private: void AddItem(const TBlockItem& item, size_t idx) { + if (KeyDrops_.contains(idx)) { + return; + } Builders_[idx]->Add(item); } void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) { + if (KeyDrops_.contains(idx)) { + return; + } Builders_[idx]->Add(value); } @@ -164,6 +186,7 @@ class TBlockJoinState : public TBlockState { size_t InputWidth_; size_t OutputWidth_; TUnboxedValueVector Inputs_; + const TSet KeyDrops_; const std::vector InputsDescr_; TVector> Readers_; TVector> Converters_; @@ -178,12 +201,13 @@ using TState = TBlockJoinState; public: TBlockWideMapJoinWrapper(TComputationMutables& mutables, const TVector&& resultJoinItems, const TVector&& leftFlowItems, - TVector&& leftKeyColumns, + const TVector&& leftKeyColumns, const TVector&& leftKeyDrops, IComputationWideFlowNode* flow, IComputationNode* dict) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) , ResultJoinItems_(std::move(resultJoinItems)) , LeftFlowItems_(std::move(leftFlowItems)) , LeftKeyColumns_(std::move(leftKeyColumns)) + , LeftKeyDrops_(leftKeyDrops.cbegin(), leftKeyDrops.cend()) , Flow_(flow) , Dict_(dict) , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size())) @@ -248,7 +272,8 @@ using TState = TBlockJoinState; } void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_); + state = ctx.HolderFactory.Create(ctx, LeftFlowItems_, LeftKeyDrops_, + ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { @@ -267,6 +292,7 @@ using TState = TBlockJoinState; const TVector ResultJoinItems_; const TVector LeftFlowItems_; const TVector LeftKeyColumns_; + const TSet LeftKeyDrops_; IComputationWideFlowNode* const Flow_; IComputationNode* const Dict_; ui32 WideFieldsIndex_; @@ -280,12 +306,13 @@ using TState = TBlockJoinState; public: TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables, const TVector&& resultJoinItems, const TVector&& leftFlowItems, - TVector&& leftKeyColumns, + const TVector&& leftKeyColumns, const TVector&& leftKeyDrops, IComputationWideFlowNode* flow, IComputationNode* dict) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Boxed) , ResultJoinItems_(std::move(resultJoinItems)) , LeftFlowItems_(std::move(leftFlowItems)) , LeftKeyColumns_(std::move(leftKeyColumns)) + , LeftKeyDrops_(leftKeyDrops.cbegin(), leftKeyDrops.cend()) , Flow_(flow) , Dict_(dict) , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size())) @@ -357,7 +384,8 @@ using TState = TBlockJoinState; } void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_); + state = ctx.HolderFactory.Create(ctx, LeftFlowItems_, LeftKeyDrops_, + ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { @@ -413,6 +441,7 @@ using TState = TBlockJoinState; const TVector ResultJoinItems_; const TVector LeftFlowItems_; const TVector LeftKeyColumns_; + const TSet LeftKeyDrops_; IComputationWideFlowNode* const Flow_; IComputationNode* const Dict_; ui32 WideFieldsIndex_; @@ -421,7 +450,7 @@ using TState = TBlockJoinState; } // namespace IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); const auto joinType = callable.GetType()->GetReturnType(); MKQL_ENSURE(joinType->IsFlow(), "Expected WideFlow as a resulting stream"); @@ -459,16 +488,18 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly); - const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - TVector leftKeyColumns; - leftKeyColumns.reserve(tupleLiteral->GetValuesCount()); - for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) { - const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i)); - leftKeyColumns.emplace_back(item->AsValue().Get()); - } + const auto leftKeyColumns = TupleToVector(callable.GetInput(3)); // TODO: Handle multi keys. Y_ENSURE(leftKeyColumns.size() == 1); + const auto leftKeyDrops = TupleToVector(callable.GetInput(4)); + const TSet leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); + for (const auto& drop : leftKeyDrops) { + MKQL_ENSURE(leftKeySet.contains(drop), + "Only key columns has to be specified in drop column set"); + + } + const auto flow = LocateNode(ctx.NodeLocator, callable, 0); const auto dict = LocateNode(ctx.NodeLocator, callable, 1); @@ -477,28 +508,34 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo case EJoinKind::Inner: if (isMulti) { return new TBlockWideMultiMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); } return new TBlockWideMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); case EJoinKind::Left: if (isMulti) { return new TBlockWideMultiMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); } return new TBlockWideMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); case EJoinKind::LeftSemi: return new TBlockWideMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); case EJoinKind::LeftOnly: return new TBlockWideMapJoinWrapper(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns), + std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(leftKeyDrops), static_cast(flow), dict); default: MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"