diff --git a/project/Build.scala b/project/Build.scala index 3ca4058dd6..51d520590c 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -25,7 +25,7 @@ object ScaldingBuild extends Build { val avroVersion = "1.7.4" val bijectionVersion = "0.8.1" val cascadingAvroVersion = "2.1.2" - val chillVersion = "0.7.0" + val chillVersion = "0.7.1-t1439774596000-77b8a73d7643263249f76ae03c0d3f03db330fc3" val dfsDatastoresVersion = "1.3.4" val elephantbirdVersion = "4.8" val hadoopLzoVersion = "0.4.19" @@ -301,6 +301,7 @@ object ScaldingBuild extends Build { "cascading" % "cascading-local" % cascadingVersion, "com.twitter" % "chill-hadoop" % chillVersion, "com.twitter" % "chill-java" % chillVersion, + "com.twitter" %% "chill-bijection" % chillVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "algebird-test" % algebirdVersion % "test", "com.twitter" %% "bijection-core" % bijectionVersion, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 0d628f78a8..bf0c37d7f4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -16,12 +16,14 @@ limitations under the License. package com.twitter.scalding import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.serializer.{ Serialization => HSerialization } -import com.twitter.chill.KryoInstantiator +import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalizer, KryoInstantiator } import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator } +import com.twitter.bijection.{ Base64String, Injection } import cascading.pipe.assembly.AggregateBy -import cascading.flow.FlowProps +import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy } import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -298,6 +300,57 @@ trait Config extends Serializable { def setReducerEstimators(clsList: String): Config = this + (Config.ReducerEstimators -> clsList) + /** + * configure flow listeneres for observability + */ + def addFlowListener(flowListenerProvider: (Mode, Config) => FlowListener): Config = { + val serializedListener = flowListenerSerializer(flowListenerProvider) + update(Config.FlowListeners) { + case None => (Some(serializedListener), ()) + case Some(lst) => (Some(s"$serializedListener,$lst"), ()) + }._2 + } + + def getFlowListeners: List[Try[(Mode, Config) => FlowListener]] = + get(Config.FlowListeners) + .toIterable + .flatMap(s => StringUtility.fastSplit(s, ",")) + .map(flowListenerSerializer.invert(_)) + .toList + + def addFlowStepListener(flowListenerProvider: (Mode, Config) => FlowStepListener): Config = { + val serializedListener = flowStepListenerSerializer(flowListenerProvider) + update(Config.FlowStepListeners) { + case None => (Some(serializedListener), ()) + case Some(lst) => (Some(s"$serializedListener,$lst"), ()) + }._2 + } + + def getFlowStepListeners: List[Try[(Mode, Config) => FlowStepListener]] = + get(Config.FlowStepListeners) + .toIterable + .flatMap(s => StringUtility.fastSplit(s, ",")) + .map(flowStepListenerSerializer.invert(_)) + .toList + + def addFlowStepStrategy(flowStrategyProvider: (Mode, Config) => FlowStepStrategy[JobConf]): Config = { + val serializedListener = flowStepStrategiesSerializer(flowStrategyProvider) + update(Config.FlowStepStrategies) { + case None => (Some(serializedListener), ()) + case Some(lst) => (Some(s"$serializedListener,$lst"), ()) + }._2 + } + + def clearFlowStepStrategies: Config = + this.-(Config.FlowStepStrategies) + + def getFlowStepStrategies: List[Try[(Mode, Config) => FlowStepStrategy[JobConf]]] = + get(Config.FlowStepStrategies) + .toIterable + .flatMap(s => StringUtility.fastSplit(s, ",")) + .map(flowStepStrategiesSerializer.invert(_)) + .toList + /** Get the number of reducers (this is the parameter Hadoop will use) */ def getNumReducers: Option[Int] = get(Config.HadoopNumReducers).map(_.toInt) def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducers -> n.toString) @@ -326,6 +379,9 @@ object Config { val ScaldingVersion: String = "scalding.version" val HRavenHistoryUserName: String = "hraven.history.user.name" val ScaldingRequireOrderedSerialization: String = "scalding.require.orderedserialization" + val FlowListeners: String = "scalding.observability.flowlisteners" + val FlowStepListeners: String = "scalding.observability.flowsteplisteners" + val FlowStepStrategies: String = "scalding.strategies.flowstepstrategies" /** * Parameter that actually controls the number of reduce tasks. @@ -476,4 +532,11 @@ object Config { is.close() md5Hex(bytes) } + + private[this] def buildInj[T: ExternalizerInjection: ExternalizerCodec]: Injection[T, String] = + Injection.connect[T, Externalizer[T], Array[Byte], Base64String, String] + + @transient private[scalding] lazy val flowStepListenerSerializer = buildInj[(Mode, Config) => FlowStepListener] + @transient private[scalding] lazy val flowListenerSerializer = buildInj[(Mode, Config) => FlowListener] + @transient private[scalding] lazy val flowStepStrategiesSerializer = buildInj[(Mode, Config) => FlowStepStrategy[JobConf]] } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index 14a4147d3b..5c8060199b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.scalding import cascading.flow.hadoop.HadoopFlow -import cascading.flow.{ FlowDef, Flow } +import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy } import cascading.flow.planner.BaseFlowStep import cascading.pipe.Pipe import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.util.{ Failure, Success, Try } +import org.slf4j.{ Logger, LoggerFactory } /* * This has all the state needed to build a single flow @@ -36,6 +37,8 @@ trait ExecutionContext { def flowDef: FlowDef def mode: Mode + import ExecutionContext._ + private def getIdentifierOpt(descriptions: Seq[String]): Option[String] = { if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None } @@ -67,8 +70,8 @@ trait ExecutionContext { name.foreach(flowDef.setName) // identify the flowDef - val withId = config.addUniqueId(UniqueID.getIDFor(flowDef)) - val flow = mode.newFlowConnector(withId).connect(flowDef) + val configWithId = config.addUniqueId(UniqueID.getIDFor(flowDef)) + val flow = mode.newFlowConnector(configWithId).connect(flowDef) if (config.getRequireOrderedSerialization) { // This will throw, but be caught by the outer try if // we have groupby/cogroupby not using OrderedSerializations @@ -89,8 +92,30 @@ trait ExecutionContext { // which instantiates and runs them mode match { case _: HadoopMode => - config.get(Config.ReducerEstimators) - .foreach(_ => flow.setFlowStepStrategy(ReducerEstimatorStepStrategy)) + val reducerEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.ReducerEstimators).toList.map(_ => ReducerEstimatorStepStrategy) + + + val otherStrategies: Seq[FlowStepStrategy[JobConf]] = config.getFlowStepStrategies.map { tTry: Try[(Mode, Config) => FlowStepStrategy[JobConf]] => + val t: (Mode, Config) => FlowStepStrategy[JobConf] = tTry.getOrElse(throw new Exception(s"Failed to decode flow step strategy $tTry when submitting job")) + t(mode, configWithId) + } + + val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ otherStrategies) + + optionalFinalStrategy.foreach { strategy => + flow.setFlowStepStrategy(strategy) + } + + config.getFlowListeners.foreach { tTry: Try[(Mode, Config) => FlowListener] => + val t: (Mode, Config) => FlowListener = tTry.getOrElse(throw new Exception(s"Failed to decode flow listener $tTry when submitting job")) + flow.addListener(t(mode, configWithId)) + } + + config.getFlowStepListeners.foreach { tTry: Try[(Mode, Config) => FlowStepListener] => + val t: (Mode, Config) => FlowStepListener = tTry.getOrElse(throw new Exception(s"Failed to decode flow step listener $tTry when submitting job")) + flow.addStepListener(t(mode, configWithId)) + } + case _ => () } @@ -124,6 +149,8 @@ trait ExecutionContext { * modeFromImplicit, etc... below. */ object ExecutionContext { + private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass) + private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = { baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match { case pipe: Pipe => RichPipe.getPipeDescriptions(pipe) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index 613d5c2290..787986e5a6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.scalding import com.twitter.algebird.monad.Reader - +import com.twitter.algebird.Semigroup import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStep, FlowStepListener, FlowSkipStrategy, FlowStepStrategy } import cascading.pipe.Pipe import cascading.property.AppProps @@ -233,7 +233,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { if (existing == null) strategy else - FlowStepStrategies.plus( + FlowStepStrategies[Any].plus( existing.asInstanceOf[FlowStepStrategy[Any]], strategy.asInstanceOf[FlowStepStrategy[Any]]) flow.setFlowStepStrategy(composed) @@ -521,11 +521,11 @@ trait CounterVerification extends Job { } } -private[scalding] object FlowStepStrategies { +private[scalding] case class FlowStepStrategies[A]() extends Semigroup[FlowStepStrategy[A]] { /** * Returns a new FlowStepStrategy that runs both strategies in sequence. */ - def plus[A](l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] = + def plus(l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] = new FlowStepStrategy[A] { override def apply( flow: Flow[A], diff --git a/scalding-core/src/main/scala/com/twitter/scalding/LineNumber.scala b/scalding-core/src/main/scala/com/twitter/scalding/LineNumber.scala index 8f6439a468..85ee1dc93f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/LineNumber.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/LineNumber.scala @@ -46,12 +46,15 @@ object LineNumber { * easier). Otherwise it just gets the most direct * caller for methods that have all the callers in the scalding package */ - def tryNonScaldingCaller: StackTraceElement = { + def tryNonScaldingCaller: Option[StackTraceElement] = + tryNonScaldingCaller(Thread.currentThread().getStackTrace) + + def tryNonScaldingCaller(stack: Array[StackTraceElement]): Option[StackTraceElement] = { /* depth = 1: * depth 0 => tryNonScaldingCaller * depth 1 => caller of this method */ - val stack = Thread.currentThread().getStackTrace + // user code is never in our package, or in scala, but // since internal methods often recurse we ignore these // in our attempt to get a good line number for the user. @@ -73,10 +76,7 @@ object LineNumber { jobClass.isAssignableFrom(cls) }) - val directCaller = getCurrent(1, stack) - scaldingJobCaller .orElse(nonScalding) - .getOrElse(directCaller) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala index 0e8541b655..1f10e39860 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala @@ -88,6 +88,13 @@ object RichPipe extends java.io.Serializable { p } + def setPipeDescriptionFrom(p: Pipe, ste: Option[StackTraceElement]): Pipe = { + ste.foreach { ste => + setPipeDescriptions(p, List(ste.toString)) + } + p + } + } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/NoStackAndThen.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/NoStackAndThen.scala index 9ef4eaee39..940d24b994 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/NoStackAndThen.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/NoStackAndThen.scala @@ -24,8 +24,8 @@ package com.twitter.scalding.typed */ sealed trait NoStackAndThen[-A, +B] extends java.io.Serializable { def apply(a: A): B - final def andThen[C](fn: B => C): NoStackAndThen[A, C] = NoStackAndThen.NoStackMore(this, fn) - final def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] = { + def andThen[C](fn: B => C): NoStackAndThen[A, C] = NoStackAndThen.NoStackMore(this, fn) + def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] = { import NoStackAndThen._ @annotation.tailrec def push(front: NoStackAndThen[A, Any], @@ -46,11 +46,24 @@ sealed trait NoStackAndThen[-A, +B] extends java.io.Serializable { } object NoStackAndThen { - def apply[A, B](fn: A => B): NoStackAndThen[A, B] = NoStackWrap(fn) + private[typed] def buildStackEntry: Array[StackTraceElement] = Thread.currentThread().getStackTrace + + def apply[A, B](fn: A => B): NoStackAndThen[A, B] = WithStackTrace(NoStackWrap(fn), buildStackEntry) + private sealed trait ReversedStack[-A, +B] private case class EmptyStack[-A, +B](fn: A => B) extends ReversedStack[A, B] private case class NonEmpty[-A, B, +C](head: A => B, rest: ReversedStack[B, C]) extends ReversedStack[A, C] + private[scalding] case class WithStackTrace[A, B](inner: NoStackAndThen[A, B], stackEntry: Array[StackTraceElement]) extends NoStackAndThen[A, B] { + override def apply(a: A): B = inner(a) + + override def andThen[C](fn: B => C): NoStackAndThen[A, C] = + WithStackTrace[A, C](inner.andThen(fn), stackEntry ++ buildStackEntry) + + override def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] = + WithStackTrace[A, C](inner.andThen(that), stackEntry ++ buildStackEntry) + } + // Just wraps a function private case class NoStackWrap[A, B](fn: A => B) extends NoStackAndThen[A, B] { def apply(a: A) = fn(a) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 72e8ff2d7b..22a873b48e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -94,7 +94,7 @@ object TypedPipe extends Serializable { def mapped = pipe def keyOrdering = ord def reducers = None - val descriptions: Seq[String] = List(LineNumber.tryNonScaldingCaller.toString) + val descriptions: Seq[String] = LineNumber.tryNonScaldingCaller.map(_.toString).toList def joinFunction = CoGroupable.castingJoinFunction[V] } @@ -161,7 +161,7 @@ trait TypedPipe[+T] extends Serializable { import Dsl._ // Ensure we hook into all pipes coming out of the typed API to apply the FlowState's properties on their pipes val pipe = asPipe[U](fieldNames).applyFlowConfigProperties(flowDef) - RichPipe.setPipeDescriptions(pipe, List(LineNumber.tryNonScaldingCaller.toString)) + RichPipe.setPipeDescriptionFrom(pipe, LineNumber.tryNonScaldingCaller) } /** @@ -379,7 +379,7 @@ trait TypedPipe[+T] extends Serializable { //the ev is not needed for the cast. In fact, you can do the cast with ev(t) and it will return //it as (K,V), but the problem is, ev is not serializable. So we do the cast, which due to ev //being present, will always pass. - Grouped(raiseTo[(K, V)]).withDescription(LineNumber.tryNonScaldingCaller.toString) + Grouped(raiseTo[(K, V)]).withDescription(LineNumber.tryNonScaldingCaller.map(_.toString)) /** Send all items to a single reducer */ def groupAll: Grouped[Unit, T] = groupBy(x => ())(ordSer[Unit]).withReducers(1) @@ -894,23 +894,32 @@ class TypedPipeFactory[T] private (@transient val next: NoStackAndThen[(FlowDef, override def sumByLocalKeys[K, V](implicit ev: T <:< (K, V), sg: Semigroup[V]) = andThen(_.sumByLocalKeys[K, V]) - override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = + override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = { // unwrap in a loop, without recursing - unwrap(this).asPipe[U](fieldNames)(flowDef, mode, setter) + val (unwrapped, st) = unwrap(this, Array()) + val pipe = unwrapped.asPipe[U](fieldNames)(flowDef, mode, setter) + RichPipe.setPipeDescriptionFrom(pipe, LineNumber.tryNonScaldingCaller(st)) + pipe + } override def toIterableExecution: Execution[Iterable[T]] = Execution.getConfigMode.flatMap { case (conf, mode) => // This can only terminate in TypedPipeInst, which will // keep the reference to this flowDef val flowDef = new FlowDef - val nextPipe = unwrap(this)(flowDef, mode) + val (nextPipe, stackTraces) = unwrap(this, Array())(flowDef, mode) nextPipe.toIterableExecution } @annotation.tailrec - private def unwrap(pipe: TypedPipe[T])(implicit flowDef: FlowDef, mode: Mode): TypedPipe[T] = pipe match { - case TypedPipeFactory(n) => unwrap(n(flowDef, mode)) - case tp => tp + private def unwrap(pipe: TypedPipe[T], st: Array[StackTraceElement])(implicit flowDef: FlowDef, mode: Mode): (TypedPipe[T], Array[StackTraceElement]) = pipe match { + case TypedPipeFactory(n) => + val fullTrace = n match { + case NoStackAndThen.WithStackTrace(_, st) => st + case _ => Array[StackTraceElement]() + } + unwrap(n(flowDef, mode), st ++ fullTrace) + case tp => (tp, st) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/WithDescription.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/WithDescription.scala index b471e262e3..816c068739 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/WithDescription.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/WithDescription.scala @@ -25,7 +25,13 @@ trait HasDescription { /** * Used for objects that may _set_ a description to be used in .dot and MR step names. */ -trait WithDescription[+This <: WithDescription[This]] extends HasDescription { +trait WithDescription[+This <: WithDescription[This]] extends HasDescription { self: This => /** never mutates this, instead returns a new item. */ def withDescription(description: String): This + + def withDescription(descriptionOpt: Option[String]): This = + descriptionOpt match { + case Some(description) => withDescription(description) + case None => self + } } diff --git a/version.sbt b/version.sbt index 6be2063a81..c999268b4d 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.15.1-SNAPSHOT" +version in ThisBuild := "0.15.1-exec.1"