Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 66 additions & 28 deletions ydb/core/kqp/opt/kqp_statistics_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,53 @@ void InferStatisticsForReadTable(const TExprNode::TPtr& input, TTypeAnnotationCo
const TKqpOptimizeContext& kqpCtx) {

auto inputNode = TExprBase(input);
double nRows = 0;
int nAttrs = 0;
std::shared_ptr<TOptimizerStatistics> inputStats;

const TExprNode* path;
int nAttrs = 0;
bool readRange = false;

if (auto readTable = inputNode.Maybe<TKqlReadTableBase>()) {
path = readTable.Cast().Table().Path().Raw();
inputStats = typeCtx->GetStats(readTable.Cast().Table().Raw());
nAttrs = readTable.Cast().Columns().Size();

auto range = readTable.Cast().Range();
auto rangeFrom = range.From().Maybe<TKqlKeyTuple>();
auto rangeTo = range.To().Maybe<TKqlKeyTuple>();
if (rangeFrom && rangeTo) {
readRange = true;
}
} else if (auto readRanges = inputNode.Maybe<TKqlReadTableRangesBase>()) {
path = readRanges.Cast().Table().Path().Raw();
inputStats = typeCtx->GetStats(readRanges.Cast().Table().Raw());
nAttrs = readRanges.Cast().Columns().Size();
} else {
Y_ENSURE(false, "Invalid node type for InferStatisticsForReadTable");
}

const auto& tableData = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, path->Content());
int totalAttrs = tableData.Metadata->Columns.size();
nRows = tableData.Metadata->RecordsCount;

double byteSize = tableData.Metadata->DataSize * (nAttrs / (double)totalAttrs);

auto keyColumns = TIntrusivePtr<TOptimizerStatistics::TKeyColumns>(new TOptimizerStatistics::TKeyColumns(tableData.Metadata->KeyColumnNames));
auto stats = std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, nRows, nAttrs, byteSize, 0.0, keyColumns);
if (kqpCtx.Config->OverrideStatistics.Get()) {
stats = OverrideStatistics(*stats, path->Content(), *kqpCtx.Config->OverrideStatistics.Get());
/**
* We need index statistics to calculate this in the future
* Right now we use very small estimates to make sure CBO picks Lookup Joins
* I.e. there can be a chain of lookup joins in OLTP scenario and we want to make
* sure the cardinality doesn't blow up and lookup joins are still being picked
*/
double inputRows = inputStats->Nrows;
double nRows = inputRows;
if (readRange) {
nRows = 1;
}

if (stats->ColumnStatistics) {
for (const auto& [columnName, metaData]: tableData.Metadata->Columns) {
stats->ColumnStatistics->Data[columnName].Type = metaData.Type;
}
}
double sizePerRow = inputStats->ByteSize / (inputRows==0?1:inputRows);
double byteSize = nRows * sizePerRow * (nAttrs / (double)inputStats->Ncols);

YQL_CLOG(TRACE, CoreDq) << "Infer statistics for read table, nrows: " << stats->Nrows << ", nattrs: " << stats->Ncols;
auto stats = std::make_shared<TOptimizerStatistics>(
EStatisticsType::BaseTable,
nRows,
nAttrs,
byteSize,
0.0,
inputStats->KeyColumns,
inputStats->ColumnStatistics);

YQL_CLOG(TRACE, CoreDq) << "Infer statistics for read table, nrows: " << stats->Nrows << ", nattrs: " << stats->Ncols << ", byteSize: " << stats->ByteSize;

typeCtx->SetStats(input.Get(), stats);
}
Expand All @@ -81,7 +94,7 @@ void InferStatisticsForKqpTable(const TExprNode::TPtr& input, TTypeAnnotationCon
stats = OverrideStatistics(*stats, path.Value(), *kqpCtx.Config->OverrideStatistics.Get());
}

YQL_CLOG(TRACE, CoreDq) << "Infer statistics for table: " << path.Value() << ", nrows: " << stats->Nrows << ", nattrs: " << stats->Ncols << ", nKeyColumns: " << stats->KeyColumns->Data.size();
YQL_CLOG(TRACE, CoreDq) << "Infer statistics for table: " << path.Value() << ", nrows: " << stats->Nrows << ", nattrs: " << stats->Ncols << ", byteSize: " << stats->ByteSize << ", nKeyColumns: " << stats->KeyColumns->Data.size();

typeCtx->SetStats(input.Get(), stats);
}
Expand All @@ -103,7 +116,14 @@ void InferStatisticsForSteamLookup(const TExprNode::TPtr& input, TTypeAnnotation
auto inputStats = typeCtx->GetStats(streamLookup.Table().Raw());
auto byteSize = inputStats->ByteSize * (nAttrs / (double) inputStats->Ncols);

typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, inputStats->Nrows, nAttrs, byteSize, 0, inputStats->KeyColumns));
typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(
EStatisticsType::BaseTable,
inputStats->Nrows,
nAttrs,
byteSize,
0,
inputStats->KeyColumns,
inputStats->ColumnStatistics));
}

/**
Expand Down Expand Up @@ -134,7 +154,14 @@ void InferStatisticsForLookupTable(const TExprNode::TPtr& input, TTypeAnnotation
byteSize = 10;
}

typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, nRows, nAttrs, byteSize, 0, inputStats->KeyColumns));
typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(
EStatisticsType::BaseTable,
nRows,
nAttrs,
byteSize,
0,
inputStats->KeyColumns,
inputStats->ColumnStatistics));
}

/**
Expand All @@ -151,7 +178,8 @@ void InferStatisticsForRowsSourceSettings(const TExprNode::TPtr& input, TTypeAnn
return;
}

double nRows = inputStats->Nrows;
double inputRows = inputStats->Nrows;
double nRows = inputRows;

// Check if we have a range expression, in that case just assign a single row to this read
// We don't currently check the size of an index lookup
Expand All @@ -165,10 +193,19 @@ void InferStatisticsForRowsSourceSettings(const TExprNode::TPtr& input, TTypeAnn
}

int nAttrs = sourceSettings.Columns().Size();

double sizePerRow = inputStats->ByteSize / (inputRows==0?1:inputRows);
double byteSize = nRows * sizePerRow * (nAttrs / (double)inputStats->Ncols);
double cost = inputStats->Cost;
double byteSize = inputStats->ByteSize * (nAttrs / (double)inputStats->Ncols);

typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, nRows, nAttrs, byteSize, cost, inputStats->KeyColumns));
typeCtx->SetStats(input.Get(), std::make_shared<TOptimizerStatistics>(
EStatisticsType::BaseTable,
nRows,
nAttrs,
byteSize,
cost,
inputStats->KeyColumns,
inputStats->ColumnStatistics));
}

/**
Expand Down Expand Up @@ -199,7 +236,8 @@ void InferStatisticsForReadTableIndexRanges(const TExprNode::TPtr& input, TTypeA
inputStats->Ncols,
inputStats->ByteSize,
inputStats->Cost,
indexColumnsPtr);
indexColumnsPtr,
inputStats->ColumnStatistics);

typeCtx->SetStats(input.Get(), stats);

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/ut/join/data/join_order/tpcc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"op_name": "InnerJoin (Map)",
"args": [
{
"op_name": "TableLookup",
"table": "stock"
},
{
"op_name": "TableRangeScan",
"table": "order_line"
}
]
}
82 changes: 41 additions & 41 deletions ydb/core/kqp/ut/join/data/join_order/tpch9_1000s.json
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
{
"op_name": "InnerJoin (Grace)",
"op_name": "InnerJoin (MapJoin)",
"args": [
{
"op_name": "InnerJoin (Grace)",
"args": [
{
"op_name": "TableFullScan",
"table": "orders"
},
{
{
"op_name": "InnerJoin (Grace)",
"args": [
{
"op_name": "TableFullScan",
"table": "lineitem"
},
{
"op_name": "InnerJoin (Grace)",
"args": [
{
{
"op_name": "TableFullScan",
"table": "partsupp"
},
{
"table": "orders"
},
{
"op_name": "InnerJoin (Grace)",
"args": [
{
"op_name": "TableFullScan",
"table": "lineitem"
},
{
"op_name": "InnerJoin (MapJoin)",
"args": [
{
"op_name": "TableFullScan",
"table": "partsupp"
},
{
"op_name": "TableFullScan",
"table": "part"
}
]
}
]
}
]
},
{
"op_name": "InnerJoin (MapJoin)",
"args": [
{
"op_name": "TableFullScan",
"table": "supplier"
},
{
"op_name": "TableFullScan",
"table": "part"
}
]
}
"table": "nation"
}
]
}
]
},
{
"op_name": "InnerJoin (MapJoin)",
"args": [
{
"op_name": "TableFullScan",
"table": "supplier"
},
{
"op_name": "TableFullScan",
"table": "nation"
}
]
}
}
]
}
}
8 changes: 8 additions & 0 deletions ydb/core/kqp/ut/join/data/queries/tpcc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT COUNT(DISTINCT (s.S_I_ID)) AS STOCK_COUNT
FROM `/Root/test/tpcc/order_line` as ol INNER JOIN `/Root/test/tpcc/stock` as s ON s.S_I_ID = ol.OL_I_ID
WHERE ol.OL_W_ID = 1
AND ol.OL_D_ID = 10
AND ol.OL_O_ID < 3000
AND ol.OL_O_ID >= 2900
AND s.S_W_ID = 1
AND s.S_QUANTITY < 15
Loading