Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

[REMOTE-SHUFFLE-18] upgrade to Spark-3.1.1 #19

Merged
merged 8 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ following configurations in spark-defaults.conf or Spark submit command line arg
Note: For DAOS users, DAOS Hadoop/Java API jars should also be included in the classpath as we leverage DAOS Hadoop filesystem.

```
spark.executor.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/remote-shuffle-<version>.jar
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/remote-shuffle-<version>.jar
spark.executor.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/shuffle-hadoop-<version>.jar
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/shuffle-hadoop-<version>.jar
```

Enable the remote shuffle manager and specify the Hadoop storage system URI holding shuffle data.
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<name>OAP Remote Shuffle Parent POM</name>
<packaging>pom</packaging>

Expand All @@ -16,7 +16,7 @@
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<spark.version>3.0.0</spark.version>
<spark.version>3.1.1</spark.version>
</properties>

<build>
Expand Down Expand Up @@ -225,8 +225,8 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.0.8</version>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
10 changes: 8 additions & 2 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
</parent>
<artifactId>shuffle-daos</artifactId>
<name>OAP Remote Shuffle Based on DAOS Object API</name>
Expand Down Expand Up @@ -205,7 +205,7 @@
<dependency>
<groupId>io.daos</groupId>
<artifactId>daos-java</artifactId>
<version>1.2.2</version>
<version>1.2.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -241,4 +241,10 @@
</dependency>
</dependencies>

<repositories>
<repository>
<id>maven-snapshots</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,6 @@ class DaosShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
}

override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): DaosShuffleReader[K, C]
= {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
new DaosShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context,
metrics, daosShuffleIO, SparkEnv.get.serializerManager,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}

override def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
Expand All @@ -157,7 +143,7 @@ class DaosShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
context: TaskContext,
metrics: ShuffleReadMetricsReporter): DaosShuffleReader[K, C]
= {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange(
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new DaosShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context,
metrics, daosShuffleIO, SparkEnv.get.serializerManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ package object daos {
.intConf
.checkValue(v => v > 0,
s"async write batch size must be positive")
.createWithDefault(1)
.createWithDefault(30)

val SHUFFLE_DAOS_READ_BATCH_SIZE =
ConfigBuilder("spark.shuffle.daos.read.batch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,19 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
public void testTwoEntries() throws Exception {
LinkedHashMap<Tuple2<Long, Integer>, Tuple3<Long, BlockId, BlockManagerId>> partSizeMap;
partSizeMap = new LinkedHashMap<>();
long mapIds[] = new long[] {12345, 12346};
long[] mapIds = new long[] {12345, 12346};
int reduceId = 6789;
long lens[] = new long[] {2 * 1024 * 1024, 1023};
long[] lens = new long[] {2 * 1024 * 1024, 1023};
int shuffleId = 1000;
long eqHandle = 1111L;
IOSimpleDDAsync descs[] = new IOSimpleDDAsync[] {Mockito.mock(IOSimpleDDAsync.class),
IOSimpleDDAsync[] descs = new IOSimpleDDAsync[] {Mockito.mock(IOSimpleDDAsync.class),
Mockito.mock(IOSimpleDDAsync.class)};
IOSimpleDDAsync.AsyncEntry entries[] = new IOSimpleDDAsync.AsyncEntry[] {
IOSimpleDDAsync.AsyncEntry[] entries = new IOSimpleDDAsync.AsyncEntry[] {
Mockito.mock(IOSimpleDDAsync.AsyncEntry.class),
Mockito.mock(IOSimpleDDAsync.AsyncEntry.class)
};
ByteBuf bufs[] = new ByteBuf[] {Mockito.mock(ByteBuf.class), Mockito.mock(ByteBuf.class)};
boolean readAlready[] = new boolean[] {false, false};
ByteBuf[] bufs = new ByteBuf[] {Mockito.mock(ByteBuf.class), Mockito.mock(ByteBuf.class)};
boolean[] readAlready = new boolean[] {false, false};
for (int i = 0; i < 2; i++) {
Mockito.when(entries[i].getFetchedData()).thenReturn(bufs[i]);
Mockito.when(entries[i].getKey()).thenReturn(String.valueOf(mapIds[i]));
Expand All @@ -162,7 +162,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}
Mockito.when(eq.getEqWrapperHdl()).thenReturn(eqHandle);

int times[] = new int[] {0};
int[] times = new int[] {0};
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Expand All @@ -176,7 +176,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}).when(eq).pollCompleted(Mockito.any(), Mockito.anyInt(), Mockito.anyLong());

int times2[] = new int[] {0};
int[] times2 = new int[] {0};
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.DoesNothing;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DaosShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
val mapOutputTracker = mock(classOf[MapOutputTracker])
val localBlockManagerId = BlockManagerId("test-client", "test-client", 1)
when(mapOutputTracker.getMapSizesByExecutorId(
shuffleId, reduceId, reduceId + 1)).thenReturn {
shuffleId, reduceId)).thenReturn {
// Test a scenario where all data is local, to avoid creating a bunch of additional mocks
// for the code to read data over the network.
val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId =>
Expand Down Expand Up @@ -172,7 +172,7 @@ class DaosShuffleReaderSuite extends SparkFunSuite with LocalSparkContext {
val taskContext = TaskContext.empty()
val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId(
shuffleId, reduceId, reduceId + 1)
shuffleId, reduceId)

val (daosReader, shuffleIO, daosObject) =
if (singleCall) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package org.apache.spark.shuffle.daos
import org.mockito.{Mock, Mockito, MockitoAnnotations}
import org.mockito.Answers._
import org.mockito.Mockito.{mock, when}
import org.scalatest.Matchers
import scala.collection.mutable
import scala.util.Random

Expand All @@ -36,7 +35,7 @@ import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.spark.util.Utils

class DaosShuffleWriterPerf extends SparkFunSuite with SharedSparkContext with Matchers {
class DaosShuffleWriterPerf extends SparkFunSuite with SharedSparkContext {

@Mock(answer = RETURNS_SMART_NULLS)
private var shuffleIO: DaosShuffleIO = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.mockito.{Mock, Mockito, MockitoAnnotations}
import org.mockito.Answers._
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, never, when}
import org.scalatest.Matchers
import scala.collection.mutable

import org.apache.spark.{Partitioner, SharedSparkContext, ShuffleDependency, SparkFunSuite}
Expand All @@ -36,7 +35,7 @@ import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.spark.util.Utils

class DaosShuffleWriterSuite extends SparkFunSuite with SharedSparkContext with Matchers {
class DaosShuffleWriterSuite extends SparkFunSuite with SharedSparkContext {

@Mock(answer = RETURNS_SMART_NULLS)
private var shuffleIO: DaosShuffleIO = _
Expand Down
2 changes: 1 addition & 1 deletion shuffle-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.intel.oap</groupId>
<artifactId>remote-shuffle-parent</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
</parent>

<artifactId>shuffle-hadoop</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.util.{HashMap => JHashMap, Map => JMap}
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.reflect.ClassTag

import com.codahale.metrics.{Metric, MetricSet}

import org.apache.spark.internal.Logging
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
import org.apache.spark.network.buffer.ManagedBuffer
Expand All @@ -46,7 +45,7 @@ private[spark] class RemoteShuffleTransferService(
bindAddress: String,
override val hostName: String,
_port: Int,
numCores: Int) extends BlockTransferService {
numCores: Int) extends BlockTransferService with Logging {

// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
Expand Down
Loading