cascading_ext is a collection of tools built on top of the Cascading platform which make it easy to build, debug, and run simple and high-performance data workflows.
Some of the most interesting public classes in the project (so far).
BloomJoin
BloomJoin is designed to be a drop-in replacement for CoGroup which achieves significant performance improvements on certain datasets by filtering the LHS pipe against a bloom filter built from the keys on the RHS. Using a BloomJoin can improve the performance of a job when:
- joining a large LHS store against a relatively small RHS store
- most reduce input data does not make it into the output store
- the job is a good candidate for HashJoin, but the RHS tuples don't fit in memory
Internally, we have cut the reduce time of jobs by up to 90% when a BloomJoin lets the job only reduce over the small subset of the data that makes it past the bloom filter.
The constructor signature mirrors CoGroup:
Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");
Pipe joined = new BloomJoin(source1, new Fields("field1"), source2, new Fields("field3"));
CascadingUtil.get().getFlowConnector().connect("Example flow", sources, sink, joined).complete();
see example usages: BloomJoinExample, BloomJoinExampleWithoutCascadingUtil
For more details on how BloomJoin works, check out our blog post.
BloomFilter
BloomFilter is similar to BloomJoin, but can be used when no fields from the RHS are needed in the output. This allows for simpler field algebra (duplicate field names are not a problem):
Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");
Pipe joined = new BloomFilter(source1, new Fields("field1"), source2, new Fields("field1"), true);
CascadingUtil.get().getFlowConnector().connect("Example flow", sources, sink, joined).complete();
Another feature of BloomFilter is the ability to perform an inexact filter, and entirely avoid reducing over the LHS. When performing an inexact join, the LHS is filtered by the bloom filter from the RHS, but the final exact CoGroup is skipped, leaving both true and false positives in the output. See TestBloomFilter for more examples.
MultiGroupBy
MultiGroupBy allows the user to easily GroupBy two or more pipes on a common field without performing a full Inner/OuterJoin first (which can lead to an explosion in the number of tuples, if keys are not distinct.) The MultiBuffer interface gives a user-defined function access to all tuples sharing a common key, across all input pipes:
Pipe s1 = new Pipe("s1");
Pipe s2 = new Pipe("s2");
Pipe results = new MultiGroupBy(s1, new Fields("key"), s2, new Fields("key"),
new Fields("key-rename"), new CustomBuffer());
CascadingUtil.get().getFlowConnector().connect(sources, sink, results).complete();
see TestMultiGroupBy for example usage.
CascadingUtil
CascadingUtil is a utility class which makes it easy to add default properties and strategies to all jobs which are run in a codebase, and which adds some useful logging and debugging information. For a simple example of how to use this class, see SimpleFlowExample:
CascadingUtil.get().getFlowConnector().connect("Example flow", sources, sink, pipe).complete();
By default CascadingUtil will
- print and format counters for each step
- retrieve and log map or reduce task errors if the job fails
- extend Cascading's job naming scheme with improved naming for some taps which use UUID identifiers.
See FlowWithCustomCascadingUtil to see examples of how CascadingUtil can be extended to include custom default properties. Subclasses can easily:
- set default properties in the job Configuration
- add serialization classes
- add serialization tokens
- add flow step strategies
FunctionStats, FilterStats, BufferStats, AggregatorStats
FunctionStats, FilterStats, BufferStats, and AggregatorStats are wrapper classes which make it easier to debug a complex flow with many Filters / Functions / etc. These wrapper classes add counters logging the number of tuples a given operation inputs or outputs. See TestAggregatorStats, TestFilterStats, TestFunctionStats, and TestBufferStats to see usage examples.
You can either build cascading_ext from source as described below, or pull the latest jar from the Liveramp Maven repository:
<repository>
<id>repository.liveramp.com</id>
<name>liveramp-repositories</name>
<url>http://repository.liveramp.com/artifactory/liveramp-repositories</url>
</repository>
Version 0.1 is built off of Cloudera Hadoop 3, (CDH3u3). The current snapshot version (0.2) is built against CDH4.1.2. Both are available via Maven:
<dependency>
<groupId>com.liveramp</groupId>
<artifactId>cascading_ext</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>com.liveramp</groupId>
<artifactId>cascading_ext</artifactId>
<version>0.2-SNAPSHOT</version>
</dependency>
To build cascading_ext.jar from source,
> mvn package
will generate build/cascading_ext.jar. To run the test suite locally,
> mvn test
See usage instructions here for running Cascading with Apache Hadoop. Everything should work fine if cascading_ext.jar and all third-party jars in lib/ are in your jobjar.
To try out any of the code in the com.liveramp.cascading_ext.example package in production, a jobjar task for cascading_ext itself is available:
> mvn assembly:single
Bug reports or feature requests are welcome: https://github.com/liveramp/cascading_ext/issues
Changes you'd like us to merge in? We love pull requests.
Most of the code here has been moved from our internal repositories so much of the original authorship has been lost in the git history. Contributors include:
- Andre Rodriguez
- Ben Pastel
- Ben Podgursky
- Bryan Duxbury
- Chris Mullins
- Diane Yap
- Eddie Siegel
- Emily Leathers
- Nathan Marz
- Piotr Kozikowski
- Porter Westling
- Sean Carr
- Takashi Yonebayashi
- Thomas Kielbus
Copyright 2013 LiveRamp
Licensed under the Apache License, Version 2.0