Skip to content

Commit

Permalink
[GLUTEN-5335][VL] Use common name for both celeborn and uniffle
Browse files Browse the repository at this point in the history
  • Loading branch information
summaryzb committed Apr 13, 2024
1 parent a06a2fd commit 3c3b5ae
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Partitioning.cc
shuffle/Payload.cc
shuffle/rss/RemotePartitionWriter.cc
shuffle/rss/CelebornPartitionWriter.cc
shuffle/rss/RssPartitionWriter.cc
shuffle/RoundRobinPartitioner.cc
shuffle/ShuffleMemoryPool.cc
shuffle/ShuffleReader.cc
Expand Down
24 changes: 12 additions & 12 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,28 +411,28 @@ class BacktraceAllocationListener final : public gluten::AllocationListener {
std::atomic_int64_t backtraceBytes_{1L << 30};
};

class CelebornClient : public RssClient {
class JavaRssClient : public RssClient {
public:
CelebornClient(JavaVM* vm, jobject javaCelebornShuffleWriter, jmethodID javaCelebornPushPartitionDataMethod)
: vm_(vm), javaCelebornPushPartitionData_(javaCelebornPushPartitionDataMethod) {
JavaRssClient(JavaVM* vm, jobject javaShuffleWriter, jmethodID javaPushPartitionDataMethod)
: vm_(vm), javaPushPartitionData_(javaPushPartitionDataMethod) {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
throw gluten::GlutenException("JNIEnv was not attached to current thread");
}

javaCelebornShuffleWriter_ = env->NewGlobalRef(javaCelebornShuffleWriter);
javaShuffleWriter_ = env->NewGlobalRef(javaShuffleWriter);
array_ = env->NewByteArray(1024 * 1024);
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}

~CelebornClient() {
~JavaRssClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
LOG(WARNING) << "JavaRssClient#~JavaRssClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
env->DeleteGlobalRef(javaShuffleWriter_);
jbyte* byteArray = env->GetByteArrayElements(array_, NULL);
env->ReleaseByteArrayElements(array_, byteArray, JNI_ABORT);
env->DeleteGlobalRef(array_);
Expand All @@ -452,17 +452,17 @@ class CelebornClient : public RssClient {
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}
env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes));
jint celebornBytesSize =
env->CallIntMethod(javaCelebornShuffleWriter_, javaCelebornPushPartitionData_, partitionId, array_, size);
jint javaBytesSize =
env->CallIntMethod(javaShuffleWriter_, javaPushPartitionData_, partitionId, array_, size);
checkException(env);
return static_cast<int32_t>(celebornBytesSize);
return static_cast<int32_t>(javaBytesSize);
}

void stop() override {}

private:
JavaVM* vm_;
jobject javaCelebornShuffleWriter_;
jmethodID javaCelebornPushPartitionData_;
jobject javaShuffleWriter_;
jmethodID javaPushPartitionData_;
jbyteArray array_;
};
15 changes: 7 additions & 8 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/StringUtil.h"

Expand Down Expand Up @@ -908,9 +908,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::shared_ptr<CelebornClient> celebornClient =
std::make_shared<CelebornClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
std::shared_ptr<JavaRssClient> celebornClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand All @@ -924,10 +924,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
// rename CelebornClient RssClient
std::shared_ptr<CelebornClient> uniffleClient =
std::make_shared<CelebornClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
std::shared_ptr<JavaRssClient> uniffleClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;

enum PartitionWriterType { kLocal, kCeleborn, kUniffle };
enum PartitionWriterType { kLocal, kRss };

struct ShuffleReaderOptions {
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/Timer.h"

namespace gluten {

void CelebornPartitionWriter::init() {
void RssPartitionWriter::init() {
bytesEvicted_.resize(numPartitions_, 0);
rawPartitionLengths_.resize(numPartitions_, 0);
}

arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
// Push data and collect metrics.
auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL);
celebornClient_->stop();
rssClient_->stop();
// Populate metrics.
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
Expand All @@ -44,12 +44,12 @@ arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
return arrow::Status::OK();
}

arrow::Status CelebornPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
*actual = 0;
return arrow::Status::OK();
}

arrow::Status CelebornPartitionWriter::evict(
arrow::Status RssPartitionWriter::evict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
Expand All @@ -64,14 +64,13 @@ arrow::Status CelebornPartitionWriter::evict(
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(
auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(celebornBufferOs.get()));
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
payload = nullptr; // Invalidate payload immediately.

// Push.
ARROW_ASSIGN_OR_RAISE(auto buffer, celebornBufferOs->Finish());
bytesEvicted_[partitionId] += celebornClient_->pushPartitionData(
ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish());
bytesEvicted_[partitionId] += rssClient_->pushPartitionData(
partitionId, reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
return arrow::Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

namespace gluten {

class CelebornPartitionWriter final : public RemotePartitionWriter {
class RssPartitionWriter final : public RemotePartitionWriter {
public:
CelebornPartitionWriter(
RssPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
std::shared_ptr<RssClient> celebornClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), celebornClient_(celebornClient) {
std::shared_ptr<RssClient> rssClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), rssClient_(rssClient) {
init();
}

Expand All @@ -51,7 +51,7 @@ class CelebornPartitionWriter final : public RemotePartitionWriter {
private:
void init();

std::shared_ptr<RssClient> celebornClient_;
std::shared_ptr<RssClient> rssClient_;

std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
Expand Down
8 changes: 4 additions & 4 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include "config/GlutenConfig.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/StringUtil.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/exception.h"
Expand All @@ -46,7 +46,7 @@ DEFINE_bool(print_result, true, "Print result for execution");
DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator");
DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single");
DEFINE_bool(celeborn, false, "Mocking celeborn shuffle.");
DEFINE_bool(rss, false, "Mocking rss.");
DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec");
DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec");
DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec");
Expand Down Expand Up @@ -90,9 +90,9 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
}

std::unique_ptr<PartitionWriter> partitionWriter;
if (FLAGS_celeborn) {
if (FLAGS_rss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
partitionWriter = std::make_unique<RssPartitionWriter>(
FLAGS_shuffle_partitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/tests/MemoryPoolUtils.h"
Expand Down Expand Up @@ -73,7 +73,7 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
params.push_back(
ShuffleTestParams{PartitionWriterType::kLocal, compression, compressionThreshold, mergeBufferSize});
}
params.push_back(ShuffleTestParams{PartitionWriterType::kCeleborn, compression, compressionThreshold, 0});
params.push_back(ShuffleTestParams{PartitionWriterType::kRss, compression, compressionThreshold, 0});
}
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
const std::vector<std::string>& localDirs,
const PartitionWriterOptions& options,
arrow::MemoryPool* pool) {
if (partitionWriterType == PartitionWriterType::kCeleborn) {
if (partitionWriterType == PartitionWriterType::kRss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
return std::make_unique<CelebornPartitionWriter>(numPartitions, options, pool, std::move(rssClient));
return std::make_unique<RssPartitionWriter>(numPartitions, options, pool, std::move(rssClient));
}
return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool, dataFile, localDirs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private boolean isDriver() {

public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED, "false");
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def isUseUniffleShuffleManager: Boolean =
conf
.getConfString("spark.shuffle.manager", "sort")
.contains("RssShuffleManager")
.contains("UniffleShuffleManager")

def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)

Expand Down

0 comments on commit 3c3b5ae

Please sign in to comment.