Skip to content

Commit

Permalink
Synchronize evaluateGroupCached to avoid concurrent access to cache (#…
Browse files Browse the repository at this point in the history
…2980)

With this change, we synchronize evaluations of the same terminal task.
This isn't necessarily needed for normal Mill executions, but in a BSP
context, where multiple requests where handled concurrently in the same
Mill instance, evaluating the same task concurrently can happen.

We don't synchronize multiple Mill-instances (e.g. run in two shells) or
multiple evaluator-instances (which should have different `out`-dirs
anyway).

Fix: #2818

Pull request: #2980
  • Loading branch information
lefou authored Jan 21, 2024
1 parent 2821d6f commit a866173
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
34 changes: 32 additions & 2 deletions main/eval/src/mill/eval/GroupEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import mill.util._
import java.io.PrintStream
import scala.collection.mutable
import scala.reflect.NameTransformer.encode
import scala.util.DynamicVariable
import scala.util.control.NonFatal
import scala.util.{DynamicVariable, Using}

/**
* Logic around evaluating a single group, which is a collection of [[Task]]s
Expand Down Expand Up @@ -41,6 +41,32 @@ private[mill] trait GroupEvaluator {
val effectiveThreadCount: Int =
this.threadCount.getOrElse(Runtime.getRuntime().availableProcessors())

/**
* Synchronize evaluations of the same terminal task.
* This isn't necessarily needed for normal Mill executions,
* but in an BSP context, where multiple requests where handled concurrently in the same Mill instance,
* evaluating the same task concurrently can happen.
*
* We don't synchronize multiple Mill-instances (e.g. run in two shells)
* or multiple evaluator-instances (which should have different `out`-dirs anyway.
*/
private object synchronizedEval {
private val keyLock = new KeyLock[Segments]()
def apply[T](terminal: Terminal, onCollision: Option[() => Unit] = None)(f: => T): T =
terminal match {
case t: Terminal.Task[_] =>
// A un-labelled terminal task won't be synchronized
// as there is no filesystem cache region assigned to it
f
case Terminal.Labelled(_, segments) =>
// A labelled terminal needs synchronization due to
// a shared cache region in the filesystem
Using.resource(keyLock.lock(segments, onCollision)) { _ =>
f
}
}
}

// those result which are inputs but not contained in this terminal group
def evaluateGroupCached(
terminal: Terminal,
Expand All @@ -50,7 +76,11 @@ private[mill] trait GroupEvaluator {
zincProblemReporter: Int => Option[CompileProblemReporter],
testReporter: TestReporter,
logger: ColorLogger
): GroupEvaluator.Results = {
): GroupEvaluator.Results = synchronizedEval(
terminal,
onCollision =
Some(() => logger.debug(s"Waiting for concurrently executing task ${terminal.render}"))
) {

val externalInputsHash = scala.util.hashing.MurmurHash3.orderedHash(
group.items.flatMap(_.inputs).filter(!group.contains(_))
Expand Down
26 changes: 26 additions & 0 deletions main/eval/src/mill/eval/KeyLock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mill.eval

private class KeyLock[K]() {
private[this] val lockKeys = new java.util.HashSet[K]()

def lock(key: K, onCollision: Option[() => Unit] = None): AutoCloseable = {
lockKeys.synchronized {
var shown = false
while (!lockKeys.add(key)) {
if (!shown) {
onCollision.foreach(_())
shown = true
}
lockKeys.wait();
}
}
() => unlock(key)
}

private def unlock(key: K): Unit = {
lockKeys.synchronized {
lockKeys.remove(key)
lockKeys.notifyAll()
}
}
}

0 comments on commit a866173

Please sign in to comment.