Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5335][VL] Use common name for both celeborn and uniffle #5385

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Build for Spark ${{ github.event.inputs.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -145,7 +145,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
Expand All @@ -395,7 +395,7 @@ jobs:
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Puniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1

Expand All @@ -421,7 +421,7 @@ jobs:
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/
mvn clean install -P${{ matrix.spark }} -Pbackends-velox,rss -DskipTests
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Pceleborn -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }}
run: |
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
Expand All @@ -437,7 +437,7 @@ jobs:
bash -c "echo -e 'CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g' > ./conf/celeborn-env.sh" && \
bash -c "echo -e 'celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64' > ./conf/celeborn-defaults.conf" && \
bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss ${EXTRA_PROFILE} && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Pceleborn ${EXTRA_PROFILE} && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-celeborn --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=8 --iterations=1 && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
Expand Down Expand Up @@ -488,7 +488,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -536,7 +536,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest

run-spark-test-spark33:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -579,7 +579,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -623,7 +623,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest

run-spark-test-spark34:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -666,7 +666,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -710,7 +710,7 @@ jobs:
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest

run-spark-test-spark35:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -753,7 +753,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -802,4 +802,4 @@ jobs:
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Partitioning.cc
shuffle/Payload.cc
shuffle/rss/RemotePartitionWriter.cc
shuffle/rss/CelebornPartitionWriter.cc
shuffle/rss/RssPartitionWriter.cc
shuffle/RoundRobinPartitioner.cc
shuffle/ShuffleMemoryPool.cc
shuffle/ShuffleReader.cc
Expand Down
23 changes: 11 additions & 12 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,28 +411,28 @@ class BacktraceAllocationListener final : public gluten::AllocationListener {
std::atomic_int64_t backtraceBytes_{1L << 30};
};

class CelebornClient : public RssClient {
class JavaRssClient : public RssClient {
public:
CelebornClient(JavaVM* vm, jobject javaCelebornShuffleWriter, jmethodID javaCelebornPushPartitionDataMethod)
: vm_(vm), javaCelebornPushPartitionData_(javaCelebornPushPartitionDataMethod) {
JavaRssClient(JavaVM* vm, jobject javaRssShuffleWriter, jmethodID javaPushPartitionDataMethod)
: vm_(vm), javaPushPartitionData_(javaPushPartitionDataMethod) {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
throw gluten::GlutenException("JNIEnv was not attached to current thread");
}

javaCelebornShuffleWriter_ = env->NewGlobalRef(javaCelebornShuffleWriter);
javaRssShuffleWriter_ = env->NewGlobalRef(javaRssShuffleWriter);
array_ = env->NewByteArray(1024 * 1024);
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}

~CelebornClient() {
~JavaRssClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
LOG(WARNING) << "JavaRssClient#~JavaRssClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
env->DeleteGlobalRef(javaRssShuffleWriter_);
jbyte* byteArray = env->GetByteArrayElements(array_, NULL);
env->ReleaseByteArrayElements(array_, byteArray, JNI_ABORT);
env->DeleteGlobalRef(array_);
Expand All @@ -452,17 +452,16 @@ class CelebornClient : public RssClient {
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}
env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes));
jint celebornBytesSize =
env->CallIntMethod(javaCelebornShuffleWriter_, javaCelebornPushPartitionData_, partitionId, array_, size);
jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_, javaPushPartitionData_, partitionId, array_, size);
checkException(env);
return static_cast<int32_t>(celebornBytesSize);
return static_cast<int32_t>(javaBytesSize);
}

void stop() override {}

private:
JavaVM* vm_;
jobject javaCelebornShuffleWriter_;
jmethodID javaCelebornPushPartitionData_;
jobject javaRssShuffleWriter_;
jmethodID javaPushPartitionData_;
jbyteArray array_;
};
15 changes: 7 additions & 8 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/StringUtil.h"

Expand Down Expand Up @@ -908,9 +908,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::shared_ptr<CelebornClient> celebornClient =
std::make_shared<CelebornClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
std::shared_ptr<JavaRssClient> celebornClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand All @@ -924,10 +924,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
// rename CelebornClient RssClient
std::shared_ptr<CelebornClient> uniffleClient =
std::make_shared<CelebornClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
std::shared_ptr<JavaRssClient> uniffleClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;

enum PartitionWriterType { kLocal, kCeleborn, kUniffle };
enum PartitionWriterType { kLocal, kRss };

struct ShuffleReaderOptions {
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/Timer.h"

namespace gluten {

void CelebornPartitionWriter::init() {
void RssPartitionWriter::init() {
bytesEvicted_.resize(numPartitions_, 0);
rawPartitionLengths_.resize(numPartitions_, 0);
}

arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
// Push data and collect metrics.
auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL);
celebornClient_->stop();
rssClient_->stop();
// Populate metrics.
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
Expand All @@ -44,12 +44,12 @@ arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
return arrow::Status::OK();
}

arrow::Status CelebornPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
*actual = 0;
return arrow::Status::OK();
}

arrow::Status CelebornPartitionWriter::evict(
arrow::Status RssPartitionWriter::evict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
Expand All @@ -64,14 +64,13 @@ arrow::Status CelebornPartitionWriter::evict(
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(
auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(celebornBufferOs.get()));
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
payload = nullptr; // Invalidate payload immediately.

// Push.
ARROW_ASSIGN_OR_RAISE(auto buffer, celebornBufferOs->Finish());
bytesEvicted_[partitionId] += celebornClient_->pushPartitionData(
ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish());
bytesEvicted_[partitionId] += rssClient_->pushPartitionData(
partitionId, reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
return arrow::Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

namespace gluten {

class CelebornPartitionWriter final : public RemotePartitionWriter {
class RssPartitionWriter final : public RemotePartitionWriter {
public:
CelebornPartitionWriter(
RssPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
std::shared_ptr<RssClient> celebornClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), celebornClient_(celebornClient) {
std::shared_ptr<RssClient> rssClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), rssClient_(rssClient) {
init();
}

Expand All @@ -51,7 +51,7 @@ class CelebornPartitionWriter final : public RemotePartitionWriter {
private:
void init();

std::shared_ptr<RssClient> celebornClient_;
std::shared_ptr<RssClient> rssClient_;

std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
Expand Down
Loading
Loading