Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Add uniffle integration #3767

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ on:
- '.github/workflows/velox_docker.yml'
- 'pom.xml'
- 'backends-velox/**'
- 'gluten-uniffle/**'
- 'gluten-celeborn/common/**'
- 'gluten-celeborn/package/**'
- 'gluten-celeborn/velox/**'
Expand Down Expand Up @@ -326,6 +327,66 @@ jobs:
--local --preset=velox --benchmark-type=ds --error-on-memleak -s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen --random-kill-tasks

run-tpc-test-centos8-uniffle:
needs: build-native-lib
strategy:
fail-fast: false
matrix:
spark: ["spark-3.2"]
runs-on: ubuntu-20.04
container: centos:8
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup java and maven
run: |
yum update -y && yum install -y java-1.8.0-openjdk-devel wget git
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export export -> export

cd /opt && \
git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \
cd incubator-uniffle && \
sed -i '250d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
sed -i '228d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
mvn clean install -Phadoop2.8 -DskipTests
cd /opt && \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \
tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \
tar xzf hadoop-2.8.5.tar.gz -C /opt/
rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to remove shuffle-server-0.8.0.jar instead of shuffle-server-0.8.0-SNAPSHOT.jar, was this a typo or something?

cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/
rm -rf /opt/incubator-uniffle
cd /opt/uniffle && mkdir shuffle_data && \
bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1

run-tpc-test-ubuntu-2204-celeborn:
needs: build-native-lib
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer]
} else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
throw new UnsupportedOperationException("temporarily uniffle not support ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ object BackendSettings extends BackendSettingsApi {

override def supportColumnarShuffleExec(): Boolean = {
GlutenConfig.getConf.isUseColumnarShuffleManager ||
GlutenConfig.getConf.isUseCelebornShuffleManager
GlutenConfig.getConf.isUseCelebornShuffleManager ||
GlutenConfig.getConf.isUseUniffleShuffleManager
}

override def enableJoinKeysRewrite(): Boolean = false
Expand Down
17 changes: 17 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
std::move(celebornClient));
} else if (partitionWriterType == "uniffle") {
jclass unifflePartitionPusherClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/spark/shuffle/writer/PartitionPusher;");
jmethodID unifflePushPartitionDataMethod =
getMethodIdOrError(env, unifflePartitionPusherClass, "pushPartitionData", "(I[BI)I");
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
// rename CelebornClient RssClient
std::shared_ptr<CelebornClient> uniffleClient =
std::make_shared<CelebornClient>(vm, partitionPusher, unifflePushPartitionDataMethod);
summaryzb marked this conversation as resolved.
Show resolved Hide resolved
partitionWriter = std::make_unique<CelebornPartitionWriter>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the above comment, better to use a common name if they indeed can be shared by both celeborn & uniffle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a common name relate to all the logic of rss in jni layer, but not limit to this PartitionWriter, maybe it's better to resolve this in a separate pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@summaryzb, ok to me. Please create an issue to track this. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow this

numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
std::move(uniffleClient));
} else {
throw gluten::GlutenException("Unrecognizable partition writer type: " + partitionWriterType);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;

enum PartitionWriterType { kLocal, kCeleborn };
enum PartitionWriterType { kLocal, kCeleborn, kUniffle };

struct ShuffleReaderOptions {
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
Expand Down
29 changes: 29 additions & 0 deletions gluten-uniffle/package/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gluten-uniffle</artifactId>
<groupId>org.apache.gluten</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gluten-uniffle-package</artifactId>
<packaging>jar</packaging>
<name>Gluten Uniffle Package</name>

<profiles>
<profile>
<id>backends-velox</id>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-uniffle-velox</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
89 changes: 89 additions & 0 deletions gluten-uniffle/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gluten-parent</artifactId>
<groupId>org.apache.gluten</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gluten-uniffle</artifactId>
<packaging>pom</packaging>
<name>Gluten Uniffle</name>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark${spark.major.version}-shaded</artifactId>
<version>${uniffle.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>backends-velox</id>
<properties>
</properties>
<modules>
<module>velox</module>
<module>package</module>
</modules>
</profile>
</profiles>
</project>
62 changes: 62 additions & 0 deletions gluten-uniffle/velox/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gluten-uniffle</artifactId>
<groupId>org.apache.gluten</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gluten-uniffle-velox</artifactId>
<packaging>jar</packaging>
<name>Gluten Uniffle Velox</name>

<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-data</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.gluten.uniffle;

import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ColumnarShuffleDependency;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.ShuffleHandle;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UniffleShuffleManager extends RssShuffleManager {
private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class);

private boolean isDriver() {
return "driver".equals(SparkEnv.get().executorId());
}

public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED, "false");
}

@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
return super.registerShuffle(shuffleId, dependency);
}

@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
if (!(handle instanceof RssShuffleHandle)) {
throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
RssShuffleHandle<K, V, V> rssHandle = (RssShuffleHandle<K, V, V>) handle;
if (rssHandle.getDependency() instanceof ColumnarShuffleDependency) {
setPusherAppId(rssHandle);
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleWriteMetrics writeMetrics;
if (metrics != null) {
writeMetrics = new WriteMetrics(metrics);
} else {
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
return new VeloxUniffleColumnarShuffleWriter<>(
context.partitionId(),
rssHandle.getAppId(),
rssHandle.getShuffleId(),
taskId,
context.taskAttemptId(),
writeMetrics,
this,
sparkConf,
shuffleWriteClient,
rssHandle,
this::markFailedTask,
context);
} else {
return super.getWriter(handle, mapId, context, metrics);
}
}
}
Loading
Loading