Skip to content

Joins and Cogroup

Matt Bossenbroek edited this page Mar 5, 2014 · 1 revision

Joining data is a concept that is widely used in map-reduce. In normal Clojure, you would use for to join data like this:

(def xs [{:a 1} {:a 2}])
(def ys [{:b 1} {:b 2}])

(defn f [x] (:a x))
(defn g [x] (:b x))

=> (for [x xs
         y ys
         :when (= (f x) (g y))]
     [x y])

[[{:a 1} {:b 1}]
 [{:a 2} {:b 2}]]

The downside of this approach is that we're actually computing a cross join of xs and ys, only to filter out most of the results with our :when clause. If we're joining large datasets, this can be prohibitively expensive. A better approach would be to specify the key that we want to join on and partition the data according to that key.

Note: we could also use :b and :c instead of f and g. Functions were just used to show that any clojure function can be used as a key selector.

The same join in PigPen looks like this:

(require '[pigpen.core :as pig])

(def pig-xs (pig/return [{:a 1} {:a 2}]))
(def pig-ys (pig/return [{:b 1} {:b 2}]))

=> (set (pig/dump
     (pig/join [(pig-xs :on f)
                (pig-ys :on g)]
       (fn [x y] [x y]))))

#{[{:a 1} {:b 1}]
  [{:a 2} {:b 2}]}

As you can see, it's a lot of the same information, just rearranged slightly. Our key selectors, f and g, are specified with the relations xs and ys. The last argument is a function that produces a result for each combination of x and y.

Note: pig/return and pig/dump are used to mock data and execute the command locally.

You can also specify more than one relation to join:

=> (let [pig-ws (pig/return [{:a 1} {:a 2}])
         pig-xs (pig/return [{:b 1} {:b 2}])
         pig-ys (pig/return [{:c 1} {:c 2}])
         pig-zs (pig/return [{:d 1} {:d 2}])]

     (set (pig/dump
       (pig/join [(pig-ws :on :a)
                  (pig-xs :on :b)
                  (pig-ys :on :c)
                  (pig-zs :on :d)]
         (fn [w x y z] [w x y z])))))

#{[{:a 1} {:b 1} {:c 1} {:d 1}]
  [{:a 2} {:b 2} {:c 2} {:d 2}]}

And mark relations as optional (outer joins):

=> (let [pig-xs (pig/return [{:a 1} {:a 2} {:a 3}])
         pig-ys (pig/return [{:b 1} {:b 2} {:b 4}])]

     (set (pig/dump
       (pig/join [(pig-xs :on :a :type :required)
                  (pig-ys :on :b :type :optional)]
         (fn [x y] [x y])))))

#{[{:a 1} {:b 1}]
  [{:a 2} {:b 2}]
  [{:a 3} nil]}

By default, relations in a join a required, while relations in a cogroup are optional.

Here's another example unit test to look at how a join works. Take a look at command:

(use '[clojure.test])

(deftest test-join
  (let [left  (pig/return [{:a 1 :b 2} {:a 1 :b 3} {:a 2 :b 4}])
        right (pig/return [{:c 1 :d "foo"} {:c 2 :d "bar"} {:c 2 :d "baz"}])

        command (pig/join [(left :on :a)
                           (right :on :c)]
                          (fn [l r] [(:b l) (:d r)]))]

    (is (= (set (pig/dump command))
           #{[2 "foo"]
             [3 "foo"]
             [4 "bar"]
             [4 "baz"]}))))

In this example, left and right are the relations we want to join. The key selectors are :a and :c, respectively. The last argument is the consolidation function - for each record from left & right that are joined, this function is called to merge the results.

Note that the return type of this function isn't a map. You can return anything from a PigPen function - vectors, strings, anything. Maps are generally the easiest to destructure in the next operator, but it's entirely up to you.

Also noteworthy is the way nil keys are handled. By default, nil keys from different relations are treated as different and are not joined. You can add the option :join-nils true to the join to change this behavior.

For example:

(deftest test-join-nils
  (let [data1 (pig/return [{:k nil, :v 1}
                           {:k nil, :v 3}
                           {:k :i, :v 5}
                           {:k :i, :v 7}
                           {:k :l, :v 9}
                           {:k :l, :v 11}])
        data2 (pig/return [{:k nil, :v 2}
                           {:k nil, :v 4}
                           {:k :i, :v 6}
                           {:k :i, :v 8}
                           {:k :r, :v 10}
                           {:k :r, :v 12}])]

    ; normal join
    (is (= (set (pig/dump
             (pig/join [(data1 :on :k)
                        (data2 :on :k)]
                       vector)))
           '#{[{:k :i, :v 5} {:k :i, :v 6}]
              [{:k :i, :v 5} {:k :i, :v 8}]
              [{:k :i, :v 7} {:k :i, :v 6}]
              [{:k :i, :v 7} {:k :i, :v 8}]}))

    ; join nils
    (is (= (set (pig/dump
             (pig/join [(data1 :on :k)
                        (data2 :on :k)]
                       vector
                       {:join-nils true})))
           '#{[{:k nil, :v 1} {:k nil, :v 2}]
              [{:k nil, :v 3} {:k nil, :v 2}]
              [{:k nil, :v 1} {:k nil, :v 4}]
              [{:k nil, :v 3} {:k nil, :v 4}]
              [{:k :i, :v 5} {:k :i, :v 6}]
              [{:k :i, :v 5} {:k :i, :v 8}]
              [{:k :i, :v 7} {:k :i, :v 6}]
              [{:k :i, :v 7} {:k :i, :v 8}]}))))

Next is a common map-reduce pattern - co-group:

(deftest test-cogroup
  (let [left  (pig/return [{:a 1 :b 2} {:a 1 :b 3} {:a 2 :b 4}])
        right (pig/return [{:c 1 :d "foo"} {:c 2 :d "bar"} {:c 2 :d "baz"}])

        command (pig/cogroup [(left :on :a)
                              (right :on :c)]
                             (fn [k l r] [k (map :b l) (map :d r)]))]

    (is (= (set (pig/dump command))
           #{[1 [2 3] ["foo"]]
             [2 [4]   ["bar" "baz"]]}))))

A cogroup is similar to a join, but instead of flattening the matching rows, all of the values are passed to the consolidation function. Note that our function takes 3 arguments - the first one is the key was joined, the rest are collections of the values that match the key for each of the relations.

It is quite common in map-reduce for the individual groups in a group-by or cogroup to be very large. In these cases, we may need to incrementally aggregate the data in parallel. To accomplish this, we use the functions in pigpen.fold. Check out Folding Data for more info and usage.