From a06b106b25639b9ed972174afe4c9dbf22fa716c Mon Sep 17 00:00:00 2001 From: jiafu zhang Date: Mon, 20 Dec 2021 06:16:43 +0000 Subject: [PATCH] [REMOTE-SHUFFLE-52] Update shuffle write to not encode/decode parameters Signed-off-by: jiafu zhang --- pom.xml | 9 +- .../shuffle/daos/DaosParallelReaderAsync.java | 3 +- .../spark/shuffle/daos/DaosReaderAsync.java | 2 +- .../apache/spark/shuffle/daos/DaosWriter.java | 45 ++++-- .../spark/shuffle/daos/DaosWriterAsync.java | 118 +++++++++++++--- .../apache/spark/shuffle/daos/package.scala | 63 +++++---- .../shuffle/daos/DaosWriterAsyncTest.java | 128 ++++++++++++++++++ shuffle-hadoop/pom.xml | 5 - 8 files changed, 308 insertions(+), 65 deletions(-) create mode 100644 shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosWriterAsyncTest.java diff --git a/pom.xml b/pom.xml index a57a61d1..31862918 100644 --- a/pom.xml +++ b/pom.xml @@ -11,8 +11,8 @@ pom - 2.12.10 - 2.12 + 2.12.10 + 2.12 1.8 ${java.version} ${java.version} @@ -214,8 +214,9 @@ org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} + provided junit @@ -231,7 +232,7 @@ org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test-jar diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosParallelReaderAsync.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosParallelReaderAsync.java index ea5b023a..d7aecae6 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosParallelReaderAsync.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosParallelReaderAsync.java @@ -114,8 +114,7 @@ protected ByteBuf readFromDaos() throws IOException { } private void releaseDescSet() { - descSet.forEach(desc -> desc.release()); - descSet.clear(); + descSet.forEach(desc -> desc.discard()); } @Override diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReaderAsync.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReaderAsync.java index f2acfc84..6403df69 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReaderAsync.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReaderAsync.java @@ -204,7 +204,7 @@ public void close(boolean force) { e = new IllegalStateException(sb.toString()); } readyList.forEach(desc -> desc.release()); - runningDescSet.forEach(desc -> desc.release()); + runningDescSet.forEach(desc -> desc.discard()); // to be released when poll if (currentDesc != null) { currentDesc.release(); currentDesc = null; diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriter.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriter.java index 7396a394..c2449e08 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriter.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriter.java @@ -26,6 +26,7 @@ import io.daos.BufferAllocator; import io.daos.obj.DaosObject; import io.daos.obj.IODataDescSync; +import io.daos.obj.IODescUpdAsync; import io.daos.obj.IOSimpleDDAsync; import io.netty.buffer.ByteBuf; import org.apache.spark.SparkConf; @@ -141,6 +142,17 @@ public interface DaosWriter { */ List getSpillInfo(int partitionId); + interface ObjectCache { + + T get(); + + T newObject(); + + void put(T object); + + boolean isFull(); + } + /** * Write parameters, including mapId, shuffleId, number of partitions and write config. */ @@ -345,16 +357,17 @@ public List createUpdateDescs(boolean fullBufferOnly) throws IOE } /** - * create list of {@link IOSimpleDDAsync} each of them has only one akey entry. + * create list of {@link IODescUpdAsync} each of them has only one akey entry. * DAOS has a constraint that same akey cannot be referenced twice in one IO. * * @param eqHandle - * @return list of {@link IOSimpleDDAsync} + * @return list of {@link IODescUpdAsync} * @throws IOException */ - public List createUpdateDescAsyncs(long eqHandle) throws IOException { + public List createUpdateDescAsyncs(long eqHandle, ObjectCache cache) + throws IOException { // make sure each spilled data don't span multiple mapId_s. - return createUpdateDescAsyncs(eqHandle, true); + return createUpdateDescAsyncs(eqHandle, cache, true); } /** @@ -367,21 +380,31 @@ public List createUpdateDescAsyncs(long eqHandle) throws IOExce * @return list of {@link IOSimpleDDAsync} * @throws IOException */ - public List createUpdateDescAsyncs(long eqHandle, boolean fullBufferOnly) throws IOException { + public List createUpdateDescAsyncs(long eqHandle, ObjectCache cache, + boolean fullBufferOnly) throws IOException { int nbrOfBuf = bufList.size(); if ((nbrOfBuf == 0) | (fullBufferOnly & (nbrOfBuf <= 1))) { return Collections.emptyList(); } nbrOfBuf -= fullBufferOnly ? 1 : 0; - List descList = new ArrayList<>(nbrOfBuf); + List descList = new ArrayList<>(nbrOfBuf); String cmapId = currentMapId(); long bufSize = 0; long offset = needSpill ? 0 : totalSize; for (int i = 0; i < nbrOfBuf; i++) { - IOSimpleDDAsync desc = object.createAsyncDataDescForUpdate(partitionIdKey, eqHandle); + IODescUpdAsync desc; ByteBuf buf = bufList.get(i); - desc.addEntryForUpdate(cmapId, offset + bufSize, buf); + if (!cache.isFull()) { + desc = cache.get(); + desc.reuse(); + desc.setDkey(partitionIdKey); + desc.setAkey(cmapId); + desc.setOffset(offset + bufSize); + desc.setDataBuffer(buf); + } else { + desc = new IODescUpdAsync(partitionIdKey, cmapId, offset + bufSize, buf); + } bufSize += buf.readableBytes(); descList.add(desc); } @@ -485,6 +508,7 @@ class WriterConfig { private int totalSubmittedLimit; private int threads; private boolean fromOtherThreads; + private int ioDescCaches; private SparkConf conf; private static final Logger logger = LoggerFactory.getLogger(WriterConfig.class); @@ -504,6 +528,7 @@ class WriterConfig { totalInMemSize = (long)conf.get(package$.MODULE$.SHUFFLE_DAOS_WRITE_MAX_BYTES_IN_FLIGHT()) * 1024; totalSubmittedLimit = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_WRITE_SUBMITTED_LIMIT()); threads = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_WRITE_THREADS()); + ioDescCaches = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_WRITE_ASYNC_DESC_CACHES()); fromOtherThreads = (boolean)conf .get(package$.MODULE$.SHUFFLE_DAOS_WRITE_IN_OTHER_THREAD()); if (logger.isDebugEnabled()) { @@ -544,6 +569,10 @@ public int getThreads() { return threads; } + public int getIoDescCaches() { + return ioDescCaches; + } + public boolean isFromOtherThreads() { return fromOtherThreads; } diff --git a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java index 3d596ae5..ae9d4c89 100644 --- a/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java +++ b/shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosWriterAsync.java @@ -26,11 +26,12 @@ import io.daos.DaosEventQueue; import io.daos.TimedOutException; import io.daos.obj.DaosObject; -import io.daos.obj.IOSimpleDDAsync; +import io.daos.obj.IODescUpdAsync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -41,15 +42,18 @@ public class DaosWriterAsync extends DaosWriterBase { private DaosEventQueue eq; - private Set descSet = new LinkedHashSet<>(); + private Set descSet = new LinkedHashSet<>(); private List completedList = new LinkedList<>(); + private AsyncDescCache cache; + private static Logger log = LoggerFactory.getLogger(DaosWriterAsync.class); public DaosWriterAsync(DaosObject object, WriteParam param) throws IOException { super(object, param); eq = DaosEventQueue.getInstance(0); + cache = new AsyncDescCache(param.getConfig().getIoDescCaches()); } @Override @@ -58,7 +62,7 @@ public void flush(int partitionId) throws IOException { if (buffer == null) { return; } - List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl()); + List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl(), cache); flush(buffer, descList); } @@ -68,24 +72,33 @@ public void flushAll(int partitionId) throws IOException { if (buffer == null) { return; } - List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl(), false); + List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl(), cache, false); flush(buffer, descList); } - private void flush(NativeBuffer buffer, List descList) throws IOException { + private void cacheOrRelease(IODescUpdAsync desc) { + if (desc.isReusable()) { + cache.put(desc); + } else { + desc.release(); + } + } + + private void flush(NativeBuffer buffer, List descList) throws IOException { if (!descList.isEmpty()) { assert Thread.currentThread().getId() == eq.getThreadId() : "current thread " + Thread.currentThread().getId() + "(" + Thread.currentThread().getName() + "), is not expected " + eq.getThreadId() + "(" + eq.getThreadName() + ")"; - for (IOSimpleDDAsync desc : descList) { + for (IODescUpdAsync desc : descList) { DaosEventQueue.Event event = acquireEvent(); descSet.add(desc); desc.setEvent(event); try { object.updateAsync(desc); } catch (Exception e) { - desc.release(); + cacheOrRelease(desc); + desc.discard(); descSet.remove(desc); throw e; } @@ -104,7 +117,7 @@ public void flushAll() throws IOException { if (buffer == null) { continue; } - List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl(), false); + List descList = buffer.createUpdateDescAsyncs(eq.getEqWrapperHdl(), cache, false); flush(buffer, descList); } waitCompletion(); @@ -116,9 +129,9 @@ protected void waitCompletion() throws IOException { try { long dur; long start = System.currentTimeMillis(); - while ((left=descSet.size()) > 0 & ((dur = System.currentTimeMillis() - start) < config.getWaitTimeMs())) { + while ((left = descSet.size()) > 0 & ((dur = System.currentTimeMillis() - start) < config.getWaitTimeMs())) { completedList.clear(); - eq.pollCompleted(completedList, IOSimpleDDAsync.class, descSet, left, config.getWaitTimeMs() - dur); + eq.pollCompleted(completedList, IODescUpdAsync.class, descSet, left, config.getWaitTimeMs() - dur); verifyCompleted(); } if (!descSet.isEmpty()) { @@ -126,9 +139,6 @@ protected void waitCompletion() throws IOException { } } catch (IOException e) { throw new IllegalStateException("failed to complete all running updates. ", e); - } finally { - descSet.forEach(desc -> desc.release()); - descSet.clear(); } super.flushAll(); } @@ -137,7 +147,7 @@ private DaosEventQueue.Event acquireEvent() throws IOException { completedList.clear(); try { DaosEventQueue.Event event = eq.acquireEventBlocking(config.getWaitTimeMs(), completedList, - IOSimpleDDAsync.class, descSet); + IODescUpdAsync.class, descSet); verifyCompleted(); return event; } catch (IOException e) { @@ -147,11 +157,11 @@ private DaosEventQueue.Event acquireEvent() throws IOException { } private void verifyCompleted() throws IOException { - IOSimpleDDAsync failed = null; + IODescUpdAsync failed = null; int failedCnt = 0; for (DaosEventQueue.Attachment attachment : completedList) { descSet.remove(attachment); - IOSimpleDDAsync desc = (IOSimpleDDAsync) attachment; + IODescUpdAsync desc = (IODescUpdAsync) attachment; if (!desc.isSucceeded()) { failedCnt++; if (failed == null) { @@ -162,12 +172,12 @@ private void verifyCompleted() throws IOException { if (log.isDebugEnabled()) { log.debug("written desc: " + desc); } - desc.release(); + cacheOrRelease(desc); } if (failedCnt > 0) { IOException e = new IOException("failed to write " + failedCnt + " IOSimpleDDAsync. Return code is " + failed.getReturnCode() + ". First failed is " + failed); - failed.release(); + cacheOrRelease(failed); throw e; } } @@ -183,6 +193,14 @@ public void close() { completedList.clear(); completedList = null; } + + if (descSet.isEmpty()) { // all descs polled + cache.release(); + } else { + descSet.forEach(d -> d.discard()); // to be released when poll + cache.release(descSet); + descSet.clear(); + } super.close(); } @@ -190,4 +208,68 @@ public void setWriterMap(Map writerMap) { writerMap.put(this, 0); this.writerMap = writerMap; } + + static class AsyncDescCache implements ObjectCache { + private int idx; + private int total; + private IODescUpdAsync[] array; + + public AsyncDescCache(int maxNbr) { + this.array = new IODescUpdAsync[maxNbr]; + } + + @Override + public IODescUpdAsync get() { + if (idx < total) { + return array[idx++]; + } + if (idx < array.length) { + array[idx] = newObject(); + total++; + return array[idx++]; + } + throw new IllegalStateException("cache is full, " + total); + } + + @Override + public IODescUpdAsync newObject() { + return new IODescUpdAsync(32); + } + + @Override + public void put(IODescUpdAsync desc) { + if (idx <= 0) { + throw new IllegalStateException("more than actual number of IODescUpdAsyncs put back"); + } + if (desc.isDiscarded()) { + desc.release(); + desc = newObject(); + } + array[--idx] = desc; + } + + @Override + public boolean isFull() { + return total == array.length; + } + + public void release() { + release(Collections.emptySet()); + } + + private void release(Set filterSet) { + for (int i = 0; i < Math.min(total, array.length); i++) { + IODescUpdAsync desc = array[i]; + if (desc != null && !filterSet.contains(desc)) { + desc.release(); + } + } + array = null; + idx = 0; + } + + protected int getIdx() { + return idx; + } + } } diff --git a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala index d7a12016..4fca6293 100644 --- a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala +++ b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala @@ -32,27 +32,27 @@ package object daos { val SHUFFLE_DAOS_POOL_UUID = ConfigBuilder("spark.shuffle.daos.pool.uuid") - .version("3.0.0") + .version("3.1.1") .stringConf .createWithDefault(null) val SHUFFLE_DAOS_CONTAINER_UUID = ConfigBuilder("spark.shuffle.daos.container.uuid") - .version("3.0.0") + .version("3.1.1") .stringConf .createWithDefault(null) val SHUFFLE_DAOS_REMOVE_SHUFFLE_DATA = ConfigBuilder("spark.shuffle.remove.shuffle.data") .doc("remove shuffle data from DAOS after shuffle completed. Default is true") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(true) val SHUFFLE_DAOS_WRITE_BUFFER_INITIAL_SIZE = ConfigBuilder("spark.shuffle.daos.write.buffer.initial") .doc("initial size of total in-memory buffer for each map output, in MiB") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.MiB) .checkValue(v => v > 0, s"The initial total buffer size must be bigger than 0.") @@ -62,7 +62,7 @@ package object daos { ConfigBuilder("spark.shuffle.daos.write.buffer.percentage") .doc("percentage of spark.shuffle.daos.buffer. Force write some buffer data out when size is bigger than " + "spark.shuffle.daos.buffer * (this percentage)") - .version("3.0.0") + .version("3.1.1") .doubleConf .checkValue(v => v >= 0.5 && v <= 0.9, s"The percentage must be no less than 0.5 and less than or equal to 0.9") @@ -72,7 +72,7 @@ package object daos { ConfigBuilder("spark.shuffle.daos.write.minimum") .doc("minimum size when write to DAOS, in KiB. A warning will be generated when size is less than this value" + " and spark.shuffle.daos.write.warn.small is set to true") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0, s"The DAOS write minimum size must be positive") @@ -82,14 +82,14 @@ package object daos { ConfigBuilder("spark.shuffle.daos.write.warn.small") .doc("log warning message when the size of written data is smaller than spark.shuffle.daos.write.minimum." + " Default is false") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(false) val SHUFFLE_DAOS_WRITE_SINGLE_BUFFER_SIZE = ConfigBuilder("spark.shuffle.daos.write.buffer.single") .doc("size of single buffer for holding data to be written to DAOS") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.MiB) .checkValue(v => v >= 1, s"The single DAOS write buffer must be at least 1m") @@ -98,7 +98,7 @@ package object daos { val SHUFFLE_DAOS_WRITE_FLUSH_RECORDS = ConfigBuilder("spark.shuffle.daos.write.flush.records") .doc("per how many number of records to flush data in buffer to DAOS") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v >= 100, s"number of records to flush should be more than 100") @@ -107,7 +107,7 @@ package object daos { val SHUFFLE_DAOS_READ_MINIMUM_SIZE = ConfigBuilder("spark.shuffle.daos.read.minimum") .doc("minimum size when read from DAOS, in KiB. ") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0, s"The DAOS read minimum size must be positive") @@ -116,7 +116,7 @@ package object daos { val SHUFFLE_DAOS_READ_MAX_BYTES_IN_FLIGHT = ConfigBuilder("spark.shuffle.daos.read.maxbytes.inflight") .doc("maximum size of requested data when read from DAOS, in KiB. ") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0, s"The DAOS read max bytes in flight must be positive") @@ -125,7 +125,7 @@ package object daos { val SHUFFLE_DAOS_WRITE_MAX_BYTES_IN_FLIGHT = ConfigBuilder("spark.shuffle.daos.write.maxbytes.inflight") .doc("maximum size of requested data when write to DAOS, in KiB. ") - .version("3.0.0") + .version("3.1.1") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0, s"The DAOS write max bytes in flight must be positive") @@ -134,7 +134,7 @@ package object daos { val SHUFFLE_DAOS_IO_ASYNC = ConfigBuilder("spark.shuffle.daos.io.async") .doc("perform shuffle IO asynchronously. Default is true") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(true) @@ -142,7 +142,7 @@ package object daos { ConfigBuilder("spark.shuffle.daos.read.threads") .doc("number of threads for each executor to read shuffle data concurrently. -1 means use number of executor " + "cores. sync IO only.") - .version("3.0.0") + .version("3.1.1") .intConf .createWithDefault(1) @@ -150,14 +150,14 @@ package object daos { ConfigBuilder("spark.shuffle.daos.write.threads") .doc("number of threads for each executor to write shuffle data concurrently. -1 means use number of executor " + "cores. sync IO only.") - .version("3.0.0") + .version("3.1.1") .intConf .createWithDefault(1) val SHUFFLE_DAOS_ASYNC_WRITE_BATCH_SIZE = ConfigBuilder("spark.shuffle.daos.async.write.batch") .doc("number of async write before flush") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"async write batch size must be positive") @@ -166,7 +166,7 @@ package object daos { val SHUFFLE_DAOS_READ_BATCH_SIZE = ConfigBuilder("spark.shuffle.daos.read.batch") .doc("number of read tasks to submit at most at each time. sync IO only.") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"read batch size must be positive") @@ -175,16 +175,25 @@ package object daos { val SHUFFLE_DAOS_WRITE_SUBMITTED_LIMIT = ConfigBuilder("spark.shuffle.daos.write.submitted.limit") .doc("limit of number of write tasks to submit. sync IO only.") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"limit of submitted task must be positive") .createWithDefault(20) + val SHUFFLE_DAOS_WRITE_ASYNC_DESC_CACHES = + ConfigBuilder("spark.shuffle.daos.write.async.desc.caches") + .doc("number of cached I/O description objects for async write.") + .version("3.1.1") + .intConf + .checkValue(v => v >= 0, + s"number of cached I/O description objects must be no less than 0") + .createWithDefault(20) + val SHUFFLE_DAOS_READ_WAIT_MS = ConfigBuilder("spark.shuffle.daos.read.wait.ms") .doc("number of milliseconds to wait data being read before timed out") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"wait data time must be positive") @@ -193,7 +202,7 @@ package object daos { val SHUFFLE_DAOS_WRITE_WAIT_MS = ConfigBuilder("spark.shuffle.daos.write.wait.ms") .doc("number of milliseconds to wait data being written before timed out") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"wait data time must be positive") @@ -203,7 +212,7 @@ package object daos { ConfigBuilder("spark.shuffle.daos.write.wait.timeout.times") .doc("number of wait timeout (spark.shuffle.daos.write.waitdata.ms) after which shuffle write task fails." + "sync IO only.") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, s"wait data timeout times must be positive") @@ -212,21 +221,21 @@ package object daos { val SHUFFLE_DAOS_READ_FROM_OTHER_THREAD = ConfigBuilder("spark.shuffle.daos.read.from.other.threads") .doc("whether read shuffled data from other threads or not. true by default. sync IO only.") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(true) val SHUFFLE_DAOS_WRITE_IN_OTHER_THREAD = ConfigBuilder("spark.shuffle.daos.write.in.other.threads") .doc("whether write shuffled data in other threads or not. true by default. sync IO only.") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(true) val SHUFFLE_DAOS_WRITE_PARTITION_MOVE_INTERVAL = ConfigBuilder("spark.shuffle.daos.write.partition.move.interval") .doc("move partition at every this interval (number of records). 1000 records by default.") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v >= 10, "partition move interval should be at least 10.") .createWithDefault(1000) @@ -236,7 +245,7 @@ package object daos { .doc("check total size of partitions and write some partitions at every this interval (number of records)." + " This value should be no less than spark.shuffle.daos.write.partition.move.interval." + " 10000 records by default.") - .version("3.0.0") + .version("3.1.1") .intConf .checkValue(v => v > 0, "total interval should be bigger than 0.") .createWithDefault(32) @@ -248,7 +257,7 @@ package object daos { "spark.shuffle.daos.spill.grant.pct. The shuffle manager will also spill if there are equal or more than two" + " consecutive lowly granted memory (granted memory < requested memory). When it's false, the shuffle manager " + "will spill once there is lowly granted memory.") - .version("3.0.0") + .version("3.1.1") .booleanConf .createWithDefault(true) @@ -258,7 +267,7 @@ package object daos { " cores\"). It takes effect only if spark.shuffle.daos.spill.first is true. When granted memory from" + " TaskMemoryManager is less than task heap memory * this percentage, spill data to DAOS. Default is 0.1. It " + "should be less than 0.5.") - .version("3.0.0") + .version("3.1.1") .doubleConf .checkValue(v => v > 0 & v < 0.5, "spill grant percentage should be greater than 0 and no more" + " than 0.5 .") diff --git a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosWriterAsyncTest.java b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosWriterAsyncTest.java new file mode 100644 index 00000000..6c704696 --- /dev/null +++ b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosWriterAsyncTest.java @@ -0,0 +1,128 @@ +package org.apache.spark.shuffle.daos; + +import io.daos.obj.IODescUpdAsync; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +public class DaosWriterAsyncTest { + + @Test + public void testDescCacheNotFull() { + DaosWriterAsync.AsyncDescCache cache = new DaosWriterAsync.AsyncDescCache(10) { + public IODescUpdAsync newObject() { + IODescUpdAsync desc = Mockito.mock(IODescUpdAsync.class); + Mockito.when(desc.isReusable()).thenReturn(true); + return desc; + } + }; + List list = new ArrayList<>(); + try { + for (int i = 0; i < 5; i++) { + IODescUpdAsync desc = cache.get(); + Assert.assertTrue(desc.isReusable()); + Assert.assertEquals(i + 1, cache.getIdx()); + list.add(desc); + } + // test reuse + IODescUpdAsync desc = list.remove(0); + cache.put(desc); + Assert.assertEquals(4, cache.getIdx()); + Assert.assertEquals(desc, cache.get()); + cache.put(desc); + for (IODescUpdAsync d : list) { + cache.put(d); + } + Assert.assertEquals(0, cache.getIdx()); + } finally { + cache.release(); + list.forEach(d -> d.release()); + } + } + + @Test + public void testDescCacheFull() { + DaosWriterAsync.AsyncDescCache cache = new DaosWriterAsync.AsyncDescCache(10) { + public IODescUpdAsync newObject() { + IODescUpdAsync desc = Mockito.mock(IODescUpdAsync.class); + Mockito.when(desc.isReusable()).thenReturn(true); + return desc; + } + }; + List list = new ArrayList<>(); + Exception ee = null; + try { + for (int i = 0; i < 11; i++) { + IODescUpdAsync desc = cache.get(); + Assert.assertTrue(desc.isReusable()); + Assert.assertEquals(i + 1, cache.getIdx()); + list.add(desc); + } + } catch (IllegalStateException e) { + ee = e; + } + Assert.assertTrue(ee instanceof IllegalStateException); + Assert.assertTrue(ee.getMessage().contains("cache is full")); + Assert.assertTrue(cache.isFull()); + + try { + // test reuse + IODescUpdAsync desc = list.remove(0); + cache.put(desc); + Assert.assertEquals(9, cache.getIdx()); + Assert.assertEquals(desc, cache.get()); + cache.put(desc); + for (IODescUpdAsync d : list) { + cache.put(d); + } + Assert.assertEquals(0, cache.getIdx()); + desc = cache.get(); + Assert.assertEquals(1, cache.getIdx()); + cache.put(desc); + Assert.assertEquals(0, cache.getIdx()); + } finally { + cache.release(); + list.forEach(d -> d.release()); + } + } + + @Test + public void testDescCachePut() { + DaosWriterAsync.AsyncDescCache cache = new DaosWriterAsync.AsyncDescCache(10) { + public IODescUpdAsync newObject() { + IODescUpdAsync desc = Mockito.mock(IODescUpdAsync.class); + Mockito.when(desc.isReusable()).thenReturn(true); + return desc; + } + }; + List list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + IODescUpdAsync desc = cache.get(); + Assert.assertTrue(desc.isReusable()); + Assert.assertEquals(i + 1, cache.getIdx()); + list.add(desc); + } + Assert.assertTrue(cache.isFull()); + + IODescUpdAsync desc = null; + while (!list.isEmpty()) { + desc = list.remove(0); + cache.put(desc); + } + Exception ee = null; + try { + cache.put(desc); + } catch (Exception e) { + ee = e; + } finally { + cache.release(); + list.forEach(d -> d.release()); + } + Assert.assertTrue(ee instanceof IllegalStateException); + Assert.assertTrue(ee.getMessage().contains("more than actual")); + } +} diff --git a/shuffle-hadoop/pom.xml b/shuffle-hadoop/pom.xml index c96e19b1..a402f4d3 100644 --- a/shuffle-hadoop/pom.xml +++ b/shuffle-hadoop/pom.xml @@ -46,11 +46,6 @@ ${spark.version} test - - org.apache.hadoop - hadoop-client - 2.7.4 - org.scalatest scalatest_${scala.binary.version}