Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new api to upload csv, table.yaml, and query into HDFS #48

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class CliDataWriter implements DataWriter {

@Override
public boolean Initialize() throws IOException {
public boolean Initialize(String... fileName) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use String[] instead of the String...

System.out.println("------Display data set------");
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ public interface DataWriter extends Closeable {

/**
* Initialize the writer
*
* @Params fileName: file name of dz
* @return successful initialization
* @throws IOException
*/
boolean Initialize() throws IOException;
boolean Initialize(String... fileName) throws IOException;

/**
* Add a tuple to the data set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ListDataWriter(List<String> out){
}

@Override
public boolean Initialize() throws IOException {
public boolean Initialize(String... fileName) throws IOException {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ public class NativeDataWriter implements DataWriter {
private DataOutputStream out = null;

@Override
public boolean Initialize() throws IOException {
public boolean Initialize(String... fileName) throws IOException {
if (initalized) return true;
this.userKeyIndex = tableSchema.getUserKeyField();
// when there is no user key, using any field for the additional condition on chunk switch is ok.
if (this.userKeyIndex == -1) this.userKeyIndex = 0;
// cublet
// create metaChunk instance, default offset to be 0, update offset when write later.
this.metaChunk = MetaChunkWS.newMetaChunkWS(this.tableSchema, 0);
this.out = newCublet();
this.out = newCublet(fileName);
// chunk
this.tupleCount = 0;
this.offset = 0;
Expand Down Expand Up @@ -137,8 +137,15 @@ private void finishCublet() throws IOException {
* @return DataOutputStream
* @throws IOException
*/
private DataOutputStream newCublet() throws IOException {
String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz";
private DataOutputStream newCublet(String... fileNameParams) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to give a file name?


// if file name is provided, then use it.
String fileName;
if (fileNameParams.length == 1){
fileName = fileNameParams[0];
}else{
fileName = Long.toHexString(System.currentTimeMillis()) + ".dz";
}
System.out.println("[*] A new cublet "+ fileName + " is created!");
File cublet = new File(outputDir, fileName);
DataOutputStream out = new DataOutputStream(new FileOutputStream(cublet));
Expand Down
4 changes: 2 additions & 2 deletions cool-core/src/main/java/com/nus/cool/loader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public static Builder builder(String dataSourceName,
/**
* load data into cool native format
*/
public void load() throws IOException {
public void load(String... fileName) throws IOException {
// write dataChunk first
writer.Initialize();
writer.Initialize(fileName);
// read the data
while (reader.hasNext()) {
writer.Add(parser.parse(reader.next()));
Expand Down
4 changes: 4 additions & 0 deletions cool-core/src/main/java/com/nus/cool/loader/LoadQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class LoadQuery {
private String outputPath;
private String configPath;

// after writing dz, table file, records the path inside the server.
private String dzFilePath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the dzFilePath?

private String TableFilePath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableFilePath should be the same as the schemaPath.


public boolean isValid() throws IOException {
boolean f = true;
if (dataFileType == "AVRO") f = isExist(configPath);
Expand Down
23 changes: 17 additions & 6 deletions cool-core/src/main/java/com/nus/cool/model/CoolLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public CoolLoader(DataLoaderConfig config){

/**
*
* @param dataSourceName output cube name. Need to be specified when loading from the repository
* @param schemaFileName path to the table.yaml
* @param dataFileName path to the data.csv
* @param cubeRepo the name of the output cube repository
* @param dataSourceName output cube name. Need to be specified when loading from the repository, eg, sogamo
* @param schemaFileName path to the table.yaml, eg. sogamo/table.yaml
* @param dataFileName path to the data.csv, eg. sogamo/test.csv
* @param cubeRepo the name of the output cube repository. eg. datasetSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs a @param for fileName

* @return dz file path, table.yaml path.
*/
public synchronized void load(String dataSourceName, String schemaFileName, String dataFileName, String cubeRepo) throws IOException{
public synchronized String[] load(String dataSourceName, String schemaFileName, String dataFileName, String cubeRepo, String... fileName) throws IOException{

// check the existence of the data repository
File root = new File(cubeRepo);
Expand Down Expand Up @@ -85,9 +86,19 @@ public synchronized void load(String dataSourceName, String schemaFileName, Stri
System.out.println("[*] New version " + outputCubeVersionDir.getName() + " is created!");
}
DataLoader loader = DataLoader.builder(dataSourceName, schema, dataFile, outputCubeVersionDir, this.loaderConfig).build();
loader.load();
loader.load(fileName);
// copy the table.yaml to new version folder
Files.copy(schemaFile, new File(outputCubeVersionDir, "table.yaml"));

String dzPath;
String tablePath;
if (fileName.length == 1){
dzPath = dataSourceName+"/"+outputCubeVersionDir.getName()+"/"+fileName[0];
tablePath = dataSourceName+"/"+outputCubeVersionDir.getName()+"/table.yaml";
return new String[]{dzPath, tablePath};
}else{
return new String[]{};
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public static void CsvLoaderTest() throws IOException {

DataLoaderConfig config = new CsvDataLoaderConfig();
CoolLoader loader = new CoolLoader(config);

// load with dz name
String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz";
loader.load(cube, schemaFileName, dataFileName, cubeRepo, fileName);
// load without dz name
loader.load(cube, schemaFileName, dataFileName, cubeRepo);

cube = "tpc-h-10g";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.nus.cool.clientservice;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.util.Properties;

public class Main {
/**
* Client package to send request to server. it will get broker's ip and send related execute request.
* @param args query type,
* @throws Exception Exception
*/
public static void main(String[] args) throws Exception {

if (args.length != 1) {
System.err.println("Pass in query id (Example: q1)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard coding.

return;
}

CloseableHttpClient client = HttpClients.createDefault();
String params;
// todo, add more apis.
if (args[0].equals("q1")) {
params = "queryId=1&type=cohort";
} else if (args[0].equals("q2")) {
params = "queryId=2&type=iceberg";
} else {
System.err.println("Unrecognized query id");
return;
}
String ip = "";
try (InputStream input = new FileInputStream("conf/app.properties")) {
Properties prop = new Properties();
prop.load(input);
ip = prop.getProperty("server.host");
} catch (IOException ex) {
ex.printStackTrace();
}
String request = "http://" + ip + ":9013/broker/execute?" + params;
URL url = new URL(request);
URI uri = new URI(url.getProtocol(), null, url.getHost(), url.getPort(), url.getPath(), url.getQuery(), null);
HttpGet get = new HttpGet(uri);
client.execute(get);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,99 @@
package com.nus.cool.queryserver.handler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nus.cool.core.iceberg.query.IcebergQuery;
import com.nus.cool.loader.LoadQuery;
import com.nus.cool.queryserver.model.QueryInfo;
import com.nus.cool.queryserver.model.QueryServerModel;
import com.nus.cool.queryserver.singleton.HDFSConnection;
import com.nus.cool.queryserver.singleton.QueryIndex;
import com.nus.cool.queryserver.singleton.TaskQueue;
import com.nus.cool.queryserver.utils.Util;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import com.nus.cool.queryserver.model.Parameter;
import org.springframework.web.multipart.MultipartFile;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/broker")
public class BrokerController {

@PostMapping(value = "/load-dfs")
public ResponseEntity<String> loadToDfs(){
/**
* Assume the csv file already at the server side. this
* Load the local CSV file and upload it to hdfs
* eg. input: '{"dataFileType": "CSV", "cubeName": "sogamo", "schemaPath": "sogamo/table.yaml",
* "dimPath": "sogamo/dim.csv", "dataPath": "sogamo/test.csv", "outputPath": "datasetSource"}'
* @param req request parsed from json.
* @return response
* @throws URISyntaxException exception
* @throws IOException exception
*/
@PostMapping(value = "/load-data-to-hdfs",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> loadDataToDfs(@RequestBody LoadQuery req) throws URISyntaxException, IOException {

Util.getTimeClock();

// file name of the .dz
String fileName = Long.toHexString(System.currentTimeMillis()) + ".dz";

QueryServerModel.loadCube(req, fileName);

// 1. connect to hdfs, get data Source Name, cohort or iceberg
HDFSConnection fs = HDFSConnection.getInstance();

String localPath1 = req.getOutputPath() + "/" + req.getDzFilePath();;
String dfsPath1 = "/cube/" + req.getDzFilePath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cube path is for ?

fs.uploadToDfs(localPath1, dfsPath1);

String localPath2 = req.getOutputPath() + "/" + req.getTableFilePath();
String dfsPath2 = "/cube/" + req.getTableFilePath();
fs.uploadToDfs(localPath2, dfsPath2);

System.out.println("[*] Data and file loaded");
return null;
}

/**
* Receive query file from client, and store to local as temp_query.json, and then upload to hdfs.
* @param queryFile query file
* @return response
* @throws URISyntaxException exception
* @throws IOException exception
*/
@PostMapping(value = "/load-query-to-hdfs",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public ResponseEntity<String> loadQueryToDfs(@RequestParam("queryFile") MultipartFile queryFile) throws URISyntaxException, IOException {

// 1. connect to hdfs, get data Source Name, cohort or iceberg
HDFSConnection fs = HDFSConnection.getInstance();

Util.getTimeClock();
System.out.println("[*] This query is for iceberg query: " + queryFile);
String queryContent = new String(queryFile.getBytes());
ObjectMapper mapper = new ObjectMapper();
IcebergQuery q = mapper.readValue(queryContent, IcebergQuery.class);

try {
// Writing to a file
mapper.writeValue(new File("temp_query.json"), q );
String localPath3 = "temp_query.json";
String dfsPath3 = "/tmp/1/query.json";
fs.uploadToDfs(localPath3, dfsPath3);

} catch (IOException e) {
e.printStackTrace();
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public class QueryServerModel {
/**
* Load a new cube
* @param q query instance
* @param fileName optional, dz name.
* @return Response
*/
public static ResponseEntity<String> loadCube(LoadQuery q) {
public static ResponseEntity<String> loadCube(LoadQuery q, String... fileName) {
try {
q.isValid();
String fileType = q.getDataFileType().toUpperCase();
Expand All @@ -77,7 +78,13 @@ public static ResponseEntity<String> loadCube(LoadQuery q) {
}
System.out.println(config.getClass().getName());
CoolLoader coolLoader = new CoolLoader(config);
coolLoader.load(q.getCubeName(),q.getSchemaPath(), q.getDataPath(),q.getOutputPath());
String[] dzTableNames = coolLoader.load(q.getCubeName(),q.getSchemaPath(), q.getDataPath(),q.getOutputPath(), fileName);

if (dzTableNames.length == 2){
q.setDzFilePath(dzTableNames[0]);
q.setTableFilePath(dzTableNames[1]);
}

String resStr = "Cube " + q.getCubeName() + " has already been loaded.";
return ResponseEntity.ok().headers(HttpHeaders.EMPTY).body(resStr);
} catch (Exception e){
Expand Down