Skip to content

add more observability to Execution#1422

Closed
declerambaul wants to merge 1 commit intotwitter:developfrom
declerambaul:dcl/exec_obs
Closed

add more observability to Execution#1422
declerambaul wants to merge 1 commit intotwitter:developfrom
declerambaul:dcl/exec_obs

Conversation

@declerambaul
Copy link

This branch would need to be cleaned up, though I would be interested in hearing whether this would be a feasible approach to add more observability to Execution based jobs.

The pr adds support for attaching cascading flowlisteners to Flows created using Execution (this can already be done when using Job with https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Job.scala#L304). I am not a fan of Config in general, though other options seems sparse - this code replicates the approach taken for configuring reducer estimators.

@MansurAshraf

.foreach(_ => flow.setFlowStepStrategy(ReducerEstimatorStepStrategy))

config.get(Config.FlowListeners).foreach { clsNames: String =>
println(s"XXX adding FlowListeners $clsNames")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use proper logging here (same as cascading, not sure which framework they use).

@johnynek
Copy link
Collaborator

This seems reasonable to me, but it is limited in that you can't give any arguments to your flowlistener.

Does this cover most of the cases you have? The good part about this API is that it doesn't clutter up the Execution type, but the downside is that it uses reflection and has the limitation mentioned above.

/**
* configure flow listeneres for observability
*/
def addFlowListener[T](cls: Class[T]): Config =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a better type on this? Is there no interface/root class for flow listeners?

@ianoc
Copy link
Collaborator

ianoc commented Aug 16, 2015

What do we think about maybe having the type be (Config, Mode) => FlowListener, and then we externalize and base64 the closure we get so we can store it inside the config. It would keep it configurable.

@johnynek
Copy link
Collaborator

@ianoc we could add that later if we needed. The class based one could always be a version of the above:

(c: Config, m: Mode) => clazz.newInstance()

so, we have a weaker API that we could lift into the new API as needed. What do you think about starting more restricted then adding if needed?

@ianoc
Copy link
Collaborator

ianoc commented Aug 18, 2015

@johnynek Not opposed no, I had added the changes as mentioned and a few more options/things @ #1426 . We can merge this and then look at that though or similar.

@declerambaul
Copy link
Author

I had a look at #1426 which makes sense to me - I meant to keep it as simple as possible.

In terms of Oscar's comment about covering use cases, I see how this could lead to some confusion&frustration - especially since in the case of the flowstep listeners we are sort of abusing the Config class. I.e. the serialized
config classes never leave the submitter (aka the config is not used on the cluster, it only registers a callback for events related to the job), but there is currently no other way of changing a Flow created by Execution (that would require changes to Execution itself, e.g. something nasty like ExecutionConfig in this old branch https://github.com/twitter/scalding/compare/dcl/exec_listener).

Using my current usecase, getting ambrose to work nicely with scalding, one way to do it would require adding the same instance of a class that extends both FlowListener and FlowStepListener to the Flow. With the current approach, that doesn't work. I tried getting around this using something like

case object X {
  val mutableAmbroseThingy = xxx  
}

class HelperClass extends AmbroseFlowListener(mutableAmbroseThingy)

but strangely (or not), that only works if I add the exact same instance of HelperClass to the Flow (i.e. the current PR does not work in that scenario).

If we are going that route, what prevents us from generalizing this more and provide a config method that modifies a Flow directly?

def addFlowConfiguration(flowConfigFn: ((Mode, Config), Flow) => Flow) = {
  val serializedFlowConfig = flowConfigurationSerializer(flowConfigFn)
  update(Config.FlowConfigurations) { xxx }
}
@transient private[scalding] def flowConfigurationSerializer = buildInj[((Mode, Config), Flow) => Flow]

And then in the ExecutionContext just fold the flow configurations into a single one

config.getFlowConfigurations.foldLeft(flow) { case (currentFlow, flowConfigTry) =>
  val configFn: ((Mode, Config), Flow) => Flow = flowConfigTry.get
  configFn((mode, configWithId), currentFlow)
}

@ianoc
Copy link
Collaborator

ianoc commented Aug 20, 2015

One reason to avoid generalizing for better or worse is the less scope to shoot yourself in the foot, tightly controlled API's around use cases tend to lead to less leakage/less restrictive on future changes. Ideally i think we wouldn't expose these API's beyond platform helpers too I think. Its exposing a lot of internal cascading stuff we'd rather keep hidden.

Which doesn't work with ambrose? the other PR works anyway, i fired up your ambrose code with it and it worked fine?

@declerambaul
Copy link
Author

Agreed about limiting scope.

Though my argument is more about dry code, as it every new config will require a couple new methods that pretty much do the same thing (and since the serialized config itself is not human readable, I don't see much value in having separate hadoop options for the different flow configurations).

My main point is - this PR would allow you to configure a flowlistener using a hadoop option - e.g. Dscalding.obs.flowlistener=com.twitter.xx.AmbroseListener - but as soon as you move to closures (your pr), (Mode,Config) => T, you are forced to do the config in code anyways.

So to limit scope, you could also make the suggested def addFlowConfiguration to be private[scalding] and force the addition of new flow configurations to be in oss scalding. The advantage would be that you can keep the Config class clean and move the individual configs (reducer estimation / observability config) to separate classes/files (since you have modify the Config in code anyways).

@declerambaul
Copy link
Author

Re: it generally seems to work well - sadly it is not that easy to tell (also I don't know ambrose well enough to know it should look). I was talking to @MansurAshraf about this - there are changes that need to done to ambrose. E.g. make it work with Execution, currently only an Execution resulting in a single FlowDef works out of the box; and also I suspect there are some bugs with how state is kept in ambrose.

In addition, this PR seems to work for the FlowListener only, not the FlowStepListener - but if you change ExecutionContext to add the same instance as a FlowStepListener (which is only true for Ambrose) - individual job progress events (aka step X is 24% done) also start working.

config.get(Config.FlowListeners).foreach { clsNames: String =>
  println(s"XXX adding FlowListeners $clsNames")
  val clsLoader = Thread.currentThread.getContextClassLoader
  StringUtility.fastSplit(clsNames, ",")
    .map { clsLoader.loadClass(_).newInstance.asInstanceOf[FlowListener] }
    .map { x =>
      flow.addListener(x)
      flow.addStepListener(x.asInstanceOf[FlowStepListener]) //hacky
    }
} 

@ianoc
Copy link
Collaborator

ianoc commented Aug 20, 2015

You do loose setting a flow listener on the command line, but on the flip side you don't need to have a fully pre-configured class for everything. You could attach a basic ambrose to everything, then enable it, or indeed configure it from those same hadoop options. So you can kind of do more with them per say.

Which PR do you mean only works with the flow listener vs flow step listener? These comments are you on your PR not mine, i'm just a little confused.

@declerambaul
Copy link
Author

I meant that my PR (the one this thread is on) does not work for the FlowStepListener - see the snippet about the case object X above. It is not quite clear to me why not, because even if you instantiate two instances of the config (one for FlowListener, one for the FlowStepListener), I would think they should still refer to the same mutable state on the same thread/jvm.

If it works using the closures in your PR, then hooray and even more reason to go with that approach.

@declerambaul
Copy link
Author

Closing this in favor of #1426 - let's also move the discussion about scope to that pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments