Skip to content

Design and Features

Matt Bossenbroek edited this page May 7, 2015 · 3 revisions

PigPen was designed to match Clojure as closely as possible. Map-reduce is functional programming, so why not use an awesome functional programming language that already exists? Not only is there a lower learning curve, but most of the concepts translate very easily to big data.

In PigPen, queries are manipulated as expression trees. Each operation is represented as a map of information about what behavior is desired. These maps can be nested together to build a tree representation of a complex query. Each command also contains references to its ancestor commands. When executed, that query tree is converted into a directed acyclic query graph. This allows for easy merging of duplicate commands, optimizing sequences of related commands, and instrumenting the query with debug information.

Optimization

###De-duping

When we represent our query as a graph of operations, de-duping them is trivial. Clojure provides value-equality, meaning that if two objects have the same content, they are equal. If any two operations have the same representation, then they are in fact identical. No care has to be taken to avoid duplicating commands when writing the query - they're all optimized before executing it.

For example, say we have the following query:

(require '[pigpen.core :as pig])

(let [even-squares (->>
                     (pig/load-clj "input.clj")
                     (pig/map (fn [x] (* x x)))
                     (pig/filter even?)
                     (pig/store-clj "even-squares.clj"))
      odd-squares (->>
                    (pig/load-clj "input.clj")
                    (pig/map (fn [x] (* x x)))
                    (pig/filter odd?)
                    (pig/store-clj "odd-squares.clj"))]
  (pig/store-many even-squares odd-squares))

In this query, we load data from a file, compute the square of each number, and then split it into even and odd numbers. Here's what a graph of this operation would look like:

This matches our query, but it's doing some extra work. It's loading the same input.clj file twice and computing the squares of all of the numbers twice. This might not seem like a lot of work, but when you do it on a lot of data, simple operations really add up. To optimize this query, we look for operations that are identical. At first glance it looks like our operation to compute squares might be a good candidate, but they actually have different parents so we can't merge them yet. We can, however, merge the load functions because they don't have any parents and they load the same file.

Now our graph looks like this:

Now we're loading the data once, which will save some time, but we're still doing the squares computation twice. Since we now have a single load command, our map operations are now identical and can be merged:

Now we have an optimized query where each operation is unique. Because we always merge commands one at a time, we know that we're not going to change the logic of the query. You can easily generate queries within loops without worrying about duplicated execution - PigPen will only execute the unique parts of the query.

Serialization

After we're done processing data in Clojure, our data must be serialized into a binary blob so that Pig or Cascading can move it around between machines in a cluster. This is an expensive, but essential, step for PigPen. Luckily, many consecutive operations in a script can often be packed together into a single operation. This saves a lot of time by not serializing and deserializing the data when we don't need to. For example, any consecutive map, filter, and mapcat operations can be re-written as a single mapcat operation.

Let's look at some examples to illustrate this:

In this example, we start with a serialized (blue) value, 4, deserialize it (orange), apply our map function, and re-serialize the value.

Now let's try a slightly more complex (and realistic) example. In this example, we apply a map, mapcat, and filter operation.

If you haven't used it before, mapcat is an operation where we apply a function to a value and that function returns a sequence of values. That sequence is then 'flattened' and each single value is fed into the next step. In Clojure, it's the result of combining map and concat. In Scala, this is called flatMap and in c# it's called selectMany.

In the diagram below, the flow on the left is our query before the optimization; the right is after the optimization. We start with the same value, 4, and calculate the square of the value; same as the first example. Then we take our value and apply a function that decrements the value, returns the value, and increments the value. Pig then takes this set of values and flattens them, making each one an input to the next step. Note that we had to serialize and deserialize the data when interacting with Pig. The third and final step is to filter the data; in this example we're retaining only odd values. As you can see, we end up serializing and deserializing the data in between each step.

The right hand side shows the result of the optimization. Put simply, each operation now returns a sequence of elements. Our map operation returns a sequence of one element, 16, our mapcat remains the same, and our filter returns a sequence of zero or one elements. By making these commands more uniform, we can merge them more easily. We end up flattening more sequences of values within the set of commands, but there is no serialization cost between steps. While it looks more complex, this optimization results in much faster execution of each if these steps.

As of PigPen 0.3.0, this is now implemented using something that resembles transducers.

Testing, Local Execution, and Debugging

Iterative development, testing, and debuggability are key tenants of PigPen. When you have jobs that can run for days at a time, the last thing you need is an unexpected bug to show up in the eleventh hour. PigPen has a local execution mode that runs your code in the REPL, which allows us to write unit tests for our queries. We can then know with very high confidence that something will not crash when run and will actually return the expected results. Even better, this feature allows for iterative development of queries.

Typically, we start with just a few records of the source data and use that to populate a unit test. Because PigPen returns data in the REPL, we don't have to go elsewhere to build our test data. Then, using the REPL, we add commands to map, filter, join, and reduce the mock data as required; each step of the way verifying that the result is what we expect. This approach produces more reliable results than building a giant monolithic script and crossing your fingers. Another useful pattern is to break up large queries into smaller functional units. Map-reduce queries tend to explode and contract the source data by orders of magnitude. When you try to test the script as a whole, you often have to start with a very large amount of data to end up with just a few rows. By breaking the query into smaller parts, you can test the first part, which may take 100 rows to produce two; and then test the second part by using those two rows as a template to simulate 100 more fake ones.

Debug mode has proven to be really useful for fixing the unexpected. When enabled, it will write to disk the result of every operation in the script, in addition to the normal outputs. This is very useful in an environment such as Hadoop, where you can't step through code and hours may pass in between operations. Debug mode can also be coupled with a graph-viz visualization of the execution graph. You can then visually associate what it plans to do with the actual output of each operation.

To enable debug mode, see the options for pigpen.pig/write-script and pigpen.pig/generate-script. It will write the extra debug output to the folder specified.

Example of debug mode enabled:

(defn my-pigpen-query []
  (->>
    (pig/load-tsv "input.tsv")
    (pig/map count)))

(require '[pigpen.pig])

(pigpen.pig/write-script "my-debug-script.pig" {:debug "/debug-output/"} (my-pigpen-query))

To enable visualization, take a look at pigpen.viz/show.

Example of visualization:

(require '[pigpen.viz])

(pigpen.viz/show (my-pigpen-query))        ;; Shows a graph of the query