From 9c15d2e1e2f5cac4cf65c169c099f81c1ba1bc2e Mon Sep 17 00:00:00 2001 From: summaryzb Date: Tue, 16 Apr 2024 10:56:38 +0800 Subject: [PATCH] [GLUTEN-5335][VL] Use common name for both celeborn and uniffle (#5385) --- .github/workflows/build_bundle_package.yml | 6 ++--- .github/workflows/velox_docker.yml | 24 +++++++++---------- cpp/core/CMakeLists.txt | 2 +- cpp/core/jni/JniCommon.h | 23 +++++++++--------- cpp/core/jni/JniWrapper.cc | 15 ++++++------ cpp/core/shuffle/Options.h | 2 +- ...rtitionWriter.cc => RssPartitionWriter.cc} | 21 ++++++++-------- ...PartitionWriter.h => RssPartitionWriter.h} | 10 ++++---- cpp/velox/benchmarks/GenericBenchmark.cc | 8 +++---- cpp/velox/tests/VeloxShuffleWriterTest.cc | 4 ++-- .../utils/tests/VeloxShuffleWriterTestBase.h | 4 ++-- dev/buildbundle-veloxbe.sh | 8 +++---- dev/package-vcpkg.sh | 8 +++---- dev/package.sh | 8 +++---- docs/developers/HowTo.md | 4 ++-- docs/developers/MicroBenchmarks.md | 4 ++-- docs/developers/NewToGluten.md | 4 ++-- docs/get-started/Velox.md | 6 ++--- ep/build-clickhouse/src/package.sh | 4 ++-- .../gluten/uniffle/UniffleShuffleManager.java | 2 +- package/pom.xml | 4 ++-- pom.xml | 4 ++-- .../org/apache/gluten/GlutenConfig.scala | 2 +- tools/gluten-it/package/pom.xml | 4 ++-- tools/gluten-te/centos/dockerfile-build | 3 ++- tools/gluten-te/ubuntu/dockerfile-build | 3 ++- .../buildhere-veloxbe-dev/scripts/all.sh | 3 ++- .../ubuntu/examples/buildhere-veloxbe/run.sh | 3 ++- 28 files changed, 97 insertions(+), 96 deletions(-) rename cpp/core/shuffle/rss/{CelebornPartitionWriter.cc => RssPartitionWriter.cc} (78%) rename cpp/core/shuffle/rss/{CelebornPartitionWriter.h => RssPartitionWriter.h} (87%) diff --git a/.github/workflows/build_bundle_package.yml b/.github/workflows/build_bundle_package.yml index 4fc45f098dac..01ddd6f43857 100644 --- a/.github/workflows/build_bundle_package.yml +++ b/.github/workflows/build_bundle_package.yml @@ -79,7 +79,7 @@ jobs: - name: Build for Spark ${{ github.event.inputs.spark }} run: | cd $GITHUB_WORKSPACE/ && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip + mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip - name: Upload bundle package uses: actions/upload-artifact@v2 with: @@ -110,7 +110,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip + mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip - name: Upload bundle package uses: actions/upload-artifact@v2 with: @@ -145,7 +145,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Prss -DskipTests -Dmaven.source.skip + mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip - name: Upload bundle package uses: actions/upload-artifact@v2 with: diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 8515d79d2ba5..271daf679ae5 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -369,7 +369,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ - mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests + mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0 run: | export MAVEN_HOME=/usr/lib/maven && \ @@ -395,7 +395,7 @@ jobs: bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \ bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \ bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh - cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \ + cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Puniffle && \ GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ --local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 @@ -421,7 +421,7 @@ jobs: - name: Build for Spark ${{ matrix.spark }} run: | cd $GITHUB_WORKSPACE/ - mvn clean install -P${{ matrix.spark }} -Pbackends-velox,rss -DskipTests + mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Pceleborn -DskipTests - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }} run: | export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 @@ -437,7 +437,7 @@ jobs: bash -c "echo -e 'CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g' > ./conf/celeborn-env.sh" && \ bash -c "echo -e 'celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64' > ./conf/celeborn-defaults.conf" && \ bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \ - cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss ${EXTRA_PROFILE} && \ + cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2 -Pceleborn ${EXTRA_PROFILE} && \ GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ --local --preset=velox-with-celeborn --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=8 --iterations=1 && \ GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ @@ -488,7 +488,7 @@ jobs: export SPARK_SCALA_VERSION=2.12 export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ mvn -ntp test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() @@ -536,7 +536,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + mvn -ntp clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark33: runs-on: ubuntu-20.04 @@ -579,7 +579,7 @@ jobs: export SPARK_SCALA_VERSION=2.12 && \ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ mvn -ntp test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() @@ -623,7 +623,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + mvn -ntp clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark34: runs-on: ubuntu-20.04 @@ -666,7 +666,7 @@ jobs: export SPARK_SCALA_VERSION=2.12 && \ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ mvn -ntp test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() @@ -710,7 +710,7 @@ jobs: cd $GITHUB_WORKSPACE/ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + mvn -ntp clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark35: runs-on: ubuntu-20.04 @@ -753,7 +753,7 @@ jobs: export SPARK_SCALA_VERSION=2.12 && \ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ mvn -ntp test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() @@ -802,4 +802,4 @@ jobs: cd $GITHUB_WORKSPACE/ export MAVEN_HOME=/usr/lib/maven export PATH=${PATH}:${MAVEN_HOME}/bin - mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Prss -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest \ No newline at end of file + mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest \ No newline at end of file diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 7e3e1ca00f4a..c369a5bc38eb 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -194,7 +194,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/Partitioning.cc shuffle/Payload.cc shuffle/rss/RemotePartitionWriter.cc - shuffle/rss/CelebornPartitionWriter.cc + shuffle/rss/RssPartitionWriter.cc shuffle/RoundRobinPartitioner.cc shuffle/ShuffleMemoryPool.cc shuffle/ShuffleReader.cc diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index ad21d442c1a5..bda5fc1dfcb9 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -411,28 +411,28 @@ class BacktraceAllocationListener final : public gluten::AllocationListener { std::atomic_int64_t backtraceBytes_{1L << 30}; }; -class CelebornClient : public RssClient { +class JavaRssClient : public RssClient { public: - CelebornClient(JavaVM* vm, jobject javaCelebornShuffleWriter, jmethodID javaCelebornPushPartitionDataMethod) - : vm_(vm), javaCelebornPushPartitionData_(javaCelebornPushPartitionDataMethod) { + JavaRssClient(JavaVM* vm, jobject javaRssShuffleWriter, jmethodID javaPushPartitionDataMethod) + : vm_(vm), javaPushPartitionData_(javaPushPartitionDataMethod) { JNIEnv* env; if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { throw gluten::GlutenException("JNIEnv was not attached to current thread"); } - javaCelebornShuffleWriter_ = env->NewGlobalRef(javaCelebornShuffleWriter); + javaRssShuffleWriter_ = env->NewGlobalRef(javaRssShuffleWriter); array_ = env->NewByteArray(1024 * 1024); array_ = static_cast(env->NewGlobalRef(array_)); } - ~CelebornClient() { + ~JavaRssClient() { JNIEnv* env; if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { - LOG(WARNING) << "CelebornClient#~CelebornClient(): " + LOG(WARNING) << "JavaRssClient#~JavaRssClient(): " << "JNIEnv was not attached to current thread"; return; } - env->DeleteGlobalRef(javaCelebornShuffleWriter_); + env->DeleteGlobalRef(javaRssShuffleWriter_); jbyte* byteArray = env->GetByteArrayElements(array_, NULL); env->ReleaseByteArrayElements(array_, byteArray, JNI_ABORT); env->DeleteGlobalRef(array_); @@ -452,17 +452,16 @@ class CelebornClient : public RssClient { array_ = static_cast(env->NewGlobalRef(array_)); } env->SetByteArrayRegion(array_, 0, size, reinterpret_cast(bytes)); - jint celebornBytesSize = - env->CallIntMethod(javaCelebornShuffleWriter_, javaCelebornPushPartitionData_, partitionId, array_, size); + jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_, javaPushPartitionData_, partitionId, array_, size); checkException(env); - return static_cast(celebornBytesSize); + return static_cast(javaBytesSize); } void stop() override {} private: JavaVM* vm_; - jobject javaCelebornShuffleWriter_; - jmethodID javaCelebornPushPartitionData_; + jobject javaRssShuffleWriter_; + jmethodID javaPushPartitionData_; jbyteArray array_; }; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 6f5b2e332e37..748eb58319c9 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -34,7 +34,7 @@ #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" #include "shuffle/Utils.h" -#include "shuffle/rss/CelebornPartitionWriter.h" +#include "shuffle/rss/RssPartitionWriter.h" #include "utils/ArrowStatus.h" #include "utils/StringUtil.h" @@ -908,9 +908,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe if (env->GetJavaVM(&vm) != JNI_OK) { throw gluten::GlutenException("Unable to get JavaVM instance"); } - std::shared_ptr celebornClient = - std::make_shared(vm, partitionPusher, celebornPushPartitionDataMethod); - partitionWriter = std::make_unique( + std::shared_ptr celebornClient = + std::make_shared(vm, partitionPusher, celebornPushPartitionDataMethod); + partitionWriter = std::make_unique( numPartitions, std::move(partitionWriterOptions), memoryManager->getArrowMemoryPool(), @@ -924,10 +924,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe if (env->GetJavaVM(&vm) != JNI_OK) { throw gluten::GlutenException("Unable to get JavaVM instance"); } - // rename CelebornClient RssClient - std::shared_ptr uniffleClient = - std::make_shared(vm, partitionPusher, unifflePushPartitionDataMethod); - partitionWriter = std::make_unique( + std::shared_ptr uniffleClient = + std::make_shared(vm, partitionPusher, unifflePushPartitionDataMethod); + partitionWriter = std::make_unique( numPartitions, std::move(partitionWriterOptions), memoryManager->getArrowMemoryPool(), diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 6a793daa3488..d8fe1c802bff 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; -enum PartitionWriterType { kLocal, kCeleborn, kUniffle }; +enum PartitionWriterType { kLocal, kRss }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc similarity index 78% rename from cpp/core/shuffle/rss/CelebornPartitionWriter.cc rename to cpp/core/shuffle/rss/RssPartitionWriter.cc index b1ac1a1e3cee..72bcf1d3a4b0 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc +++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc @@ -19,20 +19,20 @@ #include "shuffle/Payload.h" #include "shuffle/Utils.h" -#include "shuffle/rss/CelebornPartitionWriter.h" +#include "shuffle/rss/RssPartitionWriter.h" #include "utils/Timer.h" namespace gluten { -void CelebornPartitionWriter::init() { +void RssPartitionWriter::init() { bytesEvicted_.resize(numPartitions_, 0); rawPartitionLengths_.resize(numPartitions_, 0); } -arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) { +arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) { // Push data and collect metrics. auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL); - celebornClient_->stop(); + rssClient_->stop(); // Populate metrics. metrics->totalCompressTime += compressTime_; metrics->totalEvictTime += spillTime_; @@ -44,12 +44,12 @@ arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) { return arrow::Status::OK(); } -arrow::Status CelebornPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) { +arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) { *actual = 0; return arrow::Status::OK(); } -arrow::Status CelebornPartitionWriter::evict( +arrow::Status RssPartitionWriter::evict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, @@ -64,14 +64,13 @@ arrow::Status CelebornPartitionWriter::evict( ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); // Copy payload to arrow buffered os. - ARROW_ASSIGN_OR_RAISE( - auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_)); - RETURN_NOT_OK(payload->serialize(celebornBufferOs.get())); + ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_)); + RETURN_NOT_OK(payload->serialize(rssBufferOs.get())); payload = nullptr; // Invalidate payload immediately. // Push. - ARROW_ASSIGN_OR_RAISE(auto buffer, celebornBufferOs->Finish()); - bytesEvicted_[partitionId] += celebornClient_->pushPartitionData( + ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish()); + bytesEvicted_[partitionId] += rssClient_->pushPartitionData( partitionId, reinterpret_cast(const_cast(buffer->data())), buffer->size()); return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.h b/cpp/core/shuffle/rss/RssPartitionWriter.h similarity index 87% rename from cpp/core/shuffle/rss/CelebornPartitionWriter.h rename to cpp/core/shuffle/rss/RssPartitionWriter.h index 612979c024cd..ef43017fcf64 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.h +++ b/cpp/core/shuffle/rss/RssPartitionWriter.h @@ -26,14 +26,14 @@ namespace gluten { -class CelebornPartitionWriter final : public RemotePartitionWriter { +class RssPartitionWriter final : public RemotePartitionWriter { public: - CelebornPartitionWriter( + RssPartitionWriter( uint32_t numPartitions, PartitionWriterOptions options, arrow::MemoryPool* pool, - std::shared_ptr celebornClient) - : RemotePartitionWriter(numPartitions, std::move(options), pool), celebornClient_(celebornClient) { + std::shared_ptr rssClient) + : RemotePartitionWriter(numPartitions, std::move(options), pool), rssClient_(rssClient) { init(); } @@ -51,7 +51,7 @@ class CelebornPartitionWriter final : public RemotePartitionWriter { private: void init(); - std::shared_ptr celebornClient_; + std::shared_ptr rssClient_; std::vector bytesEvicted_; std::vector rawPartitionLengths_; diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index f2e275911a02..ef88da2c3abf 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -31,7 +31,7 @@ #include "config/GlutenConfig.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" -#include "shuffle/rss/CelebornPartitionWriter.h" +#include "shuffle/rss/RssPartitionWriter.h" #include "utils/StringUtil.h" #include "utils/VeloxArrowUtils.h" #include "utils/exception.h" @@ -46,7 +46,7 @@ DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single"); -DEFINE_bool(celeborn, false, "Mocking celeborn shuffle."); +DEFINE_bool(rss, false, "Mocking rss."); DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec"); DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec"); DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec"); @@ -90,9 +90,9 @@ std::shared_ptr createShuffleWriter( } std::unique_ptr partitionWriter; - if (FLAGS_celeborn) { + if (FLAGS_rss) { auto rssClient = std::make_unique(dataFile); - partitionWriter = std::make_unique( + partitionWriter = std::make_unique( FLAGS_shuffle_partitions, std::move(partitionWriterOptions), memoryManager->getArrowMemoryPool(), diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 3f370207dae7..2a8f12afbbfe 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -20,7 +20,7 @@ #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" -#include "shuffle/rss/CelebornPartitionWriter.h" +#include "shuffle/rss/RssPartitionWriter.h" #include "utils/TestUtils.h" #include "utils/VeloxArrowUtils.h" #include "utils/tests/MemoryPoolUtils.h" @@ -73,7 +73,7 @@ std::vector createShuffleTestParams() { params.push_back( ShuffleTestParams{PartitionWriterType::kLocal, compression, compressionThreshold, mergeBufferSize}); } - params.push_back(ShuffleTestParams{PartitionWriterType::kCeleborn, compression, compressionThreshold, 0}); + params.push_back(ShuffleTestParams{PartitionWriterType::kRss, compression, compressionThreshold, 0}); } } diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index bef131981e2d..972c0cb25850 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -53,9 +53,9 @@ std::unique_ptr createPartitionWriter( const std::vector& localDirs, const PartitionWriterOptions& options, arrow::MemoryPool* pool) { - if (partitionWriterType == PartitionWriterType::kCeleborn) { + if (partitionWriterType == PartitionWriterType::kRss) { auto rssClient = std::make_unique(dataFile); - return std::make_unique(numPartitions, options, pool, std::move(rssClient)); + return std::make_unique(numPartitions, options, pool, std::move(rssClient)); } return std::make_unique(numPartitions, options, pool, dataFile, localDirs); } diff --git a/dev/buildbundle-veloxbe.sh b/dev/buildbundle-veloxbe.sh index 1c77b52a6ada..10c8e61e2424 100755 --- a/dev/buildbundle-veloxbe.sh +++ b/dev/buildbundle-veloxbe.sh @@ -4,7 +4,7 @@ BASEDIR=$(dirname $0) source "$BASEDIR/builddeps-veloxbe.sh" cd $GLUTEN_DIR -mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.5 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.2 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.3 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.4 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.5 -DskipTests diff --git a/dev/package-vcpkg.sh b/dev/package-vcpkg.sh index 5c3e95759803..3f14f1c902d1 100755 --- a/dev/package-vcpkg.sh +++ b/dev/package-vcpkg.sh @@ -8,7 +8,7 @@ GLUTEN_DIR="$CURRENT_DIR/.." cd "$GLUTEN_DIR" source ./dev/vcpkg/env.sh ./dev/buildbundle-veloxbe.sh --build_tests=ON --build_benchmarks=ON --enable_s3=ON --enable_hdfs=ON -mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.5 -DskipTests \ No newline at end of file +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.2 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.3 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.4 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.5 -DskipTests \ No newline at end of file diff --git a/dev/package.sh b/dev/package.sh index 2e33fcd85db5..1b9ca85e9590 100755 --- a/dev/package.sh +++ b/dev/package.sh @@ -11,10 +11,10 @@ ARCH=`uname -m` # compile gluten jar $GLUTEN_DIR/dev/builddeps-veloxbe.sh --build_tests=ON --build_benchmarks=ON --enable_s3=ON --enable_hdfs=ON -mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests -mvn clean package -Pbackends-velox -Prss -Pspark-3.5 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.2 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.3 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.4 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.5 -DskipTests mkdir -p $THIRDPARTY_LIB function process_setup_ubuntu_2004 { diff --git a/docs/developers/HowTo.md b/docs/developers/HowTo.md index ff036a060b06..a13bf02ebede 100644 --- a/docs/developers/HowTo.md +++ b/docs/developers/HowTo.md @@ -54,8 +54,8 @@ ${GLUTEN_HOME}/dev/builddeps-veloxbe.sh --build_tests=ON --build_benchmarks=ON - ``` cd ${GLUTEN_HOME} -mvn clean package -Pspark-3.2 -Pbackends-velox -Prss -mvn test -Pspark-3.2 -Pbackends-velox -Prss -pl backends-velox \ +mvn clean package -Pspark-3.2 -Pbackends-velox -Pceleborn -Puniffle +mvn test -Pspark-3.2 -Pbackends-velox -Pceleborn -pl backends-velox \ -am -DtagsToInclude="org.apache.gluten.tags.GenerateExample" \ -Dtest=none -DfailIfNoTests=false \ -Dexec.skip diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index f6c95d593337..dc0c3b2a019b 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -36,8 +36,8 @@ generate example input files: cd /path/to/gluten/ ./dev/buildbundle-veloxbe.sh --build_tests=ON --build_benchmarks=ON -# Run test to generate input data files. If you are using spark 3.3, replace -Pspark-3.2 with -Pspark-3.3 -mvn test -Pspark-3.2 -Pbackends-velox -Prss -pl backends-velox -am \ +# Run test to generate input data files. If you are using spark 3.3, replace -Pspark-3.2 with -Pspark-3.3, If you are using uniffle, replace -Pceleborn with -Puniffle +mvn test -Pspark-3.2 -Pbackends-velox -Pcelenborn -pl backends-velox -am \ -DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none -DfailIfNoTests=false -Dexec.skip ``` diff --git a/docs/developers/NewToGluten.md b/docs/developers/NewToGluten.md index abaabc699e8d..b3f05a64b3e8 100644 --- a/docs/developers/NewToGluten.md +++ b/docs/developers/NewToGluten.md @@ -120,8 +120,8 @@ To generate a fix for Java/Scala code style, you can run one or more of the belo For Velox backend: ``` -mvn spotless:apply -Pbackends-velox -Prss -Pspark-3.2 -Pspark-ut -DskipTests -mvn spotless:apply -Pbackends-velox -Prss -Pspark-3.3 -Pspark-ut -DskipTests +mvn spotless:apply -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.2 -Pspark-ut -DskipTests +mvn spotless:apply -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.3 -Pspark-ut -DskipTests ``` For Clickhouse backend: ``` diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 81693a13471e..426b56c6b4cb 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -101,9 +101,9 @@ cd /path/to/gluten/cpp ## compile Gluten java module and create package jar cd /path/to/gluten # For spark3.2.x -mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.2 -DskipTests # For spark3.3.x -mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests +mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-3.3 -DskipTests ``` notes:The compilation of `Velox` using the script of `build_velox.sh` may fail caused by `oom`, you can prevent this failure by using the user command of `export NUM_THREADS=4` before executing the above scripts. @@ -214,7 +214,7 @@ First refer to this URL(https://github.com/apache/celeborn) to setup a celeborn When compiling the Gluten Java module, it's required to enable `rss` profile, as follows: ``` -mvn clean package -Pbackends-velox -Pspark-3.3 -Prss -DskipTests +mvn clean package -Pbackends-velox -Pspark-3.3 -Pceleborn -DskipTests ``` Then add the Gluten and Spark Celeborn Client packages to your Spark application's classpath(usually add them into `$SPARK_HOME/jars`). diff --git a/ep/build-clickhouse/src/package.sh b/ep/build-clickhouse/src/package.sh index a1cf9733ce36..3741e9ae0411 100755 --- a/ep/build-clickhouse/src/package.sh +++ b/ep/build-clickhouse/src/package.sh @@ -79,7 +79,7 @@ cp "${GLUTEN_SOURCE}"/LICENSE "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" cp "${GLUTEN_SOURCE}"/README.md "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" # build gluten with spark32 -mvn clean install -Pbackends-clickhouse -Pspark-3.2 -Prss -DskipTests -Dcheckstyle.skip +mvn clean install -Pbackends-clickhouse -Pspark-3.2 -Pceleborn -Puniffle -DskipTests -Dcheckstyle.skip cp "${GLUTEN_SOURCE}"/backends-clickhouse/target/gluten-*-spark-3.2-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark32/gluten.jar cp "${GLUTEN_SOURCE}"/gluten-celeborn/clickhouse/target/gluten-celeborn-clickhouse-${PROJECT_VERSION}-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark32 delta_version_32=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.version}' -Pspark-3.2 --non-recursive exec:exec) @@ -87,7 +87,7 @@ wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/${delta_version_32} wget https://repo1.maven.org/maven2/io/delta/delta-storage/${delta_version_32}/delta-storage-${delta_version_32}.jar -P "${PACKAGE_DIR_PATH}"/jars/spark32 # build gluten with spark33 -mvn clean install -Pbackends-clickhouse -Pspark-3.3 -Prss -DskipTests -Dcheckstyle.skip +mvn clean install -Pbackends-clickhouse -Pspark-3.3 -Pceleborn -Puniffle -DskipTests -Dcheckstyle.skip cp "${GLUTEN_SOURCE}"/backends-clickhouse/target/gluten-*-spark-3.3-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark33/gluten.jar cp "${GLUTEN_SOURCE}"/gluten-celeborn/clickhouse/target/gluten-celeborn-clickhouse-${PROJECT_VERSION}-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark33 delta_version_33=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.version}' -Pspark-3.3 --non-recursive exec:exec) diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java index 9ae62f8b8a23..8e0f4f085af5 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java @@ -42,7 +42,7 @@ private boolean isDriver() { public UniffleShuffleManager(SparkConf conf, boolean isDriver) { super(conf, isDriver); - conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED, "false"); + conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false"); } @Override diff --git a/package/pom.xml b/package/pom.xml index 913621ee1bb0..52fac53e580d 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -71,7 +71,7 @@ - rss + celeborn org.apache.gluten @@ -81,7 +81,7 @@ - rss-uniffle + uniffle org.apache.gluten diff --git a/pom.xml b/pom.xml index 0707d348d78a..6ba36121d1c8 100644 --- a/pom.xml +++ b/pom.xml @@ -235,7 +235,7 @@ - rss + celeborn false @@ -244,7 +244,7 @@ - rss-uniffle + uniffle false diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index cfaf54cdda5d..60ff95a7ebb4 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -124,7 +124,7 @@ class GlutenConfig(conf: SQLConf) extends Logging { def isUseUniffleShuffleManager: Boolean = conf .getConfString("spark.shuffle.manager", "sort") - .contains("RssShuffleManager") + .contains("UniffleShuffleManager") def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED) diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml index 3beef2df7c1d..1f86ee723240 100644 --- a/tools/gluten-it/package/pom.xml +++ b/tools/gluten-it/package/pom.xml @@ -95,7 +95,7 @@ - rss + celeborn false @@ -119,7 +119,7 @@ - rss-uniffle + uniffle false diff --git a/tools/gluten-te/centos/dockerfile-build b/tools/gluten-te/centos/dockerfile-build index 02bc8b6a9523..fa496613b168 100644 --- a/tools/gluten-te/centos/dockerfile-build +++ b/tools/gluten-te/centos/dockerfile-build @@ -65,7 +65,8 @@ RUN if [ "$BUILD_BACKEND_TYPE" == "velox" ]; \ --build_type=$GLUTEN_BUILD_TYPE --enable_ep_cache=ON"; \ EXTRA_MAVEN_OPTIONS="-Pspark-3.2 \ -Pbackends-velox \ - -Prss \ + -Pceleborn \ + -Puniffle \ -Piceberg \ -Pdelta \ -DskipTests \ diff --git a/tools/gluten-te/ubuntu/dockerfile-build b/tools/gluten-te/ubuntu/dockerfile-build index 606e14715a57..59736c98694a 100644 --- a/tools/gluten-te/ubuntu/dockerfile-build +++ b/tools/gluten-te/ubuntu/dockerfile-build @@ -65,7 +65,8 @@ RUN if [ "$BUILD_BACKEND_TYPE" == "velox" ]; \ --build_type=$GLUTEN_BUILD_TYPE --enable_ep_cache=ON"; \ EXTRA_MAVEN_OPTIONS="-Pspark-3.2 \ -Pbackends-velox \ - -Prss \ + -Pceleborn \ + -Puniffle \ -Piceberg \ -Pdelta \ -DskipTests \ diff --git a/tools/gluten-te/ubuntu/examples/buildhere-veloxbe-dev/scripts/all.sh b/tools/gluten-te/ubuntu/examples/buildhere-veloxbe-dev/scripts/all.sh index 0e1634b3f360..f795b5ba9b9b 100755 --- a/tools/gluten-te/ubuntu/examples/buildhere-veloxbe-dev/scripts/all.sh +++ b/tools/gluten-te/ubuntu/examples/buildhere-veloxbe-dev/scripts/all.sh @@ -19,7 +19,8 @@ set -ex # Build Gluten EXTRA_MAVEN_OPTIONS="-Pspark-3.2 \ -Pbackends-velox \ - -Prss \ + -Pceleborn \ + -Puniffle \ -DskipTests \ -Dscalastyle.skip=true \ -Dcheckstyle.skip=true" diff --git a/tools/gluten-te/ubuntu/examples/buildhere-veloxbe/run.sh b/tools/gluten-te/ubuntu/examples/buildhere-veloxbe/run.sh index ec4a1d628b07..a7ba6c5261f1 100755 --- a/tools/gluten-te/ubuntu/examples/buildhere-veloxbe/run.sh +++ b/tools/gluten-te/ubuntu/examples/buildhere-veloxbe/run.sh @@ -20,7 +20,8 @@ BASEDIR=$(dirname $0) EXTRA_MAVEN_OPTIONS="-Pspark-3.2 \ -Pbackends-velox \ - -Prss \ + -Pceleborn \ + -Puniffle \ -DskipTests \ -Dscalastyle.skip=true \ -Dcheckstyle.skip=true"