From bf6b6ade86f2be826b8620a85abbab2c01c62b79 Mon Sep 17 00:00:00 2001 From: jiake Date: Mon, 29 Jul 2019 11:44:05 +0800 Subject: [PATCH 01/11] change the shuffle read to local shuffle read when the sort merge join conveted to broadcast join in runtime --- .../org/apache/spark/MapOutputTracker.scala | 140 +++++++++++++++++- .../shuffle/BlockStoreShuffleReader.scala | 21 ++- .../apache/spark/shuffle/ShuffleManager.scala | 14 ++ .../shuffle/sort/SortShuffleManager.scala | 23 +++ .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../adaptive/AdaptiveSparkPlanExec.scala | 1 + .../adaptive/LocalShuffledRowRDD.scala | 118 +++++++++++++++ .../OptimizedLocalShuffleReader.scala | 44 ++++++ .../adaptive/ReduceNumShufflePartitions.scala | 27 ++-- .../exchange/ShuffleExchangeExec.scala | 5 + 10 files changed, 390 insertions(+), 13 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3b0062efeff01..ce6ec21ad5187 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -33,7 +33,7 @@ import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} -import org.apache.spark.scheduler.MapStatus +import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus} import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} import org.apache.spark.util._ @@ -337,6 +337,22 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + /** + * Called from executors to get the server URIs and output sizes for each shuffle block that + * needs to be read from a given range of map output partitions (startPartition is included but + * endPartition is excluded from the range) and a given start map Id and end map Id. + * + * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, + * and the second item is a sequence of (shuffle block id, shuffle block size, map index) + * tuples describing the shuffle blocks that are stored at that block manager. + */ + def getMapSizesByExecutorId( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + startMapId: Int, + endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + /** * Deletes map output status information for the specified shuffle stage. */ @@ -668,6 +684,35 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id + * @return a sequence of locations that each includes both a host and an executor id on that + * host. + */ + def getMapLocation(dep: ShuffleDependency[_, _, _], startMapId: Int, endMapId: Int): Seq[String] = + { + val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull + if (shuffleStatus != null) { + shuffleStatus.withMapStatuses { statuses => + if (startMapId >= 0 && endMapId <= statuses.length) { + val statusesPicked = statuses.slice(startMapId, endMapId).filter(_ != null) + statusesPicked.map { status => + ExecutorCacheTaskLocation(status.location.host, status.location.executorId).toString + } + } else { + Nil + } + } + } else { + Nil + } + } + def incrementEpoch(): Unit = { epochLock.synchronized { epoch += 1 @@ -701,6 +746,31 @@ private[spark] class MapOutputTrackerMaster( } } + override def getMapSizesByExecutorId( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + startMapId: Int, + endMapId: Int) + : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + logDebug(s"Fetching outputs for shuffle $shuffleId, startMapId $startMapId endMapId $endMapId" + + s"partitions $startPartition-$endPartition") + shuffleStatuses.get(shuffleId) match { + case Some (shuffleStatus) => + shuffleStatus.withMapStatuses { statuses => + MapOutputTracker.convertMapStatuses( + shuffleId, + startPartition, + endPartition, + statuses, + startMapId, + endMapId) + } + case None => + Iterator.empty + } + } + override def stop(): Unit = { mapOutputRequests.offer(PoisonPill) threadpool.shutdown() @@ -746,6 +816,26 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getMapSizesByExecutorId( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + startMapId: Int, + endMapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + logDebug(s"Fetching outputs for shuffle $shuffleId, startMapId $startMapId endMapId $endMapId" + + s"partitions $startPartition-$endPartition") + val statuses = getStatuses(shuffleId) + try { + MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, + startMapId, endMapId) + } catch { + case e: MetadataFetchFailedException => + // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: + mapStatuses.clear() + throw e + } + } + /** * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize * on this array when reading it, because on the driver, we may be changing it in place. @@ -908,4 +998,50 @@ private[spark] object MapOutputTracker extends Logging { } splitsByAddress.iterator } + + /** + * Given an array of map statuses, the start map Id, end map Id and a range of map output + * partitions, returns a sequence that, lists the shuffle block IDs and corresponding shuffle + * block sizes stored at that block manager. + * + * If the status of the map is null (indicating a missing location due to a failed mapper), + * throws a FetchFailedException. + * + * @param shuffleId Identifier for the shuffle + * @param startPartition Start of map output partition ID range (included in range) + * @param endPartition End of map output partition ID range (excluded from range) + * @param statuses List of map statuses, indexed by map ID. + * @param startMapId Start of map Id range (included in range) + * @param endMapId End of map Id (excluded from range) + * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, + * and the second item is a sequence of (shuffle block ID, shuffle block size, map index) + * tuples describing the shuffle blocks that are stored at that block manager. + */ + def convertMapStatuses( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + statuses: Array[MapStatus], + startMapId: Int, + endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + assert (statuses != null && statuses.length >= endMapId && startMapId >= 0) + val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long, Int)]] + for (mapIndex <- startMapId until endMapId) { + val status = statuses(mapIndex) + if (status == null) { + val errorMessage = s"Missing an output location for shuffle $shuffleId" + logError(errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + } else { + for (part <- startPartition until endPartition) { + val size = status.getSizeForBlock(part) + if (size != 0) { + splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += + ((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex)) + } + } + } + } + splitsByAddress.toIterator + } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 4329824b1b627..ccf911f6ec1a9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -36,18 +36,35 @@ private[spark] class BlockStoreShuffleReader[K, C]( readMetrics: ShuffleReadMetricsReporter, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, - mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) + mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, + startMapId: Option[Int] = None, + endMapId: Option[Int] = None) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { + val blocksByAddress = (startMapId, endMapId) match { + case (Some(startId), Some(endId)) => mapOutputTracker.getMapSizesByExecutorId( + handle.shuffleId, + startPartition, + endPartition, + startId, + endId) + case (None, None) => mapOutputTracker.getMapSizesByExecutorId( + handle.shuffleId, + startPartition, + endPartition) + case (_, _) => throw new IllegalArgumentException( + "startMapId and endMapId should be both set or unset") + } + val wrappedStreams = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), + blocksByAddress, serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024, diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index a717ef242ea7c..2f460971ea924 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -54,6 +54,20 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from map output (startMapId to endMapId - 1, inclusive). + * Called on executors by reduce tasks. + */ + def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter, + startMapId: Int, + endMapId: Int): ShuffleReader[K, C] + /** * Remove a shuffle's metadata from the ShuffleManager. * @return true if the metadata removed successfully, otherwise false. diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index d96bcb3d073df..dd3be5873867e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -127,6 +127,29 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager startPartition, endPartition, context, metrics) } + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from map output (startMapId to endMapId - 1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter, + startMapId: Int, + endMapId: Int): ShuffleReader[K, C] = { + new BlockStoreShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], + startPartition, + endPartition, + context, + metrics, + startMapId = Some(startMapId), + endMapId = Some(endMapId)) + } + /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V]( handle: ShuffleHandle, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bcb3153a3ca48..4da7003d77b1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -394,6 +394,14 @@ object SQLConf { "must be a positive integer.") .createOptional + val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED = + buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled") + .doc("When true and adaptive execution is enabled, this enables the optimization of" + + " converting the shuffle reader to local shuffle reader for the shuffle exchange" + + " of the broadcast hash join in probe side.") + .booleanConf + .createWithDefault(true) + val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") .internal() @@ -2146,6 +2154,8 @@ class SQLConf extends Serializable with Logging { def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) + def optimizedLocalShuffleReaderEnabled: Boolean = getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED) + def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 524cacc11484c..5bebde9a35009 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -92,6 +92,7 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), ReduceNumShufflePartitions(conf), + OptimizedLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf, session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala new file mode 100644 index 0000000000000..d2f629c48e340 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], + specifiedPartitionStartIndices: Option[Array[Int]] = None, + specifiedPartitionEndIndices: Option[Array[Int]] = None) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions + private[this] val numPostShufflePartitions = dependency.rdd.partitions.length + + private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match { + case Some(indices) => indices + case None => Array(0) + } + + private[this] val partitionEndIndices: Array[Int] = specifiedPartitionEndIndices match { + case Some(indices) => indices + case None if specifiedPartitionStartIndices.isEmpty => Array(numPreShufflePartitions) + case _ => specifiedPartitionStartIndices.get.drop(1) :+ numPreShufflePartitions + } + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override def getPartitions: Array[Partition] = { + assert(partitionStartIndices.length == partitionEndIndices.length) + Array.tabulate[Partition](numPostShufflePartitions) { i => + new ShuffledRDDPartition(i) + } + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] + tracker.getMapLocation(dep, partition.index, partition.index + 1) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val shuffledRowPartition = split.asInstanceOf[ShuffledRDDPartition] + val mapId = shuffledRowPartition.index + val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() + // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, + // as well as the `tempMetrics` for basic shuffle metrics. + val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) + // Connect the the InternalRows read by each ShuffleReader + new Iterator[InternalRow] { + val readers = partitionStartIndices.zip(partitionEndIndices).map { case (start, end) => + SparkEnv.get.shuffleManager.getReader( + dependency.shuffleHandle, + start, + end, + context, + sqlMetricsReporter, + mapId, + mapId + 1) + } + + var i = 0 + var iter = readers(i).read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) + + override def hasNext = { + while (iter.hasNext == false && i + 1 <= readers.length - 1) { + i += 1 + iter = readers(i).read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) + } + iter.hasNext + } + + override def next() = iter.next() + } + } + + override def clearDependencies() { + super.clearDependencies() + dependency = null + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala new file mode 100644 index 0000000000000..9101f3071282f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan + } + + plan.transformUp { + case bhj: BroadcastHashJoinExec => + bhj.buildSide match { + case BuildLeft if (bhj.right.isInstanceOf[CoalescedShuffleReaderExec]) => + bhj.right.asInstanceOf[CoalescedShuffleReaderExec].isLocal = true + case BuildRight if (bhj.left.isInstanceOf[CoalescedShuffleReaderExec]) => + bhj.left.asInstanceOf[CoalescedShuffleReaderExec].isLocal = true + case _ => None + } + bhj + } + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 1a85d5c02075b..bc927d8625901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.adaptive.rule import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration - import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -27,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.adaptive.{LocalShuffledRowRDD, QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -180,7 +179,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { case class CoalescedShuffleReaderExec( child: QueryStageExec, - partitionStartIndices: Array[Int]) extends UnaryExecNode { + partitionStartIndices: Array[Int], + var isLocal: Boolean = false) extends UnaryExecNode { override def output: Seq[Attribute] = child.output @@ -190,15 +190,24 @@ case class CoalescedShuffleReaderExec( UnknownPartitioning(partitionStartIndices.length) } - private var cachedShuffleRDD: ShuffledRowRDD = null + private var cachedShuffleRDD: RDD[InternalRow] = null override protected def doExecute(): RDD[InternalRow] = { if (cachedShuffleRDD == null) { - cachedShuffleRDD = child match { - case stage: ShuffleQueryStageExec => - stage.plan.createShuffledRDD(Some(partitionStartIndices)) - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.createShuffledRDD(Some(partitionStartIndices)) + cachedShuffleRDD = if (isLocal) { + child match { + case stage: ShuffleQueryStageExec => + stage.plan.createLocalShuffleRDD(Some(partitionStartIndices)) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createLocalShuffleRDD(Some(partitionStartIndices)) + } + } else { + child match { + case stage: ShuffleQueryStageExec => + stage.plan.createShuffledRDD(Some(partitionStartIndices)) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createShuffledRDD(Some(partitionStartIndices)) + } } } cachedShuffleRDD diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 2f4c5734469f8..d54d04ecd6989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.LocalShuffledRowRDD import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -82,6 +83,10 @@ case class ShuffleExchangeExec( new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) } + def createLocalShuffleRDD(partitionStartIndices: Option[Array[Int]]): LocalShuffledRowRDD = { + new LocalShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) + } + /** * Caches the created ShuffleRowRDD so we can reuse that. */ From 3b3d6ce79346de7263292101fdccd6a4ffd11553 Mon Sep 17 00:00:00 2001 From: jiake Date: Fri, 2 Aug 2019 14:24:10 +0800 Subject: [PATCH 02/11] fix the can't zip rdd with unequal numbers of partitions and resolve the comments --- .../OptimizedLocalShuffleReader.scala | 29 ++++++++++++++++++- .../adaptive/ReduceNumShufflePartitions.scala | 17 +++++++++-- .../adaptive/AdaptiveQueryExecSuite.scala | 16 ++++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala index 9101f3071282f..86375b0ddd02b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} import org.apache.spark.sql.internal.SQLConf @@ -40,5 +41,31 @@ case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { } bhj } + + val afterEnsureRequirements = EnsureRequirements(conf).apply(plan) + val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e + }.length + if (numExchanges > 0) { + logWarning("Local shuffle reader optimization is not applied due" + + " to additional shuffles will be introduced.") + revertLocalShuffleReader(plan) + } else { + plan + } + } + private def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case bhj: BroadcastHashJoinExec => + bhj.buildSide match { + case BuildLeft if (bhj.right.isInstanceOf[CoalescedShuffleReaderExec]) => + bhj.right.asInstanceOf[CoalescedShuffleReaderExec].isLocal = false + case BuildRight if (bhj.left.isInstanceOf[CoalescedShuffleReaderExec]) => + bhj.left.asInstanceOf[CoalescedShuffleReaderExec].isLocal = false + case _ => None + } + bhj + } + plan } -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index bc927d8625901..c3899d3985bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -19,14 +19,15 @@ package org.apache.spark.sql.execution.adaptive.rule import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration + import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.adaptive.{LocalShuffledRowRDD, QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -187,7 +188,17 @@ case class CoalescedShuffleReaderExec( override def doCanonicalize(): SparkPlan = child.canonicalized override def outputPartitioning: Partitioning = { - UnknownPartitioning(partitionStartIndices.length) + if (isLocal) { + val numPartitions = child match { + case stage: ShuffleQueryStageExec => + stage.plan.shuffleDependency.rdd.partitions.length + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.shuffleDependency.rdd.partitions.length + } + UnknownPartitioning(numPartitions) + } else { + UnknownPartitioning(partitionStartIndices.length) + } } private var cachedShuffleRDD: RDD[InternalRow] = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 37b106c3ea530..5a86915222ab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -355,6 +355,22 @@ class AdaptiveQueryExecSuite } } + test("Change merge join to broadcast join and optimize the shuffle" + + " reader to local shuffle reader") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30") { + runAdaptiveAndVerifyResult( + """ + |SELECT * FROM testData t1 join testData2 t2 + |ON t1.key = t2.a join testData3 t3 on t2.a = t3.a + |where t1.value = 1 + """.stripMargin + ) + } + } + test("Avoid changing merge join to broadcast join if too many empty partitions on build plan") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", From bdce1c3c69aaf553438bdd2288ffdcf85499f971 Mon Sep 17 00:00:00 2001 From: jiake Date: Wed, 18 Sep 2019 15:03:18 +0800 Subject: [PATCH 03/11] move the rule of converting the shuffle reader to local shuffle reader before ReduceNumShufflePartitions --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../OptimizedLocalShuffleReader.scala | 107 +++++++++++++----- .../execution/adaptive/QueryStageExec.scala | 3 +- .../adaptive/ReduceNumShufflePartitions.scala | 38 ++----- .../exchange/ShuffleExchangeExec.scala | 4 +- .../adaptive/AdaptiveQueryExecSuite.scala | 1 + 7 files changed, 97 insertions(+), 60 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4da7003d77b1f..9db16f75e8fe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -400,7 +400,7 @@ object SQLConf { " converting the shuffle reader to local shuffle reader for the shuffle exchange" + " of the broadcast hash join in probe side.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 5bebde9a35009..2dce6e96ded5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -91,8 +91,8 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), - ReduceNumShufflePartitions(conf), OptimizedLocalShuffleReader(conf), + ReduceNumShufflePartitions(conf), ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf, session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala index 86375b0ddd02b..ed8a44362e17b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala @@ -17,55 +17,108 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} import org.apache.spark.sql.internal.SQLConf case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { + shuffleStage match { + case stage: ShuffleQueryStageExec => + stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.isLocalShuffle = false + } + shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { + val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec + if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => + setIsLocalToFalse(localReader.child) + } + revertPlan + } + override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.optimizedLocalShuffleReaderEnabled) { + // Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. + val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj + } + + if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { return plan } - plan.transformUp { + // If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true + bhjs.map { case bhj: BroadcastHashJoinExec => - bhj.buildSide match { - case BuildLeft if (bhj.right.isInstanceOf[CoalescedShuffleReaderExec]) => - bhj.right.asInstanceOf[CoalescedShuffleReaderExec].isLocal = true - case BuildRight if (bhj.left.isInstanceOf[CoalescedShuffleReaderExec]) => - bhj.left.asInstanceOf[CoalescedShuffleReaderExec].isLocal = true - case _ => None + bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.isLocalShuffle = true + case plan: SparkPlan => plan } - bhj } - val afterEnsureRequirements = EnsureRequirements(conf).apply(plan) + // Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true + val newPlan = plan.transformUp { + case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => + LocalShuffleReaderExec(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => + LocalShuffleReaderExec(stage) + } + + val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan) val numExchanges = afterEnsureRequirements.collect { case e: ShuffleExchangeExec => e }.length if (numExchanges > 0) { logWarning("Local shuffle reader optimization is not applied due" + " to additional shuffles will be introduced.") - revertLocalShuffleReader(plan) + revertLocalShuffleReader(newPlan) } else { - plan + newPlan } } - private def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { - plan.transformUp { - case bhj: BroadcastHashJoinExec => - bhj.buildSide match { - case BuildLeft if (bhj.right.isInstanceOf[CoalescedShuffleReaderExec]) => - bhj.right.asInstanceOf[CoalescedShuffleReaderExec].isLocal = false - case BuildRight if (bhj.left.isInstanceOf[CoalescedShuffleReaderExec]) => - bhj.left.asInstanceOf[CoalescedShuffleReaderExec].isLocal = false - case _ => None - } - bhj +} + +case class LocalShuffleReaderExec( + child: QueryStageExec) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + val numPartitions = child match { + case stage: ShuffleQueryStageExec => + stage.plan.shuffleDependency.rdd.partitions.length + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.shuffleDependency.rdd.partitions.length + } + UnknownPartitioning(numPartitions) + } + + private var cachedShuffleRDD: RDD[InternalRow] = null + + override protected def doExecute(): RDD[InternalRow] = { + if (cachedShuffleRDD == null) { + cachedShuffleRDD = child match { + case stage: ShuffleQueryStageExec => + stage.plan.createLocalShuffleRDD() + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createLocalShuffleRDD() + } } - plan + cachedShuffleRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 231fffce3360b..2e295bd6401b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -129,7 +129,8 @@ abstract class QueryStageExec extends LeafExecNode { */ case class ShuffleQueryStageExec( override val id: Int, - override val plan: ShuffleExchangeExec) extends QueryStageExec { + override val plan: ShuffleExchangeExec, + var isLocalShuffle: Boolean = false) extends QueryStageExec { @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { if (plan.inputRDD.getNumPartitions == 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index c3899d3985bb7..f0e9128df8ae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -70,7 +70,9 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. - if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) { + if (!shuffleStages.forall(_.plan.canChangeNumPartitions) + || !shuffleStages.forall(_.isLocalShuffle == false)) { + // If the `shuffleStages` contains the local shuffle reader, we do not reduce the partitions. plan } else { val shuffleMetrics = shuffleStages.map { stage => @@ -180,45 +182,25 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { case class CoalescedShuffleReaderExec( child: QueryStageExec, - partitionStartIndices: Array[Int], - var isLocal: Boolean = false) extends UnaryExecNode { + partitionStartIndices: Array[Int]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output override def doCanonicalize(): SparkPlan = child.canonicalized override def outputPartitioning: Partitioning = { - if (isLocal) { - val numPartitions = child match { - case stage: ShuffleQueryStageExec => - stage.plan.shuffleDependency.rdd.partitions.length - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.shuffleDependency.rdd.partitions.length - } - UnknownPartitioning(numPartitions) - } else { - UnknownPartitioning(partitionStartIndices.length) - } + UnknownPartitioning(partitionStartIndices.length) } private var cachedShuffleRDD: RDD[InternalRow] = null override protected def doExecute(): RDD[InternalRow] = { if (cachedShuffleRDD == null) { - cachedShuffleRDD = if (isLocal) { - child match { - case stage: ShuffleQueryStageExec => - stage.plan.createLocalShuffleRDD(Some(partitionStartIndices)) - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.createLocalShuffleRDD(Some(partitionStartIndices)) - } - } else { - child match { - case stage: ShuffleQueryStageExec => - stage.plan.createShuffledRDD(Some(partitionStartIndices)) - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.createShuffledRDD(Some(partitionStartIndices)) - } + cachedShuffleRDD = child match { + case stage: ShuffleQueryStageExec => + stage.plan.createShuffledRDD(Some(partitionStartIndices)) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createShuffledRDD(Some(partitionStartIndices)) } } cachedShuffleRDD diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index d54d04ecd6989..2f94c522712b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -83,8 +83,8 @@ case class ShuffleExchangeExec( new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) } - def createLocalShuffleRDD(partitionStartIndices: Option[Array[Int]]): LocalShuffledRowRDD = { - new LocalShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) + def createLocalShuffleRDD(): LocalShuffledRowRDD = { + new LocalShuffledRowRDD(shuffleDependency, readMetrics) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 5a86915222ab4..8c64706ee9628 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -80,6 +80,7 @@ class AdaptiveQueryExecSuite test("Change merge join to broadcast join") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") From e5f3c9a6e6681f9151cce0d5c326562eaaaee18a Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 20 Sep 2005 22:11:07 +0800 Subject: [PATCH 04/11] resolve comments --- .../org/apache/spark/MapOutputTracker.scala | 63 ++++----- .../shuffle/BlockStoreShuffleReader.scala | 16 +-- .../apache/spark/shuffle/ShuffleManager.scala | 7 +- .../shuffle/sort/SortShuffleManager.scala | 10 +- .../spark/sql/execution/SparkPlanInfo.scala | 3 +- .../adaptive/AdaptiveSparkPlanExec.scala | 4 +- .../adaptive/LocalShuffledRowRDD.scala | 71 ++++------ .../adaptive/OptimizeLocalShuffleReader.scala | 106 +++++++++++++++ .../OptimizedLocalShuffleReader.scala | 124 ------------------ .../execution/adaptive/QueryStageExec.scala | 3 +- .../adaptive/ReduceNumShufflePartitions.scala | 4 +- .../adaptive/AdaptiveQueryExecSuite.scala | 1 - 12 files changed, 180 insertions(+), 232 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ce6ec21ad5187..6e2ca7a3376f6 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -340,7 +340,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and a given start map Id and end map Id. + * endPartition is excluded from the range) and a given mapId. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) @@ -350,8 +350,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging shuffleId: Int, startPartition: Int, endPartition: Int, - startMapId: Int, - endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] /** * Deletes map output status information for the specified shuffle stage. @@ -691,8 +690,7 @@ private[spark] class MapOutputTrackerMaster( * @param dep shuffle dependency object * @param startMapId the start map id * @param endMapId the end map id - * @return a sequence of locations that each includes both a host and an executor id on that - * host. + * @return a sequence of locations where task runs. */ def getMapLocation(dep: ShuffleDependency[_, _, _], startMapId: Int, endMapId: Int): Seq[String] = { @@ -750,10 +748,10 @@ private[spark] class MapOutputTrackerMaster( shuffleId: Int, startPartition: Int, endPartition: Int, - startMapId: Int, - endMapId: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, startMapId $startMapId endMapId $endMapId" + + mapId: Int) + : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + + s"partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -763,8 +761,7 @@ private[spark] class MapOutputTrackerMaster( startPartition, endPartition, statuses, - startMapId, - endMapId) + mapId) } case None => Iterator.empty @@ -820,14 +817,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr shuffleId: Int, startPartition: Int, endPartition: Int, - startMapId: Int, - endMapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, startMapId $startMapId endMapId $endMapId" + + mapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + s"partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { - MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, - startMapId, endMapId) + MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, mapId) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -1000,7 +995,7 @@ private[spark] object MapOutputTracker extends Logging { } /** - * Given an array of map statuses, the start map Id, end map Id and a range of map output + * Given an array of map statuses, the mapId and a range of map output * partitions, returns a sequence that, lists the shuffle block IDs and corresponding shuffle * block sizes stored at that block manager. * @@ -1011,8 +1006,7 @@ private[spark] object MapOutputTracker extends Logging { * @param startPartition Start of map output partition ID range (included in range) * @param endPartition End of map output partition ID range (excluded from range) * @param statuses List of map statuses, indexed by map ID. - * @param startMapId Start of map Id range (included in range) - * @param endMapId End of map Id (excluded from range) + * @param mapId Identifier for the map * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block ID, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. @@ -1022,23 +1016,20 @@ private[spark] object MapOutputTracker extends Logging { startPartition: Int, endPartition: Int, statuses: Array[MapStatus], - startMapId: Int, - endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - assert (statuses != null && statuses.length >= endMapId && startMapId >= 0) - val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long, Int)]] - for (mapIndex <- startMapId until endMapId) { - val status = statuses(mapIndex) - if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId" - logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { - for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += - ((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex)) - } + mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + assert (statuses != null && statuses.length >= mapId) + val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]] + val status = statuses(mapId) + if (status == null) { + val errorMessage = s"Missing an output location for shuffle $shuffleId" + logError(errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + } else { + for (part <- startPartition until endPartition) { + val size = status.getSizeForBlock(part) + if (size != 0) { + splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, part), size)) } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index ccf911f6ec1a9..ae757db2babd6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -37,27 +37,25 @@ private[spark] class BlockStoreShuffleReader[K, C]( serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, - startMapId: Option[Int] = None, - endMapId: Option[Int] = None) + mapId: Option[Int] = None) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { - val blocksByAddress = (startMapId, endMapId) match { - case (Some(startId), Some(endId)) => mapOutputTracker.getMapSizesByExecutorId( + val blocksByAddress = (mapId) match { + case (Some(mapId)) => mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition, - startId, - endId) - case (None, None) => mapOutputTracker.getMapSizesByExecutorId( + mapId) + case (None) => mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition) - case (_, _) => throw new IllegalArgumentException( - "startMapId and endMapId should be both set or unset") + case (_) => throw new IllegalArgumentException( + "mapId should be both set or unset") } val wrappedStreams = new ShuffleBlockFetcherIterator( diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 2f460971ea924..0041dca507c0f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -56,17 +56,16 @@ private[spark] trait ShuffleManager { /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapId to endMapId - 1, inclusive). + * read from mapId. * Called on executors by reduce tasks. */ - def getReader[K, C]( + def getMapReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter, - startMapId: Int, - endMapId: Int): ShuffleReader[K, C] + mapId: Int): ShuffleReader[K, C] /** * Remove a shuffle's metadata from the ShuffleManager. diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index dd3be5873867e..b21ce9ce0fc71 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -129,25 +129,23 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapId to endMapId - 1, inclusive). + * read from mapId. * Called on executors by reduce tasks. */ - override def getReader[K, C]( + override def getMapReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter, - startMapId: Int, - endMapId: Int): ShuffleReader[K, C] = { + mapId: Int): ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context, metrics, - startMapId = Some(startMapId), - endMapId = Some(endMapId)) + mapId = Some(mapId)) } /** Get a writer for a given partition. Called on executors by map tasks. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 8c7752c4bb742..459311df22d23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, LocalShuffleReaderExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -56,6 +56,7 @@ private[execution] object SparkPlanInfo { case ReusedSubqueryExec(child) => child :: Nil case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil + case localReader: LocalShuffleReaderExec => localReader.child :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 2dce6e96ded5d..1059895527eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -74,7 +74,7 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( - Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) + // Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) ) } @@ -84,6 +84,7 @@ case class AdaptiveSparkPlanExec( // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( + OptimizeLocalShuffleReader(conf), ensureRequirements ) @@ -91,7 +92,6 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), - OptimizedLocalShuffleReader(conf), ReduceNumShufflePartitions(conf), ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf, session.sessionState.columnarRules), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala index d2f629c48e340..f6605550fadba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala @@ -18,10 +18,23 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark._ -import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( + val preShufflePartitionIndex: Int, + val startPostShufflePartitionIndex: Int, + val endPostShufflePartitionIndex: Int) extends Partition { + override val index: Int = preShufflePartitionIndex +} + /** * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime @@ -41,31 +54,18 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsRe */ class LocalShuffledRowRDD( var dependency: ShuffleDependency[Int, InternalRow, InternalRow], - metrics: Map[String, SQLMetric], - specifiedPartitionStartIndices: Option[Array[Int]] = None, - specifiedPartitionEndIndices: Option[Array[Int]] = None) + metrics: Map[String, SQLMetric]) extends RDD[InternalRow](dependency.rdd.context, Nil) { private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions private[this] val numPostShufflePartitions = dependency.rdd.partitions.length - private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match { - case Some(indices) => indices - case None => Array(0) - } - - private[this] val partitionEndIndices: Array[Int] = specifiedPartitionEndIndices match { - case Some(indices) => indices - case None if specifiedPartitionStartIndices.isEmpty => Array(numPreShufflePartitions) - case _ => specifiedPartitionStartIndices.get.drop(1) :+ numPreShufflePartitions - } - override def getDependencies: Seq[Dependency[_]] = List(dependency) override def getPartitions: Array[Partition] = { - assert(partitionStartIndices.length == partitionEndIndices.length) + Array.tabulate[Partition](numPostShufflePartitions) { i => - new ShuffledRDDPartition(i) + new LocalShuffleRowRDDPartition(i, 0, numPreShufflePartitions) } } @@ -76,38 +76,21 @@ class LocalShuffledRowRDD( } override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val shuffledRowPartition = split.asInstanceOf[ShuffledRDDPartition] - val mapId = shuffledRowPartition.index + val localRowPartition = split.asInstanceOf[LocalShuffleRowRDDPartition] + val mapId = localRowPartition.index val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) - // Connect the the InternalRows read by each ShuffleReader - new Iterator[InternalRow] { - val readers = partitionStartIndices.zip(partitionEndIndices).map { case (start, end) => - SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, - start, - end, - context, - sqlMetricsReporter, - mapId, - mapId + 1) - } - var i = 0 - var iter = readers(i).read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) - - override def hasNext = { - while (iter.hasNext == false && i + 1 <= readers.length - 1) { - i += 1 - iter = readers(i).read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) - } - iter.hasNext - } - - override def next() = iter.next() - } + val reader = SparkEnv.get.shuffleManager.getMapReader( + dependency.shuffleHandle, + 0, + numPreShufflePartitions, + context, + sqlMetricsReporter, + mapId) + reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) } override def clearDependencies() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala new file mode 100644 index 0000000000000..5884b5a0361c5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { + join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { + join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan + } + + plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => + val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) + join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => + val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) + join.copy(left = localReader) + } + } +} + +case class LocalShuffleReaderExec( + child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + val numPartitions = child match { + case stage: ShuffleQueryStageExec => + stage.plan.shuffleDependency.rdd.partitions.length + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.shuffleDependency.rdd.partitions.length + } + UnknownPartitioning(numPartitions) + } + + private var cachedShuffleRDD: RDD[InternalRow] = null + + override protected def doExecute(): RDD[InternalRow] = { + if (cachedShuffleRDD == null) { + cachedShuffleRDD = child match { + case stage: ShuffleQueryStageExec => + stage.plan.createLocalShuffleRDD() + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createLocalShuffleRDD() + } + } + cachedShuffleRDD + } + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId) + child.generateTreeString( + depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala deleted file mode 100644 index ed8a44362e17b..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} -import org.apache.spark.sql.internal.SQLConf - -case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { - - private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { - shuffleStage match { - case stage: ShuffleQueryStageExec => - stage.isLocalShuffle = false - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.isLocalShuffle = false - } - shuffleStage - } - - private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { - val revertPlan = newPlan.transformUp { - case localReader: LocalShuffleReaderExec - if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => - setIsLocalToFalse(localReader.child) - } - revertPlan - } - - override def apply(plan: SparkPlan): SparkPlan = { - // Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. - val bhjs = plan.collect { - case bhj: BroadcastHashJoinExec => bhj - } - - if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { - return plan - } - - // If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true - bhjs.map { - case bhj: BroadcastHashJoinExec => - bhj.children map { - case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.isLocalShuffle = true - case plan: SparkPlan => plan - } - } - - // Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true - val newPlan = plan.transformUp { - case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => - LocalShuffleReaderExec(stage) - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => - LocalShuffleReaderExec(stage) - } - - val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan) - val numExchanges = afterEnsureRequirements.collect { - case e: ShuffleExchangeExec => e - }.length - if (numExchanges > 0) { - logWarning("Local shuffle reader optimization is not applied due" + - " to additional shuffles will be introduced.") - revertLocalShuffleReader(newPlan) - } else { - newPlan - } - } -} - -case class LocalShuffleReaderExec( - child: QueryStageExec) extends UnaryExecNode { - - override def output: Seq[Attribute] = child.output - - override def doCanonicalize(): SparkPlan = child.canonicalized - - override def outputPartitioning: Partitioning = { - val numPartitions = child match { - case stage: ShuffleQueryStageExec => - stage.plan.shuffleDependency.rdd.partitions.length - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.shuffleDependency.rdd.partitions.length - } - UnknownPartitioning(numPartitions) - } - - private var cachedShuffleRDD: RDD[InternalRow] = null - - override protected def doExecute(): RDD[InternalRow] = { - if (cachedShuffleRDD == null) { - cachedShuffleRDD = child match { - case stage: ShuffleQueryStageExec => - stage.plan.createLocalShuffleRDD() - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.createLocalShuffleRDD() - } - } - cachedShuffleRDD - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 2e295bd6401b7..231fffce3360b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -129,8 +129,7 @@ abstract class QueryStageExec extends LeafExecNode { */ case class ShuffleQueryStageExec( override val id: Int, - override val plan: ShuffleExchangeExec, - var isLocalShuffle: Boolean = false) extends QueryStageExec { + override val plan: ShuffleExchangeExec) extends QueryStageExec { @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { if (plan.inputRDD.getNumPartitions == 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index f0e9128df8ae6..3f2b4f0dc39c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -70,9 +70,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. - if (!shuffleStages.forall(_.plan.canChangeNumPartitions) - || !shuffleStages.forall(_.isLocalShuffle == false)) { - // If the `shuffleStages` contains the local shuffle reader, we do not reduce the partitions. + if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) { plan } else { val shuffleMetrics = shuffleStages.map { stage => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8c64706ee9628..5a86915222ab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -80,7 +80,6 @@ class AdaptiveQueryExecSuite test("Change merge join to broadcast join") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") From d21c99fa6f7fd09dc52da8ad9b7c30c3276efcb4 Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 20 Sep 2005 23:25:23 +0800 Subject: [PATCH 05/11] small rebase fix --- .../org/apache/spark/MapOutputTracker.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6e2ca7a3376f6..5f5ae7eb25d35 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -344,13 +344,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. + * tuples describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId( shuffleId: Int, startPartition: Int, endPartition: Int, - mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] + mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] /** * Deletes map output status information for the specified shuffle stage. @@ -749,9 +749,8 @@ private[spark] class MapOutputTrackerMaster( startPartition: Int, endPartition: Int, mapId: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + - s"partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -817,7 +816,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr shuffleId: Int, startPartition: Int, endPartition: Int, - mapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + mapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + s"partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) @@ -1016,9 +1015,9 @@ private[spark] object MapOutputTracker extends Logging { startPartition: Int, endPartition: Int, statuses: Array[MapStatus], - mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null && statuses.length >= mapId) - val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]] + val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] val status = statuses(mapId) if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" @@ -1028,11 +1027,11 @@ private[spark] object MapOutputTracker extends Logging { for (part <- startPartition until endPartition) { val size = status.getSizeForBlock(part) if (size != 0) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, part), size)) + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, part), size, mapId)) } } - } + } splitsByAddress.toIterator } } From bb10b18a031002778a9e9d2dd8bf12b83a3f318f Mon Sep 17 00:00:00 2001 From: jiake Date: Wed, 21 Sep 2005 00:22:33 +0800 Subject: [PATCH 06/11] java doc style --- .../org/apache/spark/MapOutputTracker.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5f5ae7eb25d35..6916bce86056c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -994,22 +994,22 @@ private[spark] object MapOutputTracker extends Logging { } /** - * Given an array of map statuses, the mapId and a range of map output - * partitions, returns a sequence that, lists the shuffle block IDs and corresponding shuffle - * block sizes stored at that block manager. - * - * If the status of the map is null (indicating a missing location due to a failed mapper), - * throws a FetchFailedException. - * - * @param shuffleId Identifier for the shuffle - * @param startPartition Start of map output partition ID range (included in range) - * @param endPartition End of map output partition ID range (excluded from range) - * @param statuses List of map statuses, indexed by map ID. - * @param mapId Identifier for the map - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block ID, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ + * Given an array of map statuses, the mapId and a range of map output + * partitions, returns a sequence that, lists the shuffle block IDs and corresponding shuffle + * block sizes stored at that block manager. + * + * If the status of the map is null (indicating a missing location due to a failed mapper), + * throws a FetchFailedException. + * + * @param shuffleId Identifier for the shuffle + * @param startPartition Start of map output partition ID range (included in range) + * @param endPartition End of map output partition ID range (excluded from range) + * @param statuses List of map statuses, indexed by map ID. + * @param mapId Identifier for the map id + * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, + * and the second item is a sequence of (shuffle block ID, shuffle block size, map index) + * tuples describing the shuffle blocks that are stored at that block manager. + */ def convertMapStatuses( shuffleId: Int, startPartition: Int, From 72ce82e0417aca23e47d97958631e61d73d6aaee Mon Sep 17 00:00:00 2001 From: jiake Date: Fri, 11 Oct 2019 14:14:11 +0800 Subject: [PATCH 07/11] resolve the comments --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../adaptive/LocalShuffledRowRDD.scala | 10 +-- .../adaptive/OptimizeLocalShuffleReader.scala | 74 ++++++++++++++----- .../adaptive/ReduceNumShufflePartitions.scala | 5 +- .../adaptive/AdaptiveQueryExecSuite.scala | 49 ++++++++++-- 6 files changed, 107 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9db16f75e8fe0..4da7003d77b1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -400,7 +400,7 @@ object SQLConf { " converting the shuffle reader to local shuffle reader for the shuffle exchange" + " of the broadcast hash join in probe side.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 1059895527eab..f45e3560b2cf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -74,7 +74,7 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( - // Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) + Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala index f6605550fadba..b5f7fea814fde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala @@ -57,15 +57,15 @@ class LocalShuffledRowRDD( metrics: Map[String, SQLMetric]) extends RDD[InternalRow](dependency.rdd.context, Nil) { - private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions - private[this] val numPostShufflePartitions = dependency.rdd.partitions.length + private[this] val numReducers = dependency.partitioner.numPartitions + private[this] val numMappers = dependency.rdd.partitions.length override def getDependencies: Seq[Dependency[_]] = List(dependency) override def getPartitions: Array[Partition] = { - Array.tabulate[Partition](numPostShufflePartitions) { i => - new LocalShuffleRowRDDPartition(i, 0, numPreShufflePartitions) + Array.tabulate[Partition](numMappers) { i => + new LocalShuffleRowRDDPartition(i, 0, numReducers) } } @@ -86,7 +86,7 @@ class LocalShuffledRowRDD( val reader = SparkEnv.get.shuffleManager.getMapReader( dependency.shuffleHandle, 0, - numPreShufflePartitions, + numReducers, context, sqlMetricsReporter, mapId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 5884b5a0361c5..a84f351b214bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -23,17 +23,29 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} import org.apache.spark.sql.internal.SQLConf case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { - def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { - join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { + (join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) } - def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { - join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { + (join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { + plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => + join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => + join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) + } } override def apply(plan: SparkPlan): SparkPlan = { @@ -41,14 +53,28 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { return plan } - plan.transformDown{ - case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => + val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) join.copy(right = localReader) - case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) join.copy(left = localReader) } + + val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + + val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e + }.length + + if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + + " to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) + } else { + optimizedPlan + } } } @@ -60,13 +86,23 @@ case class LocalShuffleReaderExec( override def doCanonicalize(): SparkPlan = child.canonicalized override def outputPartitioning: Partitioning = { - val numPartitions = child match { + + def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { + val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec] + .child.outputPartitioning + if (initialPartitioning.isInstanceOf[UnknownPartitioning]) { + UnknownPartitioning(stage.plan.shuffleDependency.rdd.partitions.length) + } else { + initialPartitioning + } + } + + child match { case stage: ShuffleQueryStageExec => - stage.plan.shuffleDependency.rdd.partitions.length + canUseChildPartitioning(stage) case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.plan.shuffleDependency.rdd.partitions.length + canUseChildPartitioning(stage) } - UnknownPartitioning(numPartitions) } private var cachedShuffleRDD: RDD[InternalRow] = null @@ -84,14 +120,14 @@ case class LocalShuffleReaderExec( } override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - append: String => Unit, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false, - maxFields: Int, - printNodeId: Boolean): Unit = { + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean): Unit = { super.generateTreeString(depth, lastChildren, append, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 3f2b4f0dc39c8..eb37a8251225f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.execution.adaptive.rule import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration - import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -190,7 +189,7 @@ case class CoalescedShuffleReaderExec( UnknownPartitioning(partitionStartIndices.length) } - private var cachedShuffleRDD: RDD[InternalRow] = null + private var cachedShuffleRDD: ShuffledRowRDD = null override protected def doExecute(): RDD[InternalRow] = { if (cachedShuffleRDD == null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 5a86915222ab4..2c13b8e0a2546 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -87,6 +87,11 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) } } @@ -103,12 +108,12 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val shuffleReaders = adaptivePlan.collect { - case reader: CoalescedShuffleReaderExec => reader + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader } - assert(shuffleReaders.length === 1) - // The pre-shuffle partition size is [0, 72, 0, 72, 126] - shuffleReaders.foreach { reader => + assert(localReaders.length === 1) + // Here the reader.outputPartitioning.numPartitions is equal to the number of mappers. + localReaders.foreach { reader => assert(reader.outputPartitioning.numPartitions === 2) } } @@ -125,6 +130,10 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) } } @@ -139,6 +148,11 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) } } @@ -160,6 +174,11 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) + + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) } } @@ -183,6 +202,11 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) + + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 0) } } @@ -206,6 +230,10 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) + val localReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 0) } } @@ -361,13 +389,22 @@ class AdaptiveQueryExecSuite SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30") { - runAdaptiveAndVerifyResult( + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( """ |SELECT * FROM testData t1 join testData2 t2 |ON t1.key = t2.a join testData3 t3 on t2.a = t3.a |where t1.value = 1 """.stripMargin ) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 2) + val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + val localShuffleReaders = adaptivePlan.collect { + case reader: LocalShuffleReaderExec => reader + } + // additional shuffle exchange introduced, so revert OptimizeLocalShuffleReader rule. + assert(localShuffleReaders.size == 0) } } From f0ffff6665e6c1398000477b5dbb48a8e81bf2c0 Mon Sep 17 00:00:00 2001 From: jiake Date: Fri, 11 Oct 2019 16:03:03 +0800 Subject: [PATCH 08/11] code style --- .../sql/execution/adaptive/ReduceNumShufflePartitions.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index eb37a8251225f..1a85d5c02075b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive.rule import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration + import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow From f10e5b2be22999de0fad0a153a8f8ba4fca2f639 Mon Sep 17 00:00:00 2001 From: jiake Date: Sat, 12 Oct 2019 17:24:34 +0800 Subject: [PATCH 09/11] resolve the comments --- .../org/apache/spark/MapOutputTracker.scala | 129 +++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../adaptive/LocalShuffledRowRDD.scala | 9 +- .../adaptive/OptimizeLocalShuffleReader.scala | 52 +++---- .../adaptive/AdaptiveQueryExecSuite.scala | 57 +++----- 5 files changed, 108 insertions(+), 143 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6916bce86056c..2c941a283bdb8 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.collection.mutable.{HashMap, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -688,20 +688,17 @@ private[spark] class MapOutputTrackerMaster( * executor id on that host. * * @param dep shuffle dependency object - * @param startMapId the start map id - * @param endMapId the end map id + * @param mapId the map id * @return a sequence of locations where task runs. */ - def getMapLocation(dep: ShuffleDependency[_, _, _], startMapId: Int, endMapId: Int): Seq[String] = + def getMapLocation(dep: ShuffleDependency[_, _, _], mapId: Int): Seq[String] = { val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull if (shuffleStatus != null) { shuffleStatus.withMapStatuses { statuses => - if (startMapId >= 0 && endMapId <= statuses.length) { - val statusesPicked = statuses.slice(startMapId, endMapId).filter(_ != null) - statusesPicked.map { status => - ExecutorCacheTaskLocation(status.location.host, status.location.executorId).toString - } + if (mapId >= 0 && mapId < statuses.length) { + Seq( ExecutorCacheTaskLocation(statuses(mapId).location.host, + statuses(mapId).location.executorId).toString) } else { Nil } @@ -760,7 +757,12 @@ private[spark] class MapOutputTrackerMaster( startPartition, endPartition, statuses, +<<<<<<< HEAD mapId) +======= + useOldFetchProtocol, + Some(mapId)) +>>>>>>> resolve the comments } case None => Iterator.empty @@ -821,7 +823,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr s"partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { +<<<<<<< HEAD MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, mapId) +======= + MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, + useOldFetchProtocol, Some(mapId)) +>>>>>>> resolve the comments } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -972,66 +979,60 @@ private[spark] object MapOutputTracker extends Logging { shuffleId: Int, startPartition: Int, endPartition: Int, - statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + statuses: Array[MapStatus], + useOldFetchProtocol: Boolean, + mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] - for ((status, mapIndex) <- statuses.iterator.zipWithIndex) { - if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId" - logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { - for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex)) + mapId match { + case (Some(mapId)) => + for ((status, mapIndex) <- statuses.iterator.zipWithIndex.filter(_._2 == mapId)) { + if (status == null) { + val errorMessage = s"Missing an output location for shuffle $shuffleId" + logError(errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + } else { + for (part <- startPartition until endPartition) { + val size = status.getSizeForBlock(part) + if (size != 0) { + if (useOldFetchProtocol) { + // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the + // ShuffleBlockId. + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) + } else { + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) + } + } + } + } + } + case None => + for ((status, mapIndex) <- statuses.iterator.zipWithIndex) { + if (status == null) { + val errorMessage = s"Missing an output location for shuffle $shuffleId" + logError(errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + } else { + for (part <- startPartition until endPartition) { + val size = status.getSizeForBlock(part) + if (size != 0) { + if (useOldFetchProtocol) { + // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the + // ShuffleBlockId. + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) + } else { + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) + } + } + } } } - } } - splitsByAddress.iterator - } - /** - * Given an array of map statuses, the mapId and a range of map output - * partitions, returns a sequence that, lists the shuffle block IDs and corresponding shuffle - * block sizes stored at that block manager. - * - * If the status of the map is null (indicating a missing location due to a failed mapper), - * throws a FetchFailedException. - * - * @param shuffleId Identifier for the shuffle - * @param startPartition Start of map output partition ID range (included in range) - * @param endPartition End of map output partition ID range (excluded from range) - * @param statuses List of map statuses, indexed by map ID. - * @param mapId Identifier for the map id - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block ID, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ - def convertMapStatuses( - shuffleId: Int, - startPartition: Int, - endPartition: Int, - statuses: Array[MapStatus], - mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - assert (statuses != null && statuses.length >= mapId) - val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] - val status = statuses(mapId) - if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId" - logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { - for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, part), size, mapId)) - } - } - } - splitsByAddress.toIterator + splitsByAddress.iterator } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4da7003d77b1f..f00a4b545ee3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -394,7 +394,7 @@ object SQLConf { "must be a positive integer.") .createOptional - val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED = + val OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED = buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled") .doc("When true and adaptive execution is enabled, this enables the optimization of" + " converting the shuffle reader to local shuffle reader for the shuffle exchange" + @@ -2154,8 +2154,6 @@ class SQLConf extends Serializable with Logging { def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) - def optimizedLocalShuffleReaderEnabled: Boolean = getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED) - def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala index b5f7fea814fde..9ad1ebaf6f376 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala @@ -29,9 +29,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsRe * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). */ private final class LocalShuffleRowRDDPartition( - val preShufflePartitionIndex: Int, - val startPostShufflePartitionIndex: Int, - val endPostShufflePartitionIndex: Int) extends Partition { + val preShufflePartitionIndex: Int) extends Partition { override val index: Int = preShufflePartitionIndex } @@ -65,14 +63,13 @@ class LocalShuffledRowRDD( override def getPartitions: Array[Partition] = { Array.tabulate[Partition](numMappers) { i => - new LocalShuffleRowRDDPartition(i, 0, numReducers) + new LocalShuffleRowRDDPartition(i) } } override def getPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] - tracker.getMapLocation(dep, partition.index, partition.index + 1) + tracker.getMapLocation(dependency, partition.index) } override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index a84f351b214bd..308e65e793d8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -29,57 +29,48 @@ import org.apache.spark.sql.internal.SQLConf case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { - def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { - (join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || - (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { + join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) } - def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { - (join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || - (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) - } - - def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { - plan.transformDown { - case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => - join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) - case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => - join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) - } + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { + join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) } override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.optimizedLocalShuffleReaderEnabled) { + if (!conf.getConf(SQLConf.OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED)) { return plan } val optimizedPlan = plan.transformDown { - case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) join.copy(right = localReader) - case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) join.copy(left = localReader) } - val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + def numExchanges(plan: SparkPlan): Int = { + plan.collect { + case e: ShuffleExchangeExec => e + }.length + } - val numExchanges = afterEnsureRequirements.collect { - case e: ShuffleExchangeExec => e - }.length + val numExchangeBefore = numExchanges(EnsureRequirements(conf).apply(plan)) + val numExchangeAfter = numExchanges(EnsureRequirements(conf).apply(optimizedPlan)) - if (numExchanges > 0) { + if (numExchangeAfter > numExchangeBefore) { logWarning("OptimizeLocalShuffleReader rule is not applied due" + " to additional shuffles will be introduced.") - revertLocalShuffleReader(optimizedPlan) + plan } else { optimizedPlan } } } -case class LocalShuffleReaderExec( - child: QueryStageExec) extends LeafExecNode { +case class LocalShuffleReaderExec(child: QueryStageExec) extends LeafExecNode { override def output: Seq[Attribute] = child.output @@ -87,9 +78,8 @@ case class LocalShuffleReaderExec( override def outputPartitioning: Partitioning = { - def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { - val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec] - .child.outputPartitioning + def tryReserveChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { + val initialPartitioning = stage.plan.child.outputPartitioning if (initialPartitioning.isInstanceOf[UnknownPartitioning]) { UnknownPartitioning(stage.plan.shuffleDependency.rdd.partitions.length) } else { @@ -99,9 +89,9 @@ case class LocalShuffleReaderExec( child match { case stage: ShuffleQueryStageExec => - canUseChildPartitioning(stage) + tryReserveChildPartitioning(stage) case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - canUseChildPartitioning(stage) + tryReserveChildPartitioning(stage) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 2c13b8e0a2546..1c24e988268e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -77,6 +77,13 @@ class AdaptiveQueryExecSuite } } + private def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = { + val localReaders = plan.collect { + case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === expected) + } + test("Change merge join to broadcast join") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -87,11 +94,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -108,14 +111,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 1) - // Here the reader.outputPartitioning.numPartitions is equal to the number of mappers. - localReaders.foreach { reader => - assert(reader.outputPartitioning.numPartitions === 2) - } + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -130,10 +126,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -149,10 +142,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -175,10 +165,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 1) + checkNumLocalShuffleReaders(adaptivePlan, 1) } } @@ -203,10 +190,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 0) + checkNumLocalShuffleReaders(adaptivePlan, 0) } } @@ -230,10 +214,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 3) - val localReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } - assert(localReaders.length === 0) + checkNumLocalShuffleReaders(adaptivePlan, 0) } } @@ -249,7 +230,8 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 2) val ex = findReusedExchange(adaptivePlan) - assert(ex.size == 1) + // The ReusedExchange is hidden in LocalShuffleReaderExec + assert(ex.size == 0) } } @@ -387,7 +369,7 @@ class AdaptiveQueryExecSuite " reader to local shuffle reader") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", + SQLConf.OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( """ @@ -400,11 +382,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 2) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localShuffleReaders = adaptivePlan.collect { - case reader: LocalShuffleReaderExec => reader - } // additional shuffle exchange introduced, so revert OptimizeLocalShuffleReader rule. - assert(localShuffleReaders.size == 0) + checkNumLocalShuffleReaders(adaptivePlan, 0) } } From 37111fff141556ad5f958ce24fb6fe469c2fd793 Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 15 Oct 2019 10:51:14 +0800 Subject: [PATCH 10/11] resolve the small comments --- .../org/apache/spark/MapOutputTracker.scala | 65 ++++++------------- .../adaptive/AdaptiveSparkPlanHelper.scala | 1 + .../adaptive/AdaptiveQueryExecSuite.scala | 6 +- 3 files changed, 23 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2c941a283bdb8..67a9ad3a5c7a3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -684,7 +684,7 @@ private[spark] class MapOutputTrackerMaster( } /** - * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * Return the location where the Mapper ran. The locations each includes both a host and an * executor id on that host. * * @param dep shuffle dependency object @@ -984,53 +984,28 @@ private[spark] object MapOutputTracker extends Logging { mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] - mapId match { - case (Some(mapId)) => - for ((status, mapIndex) <- statuses.iterator.zipWithIndex.filter(_._2 == mapId)) { - if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId" - logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { - for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { - if (useOldFetchProtocol) { - // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the - // ShuffleBlockId. - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) - } else { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) - } - } - } - } - } - case None => - for ((status, mapIndex) <- statuses.iterator.zipWithIndex) { - if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId" - logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { - for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { - if (useOldFetchProtocol) { - // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the - // ShuffleBlockId. - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) - } else { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) - } - } + val iter = statuses.iterator.zipWithIndex + for ((status, mapIndex) <- mapId.map(id => iter.filter(_._2 == id)).getOrElse(iter)) { + if (status == null) { + val errorMessage = s"Missing an output location for shuffle $shuffleId" + logError(errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + } else { + for (part <- startPartition until endPartition) { + val size = status.getSizeForBlock(part) + if (size != 0) { + if (useOldFetchProtocol) { + // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the + // ShuffleBlockId. + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) + } else { + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) } } } + } } splitsByAddress.iterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala index 0ec8710e4db43..94e66b0c3a430 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala @@ -125,6 +125,7 @@ trait AdaptiveSparkPlanHelper { private def allChildren(p: SparkPlan): Seq[SparkPlan] = p match { case a: AdaptiveSparkPlanExec => Seq(a.executedPlan) case s: QueryStageExec => Seq(s.plan) + case l: LocalShuffleReaderExec => Seq(l.child) case _ => p.children } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1c24e988268e6..cd0bf726da9aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -230,8 +230,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 2) val ex = findReusedExchange(adaptivePlan) - // The ReusedExchange is hidden in LocalShuffleReaderExec - assert(ex.size == 0) + assert(ex.size == 1) } } @@ -365,8 +364,7 @@ class AdaptiveQueryExecSuite } } - test("Change merge join to broadcast join and optimize the shuffle" + - " reader to local shuffle reader") { + test("Change merge join to broadcast join without local shuffle reader") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED.key -> "true", From 9c1dc5538afce26c4e693e353d8d4ef4231bb78c Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 15 Oct 2019 12:48:32 +0800 Subject: [PATCH 11/11] resolve the conflicts --- .../org/apache/spark/MapOutputTracker.scala | 25 +++---------------- .../shuffle/BlockStoreShuffleReader.scala | 2 +- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 67a9ad3a5c7a3..ef4c421cbf829 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -757,12 +757,7 @@ private[spark] class MapOutputTrackerMaster( startPartition, endPartition, statuses, -<<<<<<< HEAD - mapId) -======= - useOldFetchProtocol, Some(mapId)) ->>>>>>> resolve the comments } case None => Iterator.empty @@ -823,12 +818,8 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr s"partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { -<<<<<<< HEAD - MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, mapId) -======= - MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, - useOldFetchProtocol, Some(mapId)) ->>>>>>> resolve the comments + MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, + statuses, Some(mapId)) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -980,7 +971,6 @@ private[spark] object MapOutputTracker extends Logging { startPartition: Int, endPartition: Int, statuses: Array[MapStatus], - useOldFetchProtocol: Boolean, mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] @@ -994,15 +984,8 @@ private[spark] object MapOutputTracker extends Logging { for (part <- startPartition until endPartition) { val size = status.getSizeForBlock(part) if (size != 0) { - if (useOldFetchProtocol) { - // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the - // ShuffleBlockId. - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) - } else { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += - ((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) - } + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex)) } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index ae757db2babd6..242442ac9d8f2 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -44,7 +44,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { - val blocksByAddress = (mapId) match { + val blocksByAddress = mapId match { case (Some(mapId)) => mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition,