Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dq] Validate self join with map strategy #4613

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct TDqSettings {
static constexpr ui32 CostBasedOptimizationLevel = 0;
static constexpr ui32 MaxDPccpDPTableSize = 16400U;
static constexpr ui64 MaxAttachmentsSize = 2_GB;
static constexpr bool SplitStageOnDqReplicate = true;
};

using TPtr = std::shared_ptr<TDqSettings>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class TLocalExecutor: public TCounters
: State->RandomProvider;

TScopedAlloc alloc(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
State->FunctionRegistry->SupportsSizedAllocators(),
false);
NDq::TDqTaskRunnerContext executionContext;
Expand Down Expand Up @@ -287,7 +287,7 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
}
NDq::EChannelMode mode = GetConfiguredChannelMode(State_, typesCtx);
pipeline->Add(
NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(true), typesCtx, mode),
NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate), typesCtx, mode),
"BuildPhy");
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}
Expand Down
91 changes: 59 additions & 32 deletions ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class TDqExecutionValidator {
ctx.AddError(YqlIssue(ctx.GetPosition(where.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, err));
}

bool ValidateDqStage(const TExprNode& node) {
bool ValidateDqStage(const TExprNode& node, TNodeSet* visitedStages) {
if (visitedStages) {
visitedStages->insert(&node);
}
if (!Visited_.insert(&node).second) {
return true;
}
Expand All @@ -36,11 +39,16 @@ class TDqExecutionValidator {
ReportError(Ctx_, *bad, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage");
}


bool hasMapJoin = false;
VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
[](const TExprNode::TPtr& n) {
return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
},
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
if (TDqPhyMapJoin::Match(n.Get())) {
hasMapJoin = true;
}
if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
hasErrors = true;
Expand All @@ -60,28 +68,48 @@ class TDqExecutionValidator {
}
);

for (auto n: TDqStageBase(&node).Inputs()) {
hasErrors |= !ValidateDqNode(n.Ref());
HasMapJoin_ |= hasMapJoin;
if (hasMapJoin && CheckSelfMapJoin_) {
TNodeSet unitedVisitedStages;
bool nonUniqStages = false;
for (auto n: TDqStageBase(&node).Inputs()) {
TNodeSet inputVisitedStages;
hasErrors |= !ValidateDqNode(n.Ref(), &inputVisitedStages);
const size_t expectedSize = unitedVisitedStages.size() + inputVisitedStages.size();
unitedVisitedStages.insert(inputVisitedStages.begin(), inputVisitedStages.end());
nonUniqStages |= (expectedSize != unitedVisitedStages.size()); // Found duplicates - some stage was visited twice from different inputs
}
if (nonUniqStages) {
ReportError(Ctx_, node, TStringBuilder() << "Cannot execute self join using mapjoin strategy in DQ");
hasErrors = true;
}
if (visitedStages) {
visitedStages->insert(unitedVisitedStages.begin(), unitedVisitedStages.end());
}
} else {
for (auto n: TDqStageBase(&node).Inputs()) {
hasErrors |= !ValidateDqNode(n.Ref(), visitedStages);
}
}

if (auto outs = TDqStageBase(&node).Outputs()) {
for (auto n: outs.Cast()) {
hasErrors |= !ValidateDqNode(n.Ref());
hasErrors |= !ValidateDqNode(n.Ref(), nullptr);
}
}

return !hasErrors;

}

bool ValidateDqNode(const TExprNode& node) {
bool ValidateDqNode(const TExprNode& node, TNodeSet* visitedStages) {
if (node.GetState() == TExprNode::EState::ExecutionComplete) {
return true;
}

if (TDqStageBase::Match(&node)) {
// visited will be updated inside ValidateDqStage
return ValidateDqStage(node);
return ValidateDqStage(node, visitedStages);
}

if (!Visited_.insert(&node).second) {
Expand All @@ -94,10 +122,10 @@ class TDqExecutionValidator {
}

if (TDqConnection::Match(&node)) {
return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref());
return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), TDqCnValue::Match(&node) ? nullptr : visitedStages);
}
if (TDqPhyPrecompute::Match(&node)) {
return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref());
return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), nullptr);
}

if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) {
Expand All @@ -113,14 +141,16 @@ class TDqExecutionValidator {
: TypeCtx_(typeCtx)
, Ctx_(ctx)
, State_(state)
, CheckSelfMapJoin_(!TypeCtx_.ForceDq
&& !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate)
&& !State_->Settings->IsSpillingEnabled())
{}

bool ValidateDqExecution(const TExprNode& node) {
YQL_LOG_CTX_SCOPE(__FUNCTION__);

TNodeSet dqNodes;

bool hasJoin = false;
if (TDqCnResult::Match(&node)) {
dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw());
} else if (TDqQuery::Match(&node)) {
Expand All @@ -142,47 +172,44 @@ class TDqExecutionValidator {
});
}

VisitExpr(node, [&hasJoin](const TExprNode& n) {
if (TMaybeNode<TDqPhyMapJoin>(&n)) {
hasJoin = true;
}
return true;
});

bool hasError = false;

for (const auto n: dqNodes) {
hasError |= !ValidateDqNode(*n);
hasError |= !ValidateDqNode(*n, nullptr);
if (hasError) {
break;
}
}

for (auto& [integration, nodes]: ReadsPerProvider_) {
TMaybe<ui64> size;
hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
if (hasError) {
break;
if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) {
size_t dataSize = 0;
for (auto& [integration, nodes]: ReadsPerProvider_) {
TMaybe<ui64> size;
hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
if (hasError) {
break;
}
dataSize += *size;
}
DataSize_ += *size;
}

if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << DataSize_);
return false;
if (dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << dataSize);
return false;
}
}
return !hasError;
}
private:

const TTypeAnnotationContext& TypeCtx_;
TExprContext& Ctx_;
TNodeSet Visited_;
THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
size_t DataSize_ = 0;
const TDqState::TPtr State_;
const bool CheckSelfMapJoin_;

TNodeSet Visited_;
THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
bool HasMapJoin_ = false;
};
}

Expand Down
31 changes: 31 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,37 @@
"uri": "https://{canondata_backend}/1871102/8fb53a3a81ad5d5949727846153c9f6f58a0845e/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-off-Results_/results.txt"
}
],
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Analyze]": [
{
"checksum": "b59f1264995b5b377a58a392fc1d87c8",
"size": 3938,
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/plan.txt"
},
{
"uri": "file://test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted"
}
],
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Debug]": [
{
"checksum": "5d13bd670d234e8cc6261784c84e9012",
"size": 2195,
"uri": "https://{canondata_backend}/1784826/3baf99fc0c22227fef7f1b91df73370c2e22f014/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Debug_/opt.yql_patched"
}
],
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Plan]": [
{
"checksum": "db2d64e1503f3bfa45bc79d2d1655935",
"size": 5087,
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Plan_/plan.txt"
}
],
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Results]": [
{
"checksum": "568f3e7e0db9008acecc09f8942dd3c2",
"size": 3076,
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Results_/results.txt"
}
],
"test.test[join-three_equalities_paren--Analyze]": [
{
"checksum": "da428dcd6823eacaf44ce47e2c9951b9",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<tmp_path>/program.sql:<main>: Info: DQ cannot execute the query

<tmp_path>/program.sql:<main>: Info: Optimization

<tmp_path>/program.sql:<main>:7:22: Info: Cannot execute self join using mapjoin strategy in DQ
select * from $in as a inner join $in as b on a.key = b.key;
^
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
in Input sorted_uniq.txt

providers dq
pragma dq.HashJoinMode="off";
pragma dq.SplitStageOnDqReplicate="false";
pragma dq.SpillingEngine="disable";
Loading