Skip to content

Conversation

@sitalkedia
Copy link

What changes were proposed in this pull request?

While running a Spark job which is spilling a lot of data in reduce phase, we see that significant amount of CPU is being consumed in native Snappy ArrayCopy method (Please see the stack trace below).
Stack trace -
org.xerial.snappy.SnappyNative.$$YJP$$arrayCopy(Native Method)
org.xerial.snappy.SnappyNative.arrayCopy(SnappyNative.java)
org.xerial.snappy.Snappy.arrayCopy(Snappy.java:85)
org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:190)
org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:163)
java.io.DataInputStream.readFully(DataInputStream.java:195)
java.io.DataInputStream.readLong(DataInputStream.java:416)
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:71)
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:79)
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
The reason for that is the SpillReader does a lot of small reads from the underlying snappy compressed stream and we pay a heavy cost of jni calls for these small reads. The SpillReader should instead do a buffered read from the underlying snappy compressed stream.

How was this patch tested?

Tested by running the job and we saw more than 10% cpu savings.

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

…derlying compression stream

@JoshRosen
Copy link
Contributor

Is this the only place where buffering helps or would it make sense to do buffered reads from Snappy streams in other circumstances as well? In other words, should this buffering perhaps either be done at more call-sites of wrapForCompression or in wrapForCompression itself? (Note that pushing this into wrapForCompression risks accidental double-buffering, which might be undesirable).

@JoshRosen
Copy link
Contributor

Also, /cc @xerial, who may be able to comment on whether snappy-java performs any of its own buffering.

@sitalkedia
Copy link
Author

@JoshRosen - There might be other places where buffering might help, I did not notice any other hotspot during my job run though. Also, as you mentioned pushing this into wrapForCompression has undesirable effect of double buffering.

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@xerial
Copy link

xerial commented Mar 30, 2016

A reason snappy-java's SnappyInputStream uses Snappy.arrayCopy (JNI method) is to load the uncompressed data into primitive type arrays (e.g., float[], int[]) since there is no standard Java method for doing this.

When writing data to byte[], replacing the implementation with non-JNI based one (using System.arrayCopy) would be possible.

@sitalkedia
Copy link
Author

Thanks @xerial , this is going to fix all snappy read/write inefficiency due to small writes.

@xerial
Copy link

xerial commented Mar 31, 2016

I have just deployed snappy-java-1.1.2.3 with this fix, which will be synchronized to the Maven central soon.

@sitalkedia
Copy link
Author

That's great. Thanks a lot for the quick fix.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54564 has finished for PR 12074 at commit 5ad27f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

@JoshRosen - I guess after @xerial 's change, we won't be needing this change, right?

@JoshRosen
Copy link
Contributor

@sitalkedia, if you confirm that the updated snappy-java fixes the performance issue for you, then I'd open a different pull request to upgrade Spark to the newer version.

@sitalkedia
Copy link
Author

@JoshRosen - thanks, working on it. Will update soon.

@sitalkedia
Copy link
Author

@xerial - I am seeing similar issue for snappy write as well. Can we fix the write code path as well?

Stack trace -

org.xerial.snappy.SnappyNative.arrayCopy(Native Method)
org.xerial.snappy.Snappy.arrayCopy(Snappy.java:85)
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273)
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:220)
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:126)
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:192)
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:175)
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249)
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:83)
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:298)
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:338)
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:179)
org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
org.apache.spark.scheduler.Task.run(Task.scala:89)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

@xerial
Copy link

xerial commented Mar 31, 2016

@sitalkedia Sure. I'll do that.

xerial added a commit to xerial/snappy-java that referenced this pull request Mar 31, 2016
xerial added a commit to xerial/snappy-java that referenced this pull request Mar 31, 2016
xerial added a commit to xerial/snappy-java that referenced this pull request Mar 31, 2016
@xerial
Copy link

xerial commented Mar 31, 2016

Released snappy-java-1.1.2.4 with this fix. Thanks for letting me know.

@JoshRosen
Copy link
Contributor

Thanks @xerial! @sitalkedia, feel free to open a new PR for the dep. bump after you finish testing this new version.

@sitalkedia
Copy link
Author

Thanks @xerial. I tested the change and I saw 7.5% CPU savings after this change. Opened a PR #12096 to upgrade snappy.

@JoshRosen
Copy link
Contributor

Great! @sitalkedia, do you mind closing this PR in favor of #12096 and updating the SPARK-14277 JIRA's description to match your new PR so that it accurately describes the change that we're going to commit? Thanks!

@sitalkedia
Copy link
Author

Changed the SPARK-14277 JIRA's description, closing this PR.

@sitalkedia sitalkedia closed this Mar 31, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants