Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regression: parallel collections hang #8955

Closed
scabug opened this issue Nov 4, 2014 · 15 comments
Closed

regression: parallel collections hang #8955

scabug opened this issue Nov 4, 2014 · 15 comments
Assignees
Labels
Milestone

Comments

@scabug
Copy link

scabug commented Nov 4, 2014

This hangs reliably for me:

import scala.collection.parallel.immutable.ParSet

object Test {
  def main(args: Array[String]): Unit = {
    for (i <- 1 to 10000) test()
    println("")
  }

  def test() {
    ParSet[Int]((1 to 10000): _*) foreach (x => ())
    print(".")
  }
}  

I'm bisecting to find the point of regression.

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Imported From: https://issues.scala-lang.org/browse/SI-8955?orig=1
Reporter: @retronym
Affected Versions: 2.12.0-M1

@scabug
Copy link
Author

scabug commented Nov 4, 2014

@retronym said:
"As Seen in PR Validation!" scala/scala#4082

/cc @viktorklang

@scabug
Copy link
Author

scabug commented Nov 4, 2014

@retronym said:
Backing out the part of Viktor's SIP-14++ that caps the number of created threads is sufficient to get this working again:

% git diff -U50
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 32f30b9..627161a 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -1,94 +1,94 @@
 /*                     __                                               *\
 **     ________ ___   / /  ___     Scala API                            **
 **    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
 **  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
 ** /____/\___/_/ |_/____/_/ | |                                         **
 **                          |/                                          **
 \*                                                                      */
 
 package scala.concurrent.impl
 
 
 
 import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.Collection
 import scala.concurrent.forkjoin._
 import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
 import scala.util.control.NonFatal
 import scala.annotation.tailrec
 
 
 private[scala] class ExecutionContextImpl private[impl] (val executor: Executor, val reporter: Throwable => Unit) extends ExecutionContextExecutor {
   require(executor ne null, "Executor must not be null")
   override def execute(runnable: Runnable) = executor execute runnable
   override def reportFailure(t: Throwable) = reporter(t)
 }
 
 
 private[concurrent] object ExecutionContextImpl {
 
   // Implement BlockContext on FJP threads
   final class DefaultThreadFactory(
     daemonic: Boolean,
     maxThreads: Int,
     prefix: String,
     uncaught: Thread.UncaughtExceptionHandler) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
 
     require(prefix ne null, "DefaultThreadFactory.prefix must be non null")
     require(maxThreads > 0, "DefaultThreadFactory.maxThreads must be greater than 0")
 
     private final val currentNumberOfThreads = new AtomicInteger(0)
 
     @tailrec private final def reserveThread(): Boolean = currentNumberOfThreads.get() match {
-      case `maxThreads` | Int.`MaxValue` => false
+      case `maxThreads` | Int.`MaxValue` => true
       case other => currentNumberOfThreads.compareAndSet(other, other + 1) || reserveThread()
     }
 
     @tailrec private final def deregisterThread(): Boolean = currentNumberOfThreads.get() match {
       case 0 => false
       case other => currentNumberOfThreads.compareAndSet(other, other - 1) || deregisterThread()
     }
 
     def wire[T <: Thread](thread: T): T = {
       thread.setDaemon(daemonic)
       thread.setUncaughtExceptionHandler(uncaught)
       thread.setName(prefix + "-" + thread.getId())
       thread
     }
 
     // As per ThreadFactory contract newThread should return `null` if cannot create new thread.
     def newThread(runnable: Runnable): Thread = 
       if (reserveThread())
         wire(new Thread(new Runnable {
           // We have to decrement the current thread count when the thread exits
           override def run() = try runnable.run() finally deregisterThread()
         })) else null
 
     def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread =
       if (reserveThread()) {
         wire(new ForkJoinWorkerThread(fjp) with BlockContext {
           // We have to decrement the current thread count when the thread exits
           final override def onTermination(exception: Throwable): Unit = deregisterThread()
           final override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
             var result: T = null.asInstanceOf[T]
             ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
               @volatile var isdone = false
               override def block(): Boolean = {
                 result = try {
                     // When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
                     BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk }
                   } finally {
                     isdone = true
                   }
                 
                 true
               }
               override def isReleasable = isdone
             })
             result
           }
         })
       } else null
   }

This might indicate an upstream bug in ForkJoin triggered by a thread factory that occasionally returns null. But usually I'd suspect our code before Doug's.

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
I have a suspicion:

If you pass in the following System property:

-Dscala.concurrent.context.maxThreads=x100

Does it still hang?

If not, then I have an idea for an improvement (the max number of blocked threads is set too conservative with the updated SIP14)

@scabug
Copy link
Author

scabug commented Nov 4, 2014

@retronym said:
No hang with -Dscala.concurrent.context.maxThreads=x100.

Without that, here's the stack trace.

2014-11-04 19:45:57
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.20-b23 mixed mode):

"Attach Listener" #18 daemon prio=9 os_prio=31 tid=0x00007f8154211800 nid=0x380b waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"scala-execution-context-global-17" #17 daemon prio=5 os_prio=31 tid=0x00007f81538cb000 nid=0x6303 waiting on condition [0x000000011b786000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-16" #16 daemon prio=5 os_prio=31 tid=0x00007f81549b3800 nid=0x6103 waiting on condition [0x000000011b683000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-15" #15 daemon prio=5 os_prio=31 tid=0x00007f815601b800 nid=0x5f03 waiting on condition [0x000000011b580000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-14" #14 daemon prio=5 os_prio=31 tid=0x00007f815601b000 nid=0x5d03 waiting on condition [0x000000011b47d000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-13" #13 daemon prio=5 os_prio=31 tid=0x00007f815601a000 nid=0x5b03 waiting on condition [0x000000011b37a000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-12" #12 daemon prio=5 os_prio=31 tid=0x00007f8156830000 nid=0x5903 waiting on condition [0x000000011b277000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-11" #11 daemon prio=5 os_prio=31 tid=0x00007f81549b2800 nid=0x5703 waiting on condition [0x000000011b174000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"scala-execution-context-global-10" #10 daemon prio=5 os_prio=31 tid=0x00007f8154024000 nid=0x5503 waiting on condition [0x000000011b071000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b033f138> (a scala.concurrent.impl.ExecutionContextImpl$$anon$3)
	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2077)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1981)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:108)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007f8154827000 nid=0x5103 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007f8153058000 nid=0x4f03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f815380b000 nid=0x4d03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f815304f000 nid=0x4b03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f8154023000 nid=0x4903 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f8154022000 nid=0x3917 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f8153813800 nid=0x3503 in Object.wait() [0x0000000118f1d000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000007b0328040> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142)
	- locked <0x00000007b0328040> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158)
	at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f8153812800 nid=0x3303 in Object.wait() [0x0000000118e1a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000007b0351510> (a java.lang.ref.Reference$Lock)
	at java.lang.Object.wait(Object.java:502)
	at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
	- locked <0x00000007b0351510> (a java.lang.ref.Reference$Lock)

"main" #1 prio=5 os_prio=31 tid=0x00007f8153801000 nid=0x1303 in Object.wait() [0x000000010601a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at scala.concurrent.forkjoin.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:296)
	- locked <0x00000007bec19168> (a scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask)
	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:347)
	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:675)
	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
	at scala.collection.parallel.ExecutionContextTasks$class.executeAndWaitResult(Tasks.scala:558)
	at scala.collection.parallel.ExecutionContextTaskSupport.executeAndWaitResult(TaskSupport.scala:80)
	at scala.collection.parallel.immutable.HashSetCombiner.result(ParHashSet.scala:158)
	at scala.collection.parallel.immutable.HashSetCombiner.result(ParHashSet.scala:135)
	at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:50)
	at Test$.test(test.scala:10)
	at Test$$anonfun$main$1.apply$mcVI$sp(test.scala:5)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at Test$.main(test.scala:5)
	at Test.main(test.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at scala.reflect.internal.util.ScalaClassLoader$$anonfun$run$1.apply(ScalaClassLoader.scala:68)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:99)
	at scala.reflect.internal.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:68)
	at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:99)
	at scala.tools.nsc.CommonRunner$class.run(ObjectRunner.scala:22)
	at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:39)
	at scala.tools.nsc.CommonRunner$class.runAndCatch(ObjectRunner.scala:29)
	at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:39)
	at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:65)
	at scala.tools.nsc.MainGenericRunner.run$1(MainGenericRunner.scala:87)
	at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:98)
	at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:103)
	at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

"VM Thread" os_prio=31 tid=0x00007f815484d800 nid=0x3103 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f815380e800 nid=0x2103 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f8154001800 nid=0x2303 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f8154800000 nid=0x2503 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f815380f800 nid=0x2703 runnable 

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007f8153810000 nid=0x2903 runnable 

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007f8154002000 nid=0x2b03 runnable 

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007f8154801000 nid=0x2d03 runnable 

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007f8154002800 nid=0x2f03 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007f815381d800 nid=0x5303 waiting on condition 

JNI global references: 14

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
Alright, try this patch:

diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 32f30b9..0cd42b9 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -102,12 +102,12 @@ private[concurrent] object ExecutionContextImpl {

 def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling)
  • val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1")
  • val desiredParallelism = range(
    getInt("scala.concurrent.context.minThreads", "1"),
    getInt("scala.concurrent.context.numThreads", "x1"),
  •  maxNoOfThreads)
    
  •  getInt("scala.concurrent.context.maxThreads", "x1"))
    
  • val maxNoOfThreads = desiredParallelism + getInt("scala.concurrent.context.maxExtraThreads", "256")

The number 256 is going to be the default for the max threads for FJP in Java9 (down from 32k) so this change will harmonize the settings while making it possible to override from the outside.

The cause of the deadlock is twofold: 1) The test uses ExecutionContext.global, which is not designed for typical ForkJoin workloads since it has async = true (FIFO instead of LIFO) 2) And we capped the default max number of threads to be created when doing managed blocking from 32k to number of cores (a tad too aggressive it seems)

@scabug
Copy link
Author

scabug commented Nov 4, 2014

@retronym said:
I'll try that, but I'd appreciate a play by play hypothesis of the hang. Are we just moving the problem around by changing the cap? How can the upper bound on the thread pool size result in a hang with no active threads at all?

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
I'd love that too, here's my thoughts:

given that there are only a couple of people on the planet who understand the ForkJoinX code, and the only relevant change that would impact ParCol (at least that I made) was to cap the max threads of the pool (uncapped it's 32k), we have to either conclude that: A) The failing test is somehow broken. B) It needs to be able to spawn more threads than it currently is allowed to C) ForkJoin is somehow broken

For A) I think Aleks is probably the best debugger
For B) It's myself
For C) It's Doug Lea

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
And I forgot, there's another option: update the embedded jsr166e (scala.concurrent.forkjoin) to the latest repository version to see if it indeed is a forkjoin bug that has since been fixed.

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
As for the infinite non-progress I believe this guy is the culprit:
https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/parallel/Tasks.scala#L516

I'm not ruling out a potential bug in: https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/parallel/Tasks.scala#L465

@scabug
Copy link
Author

scabug commented Nov 4, 2014

@retronym said:
Here's what I believe is the same hang more or less expressed based directly on ForkJoin. Hopefully this lets us more directly see the problem and evaluate the solutions.

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.forkjoin._

object Test {

  val size = 10000

  val parallelismLevel = 8

  class Foreach[S](val sz: Int) extends Task {
    def leaf() = ()

    def shouldSplitFurther = sz > 1

    def split = if (sz < 2) Seq(this) else Seq(new Foreach[S](sz / 2), new Foreach[S](sz / 2))
  }

  val pool = {
//    val defaultPool = new ForkJoinPool() // okay
    val maxThreads = parallelismLevel // parallelismLevel * 2 appears to be okay
    val limitedThreadPool = new ForkJoinPool(parallelismLevel, new DefaultThreadFactory(maxThreads), Thread.getDefaultUncaughtExceptionHandler, true)
    limitedThreadPool
  }

  def test() {
    val fjtask = new WrappedTask(new Foreach(size))

    pool.execute(fjtask)
    fjtask.sync()

    print(".")
  }

  def main(args: Array[String]): Unit = {
    val repeats = 100000
    for (i <- 1 to repeats) test()
    println("Done.")
    sys.exit()
  }
}

trait Task {
  def leaf()

  def shouldSplitFurther: Boolean

  def split: Seq[Task]

  def tryLeaf() {
    leaf()
  }
}

class WrappedTask(val body: Task) extends RecursiveTask[Unit] {
  @volatile var next: WrappedTask = null
  @volatile var shouldWaitFor = true

  def start() = fork
  def sync() = join
  def tryCancel() = tryUnfork
  def release() {}

  def split: Seq[WrappedTask] = body.split.map(b => new WrappedTask(b))

  def compute() = if (body.shouldSplitFurther) {
    internal()
    release()
  } else {
    body.tryLeaf()
    release()
  }

  def internal() = {
    var last = spawnSubtasks()

    last.body.tryLeaf()
    last.release()

    while (last.next != null) {
      last = last.next
      if (last.tryCancel()) {
        last.body.tryLeaf()
        last.release()
      } else {
        last.sync()
      }

    }
  }

  def spawnSubtasks() = {
    var last: WrappedTask = null
    var head: WrappedTask = this
    do {
      val subtasks = head.split
      head = subtasks.head
      for (t <- subtasks.tail.reverse) {
        t.next = last
        last = t
        t.start()
      }
    } while (head.body.shouldSplitFurther)
    head.next = last
    head
  }
}

class DefaultThreadFactory(maxThreads: Int) extends ForkJoinPool.ForkJoinWorkerThreadFactory {
  val numThreads = new java.util.concurrent.atomic.AtomicInteger(0)

  def reserveThread(): Boolean = {
    val n = numThreads.incrementAndGet()
    n <= maxThreads
  }

  def newThread(runnable: Runnable): Thread =
    if (reserveThread()) new Thread(runnable) else null

  override def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread =
    if (reserveThread()) new ForkJoinWorkerThread(fjp) {} else null
}

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
Actually it succeeds if " val maxThreads = parallelismLevel + 1"

@scabug
Copy link
Author

scabug commented Nov 4, 2014

Viktor Klang (viktorklang) said:
I cleaned the reproducer up a little bit and added some debug output.

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.forkjoin._

trait Task {
  def leaf()

  def shouldSplitFurther: Boolean

  def split: Seq[Task]

  def tryLeaf() {
    leaf()
  }
}

class WrappedTask(val body: Task) extends RecursiveTask[Unit] {
  @volatile var next: WrappedTask = null
  @volatile var shouldWaitFor = true

  def start() = fork
  def sync() = join
  def tryCancel() = tryUnfork
  def release() {}

  def split: Seq[WrappedTask] = body.split.map(b => new WrappedTask(b))

  def compute() = {
    if (body.shouldSplitFurther) internal()
    else body.tryLeaf()

    release()
  }

  def internal() = {
    var last = spawnSubtasks()

    last.body.tryLeaf()
    last.release()

    while (last.next != null) {
      last = last.next
      if (last.tryCancel()) {
        last.body.tryLeaf()
        last.release()
      } else {
        last.sync()
      }

    }
  }

  def spawnSubtasks() = {
    var last: WrappedTask = null
    var head: WrappedTask = this
    do {
      val subtasks = head.split
      head = subtasks.head
      for (t <- subtasks.tail.reverse) {
        t.next = last
        last = t
        t.start()
      }
    } while (head.body.shouldSplitFurther)
    head.next = last
    head
  }
}

class DefaultThreadFactory(val maxThreads: Int) extends ForkJoinPool.ForkJoinWorkerThreadFactory {
  val currentNumberOfThreads = new java.util.concurrent.atomic.AtomicInteger(0)
  val hasMaxedOut = new java.util.concurrent.atomic.AtomicInteger(0)

    private final def deregisterThread(): Boolean = currentNumberOfThreads.get() match {
      case 0 => false
      case other => currentNumberOfThreads.compareAndSet(other, other - 1) || deregisterThread()
    }

    private final def reserveThread(): Boolean = currentNumberOfThreads.get() match {
      case `maxThreads` | Int.`MaxValue` => false
      case other => currentNumberOfThreads.compareAndSet(other, other + 1) || reserveThread()
    }

  override def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread =
    if (reserveThread()) new ForkJoinWorkerThread(fjp) {
      final override def onTermination(exception: Throwable): Unit =
        if (!deregisterThread()) new Exception("Couldn't deregister worker!!!").printStackTrace(System.err)
       else println("Releasing worker: " + this)
    } else {
      if(hasMaxedOut.compareAndSet(0, 1))
        new Exception("Tried to spawn new worker but couldn't").printStackTrace(System.err)
      null
    }
}

object Test {

  val size = 10000

  val parallelismLevel = 8

  class Foreach[S](val sz: Int) extends Task {
    def leaf() = ()

    def shouldSplitFurther = sz > 1

    def split = if (sz < 2) Seq(this) else Seq(new Foreach[S](sz / 2), new Foreach[S](sz / 2))
  }

  val pool = new ForkJoinPool(parallelismLevel, new DefaultThreadFactory(parallelismLevel / 2), null, false)

  def test() {
    val fjtask = new WrappedTask(new Foreach(size))

    pool.execute(fjtask)
    fjtask.sync()

    print('.')
  }

  def main(args: Array[String]): Unit = {
    val repeats = 100000
    for (i <- 1 to repeats) test()
    println("Done.")
    sys.exit()
  }
}

Test.main(null)

@scabug
Copy link
Author

scabug commented Nov 5, 2014

@retronym said:
I've sent a message to concurrency interest (entitled "ForkJoinPool with a custom thread factory that limits thread creation") to ask for some advice here.

@scabug
Copy link
Author

scabug commented Nov 5, 2014

@retronym said:
scala/scala#4090

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants