An experimental execution environment to execute a Kettle transformation as a Storm topology.
Kettle Storm is an experimental execution environment to execute a Kettle transformation across a Storm cluster. This decomposes a transformation into a topology and wraps all steps in either a Storm Spout or a Bolt. The topology is then submitted to the cluster and is automatically killed once the transformation has finished processing all data.
Many things are not implemented. I've only tested this for the transformation files included on a small cluster. There are quite a few details left to be implemented. Some of which include:
- Steps that do not emit at least one message for every input. Because Kettle does not have a message id to correlate Storm messages with we cannot guarantee a message has been completely processed until we see a record emited from a given step. Because of this, we also cannot determine which messages are produced for a given input if they are not immediately emitted as part of the same
processRow()
call. As such, we can only guarantee message processing when one input message produces at least once output message. These classification of input steps will not work until that is fixed:- Sampling
- Aggregation
- Sorting
- Filtering
- First-class Spoon support
- Repository-based transformations
- Error handling
- Conditional hops
- Sub-transformations
- Metrics: Kettle timing, throughput, logging
The following commands will execute a transformation using a local in-memory test cluster.
A Kettle transformation can be submitted as a topology using the included KettleStorm command-line application. To invoke it from Maven simply use the maven exec target with the Kettle transformation you wish to execute:
mvn package
mvn exec:java -Dexec.args=src/main/resources/test.ktr -Dkettle-storm-local-mode=true
Extract the release and run:
java -Dkettle-storm-local-mode=true -jar kettle-engine-storm-${version}-assembly.jar path/to/my.ktr
The following instructions are meant to be executed using the artifacts packaged in a release.
To execute a transformation on a Storm cluster running on the same host simply run:
java -jar kettle-engine-storm-${version}-assembly.jar path/to/my.ktr
To execute the transformation to a Nimbus host running remotely include the host and port via the storm.options
System property:
java -Dstorm.options=nimbus.host=my-nimbus,nimbus.thrift.port=6628 -jar kettle-engine-storm-${version}-assembly.jar path/to/my.ktr
If additional options are required they can be provided as System Properties vai the command line in the format: -Dargument=value
.
They are all optional and will be translated into StormExecutionEnvironmentConfig
properties:
kettle-storm-local-mode
: Flag indicating if you wish to execute the transformation as a Storm topology on an in-memory "local cluster" or remotely to an external Storm cluster. Defaults tofalse
.kettle-storm-debug
: Flag indicating you wish to enable debug messaging from Storm for the submitted topology. Defaults tofalse
.kettle-storm-topology-jar
: The path to the jar file to submit with the Storm topology. This is only required if you have created a custom jar with additional classes you wish to make available to the Kettle transformation without having to manually install plugins or configure the environment of each Storm host.
By default, Kettle Storm will submit topologies to a nimbus host running on localhost with the default connection settings included with Storm. If you'd like to use a specific storm.yaml file declare a System property on the command line:
mvn exec:java -Dstorm.conf.file=/path/to/storm.yaml -Dexec.args=src/main/resources/test.ktr
Storm configuration properties can be overriden by specifying them on the command line in the format:
-Dstorm.options=nimbus.host=my-nimbus,nimbus.thrift.port=6628
The Kettle execution engine that can submit topologies can be embedded in a Java application using StormExecutionEngine
and StormExecutionEngineConfig
.
StormExecutionEngine
provides convenience methods for integrating within multithreaded environments:
StormExecutionEngine.isComplete
: Blocks for the provided duration and returnstrue
if the topology has completed successfully.StormExecutionEngine.stop
: Kills the topology running the transformation if it's still execution.
StormExecutionEngineConfig config = new StormExecutionEngineConfig();
config.setTransformationFile("/path/to/my.ktr");
StormExecutionEngine engine = new StormExecutionEngine(config);
engine.init();
engine.execute();
engine.isComplete(10, TimeUnit.MINUTE); // Block for up to 10 minutes while the topology executes.
Execute mvn clean package
to produce the release artifacts. The jars will be stored in target/
.
Multiple artifacts are produced via the mvn package
target:
kettle-engine-storm-0.0.1-SNAPSHOT-assembly.jar
kettle-engine-storm-0.0.1-SNAPSHOT-for-remote-topology.jar
kettle-engine-storm-0.0.1-SNAPSHOT.jar
The -assembly.jar
is used to schedule execution of a transformation and contains all dependencies. The -for-remote-topology.jar
contains code to be submitted to the cluster with the topology and all dependencies. The plain jar is this project's compilation without additional dependencies.
Kettle: http://kettle.pentaho.com
Storm: http://storm-project.net