diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index e4c3a232f4b692..2af4de40e75b9a 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -97,7 +97,6 @@ public final class SparkDpp implements java.io.Serializable { private SparkSession spark = null; private EtlJobConfig etlJobConfig = null; private LongAccumulator abnormalRowAcc = null; - private LongAccumulator unselectedRowAcc = null; private LongAccumulator scannedRowsAcc = null; private LongAccumulator fileNumberAcc = null; private LongAccumulator fileSizeAcc = null; @@ -120,7 +119,6 @@ public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) { public void init() { abnormalRowAcc = spark.sparkContext().longAccumulator("abnormalRowAcc"); - unselectedRowAcc = spark.sparkContext().longAccumulator("unselectedRowAcc"); scannedRowsAcc = spark.sparkContext().longAccumulator("scannedRowsAcc"); fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc"); fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc"); @@ -854,6 +852,10 @@ private Dataset loadDataFromHiveTable(SparkSession spark, sql.append(column.columnName).append(","); }); sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName); + if (!Strings.isNullOrEmpty(fileGroup.where)) { + sql.append(" where ").append(fileGroup.where); + } + Dataset dataframe = spark.sql(sql.toString()); dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode, dstTableSchema); @@ -988,12 +990,6 @@ private void process() throws Exception { LOG.info("no data for file file group:" + fileGroup); continue; } - if (!Strings.isNullOrEmpty(fileGroup.where)) { - long originalSize = fileGroupDataframe.count(); - fileGroupDataframe = fileGroupDataframe.filter(fileGroup.where); - long currentSize = fileGroupDataframe.count(); - unselectedRowAcc.add(currentSize - originalSize); - } JavaPairRDD, Object[]> ret = fillTupleWithPartitionColumn( fileGroupDataframe,