From d282a58aae68dfe1cec16aff1ae2a3572670a088 Mon Sep 17 00:00:00 2001 From: AlvaroVadillo Date: Wed, 1 Mar 2017 09:26:20 +0100 Subject: [PATCH] Added class KuduInputBuilder --- flink-kudu.iml | 34 +++++++++++++++++++ pom.xml | 5 +++ .../flink/Job/JobBatchInputOutput.java | 24 +++++-------- .../es/accenture/flink/Job/JobSource.java | 15 ++++---- .../flink/Sources/KuduInputBuilder.java | 21 ++++++++++++ .../flink/Utils/CreateKuduTable.java | 4 +-- .../flink/Utils/DeleteKuduTable.java | 4 +-- .../flink/Utils/InsertKuduTable.java | 6 ++-- .../accenture/flink/Utils/ReadKuduTable.java | 4 +-- 9 files changed, 80 insertions(+), 37 deletions(-) create mode 100644 src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java diff --git a/flink-kudu.iml b/flink-kudu.iml index c903810..05a360a 100644 --- a/flink-kudu.iml +++ b/flink-kudu.iml @@ -28,6 +28,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 4ab94f1..0d7bbbf 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,11 @@ junit 4.12 + + java.es.accenture.flink + flink-kudu + 1.0-SNAPSHOT + diff --git a/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java b/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java index d6169f9..3e0ad3d 100644 --- a/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java +++ b/src/main/java/es/accenture/flink/Job/JobBatchInputOutput.java @@ -1,6 +1,7 @@ package es.accenture.flink.Job; import es.accenture.flink.Sink.KuduOutputFormat; +import es.accenture.flink.Sources.KuduInputBuilder; import es.accenture.flink.Sources.KuduInputFormat; import es.accenture.flink.Utils.RowSerializable; import es.accenture.flink.Utils.Utils; @@ -32,15 +33,15 @@ public static void main(String[] args) throws Exception { } // Params of program - String tableSourceName = args[0]; - String tableSinkName = args[1]; + String TABLE_SOURCE = args[0]; + String TABLE_SINK = args[1]; String mode = args[2]; - String host = args[3]; + String KUDU_MASTER = args[3]; System.out.println("-----------------------------------------------"); - System.out.println("1. Read data from a Kudu DB (" + tableSourceName + ").\n" + + System.out.println("1. Read data from a Kudu DB (" + TABLE_SOURCE + ").\n" + "2. Change field 'value' to uppercase.\n" + - "3. Write back in a new Kudu DB (" + tableSinkName + ")."); + "3. Write back in a new Kudu DB (" + TABLE_SINK + ")."); System.out.println("-----------------------------------------------\n"); String [] columnNames = new String[2]; @@ -61,19 +62,12 @@ public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); - KuduInputFormat KuduInputTest = new KuduInputFormat(tableSourceName, host); - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet input = KuduInputBuilder.build(TABLE_SOURCE, KUDU_MASTER).map(new MyMapFunction()); - TypeInformation typeInformation = TypeInformation.of(RowSerializable.class); + input.output(new KuduOutputFormat(KUDU_MASTER, TABLE_SINK, columnNames, MODE)); - DataSet source = env.createInput(KuduInputTest, typeInformation); - - DataSet input = source.map(new MyMapFunction()); - - input.output(new KuduOutputFormat(host, tableSinkName, columnNames, MODE)); - - env.execute(); + KuduInputBuilder.env.execute(); long endTime = System.currentTimeMillis(); diff --git a/src/main/java/es/accenture/flink/Job/JobSource.java b/src/main/java/es/accenture/flink/Job/JobSource.java index 686d1ba..3f08016 100644 --- a/src/main/java/es/accenture/flink/Job/JobSource.java +++ b/src/main/java/es/accenture/flink/Job/JobSource.java @@ -1,5 +1,6 @@ package es.accenture.flink.Job; +import es.accenture.flink.Sources.KuduInputBuilder; import es.accenture.flink.Sources.KuduInputFormat; import es.accenture.flink.Utils.RowSerializable; import org.apache.flink.api.common.functions.MapFunction; @@ -39,21 +40,17 @@ public static void main(String[] args) throws Exception { "3. Write data as text file."); System.out.println("-----------------------------------------------"); - KuduInputFormat prueba = new KuduInputFormat(TABLE_NAME, KUDU_MASTER); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - TypeInformation typeInformation = TypeInformation.of(RowSerializable.class); - DataSet source = env.createInput(prueba, typeInformation); - - /*Comment or uncomment to modify dataset using a map function*/ - DataSet sourceaux = source.map(new MyMapFunction()); + DataSet sourceaux = KuduInputBuilder.build(TABLE_NAME, KUDU_MASTER) + .map(new MyMapFunction()); if(!deleteFiles()){ LOG.error("Error deleting files, exiting."); } sourceaux.writeAsText("tmp/test"); - env.execute(); + + KuduInputBuilder.env.execute(); + LOG.info("Created files at: " + System.getProperty("user.dir") + "/tmp/test"); } diff --git a/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java b/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java new file mode 100644 index 0000000..37eb328 --- /dev/null +++ b/src/main/java/es/accenture/flink/Sources/KuduInputBuilder.java @@ -0,0 +1,21 @@ +package es.accenture.flink.Sources; + +import es.accenture.flink.Utils.RowSerializable; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +public class KuduInputBuilder { + public static ExecutionEnvironment env = null; + + public static DataSet build(String table_name, String master_add){ + KuduInputFormat prueba = new KuduInputFormat(table_name, master_add); + + env = ExecutionEnvironment.getExecutionEnvironment(); + + TypeInformation typeInformation = TypeInformation.of(RowSerializable.class); + DataSet source = env.createInput(prueba, typeInformation); + return source; + } + +} diff --git a/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java b/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java index 512ab9b..e69a4e9 100644 --- a/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java +++ b/src/main/java/es/accenture/flink/Utils/CreateKuduTable.java @@ -9,9 +9,7 @@ import java.util.ArrayList; import java.util.List; -/** - * Created by sergiy on 17/02/17. - */ + public class CreateKuduTable { public static void main(String[] args) { diff --git a/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java b/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java index fea4a5a..7559697 100644 --- a/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java +++ b/src/main/java/es/accenture/flink/Utils/DeleteKuduTable.java @@ -2,9 +2,7 @@ import org.apache.kudu.client.KuduClient; -/** - * Created by sergiy on 17/02/17. - */ + public class DeleteKuduTable { public static void main(String[] args) { diff --git a/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java b/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java index 27952ce..70ea308 100644 --- a/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java +++ b/src/main/java/es/accenture/flink/Utils/InsertKuduTable.java @@ -2,9 +2,7 @@ import org.apache.kudu.client.*; -/** - * Created by sergiy on 17/02/17. - */ + public class InsertKuduTable { public static void main(String[] args) { @@ -25,7 +23,7 @@ private static void insertToKudu (KuduClient client, String tableName){ KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); - for (int i = 0; i < 1500000; i++) { + for (int i = 0; i < 10000; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addInt(0, i); diff --git a/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java b/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java index df58927..ae838e9 100644 --- a/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java +++ b/src/main/java/es/accenture/flink/Utils/ReadKuduTable.java @@ -3,9 +3,7 @@ import es.accenture.flink.Utils.Exceptions.KuduClientException; import org.apache.kudu.client.KuduException; -/** - * Created by sergiy on 16/02/17. - */ + public class ReadKuduTable { public static void main(String[] args) {