diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index f9f8c56eb86c4..f06d3d001261f 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -21,9 +21,10 @@ import java.io.{DataInputStream, DataOutputStream} import java.nio.charset.StandardCharsets import java.sql.{Date, Time, Timestamp} -import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.spark.util.collection.Utils + /** * Utility functions to serialize, deserialize objects to / from R */ @@ -236,7 +237,7 @@ private[spark] object SerDe { val keys = readArray(in, jvmObjectTracker).asInstanceOf[Array[Object]] val values = readList(in, jvmObjectTracker) - keys.zip(values).toMap.asJava + Utils.toJavaMap(keys, values) } else { new java.util.HashMap[Object, Object]() } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aef79c7882ca1..e525b45f7ded3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -74,6 +74,7 @@ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.status.api.v1.{StackTrace, ThreadStackTrace} +import org.apache.spark.util.collection.{Utils => CUtils} import org.apache.spark.util.io.ChunkedByteBufferOutputStream /** CallSite represents a place in user code. It can have a short and a long form. */ @@ -1718,7 +1719,8 @@ private[spark] object Utils extends Logging { assert(files.length == fileLengths.length) val startIndex = math.max(start, 0) val endIndex = math.min(end, fileLengths.sum) - val fileToLength = files.zip(fileLengths).toMap + val fileToLength = CUtils.toMap(files, fileLengths) + logDebug("Log files: \n" + fileToLength.mkString("\n")) val stringBuffer = new StringBuffer((endIndex - startIndex).toInt) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 989ff69141750..f9bc76de6ef94 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -17,7 +17,10 @@ package org.apache.spark.util.collection +import java.util.Collections + import scala.collection.JavaConverters._ +import scala.collection.immutable import com.google.common.collect.{Iterators => GuavaIterators, Ordering => GuavaOrdering} @@ -62,4 +65,30 @@ private[spark] object Utils { */ def sequenceToOption[T](input: Seq[Option[T]]): Option[Seq[T]] = if (input.forall(_.isDefined)) Some(input.flatten) else None + + /** + * Same function as `keys.zip(values).toMap`, but has perf gain. + */ + def toMap[K, V](keys: Iterable[K], values: Iterable[V]): Map[K, V] = { + val builder = immutable.Map.newBuilder[K, V] + val keyIter = keys.iterator + val valueIter = values.iterator + while (keyIter.hasNext && valueIter.hasNext) { + builder += (keyIter.next(), valueIter.next()).asInstanceOf[(K, V)] + } + builder.result() + } + + /** + * Same function as `keys.zip(values).toMap.asJava`, but has perf gain. + */ + def toJavaMap[K, V](keys: Iterable[K], values: Iterable[V]): java.util.Map[K, V] = { + val map = new java.util.HashMap[K, V]() + val keyIter = keys.iterator + val valueIter = values.iterator + while (keyIter.hasNext && valueIter.hasNext) { + map.put(keyIter.next(), valueIter.next()) + } + Collections.unmodifiableMap(map) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala index 6ff8262c4983f..37e57736574eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala @@ -26,6 +26,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.Utils /** * Evaluator for ranking algorithms. @@ -155,7 +156,7 @@ class RankingMetrics[T: ClassTag] @Since("1.2.0") (predictionAndLabels: RDD[_ <: rdd.map { case (pred, lab, rel) => val useBinary = rel.isEmpty val labSet = lab.toSet - val relMap = lab.zip(rel).toMap + val relMap = Utils.toMap(lab, rel) if (useBinary && lab.size != rel.size) { logWarning( "# of ground truth set and # of relevance value set should be equal, " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 263d3734217bc..fcb8a5d0545aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.Utils /** * Functions to convert Scala types to Catalyst types and vice versa. @@ -229,7 +230,7 @@ object CatalystTypeConverters { val convertedValues = if (isPrimitive(valueType)) values else values.map(valueConverter.toScala) - convertedKeys.zip(convertedValues).toMap + Utils.toMap(convertedKeys, convertedValues) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6fc9d756c998d..7c00b469e7b39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -58,6 +58,7 @@ import org.apache.spark.sql.types.DayTimeIntervalType.DAY import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.Utils +import org.apache.spark.util.collection.{Utils => CUtils} /** * A trivial [[Analyzer]] with a dummy [[SessionCatalog]], [[EmptyFunctionRegistry]] and @@ -3457,7 +3458,7 @@ class Analyzer(override val catalogManager: CatalogManager) throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( cols.size, query.output.size, query) } - val nameToQueryExpr = cols.zip(query.output).toMap + val nameToQueryExpr = CUtils.toMap(cols, query.output) // Static partition columns in the table output should not appear in the column list // they will be handled in another rule ResolveInsertInto val reordered = tableOutput.flatMap { nameToQueryExpr.get(_).orElse(None) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index fe957342180a3..e61ad8e1fab7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.util.collection.Utils /** * Decorrelate the inner query by eliminating outer references and create domain joins. @@ -346,7 +347,7 @@ object DecorrelateInnerQuery extends PredicateHelper { val domains = attributes.map(_.newInstance()) // A placeholder to be rewritten into domain join. val domainJoin = DomainJoin(domains, plan) - val outerReferenceMap = attributes.zip(domains).toMap + val outerReferenceMap = Utils.toMap(attributes, domains) // Build join conditions between domain attributes and outer references. // EqualNullSafe is used to make sure null key can be joined together. Note // outer referenced attributes can be changed during the outer query optimization. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala index fde22b249b690..f351ba0b39af9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.CTE +import org.apache.spark.util.collection.Utils /** * Infer predicates and column pruning for [[CTERelationDef]] from its reference points, and push @@ -71,7 +72,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] { case PhysicalOperation(projects, predicates, ref: CTERelationRef) => val (cteDef, precedence, preds, attrs) = cteMap(ref.cteId) - val attrMapping = ref.output.zip(cteDef.output).map{ case (r, d) => r -> d }.toMap + val attrMapping = Utils.toMap(ref.output, cteDef.output) val newPredicates = if (isTruePredicate(preds)) { preds } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 4511b3038f029..1a58f45c07a29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE import org.apache.spark.sql.types.IntegerType +import org.apache.spark.util.collection.Utils /** * This rule rewrites an aggregate query with distinct aggregations into an expanded double @@ -265,7 +266,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Setup expand & aggregate operators for distinct aggregate expressions. val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap - val distinctAggFilterAttrLookup = distinctAggFilters.zip(maxConds.map(_.toAttribute)).toMap + val distinctAggFilterAttrLookup = Utils.toMap(distinctAggFilters, maxConds.map(_.toAttribute)) val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => val id = Literal(i + 1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala index 3768f7a1824f1..dd72ff4d3d083 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.util.{Map => JavaMap} +import org.apache.spark.util.collection.Utils + /** * A simple `MapData` implementation which is backed by 2 arrays. * @@ -129,20 +131,19 @@ object ArrayBasedMapData { def toScalaMap(map: ArrayBasedMapData): Map[Any, Any] = { val keys = map.keyArray.asInstanceOf[GenericArrayData].array val values = map.valueArray.asInstanceOf[GenericArrayData].array - keys.zip(values).toMap + Utils.toMap(keys, values) } def toScalaMap(keys: Array[Any], values: Array[Any]): Map[Any, Any] = { - keys.zip(values).toMap + Utils.toMap(keys, values) } def toScalaMap(keys: scala.collection.Seq[Any], values: scala.collection.Seq[Any]): Map[Any, Any] = { - keys.zip(values).toMap + Utils.toMap(keys, values) } def toJavaMap(keys: Array[Any], values: Array[Any]): java.util.Map[Any, Any] = { - import scala.collection.JavaConverters._ - keys.zip(values).toMap.asJava + Utils.toJavaMap(keys, values) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 146c5e6f714cd..bff96019e97fb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType.YEAR import org.apache.spark.unsafe.types.CalendarInterval +import org.apache.spark.util.collection.Utils /** * Random data generators for Spark SQL DataTypes. These generators do not generate uniformly random * values; instead, they're biased to return "interesting" values (such as maximum / minimum values) @@ -340,7 +341,7 @@ object RandomDataGenerator { count += 1 } val values = Seq.fill(keys.size)(valueGenerator()) - keys.zip(values).toMap + Utils.toMap(keys, values) } } case StructType(fields) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 0ce139baac109..3acadee5fb451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.collection.Utils object ExternalRDD { @@ -106,7 +107,7 @@ case class LogicalRDD( session :: originStats :: originConstraints :: Nil override def newInstance(): LogicalRDD.this.type = { - val rewrite = output.zip(output.map(_.newInstance())).toMap + val rewrite = Utils.toMap(output, output.map(_.newInstance())) val rewrittenPartitioning = outputPartitioning match { case p: Expression => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 3e5846bcdfd77..0849ab59f64d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils +import org.apache.spark.util.collection.{Utils => CUtils} /** * Utility functions used by the query planner to convert our plan to new aggregation code path. @@ -218,7 +219,7 @@ object AggUtils { } // 3. Create an Aggregate operator for partial aggregation (for distinct) - val distinctColumnAttributeLookup = distinctExpressions.zip(distinctAttributes).toMap + val distinctColumnAttributeLookup = CUtils.toMap(distinctExpressions, distinctAttributes) val rewrittenDistinctFunctions = functionsWithDistinct.map { // Children of an AggregateFunction with DISTINCT keyword has already // been evaluated. At here, we need to replace original children diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 433f9fcdd0898..dc0857383e79b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.util.PartitioningUtils +import org.apache.spark.util.collection.Utils /** * Analyzes a given set of partitions to generate per-partition statistics, which will be used in @@ -147,7 +148,7 @@ case class AnalyzePartitionCommand( r.get(i).toString } } - val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap + val spec = Utils.toMap(tableMeta.partitionColumnNames, partitionColumnValues) val count = BigInt(r.getLong(partitionColumns.size)) (spec, count) }.toMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 53a34a8f5f153..eb37a27fd7ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.Utils object PushDownUtils { /** @@ -203,7 +204,7 @@ object PushDownUtils { def toOutputAttrs( schema: StructType, relation: DataSourceV2Relation): Seq[AttributeReference] = { - val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap + val nameToAttr = Utils.toMap(relation.output.map(_.name), relation.output) val cleaned = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) cleaned.toAttributes.map { // we have to keep the attribute id during transformation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index 31b7df1abd012..0f19f14576b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.UnaryExecNode import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.Utils /** * Holds common logic for window operators @@ -69,7 +70,7 @@ trait WindowExecBase extends UnaryExecNode { // Results of window expressions will be on the right side of child's output BoundReference(child.output.size + i, e.dataType, e.nullable) } - val unboundToRefMap = expressions.zip(references).toMap + val unboundToRefMap = Utils.toMap(expressions, references) val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) UnsafeProjection.create( child.output ++ patchedWindowExpression, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index ce89d62346797..02eee36d4bee6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation} import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.collection.Utils /** * A class that tries to schedule receivers with evenly distributed. There are two phases for @@ -135,7 +136,7 @@ private[streaming] class ReceiverSchedulingPolicy { leastScheduledExecutors += executor } - receivers.map(_.streamId).zip(scheduledLocations.map(_.toSeq)).toMap + Utils.toMap(receivers.map(_.streamId), scheduledLocations.map(_.toSeq)) } /**