Skip to content

Commit

Permalink
Merge 0cb5832 into 3a1878b
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin authored Sep 12, 2024
2 parents 3a1878b + 0cb5832 commit eef889f
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 36 deletions.
98 changes: 73 additions & 25 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class TBlockJoinState : public TBlockState {
public:
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems,
const TVector<ui32>& leftIOMap,
const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBlockState(memInfo, outputItems.size())
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, LeftIOMap_(leftIOMap)
, InputsDescr_(ToValueDescr(inputItems))
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
Expand All @@ -54,30 +56,37 @@ class TBlockJoinState : public TBlockState {

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
size_t builderIndex = 0;
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
// Use the mapping from input fields to output ones to
// produce a tight loop to copy row items.
for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
AddItem(GetItem(LeftIOMap_[i]), i);
}
// Convert and append items from the "right" dict.
// Since the keys are copied to the output only from the
// "left" flow, process all values unconditionally.
if constexpr (RightRequired) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
AddValue(value.GetElement(i), builderIndex++);
}
} else {
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
for (size_t i = 0; builderIndex < OutputWidth_; i++) {
AddValue(value.GetElement(i), builderIndex++);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
while (builderIndex < OutputWidth_) {
AddValue(value, builderIndex++);
}
}
}
Expand Down Expand Up @@ -164,6 +173,7 @@ class TBlockJoinState : public TBlockState {
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const TVector<ui32> LeftIOMap_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
Expand All @@ -178,12 +188,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -248,7 +259,8 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
Expand All @@ -267,6 +279,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand All @@ -280,12 +293,13 @@ using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
IComputationWideFlowNode* flow, IComputationNode* dict)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Boxed)
, ResultJoinItems_(std::move(resultJoinItems))
, LeftFlowItems_(std::move(leftFlowItems))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(leftIOMap)
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
Expand Down Expand Up @@ -357,7 +371,8 @@ using TState = TBlockJoinState<RightRequired>;
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, LeftIOMap_,
ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}

TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
Expand Down Expand Up @@ -413,6 +428,7 @@ using TState = TBlockJoinState<RightRequired>;
const TVector<TType*> ResultJoinItems_;
const TVector<TType*> LeftFlowItems_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
Expand All @@ -421,7 +437,7 @@ using TState = TBlockJoinState<RightRequired>;
} // namespace

IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");

const auto joinType = callable.GetType()->GetReturnType();
MKQL_ENSURE(joinType->IsFlow(), "Expected WideFlow as a resulting stream");
Expand Down Expand Up @@ -459,16 +475,42 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);

const auto tupleLiteral = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto keyColumnsLiteral = callable.GetInput(3);
const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
TVector<ui32> leftKeyColumns;
leftKeyColumns.reserve(tupleLiteral->GetValuesCount());
for (ui32 i = 0; i < tupleLiteral->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, tupleLiteral->GetValue(i));
leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount());
for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
// TODO: Handle multi keys.
Y_ENSURE(leftKeyColumns.size() == 1);

const auto keyDropsLiteral = callable.GetInput(4);
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
THashSet<ui32> leftKeyDrops;
leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
}

const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
for (const auto& drop : leftKeyDrops) {
MKQL_ENSURE(leftKeySet.contains(drop),
"Only key columns has to be specified in drop column set");

}

TVector<ui32> leftIOMap;
// XXX: Mind the last wide item, containing block length.
for (size_t i = 0; i < leftFlowItems.size() - 1; i++) {
if (leftKeyDrops.contains(i)) {
continue;
}
leftIOMap.push_back(i);
}

const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);

Expand All @@ -477,28 +519,34 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
case EJoinKind::Inner:
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::Left:
if (isMulti) {
return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
}
return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftSemi:
return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
case EJoinKind::LeftOnly:
return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
std::move(joinItems), std::move(leftFlowItems), std::move(leftKeyColumns),
std::move(joinItems), std::move(leftFlowItems),
std::move(leftKeyColumns), std::move(leftIOMap),
static_cast<IComputationWideFlowNode*>(flow), dict);
default:
MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"
Expand Down
Loading

0 comments on commit eef889f

Please sign in to comment.