Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

The producer api

Ian O'Connell edited this page Oct 24, 2013 · 8 revisions

TODO: Intro to the Producer API.

Data Sourcing

You get data into Summingbird by instantiating a Source, defined by the given execution platform, and passing it to Producer.source:

def source[P <: Platform[P], T: Manifest](s: P#Source[T]): Producer[P, T]

Producer.source returns a leaf node in the graph. Calling methods on the resulting producer won’t actually do anything! Summingbird’s DSL is declarative. You’re building up a computation that will only execute later, when you use some particular Platform to plan the topology.

Map-like Functions

The map-like functions available to a producer operate on individual elements in the stream represented by the producer. In each of the examples below, producer is of type Producer[P <: Platform[P], T] and represents a stream of T using whatever representation is required by the abstract execution platform P.

producer.map { t => u: U }

The map method takes a function from T => U and performs the transformation on each item in the stream. The result is a Producer[P, U].

val birds: Producer[P, Bird] = ???
val speeds: Producer[P, Int] =
  birds.map { bird: Bird => bird.getSpeed }

producer.filter { t => <boolean> }

The filter method takes a function from T => Boolean and filters out all items from the stream for which the predicate returns false. The result is a Producer[P, T]; a stream of the same type, with potentially fewer items.

val quickBirds: Producer[P, Bird] =
  birds.filter { bird: Bird => bird.getSpeed > MilesPerHour(10) }

producer.flatMap { t => <sequence of U> }

flatMap maps each T in the producer’s stream to a TraversableOnce[U] and then flattens together all returned lists. The return value of flatMap is a Producer[P, U].

val chicks: Producer[P, Chick] =
  birds.flatMap { bird: Bird => bird.getChildren: Seq[Chick] }

The producer returned by the flatMap function may have the same, greater or fewer items than the original stream.

Misc Functions

producer.name(“flowName”)

Calling name(identifier) on a producer assigns a name to every node above the call. Platform implementations use this feature to allow the user to pass in options at planning time that are meant for sections of a given producer graph. For example:

birds.name("preOperations")
  .filter { bird: Bird => bird.getSpeed > MilesPerHour(10) }
  .name("filtered")
  .flatMap { bird: Bird => bird.getChildren: Seq[Chick] }
  .name("flatMapped")

The original birds branch is now named “preOperations”. The filter call occurs in a branch named “filtered”, and the flatMap call occurs in a branch named flatMapped.

Join/Merge Functions

producer.merge(otherProducer)

TODO. Merge combines two streams without any uniqueing or joining.

producer.either(otherProducer)

TODO. either is a way to merge with a producer of another type.

producer.leftJoin(service)

leftJoin takes a service and performs a left join, or a lookup join, in some way that makes sense to the underlying platform. In realtime, this looks like a key-value lookup on a random access key value store. In offline mode, we do a special kind of join that simulates the kv pair lookup at a specific time. More information on that later, after we describe how to write out streams of data.

def leftJoin[RightV](service: P#Service[K, RightV]): KeyedProducer[P, K, (V, Option[RightV])] =

Sink Functions

There are two ways to get data out of the system. You can write into a Sink, or you can sumByKey into a Store. The sink accepts a stream of data; each instance is associated with a time. In realtime, this just looks like a queue. Another job might consume this.

producer.write(sink)

write writes out a stream of the data.

producer.sumByKey(store)

sumByKey allows you to write out a snapshot of data.

CacheSize

The CacheSize option controls Summingbird’s map-side aggregation. If you apply a CacheSize(n) option on a flatMap phase, Summingbird will buffer n key-value pairs on each flatMap bolt executor before it does an emit. For example:

val tail = source.flatMap { t => ... }.name("my FM")
val s = Storm.local(Map("my FM" -> Options.set(CacheSize(100))))
s.plan(tail)

In this scenario, each of the 20 executors will buffer 100 key-value pairs. When each executor hits 100, it’ll sum all pairs up by key and emit the aggregated values. This is Summingbird’s realtime version of map-side aggregation, and can be really useful when your key space isn’t that large, or when you have huge skew in your data (since the skewed key will be pre-aggregated instead of kicking out large numbers of values).

If you apply a CacheSize option to a sumByKey, Summingbird will do the same sort of buffering in the SummerBolt on its commit out to the underlying Store. Same exact idea as the flatMap cache size, just another knob that you get to control.