From 259e6ee1e7e32fb4bc76774a6a70c174e84e799b Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 29 Dec 2020 16:05:28 +0100 Subject: [PATCH] Refactor Slurm tasks into slurm jobs Previously, a job would be broken into its tasks, and a new tapasco job would be created for each task. These jobs were then executed on the SLURM cluster. Refactor this, such that the original job is executed on the SLURM cluster as-is, which simplifies the SLURM logic. --- .../tapasco/jobs/executors/Compose.scala | 57 ++++++++++++++++--- .../jobs/executors/HighLevelSynthesis.scala | 44 +++++++++++++- .../main/scala/tapasco/task/ComposeTask.scala | 47 +-------------- .../tapasco/task/HighLevelSynthesisTask.scala | 36 +----------- 4 files changed, 96 insertions(+), 88 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala index 50ff9005..db42774d 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala @@ -28,18 +28,20 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore +import tapasco.activity.composers.Composer import tapasco.base._ import tapasco.filemgmt._ import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} +import tapasco.slurm.Slurm.Completed import tapasco.task._ +import tapasco.slurm._ private object Compose extends Executor[ComposeJob] { private implicit val logger = tapasco.Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled def execute(job: ComposeJob) (implicit cfg: Configuration, tsk: Tasks): Boolean = { - val signal = new Semaphore(0) - logger.trace("composition: {}", job.composition) // first, collect all kernels and trigger HLS if not built yet @@ -74,7 +76,18 @@ private object Compose extends Executor[ComposeJob] { logger.info("all HLS tasks finished successfully, beginning compose run...") logger.debug("job: {}", job) - val composeTasks = for { + if (!_slurm) nodeExecution(job) else slurmExecution(job) + } else { + logger.error("HLS tasks failed, aborting composition") + false + } + } + + private def nodeExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + val signal = new Semaphore(0) + + val composeTasks = for { p <- job.platforms a <- job.architectures t = Target(a, p) @@ -104,10 +117,40 @@ private object Compose extends Executor[ComposeJob] { // successful, if all successful (composeTasks map (_.result) fold true) (_ && _) - } else { - logger.error("HLS tasks failed, aborting composition") - false + } + + private def slurmExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val ComposeJob(c, f, i, _, _, _, _, _, _, _) = job + val name = c.composition.map(_.kernel).fold("compose")(_ ++ "-" ++ _) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("Compose").resolve(name) + // needed for resource-based scheduling + val consumer = new ComposeTask( + composition = c, + designFrequency = f, + implementation = Composer.Implementation(i), + target = Target(job.architectures.head, job.platforms.head), + onComplete = _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-compose.log"), + errorLog = outDir.resolve("slurm-compose.errors.log"), + consumer = consumer, + maxHours = ComposeTask.MAX_COMPOSE_HOURS, + comment = Some(outDir.toString), + job = job, + cfg_file = outDir.resolve("slurm-compose.cfg") + ) + + // start slurm job and wait for finish + Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false } } } - diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala index 80fbe765..7f2d04f4 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala @@ -22,7 +22,6 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore - import tapasco.Logging import tapasco.activity.hls.HighLevelSynthesizer import tapasco.activity.hls.HighLevelSynthesizer.Implementation._ @@ -30,12 +29,18 @@ import tapasco.activity.hls.HighLevelSynthesizer._ import tapasco.base._ import tapasco.filemgmt.FileAssetManager import tapasco.jobs._ +import tapasco.slurm.Slurm +import tapasco.slurm.Slurm.Completed import tapasco.task._ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { private implicit final val logger = Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled + + def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = + if (!_slurm) nodeExecution(job) else slurmExecution(job) - def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { + def nodeExecution(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { val signal = new Semaphore(0) val runs: Seq[(Kernel, Target)] = for { a <- job.architectures.toSeq.sortBy(_.name) @@ -94,4 +99,39 @@ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { // success, if all tasks were successful ((tasks ++ importTasks) map (_.result) fold true) (_ && _) } + + def slurmExecution(job: HighLevelSynthesisJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val name = job.kernels.map(_.name).fold("hls")(_++"-"++_) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("HLS").resolve(name) + // needed for resource-based scheduling + val consumer = new HighLevelSynthesisTask( + job.kernels.head, + Target(job.architectures.head, job.platforms.head), + cfg, + VivadoHLS, + _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-hls.log"), + errorLog = outDir.resolve("hls-slurm.errors.log"), + consumer = consumer, + maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, + job = job, + cfg_file = outDir.resolve("slurm-hls.cfg") + ) + + // execute sbatch to enqueue job, then wait for it + val r = Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false + } + FileAssetManager.reset() + r + } } diff --git a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala index 94450972..d07fab1f 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala @@ -26,10 +26,7 @@ import java.nio.file._ import tapasco.Logging._ import tapasco.activity.composers._ import tapasco.base._ -import tapasco.base.json._ import tapasco.dse.Heuristics -import tapasco.jobs._ -import tapasco.slurm._ import tapasco.util._ import scala.util.Properties.{lineSeparator => NL} @@ -52,7 +49,6 @@ class ComposeTask(composition: Composition, val onComplete: Boolean => Unit) (implicit cfg: Configuration) extends Task with LogTracking { private[this] implicit val _logger = tapasco.Logging.logger(getClass) - private[this] val _slurm = Slurm.enabled private[this] var _composerResult: Option[Composer.Result] = None private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq()) private[this] val _logFile = logFile getOrElse _outDir.resolve("tapasco.log").toString @@ -62,9 +58,7 @@ class ComposeTask(composition: Composition, def composerResult: Option[Composer.Result] = _composerResult /** @inheritdoc**/ - def job: Boolean = if (!_slurm) nodeExecution else slurmExecution - - private def nodeExecution: Boolean = { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(_logFile.toString) val composer = Composer(implementation)(cfg) _logger.debug("launching compose run for {}@{} [current thread: {}], logfile {}", @@ -106,43 +100,6 @@ class ComposeTask(composition: Composition, result } - private def slurmExecution: Boolean = { - - val l = Paths.get(_logFile).toAbsolutePath().normalize() - val cfgFile = l.resolveSibling("slurm-compose.cfg") // Configuration Json - val slgFile = l.resolveSibling("slurm-compose.log") // SLURM job stdout log - val errFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") - - val cmpsJob = ComposeJob( - composition, designFrequency, implementation.toString, Some(Seq(target.ad.name)), Some(Seq(target.pd.name)), - features, debugMode - ) - - // define SLURM job - val job = Slurm.Job( - name = l.getParent.getParent.getFileName.resolve(l.getParent.getFileName).toString, - log = l, - slurmLog = slgFile, - errorLog = errFile, - consumer = this, - maxHours = ComposeTask.MAX_COMPOSE_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - comment = Some(_outDir.toString), - job = cmpsJob, - cfg_file = cfgFile - ) - - Slurm(job)(cfg) foreach (Slurm.waitFor(_)) // execute and wait - - _composerResult = if (debugMode.isEmpty) { - ComposeTask.parseResultInLog(l.toString) - } else { - ComposeTask.makeDebugResult(debugMode.get) - } - (_composerResult map (_.result) getOrElse false) == ComposeResult.Success - - } - private def elementdesc = "%s [F=%2.2f]".format(logformat(composition), designFrequency.toDouble) /** @inheritdoc*/ @@ -175,7 +132,7 @@ object ComposeTask { import scala.io._ - private final val MAX_COMPOSE_HOURS = 23 + final val MAX_COMPOSE_HOURS = 23 private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 90d6e5d9..3d89ae0c 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -37,7 +37,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio val onComplete: Boolean => Unit) extends Task with LogTracking { private[this] implicit val logger = tapasco.Logging.logger(getClass) private[this] var result: Option[HighLevelSynthesizer.Result] = None - private[this] val slurm = Slurm.enabled private[this] val r = HighLevelSynthesizer(hls) private[this] val l = r.logFile(k, t)(cfg).resolveSibling("hls.log") @@ -48,43 +47,12 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio def description: String = "High-Level-Synthesis for '%s' with target %s @ %s".format(k.name, t.pd.name, t.ad.name) - def job: Boolean = if (!slurm) { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(l.toString) logger.trace("current thread name: {}", Thread.currentThread.getName()) result = Some(r.synthesize(k, t)(cfg)) LogFileTracker.stopLogFileAppender(appender) result map (_.toBoolean) getOrElse false - } else { - - val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json - val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) - val e = l.resolveSibling("hls-slurm.errors.log") - - val hlsJob = HighLevelSynthesisJob( - hls.toString, - Some(Seq(t.ad.name)), - Some(Seq(t.pd.name)), - Some(Seq(k.name)), - Some(true) // skip Evaluation on cluster - ) - - // define SLURM job - val job = Slurm.Job( - name = "hls-%s-%s-%s".format(t.ad.name, t.pd.name, k.name), - log = l, - slurmLog = slurmLog, - errorLog = e, - consumer = this, - maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - job = hlsJob, - cfg_file = cfgFile - ) - - // execute sbatch to enqueue job, then wait for it - val r = (Slurm(job)(cfg) map Slurm.waitFor).getOrElse(false) == Slurm.Completed() - FileAssetManager.reset() - r } def logFiles: Set[String] = Set(l.toString) @@ -100,6 +68,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio ) } -private object HighLevelSynthesisTask { +object HighLevelSynthesisTask { final val MAX_SYNTH_HOURS = 8 }