Skip to content

Commit

Permalink
Use I/O mapping to produce tight loop
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin committed Sep 12, 2024
1 parent 36ab859 commit 0cb5832
Showing 1 changed file with 40 additions and 33 deletions.
73 changes: 40 additions & 33 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ template <bool RightRequired>
class TBlockJoinState : public TBlockState {
public:
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems, const THashSet<ui32>& keyDrops,
const TVector<TType*>& inputItems,
const TVector<ui32>& leftIOMap,
const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBlockState(memInfo, outputItems.size())
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, KeyDrops_(keyDrops)
, LeftIOMap_(leftIOMap)
, InputsDescr_(ToValueDescr(inputItems))
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
Expand All @@ -55,28 +56,25 @@ class TBlockJoinState : public TBlockState {

void CopyRow() {
// Copy items from the "left" flow.
size_t builderIndex = 0;
for (size_t i = 0; i < InputWidth_; i++) {
if (KeyDrops_.contains(i)) {
continue;
}
AddItem(GetItem(i), builderIndex++);
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
// Copy items from the "left" flow.
size_t builderIndex = 0;
for (size_t i = 0; i < InputWidth_; i++) {
if (KeyDrops_.contains(i)) {
continue;
}
AddItem(GetItem(i), builderIndex++);
// Copy items from the "left" flow.
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
// Convert and append items from the "right" dict.
// XXX: Since the keys are copied to the output only from
// the "left" flow, process all values unconditionally.
// Since the keys are copied to the output only from the
// "left" flow, process all values unconditionally.
if constexpr (RightRequired) {
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
AddValue(value.GetElement(i), builderIndex++);
Expand Down Expand Up @@ -175,7 +173,7 @@ class TBlockJoinState : public TBlockState {
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const THashSet<ui32> KeyDrops_;
const TVector<ui32> LeftIOMap_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
Expand All @@ -190,13 +188,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftKeyDrops,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftKeyDrops_(leftKeyDrops.cbegin(), leftKeyDrops.cend())
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -261,7 +259,7 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftKeyDrops_,
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

Expand All @@ -281,7 +279,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const THashSet<ui32> LeftKeyDrops_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand All @@ -295,13 +293,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftKeyDrops,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftKeyDrops_(leftKeyDrops.cbegin(), leftKeyDrops.cend())
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -373,7 +371,7 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftKeyDrops_,
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

Expand Down Expand Up @@ -430,7 +428,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const THashSet<ui32> LeftKeyDrops_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand Down Expand Up @@ -490,11 +488,11 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo

const auto keyDropsLiteral = callable.GetInput(4);
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
TVector<ui32> leftKeyDrops;
THashSet<ui32> leftKeyDrops;
leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
leftKeyDrops.emplace_back(item->AsValue().Get<ui32>());
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
}

const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
Expand All @@ -504,6 +502,15 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo

}

TVector<ui32> leftIOMap;
// XXX: Mind the last wide item, containing block length.
for (size_t i = 0; i < leftFlowItems.size() - 1; i++) {
if (leftKeyDrops.contains(i)) {
continue;
}
leftIOMap.push_back(i);
}

const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);

Expand All @@ -513,33 +520,33 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::Left:
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftSemi:
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftOnly:
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftKeyDrops),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
default:
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"
Expand Down

0 comments on commit 0cb5832

Please sign in to comment.