Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Oct 6, 2014

For most of tasks, the serialized data will small, such as less than 8k, we can avoid the RPC at all if the data was embedded in the Broadcast object it self.

With this patch, The size of task will be similar to that before we use broadcast for them, no RPC (but still cached, only one deserialization per executor)

It will increase the bandwidth during schedule tasks, for example, if we schedule 10k tasks per seconds, then it will need 40M Bytes more bandwidth for the embedded object in the worst cases.

@davies davies changed the title embed small object in broadcast to avoid RPC [SPARK-3133] embed small object in broadcast to avoid RPC Oct 6, 2014
@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have started for PR 2681 at commit 3fd051d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have finished for PR 2681 at commit 3fd051d.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21341/Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have started for PR 2681 at commit 55a40fa.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have finished for PR 2681 at commit 55a40fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21349/Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this ever happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of having multiple sparkenv in different threads, then this will happen, also blockmanager will return an invalid results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u add some inline comment explaining this? thanks.

@davies
Copy link
Contributor Author

davies commented Oct 7, 2014

Should we change the default EMBEDDED_SIZE to 8k? maybe we could do it later.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have started for PR 2681 at commit fc9d2c4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have finished for PR 2681 at commit fc9d2c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21405/Test PASSed.

@SparkQA
Copy link

SparkQA commented Oct 10, 2014

QA tests have started for PR 2681 at commit fc9d2c4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 10, 2014

QA tests have finished for PR 2681 at commit fc9d2c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Oct 14, 2014

Sometimes hit this bug during pyspark testing

Py4JJavaError: An error occurred while calling o55.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 8, localhost): java.io.IOException: PARSING_ERROR(2)
[info]         org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
[info]         org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
[info]         org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
[info]         org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
[info]         org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
[info]         org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
[info]         org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
[info]         org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
[info]         org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
[info]         sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
[info]         sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]         java.lang.reflect.Method.invoke(Method.java:606)
[info]         java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
[info]         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
[info]         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
[info]         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[info]         java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
[info]         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
[info]         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
[info]         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[info]         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
[info]         org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
[info]         org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
[info]         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
[info]         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]         java.lang.Thread.run(Thread.java:745)
[info] Driver stacktrace:
[info]  at 

The test is really simple:

sc.parallelize(range(10)).count()

The serialized PythonRDD is about 4.1k, so I would like to increase the EMBED_SIZE to 8k, then most of the simple PythonRDD will be embedded, to make tests stable.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21740/
Test FAILed.

@JoshRosen
Copy link
Contributor

@davies: that exception that you hit is very helpful; it looks like you've been able to reproduce SPARK-3958 in local tests. That JIRA describes some reports of cases where we've seen TorrentBroadcast throw similar errors, but I've had a really hard time debugging that issue because I've had trouble reliably reproducing this bug. Why do you think EMBED_SIZE seems to affect the reliability of TorrentBroadcast? If it does, that could be a valuable debugging clue for this issue.

@davies
Copy link
Contributor Author

davies commented Oct 17, 2014

This error was not happened in tests of this PR, it happened in tests of our product, which have similar pattern as streaming, the job was submitted via py4j.

The PR also check the number of blocks in readBlocks(), will throw a meaningful exception in case of fail to get cached object in local mode.

TorrentBroadcast is so complicated (including several RPC) that it's not as stable as HTTPBroadcast or w/o broadcast (we had saw some cases reported by users in maillist), the motivation of this PR is to remove the complicity for most cases (serialized task is small), then it will be more stable.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have started for PR 2681 at commit 732949f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have finished for PR 2681 at commit 732949f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Based on some offline discussions, we think that we may have identified the cause of the Snappy issues that we've seen with TorrentBroadcast. I'm going to merge this PR and then work on some refactoring that may fix the underlying bug (or at least add additional log information to aid in debugging). I also have some thoughts on how to improve the test coverage of our broadcast implementation, but I'l address this in my PR.

Thanks @davies!

@JoshRosen
Copy link
Contributor

Actually, I changed my mind: I'm going to hold off on merging this because I don't want to backport the embedding of small objects into branch-1.1 but I do want to backport the fix for the TorrentBroadcast Snappy issue. I'll try to merge my patch in first, then rebase this on top of it. Sorry in advance for the merge conflict.

@davies
Copy link
Contributor Author

davies commented Oct 19, 2014

I hope that we can have this in 1.1, some people see regression in 1.1 because of TorrentBroadcast, this patch will help for those.

Conflicts:
	core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22151 timed out for PR 2681 at commit 823302c after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22151/
Test FAILed.

@davies
Copy link
Contributor Author

davies commented Oct 24, 2014

Even if we merge #2933, I still would like to have this, because people could use broadcast for small dataset (such as in MLlib), this patch can improve these cases.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #419 has started for PR 2681 at commit 823302c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #419 has finished for PR 2681 at commit 823302c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #429 has started for PR 2681 at commit 823302c.

  • This patch does not merge cleanly.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #429 timed out for PR 2681 at commit 823302c after a configured wait of 120m.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #481 has started for PR 2681 at commit 823302c.

  • This patch does not merge cleanly.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #481 has finished for PR 2681 at commit 823302c.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #483 has started for PR 2681 at commit 823302c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #483 has finished for PR 2681 at commit 823302c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #22367 has started for PR 2681 at commit 247b53b.

  • This patch merges cleanly.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22366/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #487 has started for PR 2681 at commit 247b53b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #22367 has finished for PR 2681 at commit 247b53b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22367/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Oct 28, 2014

Test build #487 has finished for PR 2681 at commit 247b53b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class VectorTransformer(object):
    • class Normalizer(VectorTransformer):
    • class JavaModelWrapper(VectorTransformer):
    • class StandardScalerModel(JavaModelWrapper):
    • class StandardScaler(object):
    • class HashingTF(object):
    • class IDFModel(JavaModelWrapper):
    • class IDF(object):
    • class Word2VecModel(JavaModelWrapper):

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #500 has started for PR 2681 at commit 247b53b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #500 has finished for PR 2681 at commit 247b53b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Nov 3, 2014

ping dashboard

@davies
Copy link
Contributor Author

davies commented Nov 13, 2014

@JoshRosen @pwendell @rxin This is an optimization for TorrentBroadcast (avoid all RPC for small objects), which can reduce latency for streaming (not benchmarked). Do you mind to merge this into 1.2?

@JoshRosen
Copy link
Contributor

I've pushed a commit to spark-perf which allows us to benchmark task launching throughput using different sized closures: databricks/spark-perf@79e615a

@SparkQA
Copy link

SparkQA commented Nov 25, 2014

Test build #23808 has started for PR 2681 at commit 1ffd763.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 25, 2014

Test build #23808 has finished for PR 2681 at commit 1ffd763.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class CompressedSerializer(FramedSerializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23808/
Test PASSed.

@davies
Copy link
Contributor Author

davies commented Jan 29, 2015

I'd like to close this PR now, will re-open it once needed.

@davies davies closed this Jan 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants