Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added HDFS functionality for streaming graphs from HDFS into Jasminegraph partitions. #252

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

muthumala19
Copy link

Added functionality to stream graph data from hdfs files and persist them in the Jasminegraph partitions.

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
14.2% Duplication on New Code (required ≤ 3%)

See analysis details on SonarCloud

@@ -136,6 +141,10 @@ void *frontendservicesesion(void *dummyPt) {
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);

//Initiate HDFS parameters
std::string hdfs_server_IP;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::string hdfs_server_IP;
std::string hdfsServerIP;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed variable names, and method names into camel case notation.

@@ -1251,6 +1267,137 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}

void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_server_IP, std::thread &input_stream_handler_thread,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void add_stream_hdfs_command(std::string masterIP,int connFd, std::string &hdfs_server_IP, std::thread &input_stream_handler_thread,
void addStreamHdfsCommand(std::string masterIP,int connFd, std::string &hdfsServerIP, std::thread &inputStreamHandlerThread,

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is issue I see with function names and variable names. Please follow the approach I have used in my comments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed variable names, and method names into camel case notation.

std::string file_path_s(file_path);
file_path_s = Utils::trim_copy(file_path_s);

frontend_logger.info("Reading HDFS configuration file: " + file_path_s);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file part of JasmineGraph or is it something owned by the HDFS?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this hdfs implementation, there are two types of HDFS servers.

  1. Jasminegraph's Default HDFS server.
  2. Custom HDFS servers from users.
    users can use either one of those two types. if a user wants to refer to a custom hdfs server, then he needs to provide config file to that HDFS server. Here it refers to the custom hdfs server config file. currently this file should be put into the jasminegraph docker container and give the respective path.


std::vector<std::string> vec = Utils::getFileContent(file_path_s);
for (const auto &item : vec) {
frontend_logger.info("Item: " + item);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a point of having log line here for the Item. Please remove it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

if (item.length() > 0 && !(item.rfind("#", 0) == 0)) {
std::vector<std::string> vec2 = Utils::split(item, '=');
if (vec2.size() == 2) {
if (vec2.at(0).compare("hdfs.host") == 0) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration file reading logic must be moved to some different class and different function. Do not clutter those details here.

}

if (hdfs_server_IP.empty() || hdfs_port.empty()) {
frontend_logger.error("HDFS server IP or port not set correctly");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message does not have any meaning. You need to point out the erroneous value and the reason for why you say there is an error.l

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more specific error handling with below code,

if (hdfsServerIp.empty()) {
    frontend_logger.error("HDFS server IP is not set or empty.");
}
if (hdfsPort.empty()) {
    frontend_logger.error("HDFS server port is not set or empty.");
}

hdfs_port = vec2.at(1);
}
} else {
frontend_logger.error("Invalid line in configuration file: " + item);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which configuration file are you referring to here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this hdfs implementation, there are two types of HDFS servers.

  1. Jasminegraph's Default HDFS server.
  2. Custom HDFS servers from users.
    users can use either one of those two types. if a user wants to refer to a custom hdfs server, then he needs to provide config file to that HDFS server. Here it refers to the custom hdfs server config file.


frontend_logger.info("HDFS server IP: " + hdfs_server_IP + " HDFS port: " + hdfs_port);

frontend_logger.info("Start serving HDFS command");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No meaning in this info log

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

frontend_logger.info("HDFS server IP: " + hdfs_server_IP + " HDFS port: " + hdfs_port);

frontend_logger.info("Start serving HDFS command");
std::string message = "Send HDFS file path";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should have short word commands than having this kind of lengthy sentences as the commands.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shortened as "HDFS file path: "

this->graphID = graphID;
}

void MetisPartitioner::loadFileChunks(string inputFilePath) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function name is misleading. It seems it does not do any meaningful processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is no longer there in the updated hdfs implementation.

@@ -1009,3 +1009,125 @@ void MetisPartitioner::writeSerializedCompositeMasterFiles(std::string part) {
masterFileMutex.unlock();
partitioner_logger.log("Serializing done for central part " + part, "info");
}

void MetisPartitioner::setPathAndGraphID(int graphID) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not useful. I do not see any point of having such function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is no longer there in the updated hdfs implementation.

@miyurud
Copy link
Owner

miyurud commented Dec 13, 2024

Please fix the commit messages in this PR following this blog

muthumala19 added a commit to muthumala19/jasminegraph that referenced this pull request Dec 19, 2024
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
7.7% Duplication on New Code (required ≤ 3%)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants