From ded36fa3a8d4728f1590280224d1638125310227 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Mon, 28 Nov 2016 16:21:36 +0530 Subject: [PATCH] [SNAP-1136] Kryo closure serialtization support and optimizations (#27) - added back configurable closure serializer in Spark which was removed in SPARK-12414; some minor changes taken from closed Spark PR https://github.com/apache/spark/pull/6361 - added optimized Kryo serialization for multiple classes; currently registration and string sharing fix for kryo (https://github.com/EsotericSoftware/kryo/issues/128) is only in the SnappyData layer PooledKryoSerializer implementation; classes providing maximum benefit have added KryoSerializable notably Accumulators and *Metrics - use closureSerializer for Netty messaging too instead of fixed JavaSerializer - set ordering field with kryo serialization in GenerateOrdering - fixing scalastyle and compile errors --- build.gradle | 1 - .../scala/org/apache/spark/SparkEnv.scala | 58 +++++++--- .../apache/spark/executor/InputMetrics.scala | 20 +++- .../apache/spark/executor/OutputMetrics.scala | 20 +++- .../spark/executor/ShuffleReadMetrics.scala | 30 ++++- .../spark/executor/ShuffleWriteMetrics.scala | 22 +++- .../apache/spark/executor/TaskMetrics.scala | 43 ++++++- .../netty/NettyBlockTransferService.scala | 5 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 5 +- .../spark/rdd/ZippedPartitionsRDD.scala | 30 ++++- .../org/apache/spark/rpc/RpcEndpointRef.scala | 6 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 47 +++++--- .../apache/spark/scheduler/ResultTask.scala | 31 ++++- .../spark/scheduler/ShuffleMapTask.scala | 21 +++- .../org/apache/spark/scheduler/Task.scala | 84 +++++++++++--- .../cluster/CoarseGrainedClusterMessage.scala | 28 ++++- .../org/apache/spark/storage/BlockId.scala | 18 ++- .../spark/storage/BlockManagerMessages.scala | 55 ++++++++- .../org/apache/spark/ui/JettyUtils.scala | 17 --- .../scala/org/apache/spark/ui/UIUtils.scala | 4 +- .../org/apache/spark/util/AccumulatorV2.scala | 108 +++++++++++++++++- .../spark/util/SerializableBuffer.scala | 23 +++- .../apache/spark/util/collection/BitSet.scala | 32 +++++- .../apache/spark/executor/ExecutorSuite.scala | 4 +- .../org/apache/spark/scheduler/FakeTask.scala | 2 +- .../codegen/GenerateOrdering.scala | 5 +- .../codegen/GenerateSafeProjection.scala | 25 ++-- .../codegen/GenerateUnsafeProjection.scala | 25 ++-- .../sql/execution/metric/SQLMetrics.scala | 22 +++- 29 files changed, 637 insertions(+), 154 deletions(-) diff --git a/build.gradle b/build.gradle index 7662da5ca942e..75335dbb8a576 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,6 @@ allprojects { group = 'io.snappydata' version = snappySparkVersion - productName = productName ext { vendorName = 'TIBCO Software Inc.' diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index abac62e5b0c64..36d23fe589b2d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -170,6 +170,43 @@ object SparkEnv extends Logging { env } + // Create an instance of the class with the given name, possibly initializing it with our conf + def instantiateClass[T](className: String, conf: SparkConf, + isDriver: Boolean): T = { + val cls = Utils.classForName(className) + // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just + // SparkConf, then one taking no arguments + try { + cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) + .newInstance(conf, new java.lang.Boolean(isDriver)) + .asInstanceOf[T] + } catch { + case _: NoSuchMethodException => + try { + cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] + } catch { + case _: NoSuchMethodException => + cls.getConstructor().newInstance().asInstanceOf[T] + } + } + } + + def getClosureSerializer(conf: SparkConf, doLog: Boolean = false): Serializer = { + val defaultClosureSerializerClass = classOf[JavaSerializer].getName + val closureSerializerClass = conf.get("spark.closure.serializer", + defaultClosureSerializerClass) + val closureSerializer = instantiateClass[Serializer]( + closureSerializerClass, conf, isDriver = false) + if (doLog) { + if (closureSerializerClass != defaultClosureSerializerClass) { + logInfo(s"Using non-default closure serializer: $closureSerializerClass") + } else { + logDebug(s"Using closure serializer: $closureSerializerClass") + } + } + closureSerializer + } + /** * Create a SparkEnv for the driver. */ @@ -272,26 +309,9 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } - // Create an instance of the class with the given name, possibly initializing it with our conf def instantiateClass[T](className: String): T = { - val cls = Utils.classForName(className) - // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just - // SparkConf, then one taking no arguments - try { - cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) - .newInstance(conf, new java.lang.Boolean(isDriver)) - .asInstanceOf[T] - } catch { - case _: NoSuchMethodException => - try { - cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] - } catch { - case _: NoSuchMethodException => - cls.getConstructor().newInstance().asInstanceOf[T] - } - } + SparkEnv.instantiateClass(className, conf, isDriver) } - // Create an instance of the class named by the given SparkConf property, or defaultClassName // if the property is not set, possibly initializing it with our conf def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { @@ -304,7 +324,7 @@ object SparkEnv extends Logging { val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) - val closureSerializer = new JavaSerializer(conf) + val closureSerializer = getClosureSerializer(conf, doLog = true) def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 3d15f3a0396e1..2029f8d3973e6 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,6 +17,10 @@ package org.apache.spark.executor +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.TaskContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.LongAccumulator @@ -39,7 +43,7 @@ object DataReadMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about reading data from external systems. */ @DeveloperApi -class InputMetrics private[spark] () extends Serializable { +class InputMetrics private[spark] () extends Serializable with KryoSerializable { private[executor] val _bytesRead = new LongAccumulator private[executor] val _recordsRead = new LongAccumulator @@ -56,4 +60,18 @@ class InputMetrics private[spark] () extends Serializable { private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) + + override def write(kryo: Kryo, output: Output): Unit = { + _bytesRead.write(kryo, output) + _recordsRead.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + read(kryo, input, context = null) + } + + final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = { + _bytesRead.read(kryo, input, context) + _recordsRead.read(kryo, input, context) + } } diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index dada9697c1cf9..114be5e880690 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,6 +17,10 @@ package org.apache.spark.executor +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.TaskContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.LongAccumulator @@ -38,7 +42,7 @@ object DataWriteMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about writing data to external systems. */ @DeveloperApi -class OutputMetrics private[spark] () extends Serializable { +class OutputMetrics private[spark] () extends Serializable with KryoSerializable { private[executor] val _bytesWritten = new LongAccumulator private[executor] val _recordsWritten = new LongAccumulator @@ -54,4 +58,18 @@ class OutputMetrics private[spark] () extends Serializable { private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) + + override def write(kryo: Kryo, output: Output): Unit = { + _bytesWritten.write(kryo, output) + _recordsWritten.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + read(kryo, input, context = null) + } + + final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = { + _bytesWritten.read(kryo, input, context) + _recordsWritten.read(kryo, input, context) + } } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 5a435f2f9a1ba..ee5b38695a941 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,6 +17,10 @@ package org.apache.spark.executor +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.TaskContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.{DoubleAccumulator, LongAccumulator} @@ -27,7 +31,7 @@ import org.apache.spark.util.{DoubleAccumulator, LongAccumulator} * Operations are not thread-safe. */ @DeveloperApi -class ShuffleReadMetrics private[spark] () extends Serializable { +class ShuffleReadMetrics private[spark] () extends Serializable with KryoSerializable { private[executor] val _remoteBlocksFetched = new LongAccumulator private[executor] val _localBlocksFetched = new LongAccumulator private[executor] val _remoteBytesRead = new LongAccumulator @@ -121,6 +125,30 @@ class ShuffleReadMetrics private[spark] () extends Serializable { _recordsRead.add(metric.recordsRead) } } + + override def write(kryo: Kryo, output: Output): Unit = { + _remoteBlocksFetched.write(kryo, output) + _localBlocksFetched.write(kryo, output) + _remoteBytesRead.write(kryo, output) + _remoteBytesReadToDisk.write(kryo, output) + _localBytesRead.write(kryo, output) + _fetchWaitTime.write(kryo, output) + _recordsRead.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + read(kryo, input, context = null) + } + + final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = { + _remoteBlocksFetched.read(kryo, input, context) + _localBlocksFetched.read(kryo, input, context) + _remoteBytesRead.read(kryo, input, context) + _remoteBytesReadToDisk.read(kryo, input, context) + _localBytesRead.read(kryo, input, context) + _fetchWaitTime.read(kryo, input, context) + _recordsRead.read(kryo, input, context) + } } /** diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index ada2e1bc08593..2ba2e62777ac4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,6 +17,10 @@ package org.apache.spark.executor +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.TaskContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.LongAccumulator @@ -27,7 +31,7 @@ import org.apache.spark.util.LongAccumulator * Operations are not thread-safe. */ @DeveloperApi -class ShuffleWriteMetrics private[spark] () extends Serializable { +class ShuffleWriteMetrics private[spark] () extends Serializable with KryoSerializable { private[executor] val _bytesWritten = new LongAccumulator private[executor] val _recordsWritten = new LongAccumulator private[executor] val _writeTime = new LongAccumulator @@ -57,6 +61,22 @@ class ShuffleWriteMetrics private[spark] () extends Serializable { _recordsWritten.setValue(recordsWritten - v) } + override def write(kryo: Kryo, output: Output): Unit = { + _bytesWritten.write(kryo, output) + _recordsWritten.write(kryo, output) + _writeTime.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + read(kryo, input, context = null) + } + + final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = { + _bytesWritten.read(kryo, input, context) + _recordsWritten.read(kryo, input, context) + _writeTime.read(kryo, input, context) + } + // Legacy methods for backward compatibility. // TODO: remove these once we make this class private. @deprecated("use bytesWritten instead", "2.0.0") diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index e798e83af229c..bf73754554077 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -20,6 +20,9 @@ package org.apache.spark.executor import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging @@ -42,7 +45,7 @@ import org.apache.spark.util._ * be sent to the driver. */ @DeveloperApi -class TaskMetrics private[spark] () extends Serializable { +class TaskMetrics private[spark] () extends Serializable with KryoSerializable { // Each metric is internally represented as an accumulator private val _executorDeserializeTime = new DoubleAccumulator private val _executorDeserializeCpuTime = new DoubleAccumulator @@ -263,6 +266,44 @@ class TaskMetrics private[spark] () extends Serializable { // value will be updated at driver side. internalAccums.filter(a => !a.isZero || a == _resultSize) } + + override def write(kryo: Kryo, output: Output): Unit = { + _executorDeserializeTime.write(kryo, output) + _executorDeserializeCpuTime.write(kryo, output) + _executorRunTime.write(kryo, output) + _executorCpuTime.write(kryo, output) + _resultSize.write(kryo, output) + _jvmGCTime.write(kryo, output) + _resultSerializationTime.write(kryo, output) + _memoryBytesSpilled.write(kryo, output) + _diskBytesSpilled.write(kryo, output) + _peakExecutionMemory.write(kryo, output) + _updatedBlockStatuses.write(kryo, output) + inputMetrics.write(kryo, output) + outputMetrics.write(kryo, output) + shuffleReadMetrics.write(kryo, output) + shuffleWriteMetrics.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + // read the TaskContext thread-local once + val taskContext = TaskContext.get() + _executorDeserializeTime.read(kryo, input, taskContext) + _executorDeserializeCpuTime.read(kryo, input, taskContext) + _executorRunTime.read(kryo, input, taskContext) + _executorCpuTime.read(kryo, input, taskContext) + _resultSize.read(kryo, input, taskContext) + _jvmGCTime.read(kryo, input, taskContext) + _resultSerializationTime.read(kryo, input, taskContext) + _memoryBytesSpilled.read(kryo, input, taskContext) + _diskBytesSpilled.read(kryo, input, taskContext) + _peakExecutionMemory.read(kryo, input, taskContext) + _updatedBlockStatuses.read(kryo, input, taskContext) + inputMetrics.read(kryo, input, taskContext) + outputMetrics.read(kryo, input, taskContext) + shuffleReadMetrics.read(kryo, input, taskContext) + shuffleWriteMetrics.read(kryo, input, taskContext) + } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index dc55685b1e7bd..db0034d5fad2c 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import com.codahale.metrics.{Metric, MetricSet} -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} import org.apache.spark.internal.config import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -36,7 +36,6 @@ import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher} import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils -import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.Utils @@ -53,7 +52,7 @@ private[spark] class NettyBlockTransferService( extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. - private val serializer = new JavaSerializer(conf) + private val serializer = SparkEnv.getClosureSerializer(conf) private val authEnabled = securityManager.isAuthenticationEnabled() private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 31c6bc1666ea2..454c03f40e2ad 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -147,7 +147,8 @@ abstract class RDD[T: ClassTag]( def sparkContext: SparkContext = sc /** A unique ID for this RDD (within its SparkContext). */ - val id: Int = sc.newRddId() + protected var _id: Int = sc.newRddId() + def id: Int = _id /** A friendly name for this RDD */ @transient var name: String = _ @@ -1728,7 +1729,7 @@ abstract class RDD[T: ClassTag]( // Other internal methods and fields // ======================================================================= - private var storageLevel: StorageLevel = StorageLevel.NONE + protected var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = sc.getCallSite() diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 3cb1231bd3477..7d4e5595fe860 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -21,16 +21,19 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} import org.apache.spark.util.Utils private[spark] class ZippedPartitionsPartition( - idx: Int, + private var idx: Int, @transient private val rdds: Seq[RDD[_]], @transient val preferredLocations: Seq[String]) - extends Partition { + extends Partition with KryoSerializable { - override val index: Int = idx + override def index: Int = idx var partitionValues = rdds.map(rdd => rdd.partitions(idx)) def partitions: Seq[Partition] = partitionValues @@ -40,6 +43,27 @@ private[spark] class ZippedPartitionsPartition( partitionValues = rdds.map(rdd => rdd.partitions(idx)) oos.defaultWriteObject() } + + override def write(kryo: Kryo, output: Output): Unit = { + // Update the reference to parent split at the time of task serialization + partitionValues = rdds.map(rdd => rdd.partitions(idx)) + output.writeVarInt(idx, true) + output.writeVarInt(partitionValues.length, true) + for (p <- partitionValues) { + kryo.writeClassAndObject(output, p) + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + idx = input.readVarInt(true) + var numPartitions = input.readVarInt(true) + val partitionBuilder = Seq.newBuilder[Partition] + while (numPartitions > 0) { + partitionBuilder += kryo.readClassAndObject(input).asInstanceOf[Partition] + numPartitions -= 1 + } + partitionValues = partitionBuilder.result() + } } private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 4d39f144dd198..99215fd316d9d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -30,9 +30,9 @@ import org.apache.spark.util.RpcUtils private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { - private[this] val maxRetries = RpcUtils.numRetries(conf) - private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) - private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) + protected var maxRetries: Int = RpcUtils.numRetries(conf) + protected var retryWaitMs: Long = RpcUtils.retryWaitMs(conf) + protected var defaultAskTimeout: RpcTimeout = RpcUtils.askRpcTimeout(conf) /** * return the address for the [[RpcEndpointRef]] diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 47576959322d1..ee9dd6b1d4f77 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -29,7 +29,10 @@ import scala.reflect.ClassTag import scala.util.{DynamicVariable, Failure, Success, Try} import scala.util.control.NonFatal -import org.apache.spark.{SecurityManager, SparkConf} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.network.TransportContext import org.apache.spark.network.client._ @@ -37,12 +40,12 @@ import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server._ import org.apache.spark.rpc._ -import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance, SerializationStream} -import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, ThreadUtils, Utils} +import org.apache.spark.serializer.{SerializationStream, Serializer, SerializerInstance} +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, RpcUtils, ThreadUtils, Utils} private[netty] class NettyRpcEnv( val conf: SparkConf, - javaSerializerInstance: JavaSerializerInstance, + serializer: Serializer, host: String, securityManager: SecurityManager, numUsableCores: Int) extends RpcEnv(conf) with Logging { @@ -52,6 +55,10 @@ private[netty] class NettyRpcEnv( "rpc", conf.getInt("spark.rpc.io.threads", numUsableCores)) + private val serializerInstance = new ThreadLocal[SerializerInstance] { + override def initialValue(): SerializerInstance = serializer.newInstance() + } + private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) private val streamManager = new NettyStreamManager(this) @@ -255,20 +262,20 @@ private[netty] class NettyRpcEnv( } private[netty] def serialize(content: Any): ByteBuffer = { - javaSerializerInstance.serialize(content) + serializerInstance.get().serialize(content) } /** * Returns [[SerializationStream]] that forwards the serialized bytes to `out`. */ private[netty] def serializeStream(out: OutputStream): SerializationStream = { - javaSerializerInstance.serializeStream(out) + serializerInstance.get().serializeStream(out) } private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = { NettyRpcEnv.currentClient.withValue(client) { deserialize { () => - javaSerializerInstance.deserialize[T](bytes) + serializerInstance.get().deserialize[T](bytes) } } } @@ -453,12 +460,9 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf - // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support - // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance - val javaSerializerInstance = - new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] + val serializer = SparkEnv.getClosureSerializer(sparkConf) val nettyEnv = - new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, + new NettyRpcEnv(sparkConf, serializer, config.advertiseAddress, config.securityManager, config.numUsableCores) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => @@ -499,8 +503,9 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { */ private[netty] class NettyRpcEndpointRef( @transient private val conf: SparkConf, - private val endpointAddress: RpcEndpointAddress, - @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) { + private var endpointAddress: RpcEndpointAddress, + @transient @volatile private var nettyEnv: NettyRpcEnv) + extends RpcEndpointRef(conf) with KryoSerializable { @transient @volatile var client: TransportClient = _ @@ -517,6 +522,20 @@ private[netty] class NettyRpcEndpointRef( out.defaultWriteObject() } + override def read(kryo: Kryo, input: Input): Unit = { + endpointAddress = kryo.readObject(input, classOf[RpcEndpointAddress]) + nettyEnv = NettyRpcEnv.currentEnv.value + client = NettyRpcEnv.currentClient.value + + maxRetries = RpcUtils.numRetries(nettyEnv.conf) + retryWaitMs = RpcUtils.retryWaitMs(nettyEnv.conf) + defaultAskTimeout = RpcUtils.askRpcTimeout(nettyEnv.conf) + } + + override def write(kryo: Kryo, output: Output): Unit = { + kryo.writeObject(output, endpointAddress) + } + override def name: String = endpointAddress.name override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 5b4070560eae0..e6eb15fe90a9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -22,6 +22,9 @@ import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.Properties +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD @@ -38,8 +41,8 @@ import org.apache.spark.rdd.RDD * (RDD[T], (TaskContext, Iterator[T]) => U). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling - * @param outputId index of the task in this job (a job can launch tasks on only a subset of the - * input RDD's partitions). + * @param _outputId index of the task in this job (a job can launch tasks on only a subset of the + * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side * and sent to executor side. @@ -54,10 +57,10 @@ import org.apache.spark.rdd.RDD private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, - taskBinary: Broadcast[Array[Byte]], - partition: Partition, + private var taskBinary: Broadcast[Array[Byte]], + private var partition: Partition, locs: Seq[TaskLocation], - val outputId: Int, + private var _outputId: Int, localProperties: Properties, serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, @@ -66,7 +69,9 @@ private[spark] class ResultTask[T, U]( isBarrier: Boolean = false) extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) - with Serializable { + with Serializable with KryoSerializable { + + final def outputId: Int = _outputId @transient private[this] val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.toSet.toSeq @@ -94,4 +99,18 @@ private[spark] class ResultTask[T, U]( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" + + override def write(kryo: Kryo, output: Output): Unit = { + super.writeKryo(kryo, output) + kryo.writeClassAndObject(output, taskBinary) + kryo.writeClassAndObject(output, partition) + output.writeInt(_outputId) + } + + override def read(kryo: Kryo, input: Input): Unit = { + super.readKryo(kryo, input) + taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + partition = kryo.readClassAndObject(input).asInstanceOf[Partition] + _outputId = input.readInt() + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4d261789b81bb..9de32ddaac980 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -23,6 +23,9 @@ import java.util.Properties import scala.language.existentials +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging @@ -55,8 +58,8 @@ import org.apache.spark.shuffle.ShuffleWriter private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, - taskBinary: Broadcast[Array[Byte]], - partition: Partition, + private var taskBinary: Broadcast[Array[Byte]], + private var partition: Partition, @transient private var locs: Seq[TaskLocation], localProperties: Properties, serializedTaskMetrics: Array[Byte], @@ -66,7 +69,7 @@ private[spark] class ShuffleMapTask( isBarrier: Boolean = false) extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) - with Logging { + with KryoSerializable with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { @@ -115,4 +118,16 @@ private[spark] class ShuffleMapTask( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) + + override def write(kryo: Kryo, output: Output): Unit = { + super.writeKryo(kryo, output) + kryo.writeClassAndObject(output, taskBinary) + kryo.writeClassAndObject(output, partition) + } + + override def read(kryo: Kryo, input: Input): Unit = { + super.readKryo(kryo, input) + taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + partition = kryo.readClassAndObject(input).asInstanceOf[Partition] + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 4c251fb5f9bf8..b6b6a1890aadf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -20,6 +20,9 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.Properties +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config.APP_CALLER_CONTEXT @@ -39,36 +42,50 @@ import org.apache.spark.util._ * and sends the task output back to the driver application. A ShuffleMapTask executes the task * and divides the task output to multiple buckets (based on the task's partitioner). * - * @param stageId id of the stage this task belongs to - * @param stageAttemptId attempt id of the stage this task belongs to - * @param partitionId index of the number in the RDD + * @param _stageId id of the stage this task belongs to + * @param _stageAttemptId attempt id of the stage this task belongs to + * @param _partitionId index of the number in the RDD * @param localProperties copy of thread-local properties set by the user on the driver side. * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side * and sent to executor side. * * The parameters below are optional: - * @param jobId id of the job this task belongs to - * @param appId id of the app this task belongs to - * @param appAttemptId attempt id of the app this task belongs to - * @param isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks - * at the same time for a barrier stage. + * @param _jobId id of the job this task belongs to + * @param _appId id of the app this task belongs to + * @param _appAttemptId attempt id of the app this task belongs to + * @param _isBarrier whether this task belongs to a barrier stage. Spark must launch all the tasks + * at the same time for a barrier stage. */ private[spark] abstract class Task[T]( - val stageId: Int, - val stageAttemptId: Int, - val partitionId: Int, + private var _stageId: Int, + private var _stageAttemptId: Int, + private var _partitionId: Int, @transient var localProperties: Properties = new Properties, // The default value is only used in tests. - serializedTaskMetrics: Array[Byte] = + private var serializedTaskMetrics: Array[Byte] = SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), - val jobId: Option[Int] = None, - val appId: Option[String] = None, - val appAttemptId: Option[String] = None, - val isBarrier: Boolean = false) extends Serializable { + private var _jobId: Option[Int] = None, + private var _appId: Option[String] = None, + private var _appAttemptId: Option[String] = None, + private var _isBarrier: Boolean = false) extends Serializable { @transient lazy val metrics: TaskMetrics = SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics)) + final def stageId: Int = _stageId + + final def stageAttemptId: Int = _stageAttemptId + + final def partitionId: Int = _partitionId + + final def jobId: Option[Int] = _jobId + + final def appId: Option[String] = _appId + + final def appAttemptId: Option[String] = _appAttemptId + + final def isBarrier: Boolean = _isBarrier + /** * Called by [[org.apache.spark.executor.Executor]] to run this task. * @@ -162,7 +179,7 @@ private[spark] abstract class Task[T]( } } - private var taskMemoryManager: TaskMemoryManager = _ + @transient private var taskMemoryManager: TaskMemoryManager = _ def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = { this.taskMemoryManager = taskMemoryManager @@ -231,4 +248,37 @@ private[spark] abstract class Task[T]( taskThread.interrupt() } } + + protected final def writeKryo(kryo: Kryo, output: Output): Unit = { + output.writeInt(_stageId) + output.writeVarInt(_stageAttemptId, true) + output.writeVarInt(_partitionId, true) + output.writeVarInt(serializedTaskMetrics.length, true) + output.write(serializedTaskMetrics) + output.writeVarInt(if (_jobId.isDefined) _jobId.get else -1, true) + output.writeString(if (_appId.isDefined) _appId.get else null) + output.writeString(if (_appAttemptId.isDefined) _appAttemptId.get else null) + output.writeBoolean(_isBarrier) + output.writeLong(epoch) + output.writeLong(_executorDeserializeTime) + output.writeLong(_executorDeserializeCpuTime) + } + + protected final def readKryo(kryo: Kryo, input: Input): Unit = { + _stageId = input.readInt() + _stageAttemptId = input.readVarInt(true) + _partitionId = input.readVarInt(true) + val len = input.readVarInt(true) + serializedTaskMetrics = input.readBytes(len) + _jobId = input.readVarInt(true) match { + case -1 => None + case v => Some(v) + } + _appId = Option(input.readString()) + _appAttemptId = Option(input.readString()) + _isBarrier = input.readBoolean() + epoch = input.readLong() + _executorDeserializeTime = input.readLong() + _executorDeserializeCpuTime = input.readLong() + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index e8b7fc0ef100a..fc5115c57fb03 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,10 +19,13 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason -import org.apache.spark.util.SerializableBuffer +import org.apache.spark.util.{SerializableBuffer, Utils} private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable @@ -66,8 +69,27 @@ private[spark] object CoarseGrainedClusterMessages { logUrls: Map[String, String]) extends CoarseGrainedClusterMessage - case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, - data: SerializableBuffer) extends CoarseGrainedClusterMessage + case class StatusUpdate(var executorId: String, var taskId: Long, + var state: TaskState, var data: SerializableBuffer) + extends CoarseGrainedClusterMessage with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeString(executorId) + output.writeLong(taskId) + output.writeVarInt(state.id, true) + val buffer = data.buffer + output.writeInt(buffer.remaining()) + Utils.writeByteBuffer(buffer, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + executorId = input.readString() + taskId = input.readLong() + state = org.apache.spark.TaskState(input.readVarInt(true)) + val len = input.readInt() + data = new SerializableBuffer(ByteBuffer.wrap(input.readBytes(len))) + } + } object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 7ac2c71c18eb3..3f0a0976c252c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -19,6 +19,9 @@ package org.apache.spark.storage import java.util.UUID +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi @@ -45,8 +48,19 @@ sealed abstract class BlockId { } @DeveloperApi -case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { - override def name: String = "rdd_" + rddId + "_" + splitIndex +case class RDDBlockId(var rddId: Int, var splitIndex: Int) + extends BlockId with KryoSerializable { + @transient override lazy val name: String = "rdd_" + rddId + "_" + splitIndex + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeInt(rddId) + output.writeVarInt(splitIndex, true) + } + + override def read(kryo: Kryo, input: Input): Unit = { + rddId = input.readInt() + splitIndex = input.readVarInt(true) + } } // Format of the shuffle block ids (including data and index) should be kept in sync with diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 1bbe7a5b39509..b6f45c4f894c6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -19,6 +19,9 @@ package org.apache.spark.storage import java.io.{Externalizable, ObjectInput, ObjectOutput} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -30,21 +33,63 @@ private[spark] object BlockManagerMessages { // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. - case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave + case class RemoveBlock(private var blockId: BlockId) extends ToBlockManagerSlave + with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeString(blockId.name) + } + + override def read(kryo: Kryo, input: Input): Unit = { + blockId = BlockId(input.readString()) + } + } // Replicate blocks that were lost due to executor failure case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. - case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave + case class RemoveRdd(private var rddId: Int) extends ToBlockManagerSlave + with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeInt(rddId) + } + + override def read(kryo: Kryo, input: Input): Unit = { + rddId = input.readInt() + } + } // Remove all blocks belonging to a specific shuffle. - case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave + case class RemoveShuffle(private var shuffleId: Int) extends ToBlockManagerSlave + with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeInt(shuffleId) + } + + override def read(kryo: Kryo, input: Input): Unit = { + shuffleId = input.readInt() + } + } // Remove all blocks belonging to a specific broadcast. - case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) - extends ToBlockManagerSlave + case class RemoveBroadcast(private var broadcastId: Long, + private var removeFromDriver: Boolean = true) + extends ToBlockManagerSlave with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + output.writeLong(broadcastId) + output.writeBoolean(removeFromDriver) + } + + override def read(kryo: Kryo, input: Input): Unit = { + broadcastId = input.readLong() + removeFromDriver = input.readBoolean() + } + } /** * Driver to Executor message to trigger a thread dump. diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 0924cddc72492..e429de9293bd9 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -13,23 +13,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - * Changes for SnappyData data platform. - * - * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. */ package org.apache.spark.ui diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index d1c4f214db599..7f41bd62043d9 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -23,7 +23,6 @@ import java.util.{Date, Locale, TimeZone} import javax.servlet.http.HttpServletRequest import javax.ws.rs.core.{MediaType, Response} -import scala.collection.mutable.HashMap import scala.util.control.NonFatal import scala.xml._ import scala.xml.transform.{RewriteRule, RuleTransformer} @@ -192,7 +191,8 @@ private[spark] object UIUtils extends Logging { def commonHeaderNodesSnappy(request: HttpServletRequest): Seq[Node] = { + href={prependBaseUri(request, "/static/snappydata/snappy-dashboard.css")} + type="text/css"/> diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index bf618b4afbce0..8cd961a98fdd5 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -23,6 +23,9 @@ import java.util.{ArrayList, Collections} import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo @@ -42,7 +45,7 @@ private[spark] case class AccumulatorMetadata( */ abstract class AccumulatorV2[IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true + private[spark] var atDriverSide = true private[spark] def register( sc: SparkContext, @@ -210,6 +213,63 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { } } +abstract class AccumulatorV2Kryo[IN, OUT] + extends AccumulatorV2[IN, OUT] with KryoSerializable { + + /** + * Child classes cannot override this and must instead implement + * writeKryo/readKryo for consistent writeReplace() behavior. + */ + override final def write(kryo: Kryo, output: Output): Unit = { + var instance = this + if (atDriverSide) { + instance = copyAndReset().asInstanceOf[AccumulatorV2Kryo[IN, OUT]] + assert(instance.isZero, "copyAndReset must return a zero value copy") + instance.metadata = this.metadata + } + val metadata = instance.metadata + output.writeLong(metadata.id) + metadata.name match { + case None => output.writeString(null) + case Some(name) => output.writeString(name) + } + output.writeBoolean(metadata.countFailedValues) + output.writeBoolean(instance.atDriverSide) + + instance.writeKryo(kryo, output) + } + + /** + * Child classes must implement readKryo() and cannot override this. + */ + override final def read(kryo: Kryo, input: Input): Unit = { + read(kryo, input, context = null) + } + + final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = { + val id = input.readLong() + val name = input.readString() + metadata = AccumulatorMetadata(id, Option(name), input.readBoolean()) + atDriverSide = input.readBoolean() + if (atDriverSide) { + atDriverSide = false + // Automatically register the accumulator when it is deserialized with the task closure. + // This is for external accumulators and internal ones that do not represent task level + // metrics, e.g. internal SQL metrics, which are per-operator. + val taskContext = if (context != null) context else TaskContext.get() + if (taskContext != null) { + taskContext.registerAccumulator(this) + } + } else { + atDriverSide = true + } + + readKryo(kryo, input) + } + + def writeKryo(kryo: Kryo, output: Output): Unit + def readKryo(kryo: Kryo, input: Input): Unit +} /** * An internal class used to track accumulators by Spark itself. @@ -291,7 +351,8 @@ private[spark] object AccumulatorContext extends Logging { * * @since 2.0.0 */ -class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { +class LongAccumulator extends AccumulatorV2Kryo[jl.Long, jl.Long] + with KryoSerializable { private var _sum = 0L private var _count = 0L @@ -362,6 +423,16 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { private[spark] def setValue(newValue: Long): Unit = _sum = newValue override def value: jl.Long = _sum + + override def writeKryo(kryo: Kryo, output: Output): Unit = { + output.writeLong(_sum) + output.writeLong(_count) + } + + override def readKryo(kryo: Kryo, input: Input): Unit = { + _sum = input.readLong() + _count = input.readLong() + } } @@ -371,7 +442,8 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { * * @since 2.0.0 */ -class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { +class DoubleAccumulator extends AccumulatorV2Kryo[jl.Double, jl.Double] + with KryoSerializable { private var _sum = 0.0 private var _count = 0L @@ -440,6 +512,16 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { private[spark] def setValue(newValue: Double): Unit = _sum = newValue override def value: jl.Double = _sum + + override def writeKryo(kryo: Kryo, output: Output): Unit = { + output.writeDouble(_sum) + output.writeVarLong(_count, true) + } + + override def readKryo(kryo: Kryo, input: Input): Unit = { + _sum = input.readDouble() + _count = input.readVarLong(true) + } } @@ -448,7 +530,8 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { * * @since 2.0.0 */ -class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { +class CollectionAccumulator[T] extends AccumulatorV2Kryo[T, java.util.List[T]] + with KryoSerializable { private val _list: java.util.List[T] = Collections.synchronizedList(new ArrayList[T]()) /** @@ -484,6 +567,23 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { _list.clear() _list.addAll(newValue) } + + override def writeKryo(kryo: Kryo, output: Output): Unit = { + output.writeVarInt(_list.size(), true) + val iter = _list.iterator() + while (iter.hasNext) { + kryo.writeClassAndObject(output, iter.next()) + } + } + + override def readKryo(kryo: Kryo, input: Input): Unit = { + var len = input.readVarInt(true) + if (!_list.isEmpty) _list.clear() + while (len > 0) { + _list.add(kryo.readClassAndObject(input).asInstanceOf[T]) + len -= 1 + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala index a06b6f84ef11b..5b27fe5cdc6eb 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala @@ -21,12 +21,17 @@ import java.io.{EOFException, IOException, ObjectInputStream, ObjectOutputStream import java.nio.ByteBuffer import java.nio.channels.Channels +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + /** * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make * it easier to pass ByteBuffers in case class messages. */ private[spark] -class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { +class SerializableBuffer(@transient var buffer: ByteBuffer) + extends Serializable with KryoSerializable { + def value: ByteBuffer = buffer private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { @@ -51,4 +56,20 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable } buffer.rewind() // Allow us to write it again later } + + override def write(kryo: Kryo, output: Output) { + if (buffer.position() != 0) { + throw new IOException(s"Unexpected buffer position ${buffer.position()}") + } + output.writeInt(buffer.limit()) + output.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.limit()) + } + + override def read(kryo: Kryo, input: Input) { + val length = input.readInt() + val b = new Array[Byte](length) + input.readBytes(b) + buffer = ByteBuffer.wrap(b) + buffer.rewind() // Allow us to read it later + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index e63e0e3e1f68f..953699fe37b7a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -19,14 +19,17 @@ package org.apache.spark.util.collection import java.util.Arrays +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + /** * A simple, fixed-size bit set implementation. This implementation is fast because it avoids * safety/bound checking. */ -class BitSet(numBits: Int) extends Serializable { +class BitSet(numBits: Int) extends Serializable with KryoSerializable { - private val words = new Array[Long](bit2words(numBits)) - private val numWords = words.length + private var words = new Array[Long](bit2words(numBits)) + private var numWords = words.length /** * Compute the capacity (number of bits) that can be represented @@ -238,4 +241,27 @@ class BitSet(numBits: Int) extends Serializable { /** Return the number of longs it would take to hold numBits. */ private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1 + + override def write(kryo: Kryo, output: Output): Unit = { + val words = this.words + val numWords = this.numWords + output.writeVarInt(numWords, true) + var i = 0 + while (i < numWords) { + output.writeLong(words(i)) + i += 1 + } + } + + override def read(kryo: Kryo, input: Input): Unit = { + val numWords = input.readVarInt(true) + val words = new Array[Long](numWords) + var i = 0 + while (i < numWords) { + words(i) = input.readLong() + i += 1 + } + this.words = words + this.numWords = numWords + } } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 77a7668d3a1d1..dbb37f6e619df 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -149,7 +149,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug taskBinary = taskBinary, partition = secondRDD.partitions(0), locs = Seq(), - outputId = 0, + _outputId = 0, localProperties = new Properties(), serializedTaskMetrics = serializedTaskMetrics ) @@ -224,7 +224,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug taskBinary = taskBinary, partition = secondRDD.partitions(0), locs = Seq(), - outputId = 0, + _outputId = 0, localProperties = new Properties(), serializedTaskMetrics = serializedTaskMetrics ) diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 2f517dd7391c9..1bd86f824d105 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -30,7 +30,7 @@ class FakeTask( SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), isBarrier: Boolean = false) extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics, - isBarrier = isBarrier) { + _isBarrier = isBarrier) { override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 9a51be6ed5aeb..0e3a11308406c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -197,7 +197,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR /** * A lazily generated row ordering comparator. */ -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) +class LazilyGeneratedOrdering(private var ordering: Seq[SortOrder]) extends Ordering[InternalRow] with KryoSerializable { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = @@ -220,7 +220,8 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) } override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { - generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]])) + ordering = kryo.readObject(in, classOf[Array[SortOrder]]).toSeq + generatedOrdering = GenerateOrdering.generate(ordering) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index 9fb6997c3f0e3..63a805a0e6b18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -53,26 +53,17 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] val values = ctx.freshName("values") val rowClass = classOf[GenericInternalRow].getName - - val isHomogenousStruct = { - var i = 1 - val ref = CodeGenerator.javaType(schema.fields(0).dataType) - var broken = !CodeGenerator.isPrimitiveType(ref) || schema.length <= 1 - while (!broken && i < schema.length) { - if (CodeGenerator.javaType(schema.fields(i).dataType) != ref) { - broken = true - } - i += 1 - } - !broken - } - val allFields = if (isHomogenousStruct) { + var ref: DataType = null + val isHomogeneousStruct = if (schema.length > 0) { + ref = schema.fields(0).dataType + !schema.tail.exists(_.dataType != ref) + } else false + val allFields = if (isHomogeneousStruct) { val counter = ctx.freshName("counter") - val dt = schema.fields(0).dataType val converter = convertToSafe( ctx, - JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), dt), - dt) + JavaCode.expression(CodeGenerator.getValue(tmpInput, ref, counter), ref), + ref) s""" for (int $counter = 0; $counter < ${schema.length}; ++$counter) { if (!$tmpInput.isNullAt($counter)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 4946489206fe8..851a09948f239 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -67,22 +67,15 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro v => s"$v = new $rowWriterClass($rowWriter, ${fieldEvals.length});") val previousCursor = ctx.freshName("previousCursor") - val isHomogenousStruct = { - var i = 1 - val ref = CodeGenerator.javaType(t.fields(0).dataType) - var broken = !CodeGenerator.isPrimitiveType(ref) || t.length <=1 - while (!broken && i < t.length) { - if (CodeGenerator.javaType(t.fields(i).dataType) != ref) { - broken = true - } - i +=1 - } - !broken - } - if (isHomogenousStruct) { + var ref: DataType = null + val isHomogeneousStruct = if (schemas.nonEmpty) { + ref = schemas.head.dataType + CodeGenerator.isPrimitiveType(ref) && !schemas.tail.exists(_.dataType != ref) + } else false + if (isHomogeneousStruct) { val counter = ctx.freshName("counter") val rowWriterChild = ctx.freshName("rowWriterChild") - val dt = t.fields(0).dataType + val dt = schemas.head.dataType s""" |final InternalRow $tmpInput = $input; @@ -92,9 +85,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro | // Remember the current cursor so that we can calculate how many bytes are | // written later. | final int $previousCursor = $rowWriter.cursor(); - | $rowWriterClass $rowWriterChild = new $rowWriterClass($rowWriter, ${t.length}); + | $rowWriterClass $rowWriterChild = new $rowWriterClass($rowWriter, ${schemas.length}); | $rowWriterChild.reset(); - | for (int $counter = 0; $counter < ${t.length}; $counter++) { + | for (int $counter = 0; $counter < ${schemas.length}; $counter++) { | if ($tmpInput.isNullAt($index)) { | $rowWriterChild.setNullAt($index); | } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 075785c3865e2..c2b2ee593c870 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql.execution.metric import java.text.NumberFormat import java.util.Locale +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, AccumulatorV2Kryo, Utils} /** @@ -31,12 +34,13 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} * the executor side are automatically propagated and shown in the SQL UI through metrics. Updates * on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]]. */ -class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { +final class SQLMetric(var metricType: String, initValue: Long = 0L) + extends AccumulatorV2Kryo[Long, Long] with KryoSerializable { // This is a workaround for SPARK-11013. // We may use -1 as initial value of the accumulator, if the accumulator is valid, we will // update it at the end of task and the value will be at least 0. Then we can filter out the -1 // values before calculate max, min, etc. - private[this] var _value = initValue + private var _value = initValue private var _zeroValue = initValue override def copy(): SQLMetric = { @@ -77,6 +81,18 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato new AccumulableInfo( id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + + override def writeKryo(kryo: Kryo, output: Output): Unit = { + output.writeString(metricType) + output.writeLong(_value) + output.writeLong(_zeroValue) + } + + override def readKryo(kryo: Kryo, input: Input): Unit = { + metricType = input.readString() + _value = input.readLong() + _zeroValue = input.readLong() + } } object SQLMetrics {