Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object ScaldingBuild extends Build {
val avroVersion = "1.7.4"
val bijectionVersion = "0.8.1"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.7.0"
val chillVersion = "0.7.1-t1439774596000-77b8a73d7643263249f76ae03c0d3f03db330fc3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, we wouldn't merge this yet, right? This is just to get comments?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, this version isn't on maven central, so this won't pass tests. -- branch hasn't merged in Chill either, and no rls there.

val dfsDatastoresVersion = "1.3.4"
val elephantbirdVersion = "4.8"
val hadoopLzoVersion = "0.4.19"
Expand Down Expand Up @@ -301,6 +301,7 @@ object ScaldingBuild extends Build {
"cascading" % "cascading-local" % cascadingVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-test" % algebirdVersion % "test",
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand Down
67 changes: 65 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ limitations under the License.
package com.twitter.scalding

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.serializer.{ Serialization => HSerialization }
import com.twitter.chill.KryoInstantiator
import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalizer, KryoInstantiator }
import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator }
import com.twitter.bijection.{ Base64String, Injection }

import cascading.pipe.assembly.AggregateBy
import cascading.flow.FlowProps
import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy }
import cascading.property.AppProps
import cascading.tuple.collect.SpillableProps

Expand Down Expand Up @@ -298,6 +300,57 @@ trait Config extends Serializable {
def setReducerEstimators(clsList: String): Config =
this + (Config.ReducerEstimators -> clsList)

/**
* configure flow listeneres for observability
*/
def addFlowListener(flowListenerProvider: (Mode, Config) => FlowListener): Config = {
val serializedListener = flowListenerSerializer(flowListenerProvider)
update(Config.FlowListeners) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def getFlowListeners: List[Try[(Mode, Config) => FlowListener]] =
get(Config.FlowListeners)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowListenerSerializer.invert(_))
.toList

def addFlowStepListener(flowListenerProvider: (Mode, Config) => FlowStepListener): Config = {
val serializedListener = flowStepListenerSerializer(flowListenerProvider)
update(Config.FlowStepListeners) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def getFlowStepListeners: List[Try[(Mode, Config) => FlowStepListener]] =
get(Config.FlowStepListeners)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowStepListenerSerializer.invert(_))
.toList

def addFlowStepStrategy(flowStrategyProvider: (Mode, Config) => FlowStepStrategy[JobConf]): Config = {
val serializedListener = flowStepStrategiesSerializer(flowStrategyProvider)
update(Config.FlowStepStrategies) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def clearFlowStepStrategies: Config =
this.-(Config.FlowStepStrategies)

def getFlowStepStrategies: List[Try[(Mode, Config) => FlowStepStrategy[JobConf]]] =
get(Config.FlowStepStrategies)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowStepStrategiesSerializer.invert(_))
.toList

/** Get the number of reducers (this is the parameter Hadoop will use) */
def getNumReducers: Option[Int] = get(Config.HadoopNumReducers).map(_.toInt)
def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducers -> n.toString)
Expand Down Expand Up @@ -326,6 +379,9 @@ object Config {
val ScaldingVersion: String = "scalding.version"
val HRavenHistoryUserName: String = "hraven.history.user.name"
val ScaldingRequireOrderedSerialization: String = "scalding.require.orderedserialization"
val FlowListeners: String = "scalding.observability.flowlisteners"
val FlowStepListeners: String = "scalding.observability.flowsteplisteners"
val FlowStepStrategies: String = "scalding.strategies.flowstepstrategies"

/**
* Parameter that actually controls the number of reduce tasks.
Expand Down Expand Up @@ -476,4 +532,11 @@ object Config {
is.close()
md5Hex(bytes)
}

private[this] def buildInj[T: ExternalizerInjection: ExternalizerCodec]: Injection[T, String] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we're at it, why not stack in GzippedBase64String:

https://github.com/twitter/bijection/blob/develop/bijection-core/src/main/scala/com/twitter/bijection/BinaryBijections.scala#L33

Might make the jobconfs slightly smaller.

Injection.connect[T, Externalizer[T], Array[Byte], Base64String, String]

@transient private[scalding] lazy val flowStepListenerSerializer = buildInj[(Mode, Config) => FlowStepListener]
@transient private[scalding] lazy val flowListenerSerializer = buildInj[(Mode, Config) => FlowListener]
@transient private[scalding] lazy val flowStepStrategiesSerializer = buildInj[(Mode, Config) => FlowStepStrategy[JobConf]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding

import cascading.flow.hadoop.HadoopFlow
import cascading.flow.{ FlowDef, Flow }
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy }
import cascading.flow.planner.BaseFlowStep
import cascading.pipe.Pipe
import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy
Expand All @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import org.slf4j.{ Logger, LoggerFactory }

/*
* This has all the state needed to build a single flow
Expand All @@ -36,6 +37,8 @@ trait ExecutionContext {
def flowDef: FlowDef
def mode: Mode

import ExecutionContext._

private def getIdentifierOpt(descriptions: Seq[String]): Option[String] = {
if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None
}
Expand Down Expand Up @@ -67,8 +70,8 @@ trait ExecutionContext {
name.foreach(flowDef.setName)

// identify the flowDef
val withId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(withId).connect(flowDef)
val configWithId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(configWithId).connect(flowDef)
if (config.getRequireOrderedSerialization) {
// This will throw, but be caught by the outer try if
// we have groupby/cogroupby not using OrderedSerializations
Expand All @@ -89,8 +92,30 @@ trait ExecutionContext {
// which instantiates and runs them
mode match {
case _: HadoopMode =>
config.get(Config.ReducerEstimators)
.foreach(_ => flow.setFlowStepStrategy(ReducerEstimatorStepStrategy))
val reducerEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.ReducerEstimators).toList.map(_ => ReducerEstimatorStepStrategy)


val otherStrategies: Seq[FlowStepStrategy[JobConf]] = config.getFlowStepStrategies.map { tTry: Try[(Mode, Config) => FlowStepStrategy[JobConf]] =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why take the mode as a parameter if we only pass it a HadoopMode? Why not do all these in any case and let the user attach an empty FlowListener/etc... (which we can even provide, as there is a monoid on these things).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can move it to hadoop mode sure.

You mean have a default strategy so its not an option below?

val t: (Mode, Config) => FlowStepStrategy[JobConf] = tTry.getOrElse(throw new Exception(s"Failed to decode flow step strategy $tTry when submitting job"))
t(mode, configWithId)
}

val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ otherStrategies)

optionalFinalStrategy.foreach { strategy =>
flow.setFlowStepStrategy(strategy)
}

config.getFlowListeners.foreach { tTry: Try[(Mode, Config) => FlowListener] =>
val t: (Mode, Config) => FlowListener = tTry.getOrElse(throw new Exception(s"Failed to decode flow listener $tTry when submitting job"))
flow.addListener(t(mode, configWithId))
}

config.getFlowStepListeners.foreach { tTry: Try[(Mode, Config) => FlowStepListener] =>
val t: (Mode, Config) => FlowStepListener = tTry.getOrElse(throw new Exception(s"Failed to decode flow step listener $tTry when submitting job"))
flow.addStepListener(t(mode, configWithId))
}

case _ => ()
}

Expand Down Expand Up @@ -124,6 +149,8 @@ trait ExecutionContext {
* modeFromImplicit, etc... below.
*/
object ExecutionContext {
private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass)

private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = {
baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match {
case pipe: Pipe => RichPipe.getPipeDescriptions(pipe)
Expand Down
8 changes: 4 additions & 4 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding

import com.twitter.algebird.monad.Reader

import com.twitter.algebird.Semigroup
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStep, FlowStepListener, FlowSkipStrategy, FlowStepStrategy }
import cascading.pipe.Pipe
import cascading.property.AppProps
Expand Down Expand Up @@ -233,7 +233,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
if (existing == null)
strategy
else
FlowStepStrategies.plus(
FlowStepStrategies[Any].plus(
existing.asInstanceOf[FlowStepStrategy[Any]],
strategy.asInstanceOf[FlowStepStrategy[Any]])
flow.setFlowStepStrategy(composed)
Expand Down Expand Up @@ -521,11 +521,11 @@ trait CounterVerification extends Job {
}
}

private[scalding] object FlowStepStrategies {
private[scalding] case class FlowStepStrategies[A]() extends Semigroup[FlowStepStrategy[A]] {
/**
* Returns a new FlowStepStrategy that runs both strategies in sequence.
*/
def plus[A](l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] =
def plus(l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] =
new FlowStepStrategy[A] {
override def apply(
flow: Flow[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ object LineNumber {
* easier). Otherwise it just gets the most direct
* caller for methods that have all the callers in the scalding package
*/
def tryNonScaldingCaller: StackTraceElement = {
def tryNonScaldingCaller: Option[StackTraceElement] =
tryNonScaldingCaller(Thread.currentThread().getStackTrace)

def tryNonScaldingCaller(stack: Array[StackTraceElement]): Option[StackTraceElement] = {
/* depth = 1:
* depth 0 => tryNonScaldingCaller
* depth 1 => caller of this method
*/
val stack = Thread.currentThread().getStackTrace

// user code is never in our package, or in scala, but
// since internal methods often recurse we ignore these
// in our attempt to get a good line number for the user.
Expand All @@ -73,10 +76,7 @@ object LineNumber {
jobClass.isAssignableFrom(cls)
})

val directCaller = getCurrent(1, stack)

scaldingJobCaller
.orElse(nonScalding)
.getOrElse(directCaller)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ object RichPipe extends java.io.Serializable {
p
}

def setPipeDescriptionFrom(p: Pipe, ste: Option[StackTraceElement]): Pipe = {
ste.foreach { ste =>
setPipeDescriptions(p, List(ste.toString))
}
p
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ package com.twitter.scalding.typed
*/
sealed trait NoStackAndThen[-A, +B] extends java.io.Serializable {
def apply(a: A): B
final def andThen[C](fn: B => C): NoStackAndThen[A, C] = NoStackAndThen.NoStackMore(this, fn)
final def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] = {
def andThen[C](fn: B => C): NoStackAndThen[A, C] = NoStackAndThen.NoStackMore(this, fn)
def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] = {
import NoStackAndThen._
@annotation.tailrec
def push(front: NoStackAndThen[A, Any],
Expand All @@ -46,11 +46,24 @@ sealed trait NoStackAndThen[-A, +B] extends java.io.Serializable {
}

object NoStackAndThen {
def apply[A, B](fn: A => B): NoStackAndThen[A, B] = NoStackWrap(fn)
private[typed] def buildStackEntry: Array[StackTraceElement] = Thread.currentThread().getStackTrace

def apply[A, B](fn: A => B): NoStackAndThen[A, B] = WithStackTrace(NoStackWrap(fn), buildStackEntry)

private sealed trait ReversedStack[-A, +B]
private case class EmptyStack[-A, +B](fn: A => B) extends ReversedStack[A, B]
private case class NonEmpty[-A, B, +C](head: A => B, rest: ReversedStack[B, C]) extends ReversedStack[A, C]

private[scalding] case class WithStackTrace[A, B](inner: NoStackAndThen[A, B], stackEntry: Array[StackTraceElement]) extends NoStackAndThen[A, B] {
override def apply(a: A): B = inner(a)

override def andThen[C](fn: B => C): NoStackAndThen[A, C] =
WithStackTrace[A, C](inner.andThen(fn), stackEntry ++ buildStackEntry)

override def andThen[C](that: NoStackAndThen[B, C]): NoStackAndThen[A, C] =
WithStackTrace[A, C](inner.andThen(that), stackEntry ++ buildStackEntry)
}

// Just wraps a function
private case class NoStackWrap[A, B](fn: A => B) extends NoStackAndThen[A, B] {
def apply(a: A) = fn(a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object TypedPipe extends Serializable {
def mapped = pipe
def keyOrdering = ord
def reducers = None
val descriptions: Seq[String] = List(LineNumber.tryNonScaldingCaller.toString)
val descriptions: Seq[String] = LineNumber.tryNonScaldingCaller.map(_.toString).toList
def joinFunction = CoGroupable.castingJoinFunction[V]
}

Expand Down Expand Up @@ -161,7 +161,7 @@ trait TypedPipe[+T] extends Serializable {
import Dsl._
// Ensure we hook into all pipes coming out of the typed API to apply the FlowState's properties on their pipes
val pipe = asPipe[U](fieldNames).applyFlowConfigProperties(flowDef)
RichPipe.setPipeDescriptions(pipe, List(LineNumber.tryNonScaldingCaller.toString))
RichPipe.setPipeDescriptionFrom(pipe, LineNumber.tryNonScaldingCaller)
}

/**
Expand Down Expand Up @@ -379,7 +379,7 @@ trait TypedPipe[+T] extends Serializable {
//the ev is not needed for the cast. In fact, you can do the cast with ev(t) and it will return
//it as (K,V), but the problem is, ev is not serializable. So we do the cast, which due to ev
//being present, will always pass.
Grouped(raiseTo[(K, V)]).withDescription(LineNumber.tryNonScaldingCaller.toString)
Grouped(raiseTo[(K, V)]).withDescription(LineNumber.tryNonScaldingCaller.map(_.toString))

/** Send all items to a single reducer */
def groupAll: Grouped[Unit, T] = groupBy(x => ())(ordSer[Unit]).withReducers(1)
Expand Down Expand Up @@ -894,23 +894,32 @@ class TypedPipeFactory[T] private (@transient val next: NoStackAndThen[(FlowDef,
override def sumByLocalKeys[K, V](implicit ev: T <:< (K, V), sg: Semigroup[V]) =
andThen(_.sumByLocalKeys[K, V])

override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) =
override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = {
// unwrap in a loop, without recursing
unwrap(this).asPipe[U](fieldNames)(flowDef, mode, setter)
val (unwrapped, st) = unwrap(this, Array())
val pipe = unwrapped.asPipe[U](fieldNames)(flowDef, mode, setter)
RichPipe.setPipeDescriptionFrom(pipe, LineNumber.tryNonScaldingCaller(st))
pipe
}

override def toIterableExecution: Execution[Iterable[T]] = Execution.getConfigMode.flatMap {
case (conf, mode) =>
// This can only terminate in TypedPipeInst, which will
// keep the reference to this flowDef
val flowDef = new FlowDef
val nextPipe = unwrap(this)(flowDef, mode)
val (nextPipe, stackTraces) = unwrap(this, Array())(flowDef, mode)
nextPipe.toIterableExecution
}

@annotation.tailrec
private def unwrap(pipe: TypedPipe[T])(implicit flowDef: FlowDef, mode: Mode): TypedPipe[T] = pipe match {
case TypedPipeFactory(n) => unwrap(n(flowDef, mode))
case tp => tp
private def unwrap(pipe: TypedPipe[T], st: Array[StackTraceElement])(implicit flowDef: FlowDef, mode: Mode): (TypedPipe[T], Array[StackTraceElement]) = pipe match {
case TypedPipeFactory(n) =>
val fullTrace = n match {
case NoStackAndThen.WithStackTrace(_, st) => st
case _ => Array[StackTraceElement]()
}
unwrap(n(flowDef, mode), st ++ fullTrace)
case tp => (tp, st)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ trait HasDescription {
/**
* Used for objects that may _set_ a description to be used in .dot and MR step names.
*/
trait WithDescription[+This <: WithDescription[This]] extends HasDescription {
trait WithDescription[+This <: WithDescription[This]] extends HasDescription { self: This =>
/** never mutates this, instead returns a new item. */
def withDescription(description: String): This

def withDescription(descriptionOpt: Option[String]): This =
descriptionOpt match {
case Some(description) => withDescription(description)
case None => self
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.15.1-SNAPSHOT"
version in ThisBuild := "0.15.1-exec.1"