Skip to content

Commit 2edf17e

Browse files
onursaticigatorsmile
authored andcommitted
[SPARK-24850][SQL] fix str representation of CachedRDDBuilder
## What changes were proposed in this pull request? As of #21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan. ## How was this patch tested? spark-shell, query: ``` var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache() 0 to 1 foreach { _ => df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache() } df_cached.explain ``` as of master results in: ``` == Physical Plan == InMemoryTableScan [A#10, B#11, B#35, B#87] +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35] +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#34) +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35] +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#34) +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ``` with this patch results in: ``` == Physical Plan == InMemoryTableScan [A#10, B#11, B#35, B#87] +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)) +- *(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string> ``` Author: Onur Satici <osatici@palantir.com> Closes #21805 from onursatici/os/inmemoryrelation-str.
1 parent 08e315f commit 2edf17e

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
2929
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics}
3030
import org.apache.spark.sql.execution.SparkPlan
3131
import org.apache.spark.storage.StorageLevel
32-
import org.apache.spark.util.LongAccumulator
32+
import org.apache.spark.util.{LongAccumulator, Utils}
3333

3434

3535
/**
@@ -207,4 +207,7 @@ case class InMemoryRelation(
207207
}
208208

209209
override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
210+
211+
override def simpleString: String =
212+
s"InMemoryRelation [${Utils.truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"
210213
}

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,15 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
206206
// first time use, load cache
207207
checkDataset(df5, Row(10))
208208
}
209+
210+
test("SPARK-24850 InMemoryRelation string representation does not include cached plan") {
211+
val df = Seq(1).toDF("a").cache()
212+
val outputStream = new java.io.ByteArrayOutputStream()
213+
Console.withOut(outputStream) {
214+
df.explain(false)
215+
}
216+
assert(outputStream.toString.replaceAll("#\\d+", "#x").contains(
217+
"InMemoryRelation [a#x], StorageLevel(disk, memory, deserialized, 1 replicas)"
218+
))
219+
}
209220
}

0 commit comments

Comments
 (0)