-
Notifications
You must be signed in to change notification settings - Fork 55
Getting Started for Clojure Users
Most of the usual operators you use on seqs (map
, mapcat
, filter
, reduce
, group-by
, into
, take
) have PigPen equivalents. Check out the API docs for the whole list of what is supported.
Note: The operators don't return the actual data. They return an expression tree that can be later translated into either a script or run locally.
Start by requiring pigpen.core:
(require '[pigpen.core :as pig])
Here's what's different and/or new in PigPen:
The load operators in PigPen are kind of like a reader plus line-seq that returns a lazy sequence that's disposed of when you finish reading. Conceptually, you can just think of it as something that returns a lazy seq you don't have to worry about closing.
(pig/load-tsv "input.tsv")
In this example, we're reading Pig data structures from the file input.tsv. If our input looked like this:
1 2 3
4 5 6
Our output would be this:
'(["1" "2" "3"]
["4" "5" "6"])
There are also loaders that will read Clojure and other formats. Check out the docs for more examples.
Storing data is like a line-by-line writer that's similar to clojure.core/spit:
There are a few storage options:
-
(pig/store-clj "output.clj" my-relation)
- Stores the relation as EDN -
(pig/store-tsv "output.tsv" my-relation)
- Stores the relation as a tsv file. Each element must be a seq and each value is a column. -
(pig/store-json "output.json" my-relation)
- Stores the relation as a json file.
Check out the docs for more examples.
List comprehensions, clojure.core/for
, aren't supported because they don't translate very well into map-reduce. Instead, we opted to go with pig/join
:
(def foo (pig/return [{:a "xyz"}]))
(def bar (pig/return [[42 'xyz]]))
(pig/join [(foo on :a)
(bar on (fn [bar] (-> bar second str)))]
(fn [foo bar] (vector foo bar)))
-
foo
andbar
are the relations we're joining -
:a
is the key selector forfoo
-
bar
uses a slightly more complex key selector - any function is allowed here - The last argument is a function of two arguments that is used to combine each joined
foo
with each joinedbar
There are many options and variants for joins. Look at pig/cogroup
also - it's like a join without the flatten. Check out the full docs for more examples.
Clojure has clojure.set, but it doesn't handle multi-set operations. PigPen has both:
-
pig/union
- The distinct set of all elements in all relations. Similar toclojure.set/union
-
pig/union-multiset
- Similar toclojure.core/concat
, but there's no sense of order in most of map-reduce -
pig/intersection
- Similar toclojure.set/intersection
-
pig/intersection-multiset
- Computes the multiset intersection -
pig/difference
- Similar toclojure.set/difference
-
pig/difference-multiset
- Computes the multiset difference
Check out the docs for examples of how multiset operations are different.
When you want to restrict the amount of data used in a map-reduce job, it's often favorable to sample the data instead of limiting it. This is due to the distributed nature of the execution. If you ask for 1000 rows and you're running the job on 1000 machines, each of those machines must output 1000 rows and send it to a single node that actually limits the output to 1000 rows. This is because any of those 1000 nodes may return 0 rows - none of them knows what the other is doing.
A better approach is to sample the data by taking a random percentage of it.
(def data (pig/return (range 100000)))
(pig/sample 0.01 data)
This command will take 1% of the data at random.
Note: PigPen also supports pig/take
, which is much more useful when running locally to debug a script.