Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize evaluateGroupCached to avoid concurrent access to cache #2980

Merged
merged 5 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions main/eval/src/mill/eval/GroupEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import mill.util._
import java.io.PrintStream
import scala.collection.mutable
import scala.reflect.NameTransformer.encode
import scala.util.DynamicVariable
import scala.util.control.NonFatal
import scala.util.{DynamicVariable, Using}

/**
* Logic around evaluating a single group, which is a collection of [[Task]]s
Expand Down Expand Up @@ -41,6 +41,32 @@ private[mill] trait GroupEvaluator {
val effectiveThreadCount: Int =
this.threadCount.getOrElse(Runtime.getRuntime().availableProcessors())

/**
* Synchronize evaluations of the same terminal task.
* This isn't necessarily needed for normal Mill executions,
* but in an BSP context, where multiple requests where handled concurrently in the same Mill instance,
* evaluating the same task concurrently can happen.
*
* We don't synchronize multiple Mill-instances (e.g. run in two shells)
* or multiple evaluator-instances (which should have different `out`-dirs anyway.
*/
private object synchronizedEval {
private val keyLock = new KeyLock[Segments]()
def apply[T](terminal: Terminal, onCollision: Option[() => Unit] = None)(f: => T): T =
terminal match {
case t: Terminal.Task[_] =>
// A un-labelled terminal task won't be synchronized
// as there is no filesystem cache region assigned to it
f
case Terminal.Labelled(_, segments) =>
// A labelled terminal needs synchronization due to
// a shared cache region in the filesystem
Using.resource(keyLock.lock(segments, onCollision)) { _ =>
f
}
}
}

// those result which are inputs but not contained in this terminal group
def evaluateGroupCached(
terminal: Terminal,
Expand All @@ -50,7 +76,11 @@ private[mill] trait GroupEvaluator {
zincProblemReporter: Int => Option[CompileProblemReporter],
testReporter: TestReporter,
logger: ColorLogger
): GroupEvaluator.Results = {
): GroupEvaluator.Results = synchronizedEval(
terminal,
onCollision =
Some(() => logger.debug(s"Waiting for concurrently executing task ${terminal.render}"))
) {

val externalInputsHash = scala.util.hashing.MurmurHash3.orderedHash(
group.items.flatMap(_.inputs).filter(!group.contains(_))
Expand Down
26 changes: 26 additions & 0 deletions main/eval/src/mill/eval/KeyLock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mill.eval

private class KeyLock[K]() {
private[this] val lockKeys = new java.util.HashSet[K]()

def lock(key: K, onCollision: Option[() => Unit] = None): AutoCloseable = {
lockKeys.synchronized {
var shown = false
while (!lockKeys.add(key)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a while? I thought we're inside lockKeys.synchronized, there should be no need to "try again" since nobody else else going to be able to mutate lockKeys

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes. At least, the Javadoc of the Object.wait method suggests to do so:

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:

synchronized (obj) {
    while (<condition does not hold>)
        obj.wait();
    ... // Perform action appropriate to condition
}

if (!shown) {
onCollision.foreach(_())
shown = true
}
lockKeys.wait();
}
}
() => unlock(key)
}

private def unlock(key: K): Unit = {
lockKeys.synchronized {
lockKeys.remove(key)
lockKeys.notifyAll()
}
}
}
Loading