Skip to content

Commit

Permalink
Revert "[GLUTEN-5335][VL] Use common name for both celeborn and uniff…
Browse files Browse the repository at this point in the history
…le (apache#5385)"

This reverts commit 9c15d2e.
  • Loading branch information
loneylee committed Apr 16, 2024
1 parent 13e4422 commit 8a5b4c2
Show file tree
Hide file tree
Showing 28 changed files with 96 additions and 97 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Build for Spark ${{ github.event.inputs.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -145,7 +145,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip
mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v2
with:
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
Expand All @@ -395,7 +395,7 @@ jobs:
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Puniffle && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
Expand All @@ -421,7 +421,7 @@ jobs:
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Pceleborn -DskipTests
mvn clean install -P${{ matrix.spark }} -Pbackends-velox,rss -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }}
run: |
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
Expand All @@ -437,7 +437,7 @@ jobs:
bash -c "echo -e 'CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g' > ./conf/celeborn-env.sh" && \
bash -c "echo -e 'celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64' > ./conf/celeborn-defaults.conf" && \
bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Pceleborn ${EXTRA_PROFILE} && \
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss ${EXTRA_PROFILE} && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-celeborn --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=8 --iterations=1 && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
Expand Down Expand Up @@ -488,7 +488,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -536,7 +536,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark33:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -579,7 +579,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -623,7 +623,7 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark34:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -666,7 +666,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -710,7 +710,7 @@ jobs:
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark35:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -753,7 +753,7 @@ jobs:
export SPARK_SCALA_VERSION=2.12 && \
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
mvn -ntp test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -802,4 +802,4 @@ jobs:
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Partitioning.cc
shuffle/Payload.cc
shuffle/rss/RemotePartitionWriter.cc
shuffle/rss/RssPartitionWriter.cc
shuffle/rss/CelebornPartitionWriter.cc
shuffle/RoundRobinPartitioner.cc
shuffle/ShuffleMemoryPool.cc
shuffle/ShuffleReader.cc
Expand Down
23 changes: 12 additions & 11 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,28 +411,28 @@ class BacktraceAllocationListener final : public gluten::AllocationListener {
std::atomic_int64_t backtraceBytes_{1L << 30};
};

class JavaRssClient : public RssClient {
class CelebornClient : public RssClient {
public:
JavaRssClient(JavaVM* vm, jobject javaRssShuffleWriter, jmethodID javaPushPartitionDataMethod)
: vm_(vm), javaPushPartitionData_(javaPushPartitionDataMethod) {
CelebornClient(JavaVM* vm, jobject javaCelebornShuffleWriter, jmethodID javaCelebornPushPartitionDataMethod)
: vm_(vm), javaCelebornPushPartitionData_(javaCelebornPushPartitionDataMethod) {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
throw gluten::GlutenException("JNIEnv was not attached to current thread");
}

javaRssShuffleWriter_ = env->NewGlobalRef(javaRssShuffleWriter);
javaCelebornShuffleWriter_ = env->NewGlobalRef(javaCelebornShuffleWriter);
array_ = env->NewByteArray(1024 * 1024);
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}

~JavaRssClient() {
~CelebornClient() {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
LOG(WARNING) << "JavaRssClient#~JavaRssClient(): "
LOG(WARNING) << "CelebornClient#~CelebornClient(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(javaRssShuffleWriter_);
env->DeleteGlobalRef(javaCelebornShuffleWriter_);
jbyte* byteArray = env->GetByteArrayElements(array_, NULL);
env->ReleaseByteArrayElements(array_, byteArray, JNI_ABORT);
env->DeleteGlobalRef(array_);
Expand All @@ -452,16 +452,17 @@ class JavaRssClient : public RssClient {
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}
env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes));
jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_, javaPushPartitionData_, partitionId, array_, size);
jint celebornBytesSize =
env->CallIntMethod(javaCelebornShuffleWriter_, javaCelebornPushPartitionData_, partitionId, array_, size);
checkException(env);
return static_cast<int32_t>(javaBytesSize);
return static_cast<int32_t>(celebornBytesSize);
}

void stop() override {}

private:
JavaVM* vm_;
jobject javaRssShuffleWriter_;
jmethodID javaPushPartitionData_;
jobject javaCelebornShuffleWriter_;
jmethodID javaCelebornPushPartitionData_;
jbyteArray array_;
};
15 changes: 8 additions & 7 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/StringUtil.h"

Expand Down Expand Up @@ -908,9 +908,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::shared_ptr<JavaRssClient> celebornClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
std::shared_ptr<CelebornClient> celebornClient =
std::make_shared<CelebornClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand All @@ -924,9 +924,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::shared_ptr<JavaRssClient> uniffleClient =
std::make_shared<JavaRssClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
// rename CelebornClient RssClient
std::shared_ptr<CelebornClient> uniffleClient =
std::make_shared<CelebornClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;

enum PartitionWriterType { kLocal, kRss };
enum PartitionWriterType { kLocal, kCeleborn, kUniffle };

struct ShuffleReaderOptions {
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
#include "utils/Timer.h"

namespace gluten {

void RssPartitionWriter::init() {
void CelebornPartitionWriter::init() {
bytesEvicted_.resize(numPartitions_, 0);
rawPartitionLengths_.resize(numPartitions_, 0);
}

arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
// Push data and collect metrics.
auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL);
rssClient_->stop();
celebornClient_->stop();
// Populate metrics.
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
Expand All @@ -44,12 +44,12 @@ arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
return arrow::Status::OK();
}

arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
arrow::Status CelebornPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
*actual = 0;
return arrow::Status::OK();
}

arrow::Status RssPartitionWriter::evict(
arrow::Status CelebornPartitionWriter::evict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
Expand All @@ -64,13 +64,14 @@ arrow::Status RssPartitionWriter::evict(
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
ARROW_ASSIGN_OR_RAISE(
auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
RETURN_NOT_OK(payload->serialize(celebornBufferOs.get()));
payload = nullptr; // Invalidate payload immediately.

// Push.
ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish());
bytesEvicted_[partitionId] += rssClient_->pushPartitionData(
ARROW_ASSIGN_OR_RAISE(auto buffer, celebornBufferOs->Finish());
bytesEvicted_[partitionId] += celebornClient_->pushPartitionData(
partitionId, reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
return arrow::Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

namespace gluten {

class RssPartitionWriter final : public RemotePartitionWriter {
class CelebornPartitionWriter final : public RemotePartitionWriter {
public:
RssPartitionWriter(
CelebornPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
std::shared_ptr<RssClient> rssClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), rssClient_(rssClient) {
std::shared_ptr<RssClient> celebornClient)
: RemotePartitionWriter(numPartitions, std::move(options), pool), celebornClient_(celebornClient) {
init();
}

Expand All @@ -51,7 +51,7 @@ class RssPartitionWriter final : public RemotePartitionWriter {
private:
void init();

std::shared_ptr<RssClient> rssClient_;
std::shared_ptr<RssClient> celebornClient_;

std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
Expand Down
Loading

0 comments on commit 8a5b4c2

Please sign in to comment.