Skip to content

Commit

Permalink
Refactor Slurm tasks into slurm jobs
Browse files Browse the repository at this point in the history
Previously, a job would be broken into its tasks, and a new tapasco
job would be created for each task. These jobs were then executed on the
SLURM cluster. Refactor this, such that the original job is executed on
the SLURM cluster as-is, which simplifies the SLURM logic.
  • Loading branch information
mhrtmnn committed Dec 29, 2020
1 parent b7c0a20 commit 259e6ee
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 88 deletions.
57 changes: 50 additions & 7 deletions toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ package tapasco.jobs.executors

import java.util.concurrent.Semaphore

import tapasco.activity.composers.Composer
import tapasco.base._
import tapasco.filemgmt._
import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob}
import tapasco.slurm.Slurm.Completed
import tapasco.task._
import tapasco.slurm._

private object Compose extends Executor[ComposeJob] {
private implicit val logger = tapasco.Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled

def execute(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)

logger.trace("composition: {}", job.composition)

// first, collect all kernels and trigger HLS if not built yet
Expand Down Expand Up @@ -74,7 +76,18 @@ private object Compose extends Executor[ComposeJob] {
logger.info("all HLS tasks finished successfully, beginning compose run...")
logger.debug("job: {}", job)

val composeTasks = for {
if (!_slurm) nodeExecution(job) else slurmExecution(job)
} else {
logger.error("HLS tasks failed, aborting composition")
false
}
}

private def nodeExecution(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)

val composeTasks = for {
p <- job.platforms
a <- job.architectures
t = Target(a, p)
Expand Down Expand Up @@ -104,10 +117,40 @@ private object Compose extends Executor[ComposeJob] {

// successful, if all successful
(composeTasks map (_.result) fold true) (_ && _)
} else {
logger.error("HLS tasks failed, aborting composition")
false
}

private def slurmExecution(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {

val ComposeJob(c, f, i, _, _, _, _, _, _, _) = job
val name = c.composition.map(_.kernel).fold("compose")(_ ++ "-" ++ _)
val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("Compose").resolve(name)
// needed for resource-based scheduling
val consumer = new ComposeTask(
composition = c,
designFrequency = f,
implementation = Composer.Implementation(i),
target = Target(job.architectures.head, job.platforms.head),
onComplete = _ => ()
)

// define SLURM job
val sjob = Slurm.Job(
name = name,
log = outDir.resolve("tapasco.log"),
slurmLog = outDir.resolve("slurm-compose.log"),
errorLog = outDir.resolve("slurm-compose.errors.log"),
consumer = consumer,
maxHours = ComposeTask.MAX_COMPOSE_HOURS,
comment = Some(outDir.toString),
job = job,
cfg_file = outDir.resolve("slurm-compose.cfg")
)

// start slurm job and wait for finish
Slurm(sjob)(cfg) match {
case Some(id) => Slurm.waitFor(id) == Completed()
case None => false
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@
package tapasco.jobs.executors

import java.util.concurrent.Semaphore

import tapasco.Logging
import tapasco.activity.hls.HighLevelSynthesizer
import tapasco.activity.hls.HighLevelSynthesizer.Implementation._
import tapasco.activity.hls.HighLevelSynthesizer._
import tapasco.base._
import tapasco.filemgmt.FileAssetManager
import tapasco.jobs._
import tapasco.slurm.Slurm
import tapasco.slurm.Slurm.Completed
import tapasco.task._

protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] {
private implicit final val logger = Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled

def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean =
if (!_slurm) nodeExecution(job) else slurmExecution(job)

def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = {
def nodeExecution(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)
val runs: Seq[(Kernel, Target)] = for {
a <- job.architectures.toSeq.sortBy(_.name)
Expand Down Expand Up @@ -94,4 +99,39 @@ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] {
// success, if all tasks were successful
((tasks ++ importTasks) map (_.result) fold true) (_ && _)
}

def slurmExecution(job: HighLevelSynthesisJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {

val name = job.kernels.map(_.name).fold("hls")(_++"-"++_)
val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("HLS").resolve(name)
// needed for resource-based scheduling
val consumer = new HighLevelSynthesisTask(
job.kernels.head,
Target(job.architectures.head, job.platforms.head),
cfg,
VivadoHLS,
_ => ()
)

// define SLURM job
val sjob = Slurm.Job(
name = name,
log = outDir.resolve("tapasco.log"),
slurmLog = outDir.resolve("slurm-hls.log"),
errorLog = outDir.resolve("hls-slurm.errors.log"),
consumer = consumer,
maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS,
job = job,
cfg_file = outDir.resolve("slurm-hls.cfg")
)

// execute sbatch to enqueue job, then wait for it
val r = Slurm(sjob)(cfg) match {
case Some(id) => Slurm.waitFor(id) == Completed()
case None => false
}
FileAssetManager.reset()
r
}
}
47 changes: 2 additions & 45 deletions toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import java.nio.file._
import tapasco.Logging._
import tapasco.activity.composers._
import tapasco.base._
import tapasco.base.json._
import tapasco.dse.Heuristics
import tapasco.jobs._
import tapasco.slurm._
import tapasco.util._

import scala.util.Properties.{lineSeparator => NL}
Expand All @@ -52,7 +49,6 @@ class ComposeTask(composition: Composition,
val onComplete: Boolean => Unit)
(implicit cfg: Configuration) extends Task with LogTracking {
private[this] implicit val _logger = tapasco.Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled
private[this] var _composerResult: Option[Composer.Result] = None
private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq())
private[this] val _logFile = logFile getOrElse _outDir.resolve("tapasco.log").toString
Expand All @@ -62,9 +58,7 @@ class ComposeTask(composition: Composition,
def composerResult: Option[Composer.Result] = _composerResult

/** @inheritdoc**/
def job: Boolean = if (!_slurm) nodeExecution else slurmExecution

private def nodeExecution: Boolean = {
def job: Boolean = {
val appender = LogFileTracker.setupLogFileAppender(_logFile.toString)
val composer = Composer(implementation)(cfg)
_logger.debug("launching compose run for {}@{} [current thread: {}], logfile {}",
Expand Down Expand Up @@ -106,43 +100,6 @@ class ComposeTask(composition: Composition,
result
}

private def slurmExecution: Boolean = {

val l = Paths.get(_logFile).toAbsolutePath().normalize()
val cfgFile = l.resolveSibling("slurm-compose.cfg") // Configuration Json
val slgFile = l.resolveSibling("slurm-compose.log") // SLURM job stdout log
val errFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log")

val cmpsJob = ComposeJob(
composition, designFrequency, implementation.toString, Some(Seq(target.ad.name)), Some(Seq(target.pd.name)),
features, debugMode
)

// define SLURM job
val job = Slurm.Job(
name = l.getParent.getParent.getFileName.resolve(l.getParent.getFileName).toString,
log = l,
slurmLog = slgFile,
errorLog = errFile,
consumer = this,
maxHours = ComposeTask.MAX_COMPOSE_HOURS,
commands = Seq("tapasco --configFile %s".format(cfgFile.toString)),
comment = Some(_outDir.toString),
job = cmpsJob,
cfg_file = cfgFile
)

Slurm(job)(cfg) foreach (Slurm.waitFor(_)) // execute and wait

_composerResult = if (debugMode.isEmpty) {
ComposeTask.parseResultInLog(l.toString)
} else {
ComposeTask.makeDebugResult(debugMode.get)
}
(_composerResult map (_.result) getOrElse false) == ComposeResult.Success

}

private def elementdesc = "%s [F=%2.2f]".format(logformat(composition), designFrequency.toDouble)

/** @inheritdoc*/
Expand Down Expand Up @@ -175,7 +132,7 @@ object ComposeTask {

import scala.io._

private final val MAX_COMPOSE_HOURS = 23
final val MAX_COMPOSE_HOURS = 23
private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored
private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored
private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio
val onComplete: Boolean => Unit) extends Task with LogTracking {
private[this] implicit val logger = tapasco.Logging.logger(getClass)
private[this] var result: Option[HighLevelSynthesizer.Result] = None
private[this] val slurm = Slurm.enabled
private[this] val r = HighLevelSynthesizer(hls)
private[this] val l = r.logFile(k, t)(cfg).resolveSibling("hls.log")

Expand All @@ -48,43 +47,12 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio
def description: String =
"High-Level-Synthesis for '%s' with target %s @ %s".format(k.name, t.pd.name, t.ad.name)

def job: Boolean = if (!slurm) {
def job: Boolean = {
val appender = LogFileTracker.setupLogFileAppender(l.toString)
logger.trace("current thread name: {}", Thread.currentThread.getName())
result = Some(r.synthesize(k, t)(cfg))
LogFileTracker.stopLogFileAppender(appender)
result map (_.toBoolean) getOrElse false
} else {

val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json
val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors)
val e = l.resolveSibling("hls-slurm.errors.log")

val hlsJob = HighLevelSynthesisJob(
hls.toString,
Some(Seq(t.ad.name)),
Some(Seq(t.pd.name)),
Some(Seq(k.name)),
Some(true) // skip Evaluation on cluster
)

// define SLURM job
val job = Slurm.Job(
name = "hls-%s-%s-%s".format(t.ad.name, t.pd.name, k.name),
log = l,
slurmLog = slurmLog,
errorLog = e,
consumer = this,
maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS,
commands = Seq("tapasco --configFile %s".format(cfgFile.toString)),
job = hlsJob,
cfg_file = cfgFile
)

// execute sbatch to enqueue job, then wait for it
val r = (Slurm(job)(cfg) map Slurm.waitFor).getOrElse(false) == Slurm.Completed()
FileAssetManager.reset()
r
}

def logFiles: Set[String] = Set(l.toString)
Expand All @@ -100,6 +68,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio
)
}

private object HighLevelSynthesisTask {
object HighLevelSynthesisTask {
final val MAX_SYNTH_HOURS = 8
}

0 comments on commit 259e6ee

Please sign in to comment.