diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 0d628f78a8..5b671188a9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -21,7 +21,7 @@ import com.twitter.chill.KryoInstantiator import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator } import cascading.pipe.assembly.AggregateBy -import cascading.flow.FlowProps +import cascading.flow.{ FlowListener, FlowProps } import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -298,6 +298,29 @@ trait Config extends Serializable { def setReducerEstimators(clsList: String): Config = this + (Config.ReducerEstimators -> clsList) + /** + * configure flow listeneres for observability + */ + def addFlowListener[T](cls: Class[T]): Config = + addFlowListener(cls.getName) + def addFlowListener(clsName: String): Config = + update(Config.FlowListeners) { + case None => (Some(clsName), ()) + case Some(lst) => (Some(s"$clsName,$lst"), ()) + }._2 + def setFlowListeners(clsList: String): Config = + this + (Config.FlowListeners -> clsList) + + def addFlowStepListener[T](cls: Class[T]): Config = + addFlowStepListener(cls.getName) + def addFlowStepListener(clsName: String): Config = + update(Config.FlowStepListeners) { + case None => (Some(clsName), ()) + case Some(lst) => (Some(s"$clsName,$lst"), ()) + }._2 + def setFlowStepListeners(clsList: String): Config = + this + (Config.FlowStepListeners -> clsList) + /** Get the number of reducers (this is the parameter Hadoop will use) */ def getNumReducers: Option[Int] = get(Config.HadoopNumReducers).map(_.toInt) def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducers -> n.toString) @@ -326,6 +349,8 @@ object Config { val ScaldingVersion: String = "scalding.version" val HRavenHistoryUserName: String = "hraven.history.user.name" val ScaldingRequireOrderedSerialization: String = "scalding.require.orderedserialization" + val FlowListeners: String = "scalding.observability.flowlisteners.classes" + val FlowStepListeners: String = "scalding.observability.flowsteplisteners.classes" /** * Parameter that actually controls the number of reduce tasks. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index 14a4147d3b..770474ae40 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.scalding import cascading.flow.hadoop.HadoopFlow -import cascading.flow.{ FlowDef, Flow } +import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener } import cascading.flow.planner.BaseFlowStep import cascading.pipe.Pipe import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy @@ -91,6 +91,23 @@ trait ExecutionContext { case _: HadoopMode => config.get(Config.ReducerEstimators) .foreach(_ => flow.setFlowStepStrategy(ReducerEstimatorStepStrategy)) + + config.get(Config.FlowListeners).foreach { clsNames: String => + println(s"XXX adding FlowListeners $clsNames") + val clsLoader = Thread.currentThread.getContextClassLoader + StringUtility.fastSplit(clsNames, ",") + .map { clsLoader.loadClass(_).newInstance.asInstanceOf[FlowListener] } + .map { flow.addListener(_) } + } + + config.get(Config.FlowStepListeners).foreach { clsNames: String => + println(s"XXX adding FlowStepListeners $clsNames") + val clsLoader = Thread.currentThread.getContextClassLoader + StringUtility.fastSplit(clsNames, ",") + .map { clsLoader.loadClass(_).newInstance.asInstanceOf[FlowStepListener] } + .map { flow.addStepListener(_) } + } + case _ => () } diff --git a/version.sbt b/version.sbt index 6be2063a81..c999268b4d 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.15.1-SNAPSHOT" +version in ThisBuild := "0.15.1-exec.1"