Skip to content

Commit

Permalink
Adjusted HDFS implementation for handling undirected graphs.
Browse files Browse the repository at this point in the history
Addressed questioned parts in the PR[#252].
  • Loading branch information
muthumala19 committed Dec 14, 2024
1 parent db4c398 commit 2ebdd1d
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 89 deletions.
166 changes: 96 additions & 70 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_server_IP, std::thread &input_stream_handler_thread,
static void addStreamHDFSCommand(std::string masterIP,int connFd, std::string &hdfsServerIp, std::thread &inputStreamHandlerThread,
int numberOfPartitions, SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p);
static void process_dataset_command(int connFd, bool *loop_exit_p);
Expand Down Expand Up @@ -142,7 +142,7 @@ void *frontendservicesesion(void *dummyPt) {
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);

//Initiate HDFS parameters
std::string hdfs_server_IP;
std::string hdfsServerIp;
hdfsFS fileSystem;

vector<DataPublisher *> workerClients;
Expand Down Expand Up @@ -198,11 +198,7 @@ void *frontendservicesesion(void *dummyPt) {
add_stream_kafka_command(connFd, kafka_server_IP, configs, kstream, input_stream_handler, workerClients,
numberOfPartitions, sqlite, &loop_exit);
} else if (line.compare(ADD_STREAM_HDFS) == 0) {
if (!workerClientsInitialized) {
workerClients = getWorkerClients(sqlite);
workerClientsInitialized = true;
}
add_stream_hdfs_command(masterIP,connFd, hdfs_server_IP, input_stream_handler, numberOfPartitions,
addStreamHDFSCommand(masterIP,connFd, hdfsServerIp, input_stream_handler, numberOfPartitions,
sqlite, &loop_exit);
} else if (line.compare(STOP_STREAM_KAFKA) == 0) {
stop_stream_kafka_command(connFd, kstream, &loop_exit);
Expand Down Expand Up @@ -1267,68 +1263,68 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}

void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_server_IP, std::thread &input_stream_handler_thread, int numberOfPartitions,
void addStreamHDFSCommand(std::string masterIP,int connFd, std::string &hdfsServerIp,
std::thread &inputStreamHandlerThread, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p) {
std::string hdfs_port;
std::string msg_1 = "Do you want to use the default HDFS server(y/n)?";
int result_wr = write(connFd, msg_1.c_str(), msg_1.length());
if (result_wr < 0) {
std::string hdfsPort;
std::string msg1 = "Do you want to use the default HDFS server(y/n)?";
int resultWr = write(connFd, msg1.c_str(), msg1.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

char user_res[FRONTEND_DATA_LENGTH + 1];
bzero(user_res, FRONTEND_DATA_LENGTH + 1);
read(connFd, user_res, FRONTEND_DATA_LENGTH);
std::string user_res_s(user_res);
user_res_s = Utils::trim_copy(user_res_s);
for (char &c: user_res_s) {
char userRes[FRONTEND_DATA_LENGTH + 1];
bzero(userRes, FRONTEND_DATA_LENGTH + 1);
read(connFd, userRes, FRONTEND_DATA_LENGTH);
std::string userResS(userRes);
userResS = Utils::trim_copy(userResS);
for (char &c: userResS) {
c = tolower(c);
}

if (user_res_s == "y") {
hdfs_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.host");
hdfs_port = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.port");
if (userResS == "y") {
hdfsServerIp = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.host");
hdfsPort = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.port");
} else {
std::string message = "Send the file path to the HDFS configuration file.";
result_wr = write(connFd, message.c_str(), message.length());
if (result_wr < 0) {
resultWr = write(connFd, message.c_str(), message.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

char file_path[FRONTEND_DATA_LENGTH + 1];
bzero(file_path, FRONTEND_DATA_LENGTH + 1);
read(connFd, file_path, FRONTEND_DATA_LENGTH);
std::string file_path_s(file_path);
file_path_s = Utils::trim_copy(file_path_s);
char filePath[FRONTEND_DATA_LENGTH + 1];
bzero(filePath, FRONTEND_DATA_LENGTH + 1);
read(connFd, filePath, FRONTEND_DATA_LENGTH);
std::string filePathS(filePath);
filePathS = Utils::trim_copy(filePathS);

frontend_logger.info("Reading HDFS configuration file: " + file_path_s);
frontend_logger.info("Reading HDFS configuration file: " + filePathS);

std::vector<std::string> vec = Utils::getFileContent(file_path_s);
std::vector<std::string> vec = Utils::getFileContent(filePathS);
for (const auto &item : vec) {
frontend_logger.info("Item: " + item);
if (item.length() > 0 && !(item.rfind("#", 0) == 0)) {
std::vector<std::string> vec2 = Utils::split(item, '=');
if (vec2.size() == 2) {
if (vec2.at(0).compare("hdfs.host") == 0) {
hdfs_server_IP = vec2.at(1);
hdfsServerIp = vec2.at(1);
} else if (vec2.at(0).compare("hdfs.port") == 0) {
hdfs_port = vec2.at(1);
hdfsPort = vec2.at(1);
}
} else {
frontend_logger.error("Invalid line in configuration file: " + item);
Expand All @@ -1337,39 +1333,37 @@ void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_
}
}

if (hdfs_server_IP.empty() || hdfs_port.empty()) {
frontend_logger.error("HDFS server IP or port not set correctly");
*loop_exit_p = true;
return;
if (hdfsServerIp.empty()) {
frontend_logger.error("HDFS server IP is not set or empty.");
}
if (hdfsPort.empty()) {
frontend_logger.error("HDFS server port is not set or empty.");
}

frontend_logger.info("HDFS server IP: " + hdfs_server_IP + " HDFS port: " + hdfs_port);

frontend_logger.debug("Start serving HDFS command");
std::string message = "Send HDFS file path";
result_wr = write(connFd, message.c_str(), message.length());
if (result_wr < 0) {
std::string message = "HDFS file path: ";
resultWr = write(connFd, message.c_str(), message.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

char hdfs_file_path[FRONTEND_DATA_LENGTH + 1];
bzero(hdfs_file_path, FRONTEND_DATA_LENGTH + 1);
read(connFd, hdfs_file_path, FRONTEND_DATA_LENGTH);
std::string hdfs_file_path_s(hdfs_file_path);
hdfs_file_path_s = Utils::trim_copy(hdfs_file_path_s);
char hdfsFilePath[FRONTEND_DATA_LENGTH + 1];
bzero(hdfsFilePath, FRONTEND_DATA_LENGTH + 1);
read(connFd, hdfsFilePath, FRONTEND_DATA_LENGTH);
std::string hdfsFilePathS(hdfsFilePath);
hdfsFilePathS = Utils::trim_copy(hdfsFilePathS);

HDFSConnector *hdfsConnector = new HDFSConnector(hdfs_server_IP, hdfs_port);
HDFSConnector *hdfsConnector = new HDFSConnector(hdfsServerIp, hdfsPort);

if (!hdfsConnector->isPathValid(hdfs_file_path_s)) {
frontend_logger.error("Invalid HDFS file path: " + hdfs_file_path_s);
if (!hdfsConnector->isPathValid(hdfsFilePathS)) {
frontend_logger.error("Invalid HDFS file path: " + hdfsFilePathS);
std::string error_message = "The provided HDFS path is invalid.";
write(connFd, error_message.c_str(), error_message.length());
write(connFd, "\r\n", 2);
Expand All @@ -1378,38 +1372,70 @@ void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_
return;
}

std::string con_message = "Received the HDFS file path";
int con_result_wr = write(connFd, con_message.c_str(), con_message.length());
if (con_result_wr < 0) {
/*get directionality*/
std::string isDirectedGraph = "Is this a directed graph(y/n)?";
resultWr = write(connFd, isDirectedGraph.c_str(), isDirectedGraph.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

std::string path = "hdfs:\\" + hdfs_file_path_s;
char isDirectedRes[FRONTEND_DATA_LENGTH + 1];
bzero(isDirectedRes, FRONTEND_DATA_LENGTH + 1);
read(connFd, isDirectedRes, FRONTEND_DATA_LENGTH);
std::string isDirectedS(isDirectedRes);
isDirectedS = Utils::trim_copy(isDirectedS);
/**/

bool directed;
if (isDirectedS == "y") {
directed = true;
} else {
directed = false;
}

std::string conMsg = "Received the HDFS file path";
int conResultWr = write(connFd, conMsg.c_str(), conMsg.length());
if (conResultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

std::string path = "hdfs:\\" + hdfsFilePathS;

std::time_t time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::string uploadStartTime = ctime(&time);
std::string sqlStatement =
"INSERT INTO graph (name, upload_path, upload_start_time, upload_end_time, graph_status_idgraph_status, "
"vertexcount, centralpartitioncount, edgecount) VALUES(\"" +
hdfs_file_path_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" +
std::to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\")";
"vertexcount, centralpartitioncount, edgecount, is_directed) VALUES(\"" +
hdfsFilePathS + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\", \"" +
std::to_string(Conts::GRAPH_STATUS::NONOPERATIONAL) + "\", \"\", \"\", \"\", \"" +
(directed ? "TRUE" : "FALSE") + "\")";

int newGraphID = sqlite->runInsert(sqlStatement);
frontend_logger.info("Created graph ID: " + std::to_string(newGraphID));
HDFSStreamHandler *stream_handler = new HDFSStreamHandler(hdfsConnector->getFileSystem(), hdfs_file_path_s, numberOfPartitions, newGraphID,sqlite,masterIP);
frontend_logger.info("Start listening to " + hdfs_file_path_s);
input_stream_handler_thread = std::thread(&HDFSStreamHandler::startStreamingFromBufferToPartitions, stream_handler);
HDFSStreamHandler *streamHandler = new HDFSStreamHandler(hdfsConnector->getFileSystem(), hdfsFilePathS,
numberOfPartitions, newGraphID, sqlite, masterIP);
frontend_logger.info("Started listening to " + hdfsFilePathS);
inputStreamHandlerThread = std::thread(&HDFSStreamHandler::startStreamingFromBufferToPartitions, streamHandler);
}

static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p) {
frontend_logger.info("Start serving `" + STOP_STREAM_KAFKA + "` command");
frontend_logger.info("Started serving `" + STOP_STREAM_KAFKA + "` command");
// Unsubscribe the kafka consumer.
kstream->Unsubscribe();
string message = "Successfully stop `" + stream_topic_name + "` input kafka stream";
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/JasmineGraphFrontEndProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const string SHTDN = "shdn";
const string SEND = "send";
const string ERROR = "error";
const string ADD_STREAM_KAFKA = "adstrmk";
const string ADD_STREAM_HDFS="adstrmhdfs";
const string ADD_STREAM_HDFS="adhdfs";
const string ADD_STREAM_KAFKA_CSV = "adstrmkcsv";
const string STOP_STREAM_KAFKA = "stopstrm";
const string STOP_STRIAN = "stopstrian";
Expand Down
12 changes: 2 additions & 10 deletions src/partitioner/stream/HashPartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

class HashPartitioner {

std::vector<Partition> partitions; // Holds partition objects
std::vector<Partition> partitions;

std::atomic<bool> terminateConsumers; // Termination flag
std::atomic<bool> terminateConsumers;
std::vector<std::thread> localEdgeThreads;
std::vector<std::thread> edgeCutThreads;

Expand Down Expand Up @@ -52,14 +52,6 @@ class HashPartitioner {
long vertexCount;
long edgeCount;
std::string outputFilePath;
std::map<int, std::string> partitionFileMap;
std::map<int, std::string> centralStoreFileList;
std::map<int, std::string> centralStoreDuplicateFileList;
std::map<int, std::string> partitionAttributeFileList;
std::map<int, std::string> centralStoreAttributeFileList;
std::map<int, std::string> compositeCentralStoreFileList;
std::vector<std::map<int, std::string>> fullFileList;
std::map<int, std::map<int, std::vector<int>>> partitionedLocalGraphStorageMap;

void consumeLocalEdges(int partitionIndex,JasmineGraphServer::worker worker);
void consumeEdgeCuts(int partitionIndex,JasmineGraphServer::worker worker);
Expand Down
3 changes: 0 additions & 3 deletions src/partitioner/stream/Partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#include "Partition.h"

#include <iostream>
#include <sstream>
#include <vector>
#include <string>
Expand Down Expand Up @@ -84,12 +83,10 @@ double Partition::getVertextCount() {
}
return edgeListVetices + edgeCutVertices;
}
/*Other functions seems to be incorrect. Therefore added this functionality to get edgeCutCount for hdfs implementation*/
long Partition::edgeCutC() {
return edgeCutCount;
}

/*Other functions seems to be incorrect. Therefore added this functionality to get edgeCount for hdfs implementation*/
long Partition::edgeC() {
return edgeCount;
}
Expand Down
10 changes: 5 additions & 5 deletions src/util/hdfs/HDFSStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ void HDFSStreamHandler::startStreamingFromBufferToPartitions() {
long vertices = partitioner.getVertexCount();
long edges = partitioner.getEdgeCount();

string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertices) +
"' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) +
"' ,edgecount = '" +
std::to_string(edges) + "' WHERE idgraph = '" +
std::to_string(this->graphId) + "'";
std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertices) +
"', centralpartitioncount = '" + std::to_string(this->numberOfPartitions) +
"', edgecount = '" + std::to_string(edges) +
"', graph_status_idgraph_status = '" + std::to_string(Conts::GRAPH_STATUS::OPERATIONAL) +
"' WHERE idgraph = '" + std::to_string(this->graphId) + "'";

dbLock.lock();
this->sqlite->runUpdate(sqlStatement);
Expand Down

0 comments on commit 2ebdd1d

Please sign in to comment.