Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[PMEM-SHUFFLE-39] Fix the bug that pmem-shuffle without RPMP fails to pass Terasort benchmark due to latest patch. #40

Merged
merged 3 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
**/target/**
*.class
*.iml
*.sh

# logs
*.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
fetchContinuousBlocksInBatch
).toCompletionIterator

/**
* Force iterator to traverse itself and update internal counter
**/
wrappedStreams.size

val serializerInstance = dep.serializer.newInstance()

// Create a key/value iterator for each stream
Expand Down
10 changes: 4 additions & 6 deletions rpmp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ file(COPY bin/stop-rpmp.sh DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})

add_executable(data-server main.cc)
add_executable(chashtest test/chash_test.cc)
add_executable(rocksdbtest test/rocksdb_test.cc)
add_executable(proxy-server ProxyMain.cc)
add_executable(client Client.cc)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(rocksdbtest spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp redis++)

file(COPY ${PROJECT_SOURCE_DIR}/config DESTINATION .)
2 changes: 1 addition & 1 deletion rpmp/pmpool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ add_library(pmpool_client_jni SHARED Event.cc ProxyEvent.cc client/PmPoolClient.
target_link_libraries(pmpool_client_jni LINK_PUBLIC ${Boost_LIBRARIES} hpnl)
set_target_properties(pmpool_client_jni PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")

add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc proxy/metastore/rocksdb/Rocks.cc proxy/metastore/rocksdb/RocksConnection.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)
add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)

target_link_libraries(pmpool LINK_PUBLIC ${Boost_LIBRARIES} hpnl pmemobj)
set_target_properties(pmpool PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
Expand Down
63 changes: 5 additions & 58 deletions rpmp/pmpool/proxy/metastore/ConnectionFacade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,34 @@
#include <iostream>
#include <memory>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"

#include "redis/Redis.h"

using namespace std;
using namespace ROCKSDB_NAMESPACE;

ConnectionFacade::ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log,string type){
config_ = config;
log_ = log;
type_ = type;
}

//RocksDB
int ConnectionFacade::connect(string DBPath){
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;

// open DB
Status s = DB::Open(options, DBPath, &db_);

if (s.ok() == true){
setConnected(true);
return 0;
}
setConnected(false);
return -1;
}

string ConnectionFacade::put(string key, string value){
if(type_ == ROCKS){
Status s = db_->Put(WriteOptions(), key, value);
return s.ToString();
}else{
return redis_->set(key, value);
}
return redis_->set(key, value);
}

string ConnectionFacade::get(string key){
if(type_ == ROCKS){
string value;
Status s = db_->Get(ReadOptions(), key, &value);
return value;
}else{
return redis_->get(key);
}
return redis_->get(key);
}

int ConnectionFacade::exists(string key){
if(type_ == ROCKS){
string value;
Status s = db_->Get(ReadOptions(), key, &value);
if(s.ok()){
return 1;
}
return 0;
}else{
return redis_->exists(key);
}
return redis_->exists(key);
}

std::unordered_set<std::string> ConnectionFacade::scanAll(){
if(type_ == ROCKS){
//Do nothing
}else{
return redis_->scanAll();
}
return redis_->scanAll();
}

std::unordered_set<std::string> ConnectionFacade::scan(std::string pattern){
if(type_ == ROCKS){
//Do nothing
}else{
return redis_->scan(pattern);
}
return redis_->scan(pattern);
}

//Redis
Expand Down
12 changes: 2 additions & 10 deletions rpmp/pmpool/proxy/metastore/ConnectionFacade.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
#include <memory>
#include <string>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "redis/Redis.h"

#include "pmpool/Config.h"
#include "pmpool/RLog.h"

using namespace std;
using namespace ROCKSDB_NAMESPACE;

/**
* Facade for connection to Redis and RocksDB
* Facade for connection to Redis
*
**/
class ConnectionFacade: public std::enable_shared_from_this<ConnectionFacade>{
public:
// RocksDB
ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log, string type);
ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log,string type);
// Redis
int connect();
// Common
Expand All @@ -40,10 +35,7 @@ class ConnectionFacade: public std::enable_shared_from_this<ConnectionFacade>{
bool connected_;
int setConnected(bool connected);
string type_;
string ROCKS = "ROCKS";
string REDIS = "REDIS";
// RocksDB
DB *db_;
// Redis
shared_ptr<Redis> redis_;
};
Expand Down
8 changes: 1 addition & 7 deletions rpmp/pmpool/proxy/metastore/MetastoreFacade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "../../Config.h"

#include "MetastoreFacade.h"
#include "rocksdb/Rocks.h"
#include "redis/Redis.h"
#include "json/json.h"

Expand All @@ -17,12 +16,7 @@ MetastoreFacade::MetastoreFacade(std::shared_ptr<Config> config, std::shared_ptr

bool MetastoreFacade::connect() {
int res = 0;
if(type_ == ROCKS){
string DBPath = "/tmp/rocksdb_simple_example";
res = connection_->connect(DBPath);
}else if(type_ == REDIS){
res = connection_->connect();
}
res = connection_->connect();
if (res == 0) {
log_->get_console_log()->info("Successfully connected to metastore database");
return true;
Expand Down
3 changes: 1 addition & 2 deletions rpmp/pmpool/proxy/metastore/MetastoreFacade.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Config;

/**
* Facade for metastore, either Redis or RocksDB
* Facade for metastore
*
**/
class MetastoreFacade: public std::enable_shared_from_this<MetastoreFacade>{
Expand All @@ -31,7 +31,6 @@ class MetastoreFacade: public std::enable_shared_from_this<MetastoreFacade>{
std::string port_;
std::string type_;
std::string REDIS = "REDIS";
std::string ROCKS = "ROCKS";
};

#endif
35 changes: 0 additions & 35 deletions rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc

This file was deleted.

28 changes: 0 additions & 28 deletions rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h

This file was deleted.

60 changes: 0 additions & 60 deletions rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc

This file was deleted.

27 changes: 0 additions & 27 deletions rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h

This file was deleted.

Loading