Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial structure and parallelization of parser #66

Open
wants to merge 16 commits into
base: 2.13.x
Choose a base branch
from
131 changes: 89 additions & 42 deletions src/compiler/scala/tools/nsc/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package nsc
import java.io.{File, FileNotFoundException, IOException}
import java.net.URL
import java.nio.charset.{Charset, CharsetDecoder, IllegalCharsetNameException, UnsupportedCharsetException}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.{immutable, mutable}
import io.{AbstractFile, Path, SourceReader}
import reporters.Reporter
import util.{ClassPath, returning}
import reporters.{Reporter, StoreReporter}
import util.{ClassPath, ThreadIdentityAwareThreadLocal, returning}
import scala.reflect.ClassTag
import scala.reflect.internal.util.{BatchSourceFile, NoSourceFile, ScalaClassLoader, ScriptSourceFile, SourceFile, StatisticsStatics}
import scala.reflect.internal.pickling.PickleBuffer
Expand All @@ -26,12 +28,13 @@ import typechecker._
import transform.patmat.PatternMatching
import transform._
import backend.{JavaPlatform, ScalaPrimitives}
import backend.jvm.{GenBCode, BackendStats}
import scala.concurrent.Future
import backend.jvm.{BackendStats, GenBCode}
import scala.concurrent.duration.Duration
import scala.concurrent._
import scala.language.postfixOps
import scala.tools.nsc.ast.{TreeGen => AstTreeGen}
import scala.tools.nsc.classpath._
import scala.tools.nsc.profile.Profiler
import scala.tools.nsc.profile.{Profiler, ThreadPoolFactory}

class Global(var currentSettings: Settings, reporter0: Reporter)
extends SymbolTable
Expand Down Expand Up @@ -75,16 +78,18 @@ class Global(var currentSettings: Settings, reporter0: Reporter)

override def settings = currentSettings

private[this] var currentReporter: Reporter = { reporter = reporter0 ; currentReporter }
private[this] val currentReporter: ThreadIdentityAwareThreadLocal[Reporter] =
ThreadIdentityAwareThreadLocal(new StoreReporter, reporter0)

def reporter: Reporter = { reporter = reporter0 ; currentReporter.get }

def reporter: Reporter = currentReporter
def reporter_=(newReporter: Reporter): Unit =
currentReporter = newReporter match {
currentReporter.set(newReporter match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert Parallel.onMainThread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be true.
We are setting new one for every unit now.
It's required to ensure consistent ordering of the logs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should assert that we are on the main thread I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not the case here, please look into processUnit method.

case _: reporters.ConsoleReporter | _: reporters.LimitingReporter => newReporter
case _ if settings.maxerrs.isSetByUser && settings.maxerrs.value < settings.maxerrs.default =>
new reporters.LimitingReporter(settings, newReporter)
case _ => newReporter
}
})

/** Switch to turn on detailed type logs */
var printTypings = settings.Ytyperdebug.value
Expand Down Expand Up @@ -385,45 +390,83 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
abstract class GlobalPhase(prev: Phase) extends Phase(prev) {
phaseWithId(id) = this

def run(): Unit = {
def apply(unit: CompilationUnit): Unit

def run() {
assertOnMainThread()
echoPhaseSummary(this)
currentRun.units foreach applyPhase
Await.result(Future.sequence(currentRun.units map processUnit), Duration.Inf)
}

def apply(unit: CompilationUnit): Unit
final def applyPhase(unit: CompilationUnit): Unit = Await.result(processUnit(unit), Duration.Inf)

/** Is current phase cancelled on this unit? */
def cancelled(unit: CompilationUnit) = {
// run the typer only if in `createJavadoc` mode
val maxJavaPhase = if (createJavadoc) currentRun.typerPhase.id else currentRun.namerPhase.id
reporter.cancelled || unit.isJava && this.id > maxJavaPhase
implicit val ec: ExecutionContext = {
val threadPoolFactory = ThreadPoolFactory(Global.this, this)
val javaExecutor = threadPoolFactory.newUnboundedQueueFixedThreadPool(parallelThreads, "worker")
scala.concurrent.ExecutionContext.fromExecutorService(javaExecutor, (_) => ())
}

final def withCurrentUnit(unit: CompilationUnit)(task: => Unit): Unit = {
if ((unit ne null) && unit.exists)
lastSeenSourceFile = unit.source

private def processUnit(unit: CompilationUnit)(implicit ec: ExecutionContext): Future[Unit] = {
if (settings.debug && (settings.verbose || currentRun.size < 5))
inform("[running phase " + name + " on " + unit + "]")
if (!cancelled(unit)) {
currentRun.informUnitStarting(this, unit)
try withCurrentUnitNoLog(unit)(task)
finally currentRun.advanceUnit()

def runWithCurrentUnit(): Unit = {
val threadName = Thread.currentThread().getName
if (!threadName.contains("worker")) Thread.currentThread().setName(s"$threadName-worker")
val unit0 = currentUnit

try {
if ((unit ne null) && unit.exists) lastSeenSourceFile = unit.source
currentRun.currentUnit = unit
apply(unit)
} finally {
currentRun.currentUnit = unit0
currentRun.advanceUnit()
Thread.currentThread().setName(threadName)

// If we are on main thread it means there are no worker threads at all.
// That in turn means we were already using main reporter all the time, so there is nothing more to do.
// Otherwise we have to forward messages from worker thread reporter to main one.
reporter match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't work - we need to report in order - that was why I had the local data for this

we should move this to a relay method on the StoreReporter I think, or a new RelayingReporter or some such

case rep: StoreReporter =>
val mainReporter = currentReporter.main
mainReporter.synchronized {
rep.infos.foreach { info =>
info.severity.toString match {
case "INFO" => mainReporter.info(info.pos, info.msg, force = false)
case "WARNING" => mainReporter.warning(info.pos, info.msg)
case "ERROR" => mainReporter.error(info.pos, info.msg)
}
}
}
case _ =>
}
}
}

if (cancelled(unit)) Future.successful(())
else if (isParallel) Future(runWithCurrentUnit())
else Future.fromTry(scala.util.Try(runWithCurrentUnit()))
}

final def withCurrentUnitNoLog(unit: CompilationUnit)(task: => Unit): Unit = {
val unit0 = currentUnit
try {
currentRun.currentUnit = unit
task
} finally {
//assert(currentUnit == unit)
currentRun.currentUnit = unit0
}
private def adjustWorkerThreadName(): Unit = {
val currentThreadName = Thread.currentThread().getName
}

final def applyPhase(unit: CompilationUnit) = withCurrentUnit(unit)(apply(unit))
private def parallelThreads = settings.YparallelThreads.value

private def isParallel = settings.YparallelPhases.containsPhase(this)

/** Is current phase cancelled on this unit? */
private def cancelled(unit: CompilationUnit) = {
// run the typer only if in `createJavadoc` mode
val maxJavaPhase = if (createJavadoc) currentRun.typerPhase.id else currentRun.namerPhase.id
reporter.cancelled || unit.isJava && this.id > maxJavaPhase
}

private def assertOnMainThread(): Unit = {
assert("main".equals(Thread.currentThread().getName), "")
}
}

// phaseName = "parser"
Expand Down Expand Up @@ -953,7 +996,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* of what file was being compiled when it broke. Since I really
* really want to know, this hack.
*/
protected var lastSeenSourceFile: SourceFile = NoSourceFile
protected var _lastSeenSourceFile: ThreadIdentityAwareThreadLocal[SourceFile] = ThreadIdentityAwareThreadLocal(NoSourceFile)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[this] final val ?

@inline protected def lastSeenSourceFile: SourceFile = _lastSeenSourceFile.get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will need to be final to @inline

Should be final anyway I think

The pattern is that we replace
<prot> var <name>: <type>
with

private[this] final _<name> = new ThreadIdentityAwareThreadLocal[<type>](...)
<prot> final def <name> : <type> = _<name>.get
<prot> final def <name>_=(newValue: <type>) = _<name>.set(newValue)

@inline protected def lastSeenSourceFile_=(source: SourceFile): Unit = _lastSeenSourceFile.set(source)

/** Let's share a lot more about why we crash all over the place.
* People will be very grateful.
Expand Down Expand Up @@ -1090,7 +1135,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
*/
var isDefined = false
/** The currently compiled unit; set from GlobalPhase */
var currentUnit: CompilationUnit = NoCompilationUnit
private final val _currentUnit: ThreadIdentityAwareThreadLocal[CompilationUnit] = ThreadIdentityAwareThreadLocal(NoCompilationUnit, NoCompilationUnit)
def currentUnit: CompilationUnit = _currentUnit.get
def currentUnit_=(unit: CompilationUnit): Unit = _currentUnit.set(unit)

val profiler: Profiler = Profiler(settings)
keepPhaseStack = settings.log.isSetByUser
Expand Down Expand Up @@ -1128,8 +1175,8 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
/** A map from compiled top-level symbols to their picklers */
val symData = new mutable.AnyRefMap[Symbol, PickleBuffer]

private var phasec: Int = 0 // phases completed
private var unitc: Int = 0 // units completed this phase
private var phasec: Int = 0 // phases completed
private final val unitc: AtomicInteger = new AtomicInteger(0) // units completed this phase
Copy link
Collaborator

@mkeskells mkeskells May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that can be a Parallel.Counter
and AtomicInteger, with get, increment (or maybe +=) and reset
we can also add assertions on the access in the same way


def size = unitbuf.size
override def toString = "scalac Run for:\n " + compiledFiles.toList.sorted.mkString("\n ")
Expand Down Expand Up @@ -1250,22 +1297,22 @@ class Global(var currentSettings: Settings, reporter0: Reporter)
* (for progress reporting)
*/
def advancePhase(): Unit = {
unitc = 0
unitc.set(0)
phasec += 1
refreshProgress()
}
/** take note that a phase on a unit is completed
* (for progress reporting)
*/
def advanceUnit(): Unit = {
unitc += 1
unitc.incrementAndGet()
refreshProgress()
}

// for sbt
def cancel(): Unit = { reporter.cancelled = true }

private def currentProgress = (phasec * size) + unitc
private def currentProgress = (phasec * size) + unitc.get()
private def totalProgress = (phaseDescriptors.size - 1) * size // -1: drops terminal phase
private def refreshProgress() = if (size > 0) progress(currentProgress, totalProgress)

Expand Down
1 change: 0 additions & 1 deletion src/compiler/scala/tools/nsc/ast/Positions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ trait Positions extends scala.reflect.internal.Positions {
self: Global =>

class ValidatingPosAssigner extends PosAssigner {
var pos: Position = _
override def traverse(t: Tree): Unit = {
if (t eq EmptyTree) ()
else if (t.pos == NoPosition) super.traverse(t setPos pos)
Expand Down
5 changes: 4 additions & 1 deletion src/compiler/scala/tools/nsc/settings/ScalaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ trait ScalaSettings extends AbsScalaSettings
val reporter = StringSetting ("-Xreporter", "classname", "Specify a custom reporter for compiler messages.", "scala.tools.nsc.reporters.ConsoleReporter")
val source = ScalaVersionSetting ("-Xsource", "version", "Treat compiler input as Scala source for the specified version, see scala/bug#8126.", initial = ScalaVersion("2.13"))

val XnoPatmatAnalysis = BooleanSetting ("-Xno-patmat-analysis", "Don't perform exhaustivity/unreachability analysis. Also, ignore @switch annotation.")
val XnoPatmatAnalysis = BooleanSetting ("-Xno-patmat-analysis", "Don't perform exhaustivity/unreachability analysis. Also, ignore @switch annotation.")

val YparallelPhases = PhasesSetting ("-Yparallel-phases", "Which phases to run in parallel")
val YparallelThreads = IntSetting ("-Yparallel-threads", "worker threads for parallel compilation", 4, Some((0,64)), _ => None )

val XmixinForceForwarders = ChoiceSetting(
name = "-Xmixin-force-forwarders",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package scala.tools.nsc.util

object ThreadIdentityAwareThreadLocal {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a object Parallel, with all of the Parallel compilation extras in one place?

def apply[T](valueOnWorker: => T) = new ThreadIdentityAwareThreadLocal[T](valueOnWorker, null.asInstanceOf[T])
def apply[T](valueOnWorker: => T, valueOnMain: => T) = new ThreadIdentityAwareThreadLocal[T](valueOnWorker, valueOnMain)
}

// `ThreadIdentityAwareThreadLocal` allows us to have different (sub)type of values on main and worker threads.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we have 3 modes
admin - running setup and cleardown on the main thread
parallel-worker
single-worker

It may be easier to have this single worker on a seperate single thread to keep it simple

The ThreadFactory used for the async executor can provide a subclass of Thread - ParallelWorkerThread - it makes this switch easier

// It's useful in cases like reporter, when on workers we want to just store messages and on main we want to print them,
// but also in the cases when we do not expect some value to be read/write on the main thread,
// and we want to discover violations of that rule.
class ThreadIdentityAwareThreadLocal[T](valueOnWorker: => T, valueOnMain: => T) {
var main: T = valueOnMain

private val worker: ThreadLocal[T] = new ThreadLocal[T] {
override def initialValue(): T = valueOnWorker
}

// That logic may look a little bit funky because we need to consider cases
// where there is only one thread which is both main and worker at a given time.
private def isOnMainThread: Boolean = {
val currentThreadName = Thread.currentThread().getName
def isMainDefined = currentThreadName.startsWith("main") && valueOnMain != null
def isWorkerDefined = currentThreadName.contains("-worker")

assert(isMainDefined || isWorkerDefined, "Variable cannot be accessed on the main thread")

isMainDefined
}

def get: T = if (isOnMainThread) main else worker.get()

def set(value: T): Unit = if (isOnMainThread) main = value else worker.set(value)

def reset(): Unit = {
main = valueOnMain
worker.set(valueOnWorker)
}
}
13 changes: 9 additions & 4 deletions src/reflect/scala/reflect/api/Trees.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package scala
package reflect
package api

import scala.reflect.runtime.ThreadIdentityAwareThreadLocal

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
Expand Down Expand Up @@ -49,8 +51,7 @@ package api
* @groupprio Factories 1
* @groupname Copying Tree Copying
* @groupprio Copying 1
*
* @contentDiagram hideNodes "*Api"
* @contentDiagram hideNodes "*Api"
* @group ReflectionAPI
*/
trait Trees { self: Universe =>
Expand Down Expand Up @@ -2463,7 +2464,9 @@ trait Trees { self: Universe =>
* @group Traversal
*/
class Traverser {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if all of these/some of these Traversers are thread local themselves.
Probably a bigger change thuogh and higher risk

protected[scala] var currentOwner: Symbol = rootMirror.RootClass
protected[scala] def currentOwner: Symbol = _currentOwner.get
protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private val _currentOwner: ThreadIdentityAwareThreadLocal[Symbol] = ThreadIdentityAwareThreadLocal[Symbol](rootMirror.RootClass)

/** Traverse something which Trees contain, but which isn't a Tree itself. */
def traverseName(name: Name): Unit = ()
Expand Down Expand Up @@ -2535,7 +2538,9 @@ trait Trees { self: Universe =>
val treeCopy: TreeCopier = newLazyTreeCopier

/** The current owner symbol. */
protected[scala] var currentOwner: Symbol = rootMirror.RootClass
protected[scala] def currentOwner: Symbol = _currentOwner.get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make these final pls

protected[scala] def currentOwner_=(sym: Symbol): Unit = _currentOwner.set(sym)
private val _currentOwner: ThreadIdentityAwareThreadLocal[Symbol] = ThreadIdentityAwareThreadLocal[Symbol](rootMirror.RootClass)

/** The enclosing method of the currently transformed tree. */
protected def currentMethod = {
Expand Down
2 changes: 1 addition & 1 deletion src/reflect/scala/reflect/internal/Names.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait Names extends api.Names {
// detect performance regressions.
//
// Discussion: https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ
protected def synchronizeNames: Boolean = false
protected def synchronizeNames: Boolean = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do.
Umad reports violation in this class:

[WARN] Method accessed from multiple threads (main, scalac-parser-worker-1): scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(int)
    scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(SymbolTable.scala)
    scala.reflect.internal.Names.enterChars(Names.scala:78)
    scala.reflect.internal.Names.body$1(Names.scala:116)
    scala.reflect.internal.Names.newTermName(Names.scala:127)
    scala.reflect.internal.Names.newTermName$(Names.scala:96)
    scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
    scala.reflect.internal.Names.newTermName(Names.scala:83)
    scala.reflect.internal.Names.newTermName$(Names.scala:82)
    scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
    scala.reflect.internal.Names.newTermName(Names.scala:85)

What synchronizeNames do is enabling synchronization on the newTermName.
It looks like something designed exactly for our use case so I happily used it.
There were some concerns regarding enabling it globally though (mostly performance ones): https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ

private val nameLock: Object = new Object

/** Memory to store all names sequentially. */
Expand Down
6 changes: 4 additions & 2 deletions src/reflect/scala/reflect/internal/Positions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package internal
import scala.collection.mutable
import util._
import scala.collection.mutable.ListBuffer
import scala.reflect.runtime.ThreadIdentityAwareThreadLocal

/** Handling range positions
* atPos, the main method in this trait, will add positions to a tree,
Expand Down Expand Up @@ -278,12 +279,13 @@ trait Positions extends api.Positions { self: SymbolTable =>
}

trait PosAssigner extends InternalTraverser {
var pos: Position
private val _pos = ThreadIdentityAwareThreadLocal[Position](NoPosition)
@inline def pos: Position = _pos.get
@inline def pos_=(position: Position): Unit = _pos.set(position)
}
protected[this] lazy val posAssigner: PosAssigner = new DefaultPosAssigner
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make this a thread local. BTW it has overrides

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it will require more changes exactly because of those overrides but when i tried now it turns out it's 3-lines-change. Fixed.


protected class DefaultPosAssigner extends PosAssigner {
var pos: Position = _
override def traverse(t: Tree) {
if (!t.canHaveAttrs) ()
else if (t.pos == NoPosition) {
Expand Down
2 changes: 2 additions & 0 deletions src/reflect/scala/reflect/internal/Reporting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ import util.Position
abstract class Reporter {
protected def info0(pos: Position, msg: String, severity: Severity, force: Boolean): Unit

def infoRaw(pos: Position, msg: String, severity: Severity, force: Boolean): Unit = info0(pos, msg, severity, force)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just for relay?


def echo(pos: Position, msg: String): Unit = info0(pos, msg, INFO, force = true)
def warning(pos: Position, msg: String): Unit = info0(pos, msg, WARNING, force = false)
def error(pos: Position, msg: String): Unit = info0(pos, msg, ERROR, force = false)
Expand Down
Loading