Skip to content

Conversation

@mengxr
Copy link
Contributor

@mengxr mengxr commented Nov 16, 2014

Spark hangs with the following code:

sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute startIndices during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

@pwendell

@SparkQA
Copy link

SparkQA commented Nov 16, 2014

Test build #23433 has started for PR 3291 at commit c284d9f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 16, 2014

Test build #23433 has finished for PR 3291 at commit c284d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23433/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Nov 16, 2014

This makes the whole thing eager, isn't it?

@mengxr
Copy link
Contributor Author

mengxr commented Nov 17, 2014

One way to do this lazily is via shuffle:

      val identityPartitioner = new Partitioner {
        override def numPartitions: Int = p
        override def getPartition(key: Any): Int = key.asInstanceOf[Int]
      }
      val startIndices = PartitionPruningRDD.create(this, _ < p - 1) // skip the last partition
        .mapPartitionsWithIndex { (split, iter) =>
          val size = Utils.getIteratorSize(iter)
          Iterator.range(split + 1, p).map { i =>
            (i, size)
          }
        }.reduceByKey(identityPartitioner, _ + _)
        .values
      this.zipPartitions(startIndices) { (iter, startIndexIter) =>
        val startIndex = if (startIndexIter.hasNext) startIndexIter.next() else 0L
        iter.zipWithIndex.map { case (item, localIndex) =>
          (item, startIndex + localIndex)
        }
      }

But I think this is more expensive.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 18, 2014

@rxin Does it look good to you? I hope that this fix can get into 1.1.1.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2014

lgtm

@mengxr
Copy link
Contributor Author

mengxr commented Nov 19, 2014

Thanks! Merged into master, branch-1.2, 1.1, and 1.0.

davies pushed a commit to davies/spark that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex
guavuslabs-builder pushed a commit to ThalesGroup/spark that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex

(cherry picked from commit bb46046)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
mengxr added a commit that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes #3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex

(cherry picked from commit bb46046)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
@andrewor14
Copy link
Contributor

Hey @mengxr can you close this

@mengxr mengxr closed this Nov 20, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants