Skip to content

Commit b81ad6b

Browse files
author
Marcelo Vanzin
committed
Revert "[SPARK-17549][SQL] Revert Only collect table size stat in driver for cached relation."
This reverts commit 9ac68db. Turns out the original fix was correct.
1 parent 2f84a68 commit b81ad6b

File tree

2 files changed

+20
-18
lines changed

2 files changed

+20
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.columnar
1919

20-
import scala.collection.JavaConverters._
21-
2220
import org.apache.commons.lang3.StringUtils
2321

2422
import org.apache.spark.network.util.JavaUtils
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
3129
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3230
import org.apache.spark.sql.execution.SparkPlan
3331
import org.apache.spark.storage.StorageLevel
34-
import org.apache.spark.util.CollectionAccumulator
32+
import org.apache.spark.util.LongAccumulator
3533

3634

3735
object InMemoryRelation {
@@ -63,8 +61,7 @@ case class InMemoryRelation(
6361
@transient child: SparkPlan,
6462
tableName: Option[String])(
6563
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
66-
val batchStats: CollectionAccumulator[InternalRow] =
67-
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
64+
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
6865
extends logical.LeafNode with MultiInstanceRelation {
6966

7067
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -74,21 +71,12 @@ case class InMemoryRelation(
7471
@transient val partitionStatistics = new PartitionStatistics(output)
7572

7673
override lazy val statistics: Statistics = {
77-
if (batchStats.value.isEmpty) {
74+
if (batchStats.value == 0L) {
7875
// Underlying columnar RDD hasn't been materialized, no useful statistics information
7976
// available, return the default statistics.
8077
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
8178
} else {
82-
// Underlying columnar RDD has been materialized, required information has also been
83-
// collected via the `batchStats` accumulator.
84-
val sizeOfRow: Expression =
85-
BindReferences.bindReference(
86-
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
87-
partitionStatistics.schema)
88-
89-
val sizeInBytes =
90-
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
91-
Statistics(sizeInBytes = sizeInBytes)
79+
Statistics(sizeInBytes = batchStats.value.longValue)
9280
}
9381
}
9482

@@ -139,10 +127,10 @@ case class InMemoryRelation(
139127
rowCount += 1
140128
}
141129

130+
batchStats.add(totalSize)
131+
142132
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
143133
.flatMap(_.values))
144-
145-
batchStats.add(stats)
146134
CachedBatch(rowCount, columnBuilders.map { builder =>
147135
JavaUtils.bufferToArray(builder.build())
148136
}, stats)

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
232232
val columnTypes2 = List.fill(length2)(IntegerType)
233233
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
234234
}
235+
236+
test("SPARK-17549: cached table size should be correctly calculated") {
237+
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
238+
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
239+
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
240+
241+
// Materialize the data.
242+
val expectedAnswer = data.collect()
243+
checkAnswer(cached, expectedAnswer)
244+
245+
// Check that the right size was calculated.
246+
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
247+
}
248+
235249
}

0 commit comments

Comments
 (0)