From 2b06bf815975247f0b3f07c03b7139899ecdab92 Mon Sep 17 00:00:00 2001 From: NLGithubWP Date: Sun, 29 May 2022 23:44:37 +0800 Subject: [PATCH 1/5] Enable pass .dz fileName to dataLoader, writer and Test --- .../cool/core/util/writer/CliDataWriter.java | 2 +- .../nus/cool/core/util/writer/DataWriter.java | 4 ++-- .../cool/core/util/writer/ListDataWriter.java | 2 +- .../core/util/writer/NativeDataWriter.java | 15 ++++++++---- .../java/com/nus/cool/loader/DataLoader.java | 4 ++-- .../java/com/nus/cool/model/CoolLoader.java | 23 ++++++++++++++----- .../nus/cool/core/model/CoolModelTest.java | 5 ++++ 7 files changed, 39 insertions(+), 16 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/CliDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/CliDataWriter.java index ec84774f..e3bc427b 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/CliDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/CliDataWriter.java @@ -9,7 +9,7 @@ public class CliDataWriter implements DataWriter { @Override - public boolean Initialize() throws IOException { + public boolean Initialize(String... fileName) throws IOException { System.out.println("------Display data set------"); return true; } diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/DataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/DataWriter.java index ecb04701..48e56d40 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/DataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/DataWriter.java @@ -7,11 +7,11 @@ public interface DataWriter extends Closeable { /** * Initialize the writer - * + * @Params fileName: file name of dz * @return successful initialization * @throws IOException */ - boolean Initialize() throws IOException; + boolean Initialize(String... fileName) throws IOException; /** * Add a tuple to the data set diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/ListDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/ListDataWriter.java index e3a95f12..38aa7ac6 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/ListDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/ListDataWriter.java @@ -40,7 +40,7 @@ public ListDataWriter(List out){ } @Override - public boolean Initialize() throws IOException { + public boolean Initialize(String... fileName) throws IOException { return true; } diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java index de560dcc..a4cfe423 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java @@ -66,7 +66,7 @@ public class NativeDataWriter implements DataWriter { private DataOutputStream out = null; @Override - public boolean Initialize() throws IOException { + public boolean Initialize(String... fileName) throws IOException { if (initalized) return true; this.userKeyIndex = tableSchema.getUserKeyField(); // when there is no user key, using any field for the additional condition on chunk switch is ok. @@ -74,7 +74,7 @@ public boolean Initialize() throws IOException { // cublet // create metaChunk instance, default offset to be 0, update offset when write later. this.metaChunk = MetaChunkWS.newMetaChunkWS(this.tableSchema, 0); - this.out = newCublet(); + this.out = newCublet(fileName); // chunk this.tupleCount = 0; this.offset = 0; @@ -137,8 +137,15 @@ private void finishCublet() throws IOException { * @return DataOutputStream * @throws IOException */ - private DataOutputStream newCublet() throws IOException { - String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz"; + private DataOutputStream newCublet(String... fileNameParams) throws IOException { + + // if file name is provided, then use it. + String fileName; + if (fileNameParams.length == 1){ + fileName = fileNameParams[0]; + }else{ + fileName = Long.toHexString(System.currentTimeMillis()) + ".dz"; + } System.out.println("[*] A new cublet "+ fileName + " is created!"); File cublet = new File(outputDir, fileName); DataOutputStream out = new DataOutputStream(new FileOutputStream(cublet)); diff --git a/cool-core/src/main/java/com/nus/cool/loader/DataLoader.java b/cool-core/src/main/java/com/nus/cool/loader/DataLoader.java index 6403e5e3..538685a5 100644 --- a/cool-core/src/main/java/com/nus/cool/loader/DataLoader.java +++ b/cool-core/src/main/java/com/nus/cool/loader/DataLoader.java @@ -70,9 +70,9 @@ public static Builder builder(String dataSourceName, /** * load data into cool native format */ - public void load() throws IOException { + public void load(String... fileName) throws IOException { // write dataChunk first - writer.Initialize(); + writer.Initialize(fileName); // read the data while (reader.hasNext()) { writer.Add(parser.parse(reader.next())); diff --git a/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java b/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java index e1f887e7..d7aec7e2 100644 --- a/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java +++ b/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java @@ -40,12 +40,13 @@ public CoolLoader(DataLoaderConfig config){ /** * - * @param dataSourceName output cube name. Need to be specified when loading from the repository - * @param schemaFileName path to the table.yaml - * @param dataFileName path to the data.csv - * @param cubeRepo the name of the output cube repository + * @param dataSourceName output cube name. Need to be specified when loading from the repository, eg, sogamo + * @param schemaFileName path to the table.yaml, eg. sogamo/table.yaml + * @param dataFileName path to the data.csv, eg. sogamo/test.csv + * @param cubeRepo the name of the output cube repository. eg. datasetSource + * @return dz file path, table.yaml path. */ - public synchronized void load(String dataSourceName, String schemaFileName, String dataFileName, String cubeRepo) throws IOException{ + public synchronized String[] load(String dataSourceName, String schemaFileName, String dataFileName, String cubeRepo, String... fileName) throws IOException{ // check the existence of the data repository File root = new File(cubeRepo); @@ -85,9 +86,19 @@ public synchronized void load(String dataSourceName, String schemaFileName, Stri System.out.println("[*] New version " + outputCubeVersionDir.getName() + " is created!"); } DataLoader loader = DataLoader.builder(dataSourceName, schema, dataFile, outputCubeVersionDir, this.loaderConfig).build(); - loader.load(); + loader.load(fileName); // copy the table.yaml to new version folder Files.copy(schemaFile, new File(outputCubeVersionDir, "table.yaml")); + + String dzPath; + String tablePath; + if (fileName.length == 1){ + dzPath = cubeRepo+"/"+ dataSourceName+"/"+outputCubeVersionDir.getName()+"/"+fileName[0]; + tablePath = cubeRepo+"/"+ dataSourceName+"/"+outputCubeVersionDir.getName()+"/table.yaml"; + return new String[]{dzPath, tablePath}; + }else{ + return new String[]{}; + } } diff --git a/cool-core/src/test/java/com/nus/cool/core/model/CoolModelTest.java b/cool-core/src/test/java/com/nus/cool/core/model/CoolModelTest.java index 8d55b99f..7b532721 100644 --- a/cool-core/src/test/java/com/nus/cool/core/model/CoolModelTest.java +++ b/cool-core/src/test/java/com/nus/cool/core/model/CoolModelTest.java @@ -35,6 +35,11 @@ public static void CsvLoaderTest() throws IOException { DataLoaderConfig config = new CsvDataLoaderConfig(); CoolLoader loader = new CoolLoader(config); + + // load with dz name + String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz"; + loader.load(cube, schemaFileName, dataFileName, cubeRepo, fileName); + // load without dz name loader.load(cube, schemaFileName, dataFileName, cubeRepo); cube = "tpc-h-10g"; From d579b21a9f2073f99dfa59298c49add46ba2ce0b Mon Sep 17 00:00:00 2001 From: NLGithubWP Date: Mon, 30 May 2022 00:15:34 +0800 Subject: [PATCH 2/5] Loader Return only data-type related dz path and table.yaml path --- cool-core/src/main/java/com/nus/cool/model/CoolLoader.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java b/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java index d7aec7e2..4b9e6e55 100644 --- a/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java +++ b/cool-core/src/main/java/com/nus/cool/model/CoolLoader.java @@ -93,8 +93,8 @@ public synchronized String[] load(String dataSourceName, String schemaFileName, String dzPath; String tablePath; if (fileName.length == 1){ - dzPath = cubeRepo+"/"+ dataSourceName+"/"+outputCubeVersionDir.getName()+"/"+fileName[0]; - tablePath = cubeRepo+"/"+ dataSourceName+"/"+outputCubeVersionDir.getName()+"/table.yaml"; + dzPath = dataSourceName+"/"+outputCubeVersionDir.getName()+"/"+fileName[0]; + tablePath = dataSourceName+"/"+outputCubeVersionDir.getName()+"/table.yaml"; return new String[]{dzPath, tablePath}; }else{ return new String[]{}; From 50847369edde5315ce67aef79fa61c4694a207f8 Mon Sep 17 00:00:00 2001 From: NLGithubWP Date: Mon, 30 May 2022 00:16:32 +0800 Subject: [PATCH 3/5] Add API to upload data and table into HDFS --- .../java/com/nus/cool/loader/LoadQuery.java | 4 ++ .../queryserver/handler/BrokerController.java | 46 ++++++++++++++++++- .../queryserver/model/QueryServerModel.java | 11 ++++- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/loader/LoadQuery.java b/cool-core/src/main/java/com/nus/cool/loader/LoadQuery.java index 6b67f7a7..7b395522 100644 --- a/cool-core/src/main/java/com/nus/cool/loader/LoadQuery.java +++ b/cool-core/src/main/java/com/nus/cool/loader/LoadQuery.java @@ -33,6 +33,10 @@ public class LoadQuery { private String outputPath; private String configPath; + // after writing dz, table file, records the path inside the server. + private String dzFilePath; + private String TableFilePath; + public boolean isValid() throws IOException { boolean f = true; if (dataFileType == "AVRO") f = isExist(configPath); diff --git a/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java b/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java index 78efb6b2..ff8fc4da 100644 --- a/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java +++ b/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java @@ -1,14 +1,24 @@ package com.nus.cool.queryserver.handler; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.nus.cool.core.iceberg.query.IcebergQuery; +import com.nus.cool.loader.LoadQuery; import com.nus.cool.queryserver.model.QueryInfo; +import com.nus.cool.queryserver.model.QueryServerModel; import com.nus.cool.queryserver.singleton.HDFSConnection; import com.nus.cool.queryserver.singleton.QueryIndex; import com.nus.cool.queryserver.singleton.TaskQueue; +import com.nus.cool.queryserver.utils.Util; import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import com.nus.cool.queryserver.model.Parameter; +import org.springframework.web.multipart.MultipartFile; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; import java.util.List; import java.util.Map; @@ -16,8 +26,40 @@ @RequestMapping("/broker") public class BrokerController { - @PostMapping(value = "/load-dfs") - public ResponseEntity loadToDfs(){ + /** + * Assume the csv file already at the server side. this + * Load the local CSV file and upload it to hdfs + * eg. input: '{"dataFileType": "CSV", "cubeName": "sogamo", "schemaPath": "sogamo/table.yaml", + * "dimPath": "sogamo/dim.csv", "dataPath": "sogamo/test.csv", "outputPath": "datasetSource"}' + * @param req request parsed from json. + * @return response + * @throws URISyntaxException exception + * @throws IOException exception + */ + @PostMapping(value = "/load-data-to-hdfs", + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity loadDataToDfs(@RequestBody LoadQuery req) throws URISyntaxException, IOException { + + Util.getTimeClock(); + + // file name of the .dz + String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz"; + + QueryServerModel.loadCube(req, fileName); + + // 1. connect to hdfs, get data Source Name, cohort or iceberg + HDFSConnection fs = HDFSConnection.getInstance(); + + String localPath1 = req.getOutputPath() + "/" + req.getDzFilePath();; + String dfsPath1 = "/cube/" + req.getDzFilePath(); + fs.uploadToDfs(localPath1, dfsPath1); + + String localPath2 = req.getOutputPath() + "/" + req.getTableFilePath(); + String dfsPath2 = "/cube/" + req.getTableFilePath(); + fs.uploadToDfs(localPath2, dfsPath2); + + System.out.println("[*] Data and file loaded"); return null; } diff --git a/cool-queryserver/src/main/java/com/nus/cool/queryserver/model/QueryServerModel.java b/cool-queryserver/src/main/java/com/nus/cool/queryserver/model/QueryServerModel.java index 948ecc38..54aa022a 100644 --- a/cool-queryserver/src/main/java/com/nus/cool/queryserver/model/QueryServerModel.java +++ b/cool-queryserver/src/main/java/com/nus/cool/queryserver/model/QueryServerModel.java @@ -52,9 +52,10 @@ public class QueryServerModel { /** * Load a new cube * @param q query instance + * @param fileName optional, dz name. * @return Response */ - public static ResponseEntity loadCube(LoadQuery q) { + public static ResponseEntity loadCube(LoadQuery q, String... fileName) { try { q.isValid(); String fileType = q.getDataFileType().toUpperCase(); @@ -77,7 +78,13 @@ public static ResponseEntity loadCube(LoadQuery q) { } System.out.println(config.getClass().getName()); CoolLoader coolLoader = new CoolLoader(config); - coolLoader.load(q.getCubeName(),q.getSchemaPath(), q.getDataPath(),q.getOutputPath()); + String[] dzTableNames = coolLoader.load(q.getCubeName(),q.getSchemaPath(), q.getDataPath(),q.getOutputPath(), fileName); + + if (dzTableNames.length == 2){ + q.setDzFilePath(dzTableNames[0]); + q.setTableFilePath(dzTableNames[1]); + } + String resStr = "Cube " + q.getCubeName() + " has already been loaded."; return ResponseEntity.ok().headers(HttpHeaders.EMPTY).body(resStr); } catch (Exception e){ From 0a55ff03aa77c370e47b8a6da0f918bf8d66174b Mon Sep 17 00:00:00 2001 From: NLGithubWP Date: Mon, 30 May 2022 00:16:53 +0800 Subject: [PATCH 4/5] Add API to accept query file, store locally, and table into HDFS --- .../queryserver/handler/BrokerController.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java b/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java index ff8fc4da..c0bd7fdd 100644 --- a/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java +++ b/cool-queryserver/src/main/java/com/nus/cool/queryserver/handler/BrokerController.java @@ -63,6 +63,40 @@ public ResponseEntity loadDataToDfs(@RequestBody LoadQuery req) throws U return null; } + /** + * Receive query file from client, and store to local as temp_query.json, and then upload to hdfs. + * @param queryFile query file + * @return response + * @throws URISyntaxException exception + * @throws IOException exception + */ + @PostMapping(value = "/load-query-to-hdfs", + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + public ResponseEntity loadQueryToDfs(@RequestParam("queryFile") MultipartFile queryFile) throws URISyntaxException, IOException { + + // 1. connect to hdfs, get data Source Name, cohort or iceberg + HDFSConnection fs = HDFSConnection.getInstance(); + + Util.getTimeClock(); + System.out.println("[*] This query is for iceberg query: " + queryFile); + String queryContent = new String(queryFile.getBytes()); + ObjectMapper mapper = new ObjectMapper(); + IcebergQuery q = mapper.readValue(queryContent, IcebergQuery.class); + + try { + // Writing to a file + mapper.writeValue(new File("temp_query.json"), q ); + String localPath3 = "temp_query.json"; + String dfsPath3 = "/tmp/1/query.json"; + fs.uploadToDfs(localPath3, dfsPath3); + + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + @GetMapping(value = "/execute") public ResponseEntity handler(@RequestParam Map params){ System.out.println(params); From 1c90fd5724106efd807df75b66eddd7336000ba6 Mon Sep 17 00:00:00 2001 From: NLGithubWP Date: Mon, 30 May 2022 00:29:33 +0800 Subject: [PATCH 5/5] Add Client package to help user to send request to broker. --- .../java/com/nus/cool/clientservice/Main.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 cool-queryserver/src/main/java/com/nus/cool/clientservice/Main.java diff --git a/cool-queryserver/src/main/java/com/nus/cool/clientservice/Main.java b/cool-queryserver/src/main/java/com/nus/cool/clientservice/Main.java new file mode 100644 index 00000000..c5d56dc2 --- /dev/null +++ b/cool-queryserver/src/main/java/com/nus/cool/clientservice/Main.java @@ -0,0 +1,52 @@ +package com.nus.cool.clientservice; + +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URL; +import java.util.Properties; + +public class Main { + /** + * Client package to send request to server. it will get broker's ip and send related execute request. + * @param args query type, + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + if (args.length != 1) { + System.err.println("Pass in query id (Example: q1)"); + return; + } + + CloseableHttpClient client = HttpClients.createDefault(); + String params; + // todo, add more apis. + if (args[0].equals("q1")) { + params = "queryId=1&type=cohort"; + } else if (args[0].equals("q2")) { + params = "queryId=2&type=iceberg"; + } else { + System.err.println("Unrecognized query id"); + return; + } + String ip = ""; + try (InputStream input = new FileInputStream("conf/app.properties")) { + Properties prop = new Properties(); + prop.load(input); + ip = prop.getProperty("server.host"); + } catch (IOException ex) { + ex.printStackTrace(); + } + String request = "http://" + ip + ":9013/broker/execute?" + params; + URL url = new URL(request); + URI uri = new URI(url.getProtocol(), null, url.getHost(), url.getPort(), url.getPath(), url.getQuery(), null); + HttpGet get = new HttpGet(uri); + client.execute(get); + } +} \ No newline at end of file