Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_10]
Select Operator [SEL_9] (rows=30 width=520)
Select Operator [SEL_9] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_45] (rows=30 width=336)
Map Join Operator [MAPJOIN_45] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_2._col0, _col1=RS_7._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [CUSTOM_EDGE] llap
MULTICAST [RS_7]
Expand Down Expand Up @@ -175,19 +175,19 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_14]
Select Operator [SEL_13] (rows=10 width=520)
Select Operator [SEL_13] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_49] (rows=10 width=336)
Map Join Operator [MAPJOIN_49] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_2._col0, _col1=RS_11._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [CUSTOM_EDGE] llap
MULTICAST [RS_11]
PartitionCols:_col1
Group By Operator [GBY_8] (rows=1 width=168)
Group By Operator [GBY_8] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_7]
PartitionCols:_col0, _col1
Group By Operator [GBY_6] (rows=1 width=168)
Group By Operator [GBY_6] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_5] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -245,9 +245,9 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_10]
Select Operator [SEL_9] (rows=30 width=520)
Select Operator [SEL_9] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_45] (rows=30 width=336)
Map Join Operator [MAPJOIN_45] (rows=3 width=336)
Conds:SEL_2._col0, _col1=RS_7._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [BROADCAST_EDGE] llap
BROADCAST [RS_7]
Expand Down Expand Up @@ -309,19 +309,19 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_14]
Select Operator [SEL_13] (rows=10 width=520)
Select Operator [SEL_13] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_49] (rows=10 width=336)
Map Join Operator [MAPJOIN_49] (rows=3 width=336)
Conds:SEL_2._col0, _col1=RS_11._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [BROADCAST_EDGE] llap
BROADCAST [RS_11]
PartitionCols:_col0, _col1
Group By Operator [GBY_8] (rows=1 width=168)
Group By Operator [GBY_8] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_7]
PartitionCols:_col0, _col1
Group By Operator [GBY_6] (rows=1 width=168)
Group By Operator [GBY_6] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_5] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -379,9 +379,9 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_54]
Select Operator [SEL_53] (rows=30 width=520)
Select Operator [SEL_53] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_52] (rows=30 width=336)
Map Join Operator [MAPJOIN_52] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_51._col0, _col1=RS_49._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [CUSTOM_EDGE] vectorized, llap
MULTICAST [RS_49]
Expand Down Expand Up @@ -443,19 +443,19 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_61]
Select Operator [SEL_60] (rows=10 width=520)
Select Operator [SEL_60] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_59] (rows=10 width=336)
Map Join Operator [MAPJOIN_59] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_58._col0, _col1=RS_56._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [CUSTOM_EDGE] vectorized, llap
MULTICAST [RS_56]
PartitionCols:_col1
Group By Operator [GBY_55] (rows=1 width=168)
Group By Operator [GBY_55] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] vectorized, llap
SHUFFLE [RS_54]
PartitionCols:_col0, _col1
Group By Operator [GBY_53] (rows=1 width=168)
Group By Operator [GBY_53] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_52] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -513,9 +513,9 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_54]
Select Operator [SEL_53] (rows=30 width=520)
Select Operator [SEL_53] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_52] (rows=30 width=336)
Map Join Operator [MAPJOIN_52] (rows=3 width=336)
Conds:SEL_51._col0, _col1=RS_49._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [BROADCAST_EDGE] vectorized, llap
BROADCAST [RS_49]
Expand Down Expand Up @@ -577,19 +577,19 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_61]
Select Operator [SEL_60] (rows=10 width=520)
Select Operator [SEL_60] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_59] (rows=10 width=336)
Map Join Operator [MAPJOIN_59] (rows=3 width=336)
Conds:SEL_58._col0, _col1=RS_56._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [BROADCAST_EDGE] vectorized, llap
BROADCAST [RS_56]
PartitionCols:_col0, _col1
Group By Operator [GBY_55] (rows=1 width=168)
Group By Operator [GBY_55] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] vectorized, llap
SHUFFLE [RS_54]
PartitionCols:_col0, _col1
Group By Operator [GBY_53] (rows=1 width=168)
Group By Operator [GBY_53] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_52] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down
24 changes: 19 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String col
cs.setNumNulls(csd.getBinaryStats().getNumNulls());
} else if (colTypeLowerCase.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) {
cs.setAvgColLen(JavaDataModel.get().lengthOfTimestamp());
cs.setCountDistint(csd.getTimestampStats().getNumDVs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if this was deliberately not added or an unintended omission. It does seem to improve stats' calculations of multiple .q test files, especially after more conservative NDV handling by PessimisticStatCombiner

cs.setNumNulls(csd.getTimestampStats().getNumNulls());
Long lowVal = (csd.getTimestampStats().getLowValue() != null) ? csd.getTimestampStats().getLowValue()
.getSecondsSinceEpoch() : null;
Expand Down Expand Up @@ -862,6 +863,7 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String col
cs.setHistogram(csd.getDecimalStats().getHistogram());
} else if (colTypeLowerCase.equals(serdeConstants.DATE_TYPE_NAME)) {
cs.setAvgColLen(JavaDataModel.get().lengthOfDate());
cs.setCountDistint(csd.getDateStats().getNumDVs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if this was deliberately not added or an unintended omission. It does seem to improve stats' calculations of multiple .q test files, especially after more conservative NDV handling by PessimisticStatCombiner

cs.setNumNulls(csd.getDateStats().getNumNulls());
Long lowVal = (csd.getDateStats().getLowValue() != null) ? csd.getDateStats().getLowValue()
.getDaysSinceEpoch() : null;
Expand Down Expand Up @@ -2086,11 +2088,7 @@ private static List<Long> extractNDVGroupingColumns(List<ColStatistics> colStats
// compute product of distinct values of grouping columns
for (ColStatistics cs : colStats) {
if (cs != null) {
long ndv = cs.getCountDistint();
if (cs.getNumNulls() > 0) {
ndv = StatsUtils.safeAdd(ndv, 1);
}
ndvValues.add(ndv);
ndvValues.add(getGroupingColumnNdv(cs, parentStats));
} else {
if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
// the column must be an aggregate column inserted by GBY. We
Expand All @@ -2109,4 +2107,20 @@ private static List<Long> extractNDVGroupingColumns(List<ColStatistics> colStats

return ndvValues;
}

private static long getGroupingColumnNdv(ColStatistics cs, Statistics parentStats) {
long ndv = cs.getCountDistint();

if (ndv == 0L) {
// Typically, ndv == 0 means "NDV unknown", and no safe GROUPBY adjustments are possible
// However, there is a special exception for "constant NULL" columns. They are intentionally generated
// with NDV values of 0 and numNulls == numRows, while their actual NDV is 1
if (cs.getNumNulls() >= parentStats.getNumRows()) {
ndv = 1L;
}
} else if (cs.getNumNulls() > 0L) {
ndv = StatsUtils.safeAdd(ndv, 1L);
}
return ndv;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ public void add(ColStatistics stat) {
if (stat.getAvgColLen() > result.getAvgColLen()) {
result.setAvgColLen(stat.getAvgColLen());
}
if (stat.getCountDistint() > result.getCountDistint()) {
result.setCountDistint(stat.getCountDistint());
}

// NDVs can only be accurately combined if full information about columns, query branches and
// their relationships is available. Without that info, there is only one "truly conservative"
// value of NDV which is 0, which means that the NDV is unknown. It forces optimizer
// to make the most conservative decisions possible, which is the exact goal of
// PessimisticStatCombiner. It does inflate statistics in multiple cases, but at the same time it
// also ensures than the query execution does not "blow up" due to too optimistic stats estimates
result.setCountDistint(0L);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could appear counter-intuitive at first, however, when combining statistics of different logical branches of the same column, and having no reliable information about their interdependencies (i.e. in a "truly pessimistic" scenario), every other option appears to introduce undesired under-estimations, which often lead to catastrophic query failures.

For example, a simple column generated by an CASE..WHEN clause with three constants produces an NDV of 1 by the original code, while, in most cases, the "true" NDV is 3. If such a column participates in a GROUP BY condition later on, its estimated number of records naturally becomes "1". Even this seemingly small under-estimation could lead to bad decision of converting to a mapjoin or not, especially over large data sets.

Alternatively, trying to "total up" NDV values of the same columns could cause over-estimation of the true NDV of such a column, which, it its turn, could lead to a severe underestimation of records matching an "IN" filter, ultimately producing equally bad results as the previous case


if (stat.getNumNulls() > result.getNumNulls()) {
result.setNumNulls(stat.getNumNulls());
}
Expand Down
89 changes: 89 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Date;
import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Timestamp;
import org.apache.hadoop.hive.metastore.api.TimestampColumnStatsData;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.plan.ColStatistics.Range;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -244,4 +251,86 @@ static Stream<Arguments> floatingPointStatisticsTestData() {
);
}

@Test
void testGetColStatisticsTimestampType() {
ColumnStatisticsObj cso = new ColumnStatisticsObj();
cso.setColName("ts_col");
cso.setColType(serdeConstants.TIMESTAMP_TYPE_NAME);

TimestampColumnStatsData tsStats = new TimestampColumnStatsData();
tsStats.setNumDVs(35);
tsStats.setNumNulls(5);
tsStats.setLowValue(new Timestamp(1000));
tsStats.setHighValue(new Timestamp(2000));

ColumnStatisticsData data = new ColumnStatisticsData();
data.setTimestampStats(tsStats);
cso.setStatsData(data);

ColStatistics cs = StatsUtils.getColStatistics(cso, "ts_col");

assertNotNull(cs, "ColStatistics should not be null");
assertEquals(35, cs.getCountDistint(), "TIMESTAMP NumDVs should be extracted from metastore stats");
assertEquals(5, cs.getNumNulls(), "NumNulls mismatch");
}

@Test
void testGetColStatisticsDateType() {
ColumnStatisticsObj cso = new ColumnStatisticsObj();
cso.setColName("date_col");
cso.setColType(serdeConstants.DATE_TYPE_NAME);

DateColumnStatsData dateStats = new DateColumnStatsData();
dateStats.setNumDVs(42);
dateStats.setNumNulls(3);
dateStats.setLowValue(new Date(18000));
dateStats.setHighValue(new Date(19000));

ColumnStatisticsData data = new ColumnStatisticsData();
data.setDateStats(dateStats);
cso.setStatsData(data);

ColStatistics cs = StatsUtils.getColStatistics(cso, "date_col");

assertNotNull(cs, "ColStatistics should not be null");
assertEquals(42, cs.getCountDistint(), "DATE NumDVs should be extracted from metastore stats");
assertEquals(3, cs.getNumNulls(), "NumNulls mismatch");
}

private ColStatistics createColStats(String name, long ndv, long numNulls) {
ColStatistics cs = new ColStatistics(name, "string");
cs.setCountDistint(ndv);
cs.setNumNulls(numNulls);
return cs;
}

private Statistics createParentStats(long numRows) {
Statistics stats = new Statistics(numRows, 0, 0, 0);
stats.setColumnStatsState(Statistics.State.COMPLETE);
return stats;
}

@Test
void testComputeNDVGroupingColumnsPartialStats() {
ColStatistics cs = createColStats("partial_stats_col", 0, 100);
Statistics parentStats = createParentStats(1000);
List<ColStatistics> colStats = Collections.singletonList(cs);

long ndv = StatsUtils.computeNDVGroupingColumns(colStats, parentStats, false);

assertEquals(0, ndv, "Partial stats (ndv=0, numNulls<numRows) should return 0, not inflate to 1");
}

@Test
void testComputeNDVGroupingColumnsAllNulls() {
// When ndv=0 and numNulls >= numRows, it's a "constant NULL" column, so NDV should be 1
ColStatistics cs = createColStats("all_nulls_col", 0, 1000);
Statistics parentStats = createParentStats(1000);
List<ColStatistics> colStats = Collections.singletonList(cs);

long ndv = StatsUtils.computeNDVGroupingColumns(colStats, parentStats, false);

assertEquals(1, ndv, "All-null column (ndv=0, numNulls==numRows) should have NDV inflated to 1");
}

}
Loading