Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement excess key columns drop for BlockMapJoinCore computation node #9036

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 73 additions & 25 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class TBlockJoinState : public TBlockState {
public:
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems,
const TVector<ui32>& leftIOMap,
const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBlockState(memInfo, outputItems.size())
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, LeftIOMap_(leftIOMap)
, InputsDescr_(ToValueDescr(inputItems))
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
Expand All @@ -54,30 +56,37 @@ class TBlockJoinState : public TBlockState {

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
size_t builderIndex = 0;
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
// Convert and append items from the "right" dict.
// Since the keys are copied to the output only from the
// "left" flow, process all values unconditionally.
if constexpr (RightRequired) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
AddValue(value.GetElement(i), builderIndex++);
}
} else {
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
AddValue(value.GetElement(i), builderIndex++);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
while (builderIndex < OutputWidth_) {
AddValue(value, builderIndex++);
}
}
}
Expand Down Expand Up @@ -164,6 +173,7 @@ class TBlockJoinState : public TBlockState {
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const TVector<ui32> LeftIOMap_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
Expand All @@ -178,12 +188,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -248,7 +259,8 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
Expand All @@ -267,6 +279,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand All @@ -280,12 +293,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -357,7 +371,8 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
Expand Down Expand Up @@ -413,6 +428,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand All @@ -421,7 +437,7 @@ using TState = TBlockJoinState<RightRequired>;
} // namespace

IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");

const auto joinType = callable.GetType()->GetReturnType();
MKQL_ENSURE(joinType->IsFlow(), "Expected WideFlow as a resulting stream");
Expand Down Expand Up @@ -459,16 +475,42 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);

const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto keyColumnsLiteral = callable.GetInput(3);
const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
TVector<ui32> leftKeyColumns;
leftKeyColumns.reserve(tupleLiteral->GetValuesCount());
for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i));
leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount());
for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
// TODO: Handle multi keys.
Y_ENSURE(leftKeyColumns.size() == 1);

const auto keyDropsLiteral = callable.GetInput(4);
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
THashSet<ui32> leftKeyDrops;
leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
}

const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
for (const auto& drop : leftKeyDrops) {
MKQL_ENSURE(leftKeySet.contains(drop),
"Only key columns has to be specified in drop column set");

}

TVector<ui32> leftIOMap;
// XXX: Mind the last wide item, containing block length.
for (size_t i = 0; i < leftFlowItems.size() - 1; i++) {
if (leftKeyDrops.contains(i)) {
continue;
}
leftIOMap.push_back(i);
}

const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);

Expand All @@ -477,28 +519,34 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
case EJoinKind::Inner:
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::Left:
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftSemi:
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftOnly:
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
default:
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"
Expand Down
Loading
Loading