From 8731588d28f302e51095f7ed1a4331edc5233958 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 15 Oct 2018 10:44:39 -0700 Subject: [PATCH 1/2] fix --- .../sql/execution/datasources/FileScanRDD.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index dd3c154259c73..72fc2b3e4461c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -85,7 +85,7 @@ class FileScanRDD( // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). - private def updateBytesRead(): Unit = { + private def incTaskInputMetricsBytesRead(): Unit = { inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) } @@ -107,14 +107,15 @@ class FileScanRDD( val preNumRecordsRead = inputMetrics.recordsRead if (nextElement.isInstanceOf[ColumnarBatch]) { inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + incTaskInputMetricsBytesRead() } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } inputMetrics.incRecordsRead(1) } - // The records may be incremented by more than 1 at a time. - if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS != - inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) { - updateBytesRead() - } nextElement } @@ -201,7 +202,7 @@ class FileScanRDD( } override def close(): Unit = { - updateBytesRead() + incTaskInputMetricsBytesRead() InputFileBlockHolder.unset() } } From 7c3fd54feabd69320579626819e8083e025e6429 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 15 Oct 2018 11:01:27 -0700 Subject: [PATCH 2/2] reorder it --- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 72fc2b3e4461c..ffea33c08ef94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -106,8 +106,8 @@ class FileScanRDD( // don't need to run this `if` for every record. val preNumRecordsRead = inputMetrics.recordsRead if (nextElement.isInstanceOf[ColumnarBatch]) { - inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) } else { // too costly to update every record if (inputMetrics.recordsRead %