Java library to integrate Apache Kudu and Apache Flink. Main goal is to be able to read/write data from/to Kudu using the DataSet and DataStream Flink's APIs.
Data flows patterns:
- Batch
- Kudu -> DataSet<RowSerializable> -> Kudu
- Kudu -> DataSet<RowSerializable> -> other source
- Other source -> DataSet<RowSerializable> -> other source
- Stream
- Other source -> DataStream <RowSerializable> -> Kudu
/* Batch mode - DataSet API -*/
DataSet<RowSerializable> input = KuduInputBuilder.build(TABLE_SOURCE, KUDU_MASTER)
// DataSet operations --> .map(), .filter(), reduce(), etc.
//result = input.map(...)
result.output(new KuduOutputFormat(KUDU_MASTER, TABLE_SINK, columnNames, KuduOutputFormat.CREATE));
KuduInputBuilder.env.execute();
/* Streaming mode - DataSream API - */
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("data1 data2 data3");
DataStream<RowSerializable> stream2 = stream.map(new MapToRowSerializable());
stream2.addSink(new KuduSink(KUDU_MASTER, DEST_TABLE, columnNames));
env.execute();
- Flink and Kudu compatible OS
- Java (version 8)
- Scala (version 2.12.1)
- Apache Flink (version 1.1.3)
- Apache Kudu (version 1.2.0)
- Apache Kafka (version 0.10.1.1) (only for streaming example)
- Maven (version 3.3.9) (only for building)
git clone https://github.com/rubencasado/Flink-Kudu.git
cd Flink-Kudu
mvn clean install -DskipTests
mvn package -DskipTests
Generated JAR will be located at "Flink-Kudu / target / flink-kudu-1.0-SNAPSHOT.jar"
You can also add this connector as a maven dependency
<dependency>
<groupId>es.accenture</groupId>
<artifactId>flink-kudu-connector</artifactId>
<version>1.0</version>
</dependency>
Start Flink Job Manager
<Flink-installation-folder>/bin/start-local.sh
Submit the job
<Flink-instalation-folder>/bin/flink run -c <Job-package-path> target/flink-kudu-1.0-SNAPSHOT.jar param1 param2 ...
Example:
/opt/flink-1.1.3/bin/flink run -c es.accenture.flink.Job.JobBatchSink target/flink-kudu-1.0-SNAPSHOT.jar mytable CREATE localhost
Included examples
-
JobBatchSink (es.accenture.flink.Job.JobBatchSink): Saves data from a DataSet into a Kudu table(DataSet API, Batch Mode). Input parameters: table_name, table_mode (CREATE, APPEND or OVERRIDE) and host.
-
JobSource (es.accenture.flink.Job.JobSource): Reads data from a Kudu table and prints the information (DataSet API, Batch Mode).
-
JobBatchInputOutput (es.accenture.flink.Job.JobBatchInputOutput): Reads data from a Kudu table, executes some basic operations using DataSet API and save the result into a Kudu table(DataSet API, Batch Mode).
-
JobStreamingInputOutput (es.accenture.flink.Job.JobStreamingInputOutput): Reads data from Kafka, executes some basic operations using DataStream API and saves results into a Kudu table (DataStream API, Streaming Mode).
-
JobStreamingSink (es.accenture.flink.Job.JobStreamingSink): Saves data from a DataStream into a Kudu table (DataStream API, Streaming Mode). Input parameters: table_name and host.