diff --git a/flink-kudu.iml b/flink-kudu.iml
index c903810..05a360a 100644
--- a/flink-kudu.iml
+++ b/flink-kudu.iml
@@ -28,6 +28,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 4ab94f1..0d7bbbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,11 @@
junit
4.12
+
+ java.es.accenture.flink
+ flink-kudu
+ 1.0-SNAPSHOT
+
diff --git a/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java b/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java
index d6169f9..3e0ad3d 100644
--- a/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java
+++ b/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java
@@ -1,6 +1,7 @@
package es.accenture.flink.Job;
import es.accenture.flink.Sink.KuduOutputFormat;
+import es.accenture.flink.Sources.KuduInputBuilder;
import es.accenture.flink.Sources.KuduInputFormat;
import es.accenture.flink.Utils.RowSerializable;
import es.accenture.flink.Utils.Utils;
@@ -32,15 +33,15 @@ public static void main(String[] args) throws Exception {
}
// Params of program
- String tableSourceName = args[0];
- String tableSinkName = args[1];
+ String TABLE_SOURCE = args[0];
+ String TABLE_SINK = args[1];
String mode = args[2];
- String host = args[3];
+ String KUDU_MASTER = args[3];
System.out.println("-----------------------------------------------");
- System.out.println("1. Read data from a Kudu DB (" + tableSourceName + ").\n" +
+ System.out.println("1. Read data from a Kudu DB (" + TABLE_SOURCE + ").\n" +
"2. Change field 'value' to uppercase.\n" +
- "3. Write back in a new Kudu DB (" + tableSinkName + ").");
+ "3. Write back in a new Kudu DB (" + TABLE_SINK + ").");
System.out.println("-----------------------------------------------\n");
String [] columnNames = new String[2];
@@ -61,19 +62,12 @@ public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
- KuduInputFormat KuduInputTest = new KuduInputFormat(tableSourceName, host);
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet input = KuduInputBuilder.build(TABLE_SOURCE, KUDU_MASTER).map(new MyMapFunction());
- TypeInformation typeInformation = TypeInformation.of(RowSerializable.class);
+ input.output(new KuduOutputFormat(KUDU_MASTER, TABLE_SINK, columnNames, MODE));
- DataSet source = env.createInput(KuduInputTest, typeInformation);
-
- DataSet input = source.map(new MyMapFunction());
-
- input.output(new KuduOutputFormat(host, tableSinkName, columnNames, MODE));
-
- env.execute();
+ KuduInputBuilder.env.execute();
long endTime = System.currentTimeMillis();
diff --git a/src/main/java/es/accenture/flink/Job/JobSource.java b/src/main/java/es/accenture/flink/Job/JobSource.java
index 686d1ba..3f08016 100644
--- a/src/main/java/es/accenture/flink/Job/JobSource.java
+++ b/src/main/java/es/accenture/flink/Job/JobSource.java
@@ -1,5 +1,6 @@
package es.accenture.flink.Job;
+import es.accenture.flink.Sources.KuduInputBuilder;
import es.accenture.flink.Sources.KuduInputFormat;
import es.accenture.flink.Utils.RowSerializable;
import org.apache.flink.api.common.functions.MapFunction;
@@ -39,21 +40,17 @@ public static void main(String[] args) throws Exception {
"3. Write data as text file.");
System.out.println("-----------------------------------------------");
- KuduInputFormat prueba = new KuduInputFormat(TABLE_NAME, KUDU_MASTER);
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- TypeInformation typeInformation = TypeInformation.of(RowSerializable.class);
- DataSet source = env.createInput(prueba, typeInformation);
-
- /*Comment or uncomment to modify dataset using a map function*/
- DataSet sourceaux = source.map(new MyMapFunction());
+ DataSet sourceaux = KuduInputBuilder.build(TABLE_NAME, KUDU_MASTER)
+ .map(new MyMapFunction());
if(!deleteFiles()){
LOG.error("Error deleting files, exiting.");
}
sourceaux.writeAsText("tmp/test");
- env.execute();
+
+ KuduInputBuilder.env.execute();
+
LOG.info("Created files at: " + System.getProperty("user.dir") + "/tmp/test");
}
diff --git a/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java b/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java
new file mode 100644
index 0000000..37eb328
--- /dev/null
+++ b/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java
@@ -0,0 +1,21 @@
+package es.accenture.flink.Sources;
+
+import es.accenture.flink.Utils.RowSerializable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+public class KuduInputBuilder {
+ public static ExecutionEnvironment env = null;
+
+ public static DataSet build(String table_name, String master_add){
+ KuduInputFormat prueba = new KuduInputFormat(table_name, master_add);
+
+ env = ExecutionEnvironment.getExecutionEnvironment();
+
+ TypeInformation typeInformation = TypeInformation.of(RowSerializable.class);
+ DataSet source = env.createInput(prueba, typeInformation);
+ return source;
+ }
+
+}
diff --git a/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java b/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java
index 512ab9b..e69a4e9 100644
--- a/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java
+++ b/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java
@@ -9,9 +9,7 @@
import java.util.ArrayList;
import java.util.List;
-/**
- * Created by sergiy on 17/02/17.
- */
+
public class CreateKuduTable {
public static void main(String[] args) {
diff --git a/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java b/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java
index fea4a5a..7559697 100644
--- a/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java
+++ b/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java
@@ -2,9 +2,7 @@
import org.apache.kudu.client.KuduClient;
-/**
- * Created by sergiy on 17/02/17.
- */
+
public class DeleteKuduTable {
public static void main(String[] args) {
diff --git a/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java b/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java
index 27952ce..70ea308 100644
--- a/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java
+++ b/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java
@@ -2,9 +2,7 @@
import org.apache.kudu.client.*;
-/**
- * Created by sergiy on 17/02/17.
- */
+
public class InsertKuduTable {
public static void main(String[] args) {
@@ -25,7 +23,7 @@ private static void insertToKudu (KuduClient client, String tableName){
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
- for (int i = 0; i < 1500000; i++) {
+ for (int i = 0; i < 10000; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, i);
diff --git a/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java b/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java
index df58927..ae838e9 100644
--- a/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java
+++ b/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java
@@ -3,9 +3,7 @@
import es.accenture.flink.Utils.Exceptions.KuduClientException;
import org.apache.kudu.client.KuduException;
-/**
- * Created by sergiy on 16/02/17.
- */
+
public class ReadKuduTable {
public static void main(String[] args) {