Skip to content

Commit

Permalink
[SNAP-1136] Kryo closure serialtization support and optimizations (#27)
Browse files Browse the repository at this point in the history
- added back configurable closure serializer in Spark which was removed in SPARK-12414;
  some minor changes taken from closed Spark PR apache#6361
- added optimized Kryo serialization for multiple classes; currently registration and
  string sharing fix for kryo (EsotericSoftware/kryo#128) is
  only in the SnappyData layer PooledKryoSerializer implementation;
  classes providing maximum benefit have added KryoSerializable notably Accumulators and *Metrics
- use closureSerializer for Netty messaging too instead of fixed JavaSerializer
- set ordering field with kryo serialization in GenerateOrdering
- fixing scalastyle and compile errors
  • Loading branch information
Sumedh Wale authored and sumwale committed Jul 11, 2021
1 parent 440899f commit ded36fa
Show file tree
Hide file tree
Showing 29 changed files with 637 additions and 154 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ allprojects {

group = 'io.snappydata'
version = snappySparkVersion
productName = productName

ext {
vendorName = 'TIBCO Software Inc.'
Expand Down
58 changes: 39 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,43 @@ object SparkEnv extends Logging {
env
}

// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String, conf: SparkConf,
isDriver: Boolean): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}

def getClosureSerializer(conf: SparkConf, doLog: Boolean = false): Serializer = {
val defaultClosureSerializerClass = classOf[JavaSerializer].getName
val closureSerializerClass = conf.get("spark.closure.serializer",
defaultClosureSerializerClass)
val closureSerializer = instantiateClass[Serializer](
closureSerializerClass, conf, isDriver = false)
if (doLog) {
if (closureSerializerClass != defaultClosureSerializerClass) {
logInfo(s"Using non-default closure serializer: $closureSerializerClass")
} else {
logDebug(s"Using closure serializer: $closureSerializerClass")
}
}
closureSerializer
}

/**
* Create a SparkEnv for the driver.
*/
Expand Down Expand Up @@ -272,26 +309,9 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}

// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
SparkEnv.instantiateClass(className, conf, isDriver)
}

// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
Expand All @@ -304,7 +324,7 @@ object SparkEnv extends Logging {

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

val closureSerializer = new JavaSerializer(conf)
val closureSerializer = getClosureSerializer(conf, doLog = true)

def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.executor

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.TaskContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator

Expand All @@ -39,7 +43,7 @@ object DataReadMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
class InputMetrics private[spark] () extends Serializable {
class InputMetrics private[spark] () extends Serializable with KryoSerializable {
private[executor] val _bytesRead = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator

Expand All @@ -56,4 +60,18 @@ class InputMetrics private[spark] () extends Serializable {
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)

override def write(kryo: Kryo, output: Output): Unit = {
_bytesRead.write(kryo, output)
_recordsRead.write(kryo, output)
}

override def read(kryo: Kryo, input: Input): Unit = {
read(kryo, input, context = null)
}

final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = {
_bytesRead.read(kryo, input, context)
_recordsRead.read(kryo, input, context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.executor

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.TaskContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator

Expand All @@ -38,7 +42,7 @@ object DataWriteMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
class OutputMetrics private[spark] () extends Serializable {
class OutputMetrics private[spark] () extends Serializable with KryoSerializable {
private[executor] val _bytesWritten = new LongAccumulator
private[executor] val _recordsWritten = new LongAccumulator

Expand All @@ -54,4 +58,18 @@ class OutputMetrics private[spark] () extends Serializable {

private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)

override def write(kryo: Kryo, output: Output): Unit = {
_bytesWritten.write(kryo, output)
_recordsWritten.write(kryo, output)
}

override def read(kryo: Kryo, input: Input): Unit = {
read(kryo, input, context = null)
}

final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = {
_bytesWritten.read(kryo, input, context)
_recordsWritten.read(kryo, input, context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.executor

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.TaskContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}

Expand All @@ -27,7 +31,7 @@ import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}
* Operations are not thread-safe.
*/
@DeveloperApi
class ShuffleReadMetrics private[spark] () extends Serializable {
class ShuffleReadMetrics private[spark] () extends Serializable with KryoSerializable {
private[executor] val _remoteBlocksFetched = new LongAccumulator
private[executor] val _localBlocksFetched = new LongAccumulator
private[executor] val _remoteBytesRead = new LongAccumulator
Expand Down Expand Up @@ -121,6 +125,30 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_recordsRead.add(metric.recordsRead)
}
}

override def write(kryo: Kryo, output: Output): Unit = {
_remoteBlocksFetched.write(kryo, output)
_localBlocksFetched.write(kryo, output)
_remoteBytesRead.write(kryo, output)
_remoteBytesReadToDisk.write(kryo, output)
_localBytesRead.write(kryo, output)
_fetchWaitTime.write(kryo, output)
_recordsRead.write(kryo, output)
}

override def read(kryo: Kryo, input: Input): Unit = {
read(kryo, input, context = null)
}

final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = {
_remoteBlocksFetched.read(kryo, input, context)
_localBlocksFetched.read(kryo, input, context)
_remoteBytesRead.read(kryo, input, context)
_remoteBytesReadToDisk.read(kryo, input, context)
_localBytesRead.read(kryo, input, context)
_fetchWaitTime.read(kryo, input, context)
_recordsRead.read(kryo, input, context)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.executor

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.TaskContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator

Expand All @@ -27,7 +31,7 @@ import org.apache.spark.util.LongAccumulator
* Operations are not thread-safe.
*/
@DeveloperApi
class ShuffleWriteMetrics private[spark] () extends Serializable {
class ShuffleWriteMetrics private[spark] () extends Serializable with KryoSerializable {
private[executor] val _bytesWritten = new LongAccumulator
private[executor] val _recordsWritten = new LongAccumulator
private[executor] val _writeTime = new LongAccumulator
Expand Down Expand Up @@ -57,6 +61,22 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
_recordsWritten.setValue(recordsWritten - v)
}

override def write(kryo: Kryo, output: Output): Unit = {
_bytesWritten.write(kryo, output)
_recordsWritten.write(kryo, output)
_writeTime.write(kryo, output)
}

override def read(kryo: Kryo, input: Input): Unit = {
read(kryo, input, context = null)
}

final def read(kryo: Kryo, input: Input, context: TaskContext): Unit = {
_bytesWritten.read(kryo, input, context)
_recordsWritten.read(kryo, input, context)
_writeTime.read(kryo, input, context)
}

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
Expand Down
43 changes: 42 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package org.apache.spark.executor
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
Expand All @@ -42,7 +45,7 @@ import org.apache.spark.util._
* be sent to the driver.
*/
@DeveloperApi
class TaskMetrics private[spark] () extends Serializable {
class TaskMetrics private[spark] () extends Serializable with KryoSerializable {
// Each metric is internally represented as an accumulator
private val _executorDeserializeTime = new DoubleAccumulator
private val _executorDeserializeCpuTime = new DoubleAccumulator
Expand Down Expand Up @@ -263,6 +266,44 @@ class TaskMetrics private[spark] () extends Serializable {
// value will be updated at driver side.
internalAccums.filter(a => !a.isZero || a == _resultSize)
}

override def write(kryo: Kryo, output: Output): Unit = {
_executorDeserializeTime.write(kryo, output)
_executorDeserializeCpuTime.write(kryo, output)
_executorRunTime.write(kryo, output)
_executorCpuTime.write(kryo, output)
_resultSize.write(kryo, output)
_jvmGCTime.write(kryo, output)
_resultSerializationTime.write(kryo, output)
_memoryBytesSpilled.write(kryo, output)
_diskBytesSpilled.write(kryo, output)
_peakExecutionMemory.write(kryo, output)
_updatedBlockStatuses.write(kryo, output)
inputMetrics.write(kryo, output)
outputMetrics.write(kryo, output)
shuffleReadMetrics.write(kryo, output)
shuffleWriteMetrics.write(kryo, output)
}

override def read(kryo: Kryo, input: Input): Unit = {
// read the TaskContext thread-local once
val taskContext = TaskContext.get()
_executorDeserializeTime.read(kryo, input, taskContext)
_executorDeserializeCpuTime.read(kryo, input, taskContext)
_executorRunTime.read(kryo, input, taskContext)
_executorCpuTime.read(kryo, input, taskContext)
_resultSize.read(kryo, input, taskContext)
_jvmGCTime.read(kryo, input, taskContext)
_resultSerializationTime.read(kryo, input, taskContext)
_memoryBytesSpilled.read(kryo, input, taskContext)
_diskBytesSpilled.read(kryo, input, taskContext)
_peakExecutionMemory.read(kryo, input, taskContext)
_updatedBlockStatuses.read(kryo, input, taskContext)
inputMetrics.read(kryo, input, taskContext)
outputMetrics.read(kryo, input, taskContext)
shuffleReadMetrics.read(kryo, input, taskContext)
shuffleWriteMetrics.read(kryo, input, taskContext)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.reflect.ClassTag

import com.codahale.metrics.{Metric, MetricSet}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.internal.config
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
Expand All @@ -36,7 +36,6 @@ import org.apache.spark.network.server._
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils

Expand All @@ -53,7 +52,7 @@ private[spark] class NettyBlockTransferService(
extends BlockTransferService {

// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val serializer = SparkEnv.getClosureSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ abstract class RDD[T: ClassTag](
def sparkContext: SparkContext = sc

/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
protected var _id: Int = sc.newRddId()
def id: Int = _id

/** A friendly name for this RDD */
@transient var name: String = _
Expand Down Expand Up @@ -1728,7 +1729,7 @@ abstract class RDD[T: ClassTag](
// Other internal methods and fields
// =======================================================================

private var storageLevel: StorageLevel = StorageLevel.NONE
protected var storageLevel: StorageLevel = StorageLevel.NONE

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = sc.getCallSite()
Expand Down
Loading

0 comments on commit ded36fa

Please sign in to comment.