diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index f06544ea8ed0..eebe6ad2e794 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -40,6 +40,9 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + // NOTE: coordinator can be null after serialization/deserialization, + // e.g. it can be null on the Executor side + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) @@ -47,7 +50,7 @@ case class ShuffleExchange( val extraInfo = coordinator match { case Some(exchangeCoordinator) => s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case None => "" + case _ => "" } val simpleNodeName = "Exchange" @@ -70,7 +73,7 @@ case class ShuffleExchange( // the plan. coordinator match { case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case None => + case _ => } } @@ -117,7 +120,7 @@ case class ShuffleExchange( val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD - case None => + case _ => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) }