diff --git a/common/src/main/java/org/apache/uniffle/common/RemoteStorageInfo.java b/common/src/main/java/org/apache/uniffle/common/RemoteStorageInfo.java index 113602b1f5..306d729e84 100644 --- a/common/src/main/java/org/apache/uniffle/common/RemoteStorageInfo.java +++ b/common/src/main/java/org/apache/uniffle/common/RemoteStorageInfo.java @@ -55,7 +55,7 @@ public RemoteStorageInfo(String path, String confString) { if (!ArrayUtils.isEmpty(items)) { for (String item : items) { String[] kv = item.split(Constants.EQUAL_SPLIT_CHAR); - if (kv != null && kv.length == 2) { + if (kv.length == 2) { this.confItems.put(kv[0], kv[1]); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java new file mode 100644 index 0000000000..d76ceea336 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.uniffle.common.exception.RssException; + +import static org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; + +/** + * This is a simple implementation class, which provides some methods to check whether the path is normal + */ +public abstract class AbstractSelectStorageStrategy implements SelectStorageStrategy { + /** + * store remote path -> application count for assignment strategy + */ + protected final Map remoteStoragePathRankValue; + protected final int fileSize; + + public AbstractSelectStorageStrategy( + Map remoteStoragePathRankValue, + CoordinatorConf conf) { + this.remoteStoragePathRankValue = remoteStoragePathRankValue; + fileSize = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE); + } + + public void readAndWriteHdfsStorage(FileSystem fs, Path testPath, + String uri, RankValue rankValue) throws IOException { + byte[] data = RandomUtils.nextBytes(fileSize); + try (FSDataOutputStream fos = fs.create(testPath)) { + fos.write(data); + fos.flush(); + } + byte[] readData = new byte[fileSize]; + int readBytes; + try (FSDataInputStream fis = fs.open(testPath)) { + int hasReadBytes = 0; + do { + readBytes = fis.read(readData); + if (hasReadBytes < fileSize) { + for (int i = 0; i < readBytes; i++) { + if (data[hasReadBytes + i] != readData[i]) { + remoteStoragePathRankValue.put(uri, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); + throw new RssException("The content of reading and writing is inconsistent."); + } + } + } + hasReadBytes += readBytes; + } while (readBytes != -1); + } + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java index b77c7f937d..764b854fff 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java @@ -21,124 +21,114 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; /** * AppBalanceSelectStorageStrategy will consider the number of apps allocated on each remote path is balanced. */ -public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy { +public class AppBalanceSelectStorageStrategy extends AbstractSelectStorageStrategy { private static final Logger LOG = LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class); - /** - * store appId -> remote path to make sure all shuffle data of the same application - * will be written to the same remote storage - */ - private final Map appIdToRemoteStorageInfo; /** * store remote path -> application count for assignment strategy */ - private final Map remoteStoragePathCounter; + private final Map appIdToRemoteStorageInfo; private final Map availableRemoteStorageInfo; - - public AppBalanceSelectStorageStrategy() { - this.appIdToRemoteStorageInfo = Maps.newConcurrentMap(); - this.remoteStoragePathCounter = Maps.newConcurrentMap(); - this.availableRemoteStorageInfo = Maps.newHashMap(); + private final Configuration hdfsConf; + private List> uris; + + public AppBalanceSelectStorageStrategy( + Map remoteStoragePathRankValue, + Map appIdToRemoteStorageInfo, + Map availableRemoteStorageInfo, + CoordinatorConf conf) { + super(remoteStoragePathRankValue, conf); + this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo; + this.availableRemoteStorageInfo = availableRemoteStorageInfo; + this.hdfsConf = new Configuration(); } - /** - * the strategy of pick remote storage is according to assignment count - */ - @Override - public RemoteStorageInfo pickRemoteStorage(String appId) { - if (appIdToRemoteStorageInfo.containsKey(appId)) { - return appIdToRemoteStorageInfo.get(appId); - } - - // create list for sort - List> sizeList = - Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull) - .sorted(Comparator.comparingInt(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList()); - - for (Map.Entry entry : sizeList) { - String storagePath = entry.getKey(); - if (availableRemoteStorageInfo.containsKey(storagePath)) { - appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath)); - incRemoteStorageCounter(storagePath); - break; - } - } - return appIdToRemoteStorageInfo.get(appId); - } - - @Override @VisibleForTesting - public synchronized void incRemoteStorageCounter(String remoteStoragePath) { - RankValue counter = remoteStoragePathCounter.get(remoteStoragePath); - if (counter != null) { - counter.getAppNum().incrementAndGet(); - } else { - // it may be happened when assignment remote storage - // and refresh remote storage at the same time - LOG.warn("Remote storage path lost during assignment: {} doesn't exist, reset it to 1", - remoteStoragePath); - remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1)); + public void sortPathByRankValue(String path, String test) { + RankValue rankValue = remoteStoragePathRankValue.get(path); + try { + FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), hdfsConf); + fs.delete(new Path(test),true); + if (rankValue.getHealthy().get()) { + rankValue.setCostTime(new AtomicLong(0)); + } + } catch (Exception e) { + rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE)); + LOG.error("Failed to sort, we will not use this remote path {}.", path, e); } + uris = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream() + .filter(Objects::nonNull).collect(Collectors.toList()); } @Override - @VisibleForTesting - public synchronized void decRemoteStorageCounter(String storagePath) { - if (!StringUtils.isEmpty(storagePath)) { - RankValue atomic = remoteStoragePathCounter.get(storagePath); - if (atomic != null) { - double count = atomic.getAppNum().decrementAndGet(); - if (count < 0) { - LOG.warn("Unexpected counter for remote storage: {}, which is {}, reset to 0", - storagePath, count); - atomic.getAppNum().set(0); + public void detectStorage() { + uris = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()); + if (remoteStoragePathRankValue.size() > 1) { + for (Map.Entry uri : uris) { + if (uri.getKey().startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) { + RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey()); + rankValue.setHealthy(new AtomicBoolean(true)); + Path remotePath = new Path(uri.getKey()); + String rssTest = uri.getKey() + "/rssTest"; + Path testPath = new Path(rssTest); + try { + FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, hdfsConf); + readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue); + } catch (Exception e) { + rankValue.setHealthy(new AtomicBoolean(false)); + LOG.error("Storage read and write error, we will not use this remote path {}.", uri, e); + } finally { + sortPathByRankValue(uri.getKey(), rssTest); + } } - } else { - LOG.warn("Can't find counter for remote storage: {}", storagePath); - remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0)); - } - if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0 - && !availableRemoteStorageInfo.containsKey(storagePath)) { - remoteStoragePathCounter.remove(storagePath); } } } + /** + * When choosing the AppBalance strategy, each time you select a path, + * you should know the number of the latest apps in different paths + */ @Override - public synchronized void removePathFromCounter(String storagePath) { - RankValue atomic = remoteStoragePathCounter.get(storagePath); - if (atomic != null && atomic.getAppNum().get() == 0) { - remoteStoragePathCounter.remove(storagePath); + public synchronized RemoteStorageInfo pickStorage(String appId) { + boolean isUnhealthy = + uris.stream().noneMatch(rv -> rv.getValue().getCostTime().get() != Long.MAX_VALUE); + if (!isUnhealthy) { + // If there is only one unhealthy path, then filter that path + uris = uris.stream().filter(rv -> rv.getValue().getCostTime().get() != Long.MAX_VALUE).sorted( + Comparator.comparingInt(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList()); + } else { + // If all paths are unhealthy, assign paths according to the number of apps + uris = uris.stream().sorted(Comparator.comparingInt( + entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList()); } - } - - @Override - public Map getAppIdToRemoteStorageInfo() { - return appIdToRemoteStorageInfo; - } - - @Override - public Map getRemoteStoragePathRankValue() { - return remoteStoragePathCounter; - } - - @Override - public Map getAvailableRemoteStorageInfo() { - return availableRemoteStorageInfo; + LOG.info("The sorted remote path list is: {}", uris); + for (Map.Entry entry : uris) { + String storagePath = entry.getKey(); + if (availableRemoteStorageInfo.containsKey(storagePath)) { + return appIdToRemoteStorageInfo.computeIfAbsent(appId, x -> availableRemoteStorageInfo.get(storagePath)); + } + } + LOG.warn("No remote storage is available, we will default to the first."); + return availableRemoteStorageInfo.values().iterator().next(); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java index 6fcd6b56f9..0b79b5067b 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java @@ -19,6 +19,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,39 +43,48 @@ public class ApplicationManager { private static final Logger LOG = LoggerFactory.getLogger(ApplicationManager.class); - private long expired; - private StrategyName storageStrategy; - private Map appIds = Maps.newConcurrentMap(); - private SelectStorageStrategy selectStorageStrategy; + // TODO: Add anomaly detection for other storage + public static final List REMOTE_PATH_SCHEMA = Arrays.asList("hdfs"); + private final long expired; + private final StrategyName storageStrategy; + private final Map appIds = Maps.newConcurrentMap(); + private final SelectStorageStrategy selectStorageStrategy; // store appId -> remote path to make sure all shuffle data of the same application // will be written to the same remote storage - private Map appIdToRemoteStorageInfo; + private final Map appIdToRemoteStorageInfo; // store remote path -> application count for assignment strategy - private Map remoteStoragePathRankValue; - private Map remoteStorageToHost = Maps.newConcurrentMap(); - private Map availableRemoteStorageInfo; - private ScheduledExecutorService scheduledExecutorService; + private final Map remoteStoragePathRankValue; + private final Map remoteStorageToHost = Maps.newConcurrentMap(); + private final Map availableRemoteStorageInfo; // it's only for test case to check if status check has problem private boolean hasErrorInStatusCheck = false; public ApplicationManager(CoordinatorConf conf) { storageStrategy = conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY); + appIdToRemoteStorageInfo = Maps.newConcurrentMap(); + remoteStoragePathRankValue = Maps.newConcurrentMap(); + availableRemoteStorageInfo = Maps.newConcurrentMap(); if (StrategyName.IO_SAMPLE == storageStrategy) { - selectStorageStrategy = new LowestIOSampleCostSelectStorageStrategy(conf); + selectStorageStrategy = new LowestIOSampleCostSelectStorageStrategy(remoteStoragePathRankValue, + appIdToRemoteStorageInfo, availableRemoteStorageInfo, conf); } else if (StrategyName.APP_BALANCE == storageStrategy) { - selectStorageStrategy = new AppBalanceSelectStorageStrategy(); + selectStorageStrategy = new AppBalanceSelectStorageStrategy(remoteStoragePathRankValue, + appIdToRemoteStorageInfo, availableRemoteStorageInfo, conf); } else { throw new UnsupportedOperationException("Unsupported selected storage strategy."); } - appIdToRemoteStorageInfo = selectStorageStrategy.getAppIdToRemoteStorageInfo(); - remoteStoragePathRankValue = selectStorageStrategy.getRemoteStoragePathRankValue(); - availableRemoteStorageInfo = selectStorageStrategy.getAvailableRemoteStorageInfo(); expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED); // the thread for checking application status - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ThreadUtils.getThreadFactory("ApplicationManager-%d")); scheduledExecutorService.scheduleAtFixedRate( - () -> statusCheck(), expired / 2, expired / 2, TimeUnit.MILLISECONDS); + this::statusCheck, expired / 2, expired / 2, TimeUnit.MILLISECONDS); + // the thread for checking if the storage is normal + ScheduledExecutorService detectStorageScheduler = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.getThreadFactory("detectStoragesScheduler-%d")); + // should init later than the refreshRemoteStorage init + detectStorageScheduler.scheduleAtFixedRate(selectStorageStrategy::detectStorage, 1000, + conf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS); } public void refreshAppId(String appId) { @@ -127,17 +137,57 @@ public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageC // the strategy of pick remote storage is according to assignment count // todo: better strategy with workload balance public RemoteStorageInfo pickRemoteStorage(String appId) { - selectStorageStrategy.pickRemoteStorage(appId); + if (appIdToRemoteStorageInfo.containsKey(appId)) { + return appIdToRemoteStorageInfo.get(appId); + } + RemoteStorageInfo pickStorage = selectStorageStrategy.pickStorage(appId); + incRemoteStorageCounter(pickStorage.getPath()); return appIdToRemoteStorageInfo.get(appId); } @VisibleForTesting - protected synchronized void decRemoteStorageCounter(String storagePath) { - selectStorageStrategy.decRemoteStorageCounter(storagePath); + public synchronized void incRemoteStorageCounter(String remoteStoragePath) { + RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath); + if (counter != null) { + counter.getAppNum().incrementAndGet(); + } else { + // it may be happened when assignment remote storage + // and refresh remote storage at the same time + LOG.warn("Remote storage path lost during assignment: {} doesn't exist, reset it to 1", + remoteStoragePath); + remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1)); + } + } + + @VisibleForTesting + public synchronized void decRemoteStorageCounter(String storagePath) { + if (!StringUtils.isEmpty(storagePath)) { + RankValue atomic = remoteStoragePathRankValue.get(storagePath); + if (atomic != null) { + double count = atomic.getAppNum().decrementAndGet(); + if (count < 0) { + LOG.warn("Unexpected counter for remote storage: {}, which is {}, reset to 0", + storagePath, count); + atomic.getAppNum().set(0); + } + } else { + LOG.warn("Can't find counter for remote storage: {}", storagePath); + remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(0)); + } + if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0 + && !availableRemoteStorageInfo.containsKey(storagePath)) { + remoteStoragePathRankValue.remove(storagePath); + } + } } - private synchronized void removePathFromCounter(String storagePath) { - selectStorageStrategy.removePathFromCounter(storagePath); + public synchronized void removePathFromCounter(String storagePath) { + RankValue atomic = remoteStoragePathRankValue.get(storagePath); + // The time spent reading and writing cannot be used to determine whether the current path is still used by apps. + // Therefore, determine whether the HDFS path is still used by the number of apps + if (atomic != null && atomic.getAppNum().get() == 0) { + remoteStoragePathRankValue.remove(storagePath); + } } public Set getAppIds() { @@ -159,6 +209,11 @@ public Map getAvailableRemoteStorageInfo() { return availableRemoteStorageInfo; } + @VisibleForTesting + public Map getAppIdToRemoteStorageInfo() { + return appIdToRemoteStorageInfo; + } + @VisibleForTesting protected boolean hasErrorInStatusCheck() { return hasErrorInStatusCheck; diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 96053ab1a9..b343ac64cc 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -138,18 +138,18 @@ public class CoordinatorConf extends RssBaseConf { .enumType(ApplicationManager.StrategyName.class) .defaultValue(APP_BALANCE) .withDescription("Strategy for selecting the remote path"); - public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME = ConfigOptions - .key("rss.coordinator.remote.storage.io.sample.schedule.time") + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME = ConfigOptions + .key("rss.coordinator.remote.storage.schedule.time") .longType() .defaultValue(60 * 1000L) .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS"); - public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE = ConfigOptions - .key("rss.coordinator.remote.storage.io.sample.file.size") + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE = ConfigOptions + .key("rss.coordinator.remote.storage.schedule.file.size") .intType() .defaultValue(204800 * 1000) .withDescription("The size of the file that the scheduled thread reads and writes"); - public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES = ConfigOptions - .key("rss.coordinator.remote.storage.io.sample.access.times") + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES = ConfigOptions + .key("rss.coordinator.remote.storage.schedule.access.times") .intType() .defaultValue(3) .withDescription("The number of times to read and write HDFS files"); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java index 291ce43d4c..8917233408 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java @@ -17,25 +17,17 @@ package org.apache.uniffle.coordinator; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.RandomUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -43,224 +35,156 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; -import org.apache.uniffle.common.util.ThreadUtils; /** * LowestIOSampleCostSelectStorageStrategy considers that when allocating apps to different remote paths, * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster. * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster. */ -public class LowestIOSampleCostSelectStorageStrategy implements SelectStorageStrategy { +public class LowestIOSampleCostSelectStorageStrategy extends AbstractSelectStorageStrategy { private static final Logger LOG = LoggerFactory.getLogger(LowestIOSampleCostSelectStorageStrategy.class); - /** - * store appId -> remote path to make sure all shuffle data of the same application - * will be written to the same remote storage - */ - private final Map appIdToRemoteStorageInfo; /** * store remote path -> application count for assignment strategy */ - private final Map remoteStoragePathRankValue; + private final Map appIdToRemoteStorageInfo; private final Map availableRemoteStorageInfo; - private List> sizeList; - private FileSystem fs; - private Configuration conf; - private final int fileSize; + private final Configuration hdfsConf; private final int readAndWriteTimes; - - public LowestIOSampleCostSelectStorageStrategy(CoordinatorConf cf) { - conf = new Configuration(); - fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE); - readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES); - this.appIdToRemoteStorageInfo = Maps.newConcurrentMap(); - this.remoteStoragePathRankValue = Maps.newConcurrentMap(); - this.availableRemoteStorageInfo = Maps.newHashMap(); - this.sizeList = Lists.newCopyOnWriteArrayList(); - ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor( - ThreadUtils.getThreadFactory("readWriteRankScheduler-%d")); - // should init later than the refreshRemoteStorage init - readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000, - cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME), TimeUnit.MILLISECONDS); - } - - public void checkReadAndWrite() { - if (remoteStoragePathRankValue.size() > 1) { - for (Map.Entry entry : remoteStoragePathRankValue.entrySet()) { - final String path = entry.getKey(); - final RankValue rankValue = entry.getValue(); - Path remotePath = new Path(path); - Path testPath = new Path(path + "/rssTest"); - long startWriteTime = System.currentTimeMillis(); - try { - fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf); - for (int j = 0; j < readAndWriteTimes; j++) { - byte[] data = RandomUtils.nextBytes(fileSize); - try (FSDataOutputStream fos = fs.create(testPath)) { - fos.write(data); - fos.flush(); - } - byte[] readData = new byte[fileSize]; - int readBytes; - try (FSDataInputStream fis = fs.open(testPath)) { - int hasReadBytes = 0; - do { - readBytes = fis.read(readData); - if (hasReadBytes < fileSize) { - for (int i = 0; i < readBytes; i++) { - if (data[hasReadBytes + i] != readData[i]) { - remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); - } - } - } - hasReadBytes += readBytes; - } while (readBytes != -1); - } - } - } catch (Exception e) { - LOG.error("Storage read and write error, we will not use this remote path {}.", path, e); - remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); - } finally { - sortPathByRankValue(path, testPath, startWriteTime); - } - } - } else { - sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()); - } + private List> uris; + + public LowestIOSampleCostSelectStorageStrategy( + Map remoteStoragePathRankValue, + Map appIdToRemoteStorageInfo, + Map availableRemoteStorageInfo, + CoordinatorConf conf) { + super(remoteStoragePathRankValue, conf); + this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo; + this.availableRemoteStorageInfo = availableRemoteStorageInfo; + this.hdfsConf = new Configuration(); + readAndWriteTimes = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES); } @VisibleForTesting - public void sortPathByRankValue(String path, Path testPath, long startWrite) { + public void sortPathByRankValue( + String path, String testPath, long startWrite) { + RankValue rankValue = remoteStoragePathRankValue.get(path); try { - fs.delete(testPath, true); - long totalTime = System.currentTimeMillis() - startWrite; - RankValue rankValue = remoteStoragePathRankValue.get(path); - remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get())); + FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), hdfsConf); + fs.delete(new Path(testPath), true); + if (rankValue.getHealthy().get()) { + rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() - startWrite)); + } } catch (Exception e) { - RankValue rankValue = remoteStoragePathRankValue.get(path); - remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); + rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE)); LOG.error("Failed to sort, we will not use this remote path {}.", path, e); - } finally { - sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull) - .sorted(Comparator.comparingDouble( - entry -> entry.getValue().getReadAndWriteTime().get())).collect(Collectors.toList()); - } - } - - /** - * the strategy of pick remote storage is based on whether the remote path can be read or written - */ - @Override - public RemoteStorageInfo pickRemoteStorage(String appId) { - if (appIdToRemoteStorageInfo.containsKey(appId)) { - return appIdToRemoteStorageInfo.get(appId); - } - - for (Map.Entry entry : sizeList) { - String storagePath = entry.getKey(); - if (availableRemoteStorageInfo.containsKey(storagePath)) { - appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath)); - incRemoteStorageCounter(storagePath); - break; - } - } - return appIdToRemoteStorageInfo.get(appId); - } - - @Override - @VisibleForTesting - public synchronized void incRemoteStorageCounter(String remoteStoragePath) { - RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath); - if (counter != null) { - counter.getAppNum().incrementAndGet(); - } else { - remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1)); - // it may be happened when assignment remote storage - // and refresh remote storage at the same time - LOG.warn("Remote storage path lost during assignment: {} doesn't exist, " - + "reset the rank value to 0 and app size to 1.", remoteStoragePath); } + uris = Lists.newCopyOnWriteArrayList( + remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull).sorted((x, y) -> { + final long xReadAndWriteTime = x.getValue().getCostTime().get(); + final long yReadAndWriteTime = y.getValue().getCostTime().get(); + if (xReadAndWriteTime > yReadAndWriteTime) { + return 1; + } else if (xReadAndWriteTime < yReadAndWriteTime) { + return -1; + } else { + return Integer.compare(x.getValue().getAppNum().get(), y.getValue().getAppNum().get()); + } + }).collect(Collectors.toList()); + LOG.info("The sorted remote path list is: {}", uris); } @Override - @VisibleForTesting - public synchronized void decRemoteStorageCounter(String storagePath) { - if (!StringUtils.isEmpty(storagePath)) { - RankValue atomic = remoteStoragePathRankValue.get(storagePath); - if (atomic != null) { - double count = atomic.getAppNum().decrementAndGet(); - if (count < 0) { - LOG.warn("Unexpected counter for remote storage: {}, which is {}, reset to 0", - storagePath, count); - atomic.getAppNum().set(0); + public void detectStorage() { + uris = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()); + if (remoteStoragePathRankValue.size() > 1) { + for (Map.Entry uri : uris) { + if (uri.getKey().startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) { + Path remotePath = new Path(uri.getKey()); + String rssTest = uri.getKey() + "/rssTest"; + Path testPath = new Path(rssTest); + RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey()); + rankValue.setHealthy(new AtomicBoolean(true)); + long startWriteTime = System.currentTimeMillis(); + try { + FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, hdfsConf); + for (int j = 0; j < readAndWriteTimes; j++) { + readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue); + } + } catch (Exception e) { + LOG.error("Storage read and write error, we will not use this remote path {}.", uri, e); + rankValue.setHealthy(new AtomicBoolean(false)); + } finally { + sortPathByRankValue(uri.getKey(), rssTest, startWriteTime); + } } - } else { - remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1)); - LOG.warn("Can't find counter for remote storage: {}", storagePath); - } - - if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0 - && !availableRemoteStorageInfo.containsKey(storagePath)) { - remoteStoragePathRankValue.remove(storagePath); } } } @Override - public synchronized void removePathFromCounter(String storagePath) { - RankValue rankValue = remoteStoragePathRankValue.get(storagePath); - // The time spent reading and writing cannot be used to determine whether the current path is still used by apps. - // Therefore, determine whether the HDFS path is still used by the number of apps - if (rankValue != null && rankValue.getAppNum().get() == 0) { - remoteStoragePathRankValue.remove(storagePath); + public synchronized RemoteStorageInfo pickStorage(String appId) { + LOG.info("The sorted remote path list is: {}", uris); + for (Map.Entry uri : uris) { + String storagePath = uri.getKey(); + if (availableRemoteStorageInfo.containsKey(storagePath)) { + return appIdToRemoteStorageInfo.computeIfAbsent(appId, x -> availableRemoteStorageInfo.get(storagePath)); + } } - } - - @Override - public Map getAppIdToRemoteStorageInfo() { - return appIdToRemoteStorageInfo; - } - - @Override - public Map getRemoteStoragePathRankValue() { - return remoteStoragePathRankValue; - } - - @Override - public Map getAvailableRemoteStorageInfo() { - return availableRemoteStorageInfo; - } - - @VisibleForTesting - public void setFs(FileSystem fs) { - this.fs = fs; - } - - @VisibleForTesting - public void setConf(Configuration conf) { - this.conf = conf; + LOG.warn("No remote storage is available, we will default to the first."); + return availableRemoteStorageInfo.values().iterator().next(); } static class RankValue { - AtomicLong readAndWriteTime; + AtomicLong costTime; AtomicInteger appNum; + AtomicBoolean isHealthy; RankValue(int appNum) { - this.readAndWriteTime = new AtomicLong(0); + this.costTime = new AtomicLong(0); this.appNum = new AtomicInteger(appNum); + this.isHealthy = new AtomicBoolean(true); } - RankValue(long ratioValue, int appNum) { - this.readAndWriteTime = new AtomicLong(ratioValue); + RankValue(long costTime, int appNum) { + this.costTime = new AtomicLong(costTime); this.appNum = new AtomicInteger(appNum); + this.isHealthy = new AtomicBoolean(true); } - public AtomicLong getReadAndWriteTime() { - return readAndWriteTime; + public AtomicLong getCostTime() { + return costTime; } public AtomicInteger getAppNum() { return appNum; } + + public AtomicBoolean getHealthy() { + return isHealthy; + } + + public void setCostTime(AtomicLong readAndWriteTime) { + this.costTime = readAndWriteTime; + } + + public void setAppNum(AtomicInteger appNum) { + this.appNum = appNum; + } + + public void setHealthy(AtomicBoolean isHealthy) { + this.isHealthy = isHealthy; + if (!isHealthy.get()) { + this.costTime.set(Long.MAX_VALUE); + } + } + + @Override + public String toString() { + return "RankValue{" + + "costTime=" + costTime + + ", appNum=" + appNum + + '}'; + } } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java index 654ec8a07a..52f7073cc6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java @@ -17,24 +17,11 @@ package org.apache.uniffle.coordinator; -import java.util.Map; - import org.apache.uniffle.common.RemoteStorageInfo; -import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; public interface SelectStorageStrategy { - RemoteStorageInfo pickRemoteStorage(String appId); - - void incRemoteStorageCounter(String remoteStoragePath); - - void decRemoteStorageCounter(String storagePath); - - void removePathFromCounter(String storagePath); - - Map getAvailableRemoteStorageInfo(); - - Map getAppIdToRemoteStorageInfo(); + void detectStorage(); - Map getRemoteStoragePathRankValue(); + RemoteStorageInfo pickStorage(String appId); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java index d52d65aac7..8ea88bcf89 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java @@ -32,7 +32,6 @@ public class AppBalanceSelectStorageStrategyTest { - private AppBalanceSelectStorageStrategy appBalanceSelectStorageStrategy; private ApplicationManager applicationManager; private long appExpiredTime = 2000L; private String remotePath1 = "hdfs://path1"; @@ -50,63 +49,64 @@ public static void clear() { } @BeforeEach - public void setUp() { + public void setUp() throws InterruptedException { CoordinatorConf conf = new CoordinatorConf(); conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime); conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE); + conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 1000); applicationManager = new ApplicationManager(conf); - appBalanceSelectStorageStrategy = (AppBalanceSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); } @Test public void selectStorageTest() throws Exception { String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); - assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get()); - assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); String storageHost1 = "path1"; assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5); String storageHost2 = "path2"; assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); - + // init readWriteRankScheduler + Thread.sleep(2000); // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call - appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1); - appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1); + applicationManager.incRemoteStorageCounter(remotePath1); + applicationManager.incRemoteStorageCounter(remotePath1); String testApp1 = "testApp1"; applicationManager.refreshAppId(testApp1); - assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); - assertEquals(remotePath2, appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); - assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); + assertEquals(remotePath2, applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); + assertEquals(1, applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); // return the same value if did the assignment already - assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); - assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); + assertEquals(1, applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); Thread.sleep(appExpiredTime + 2000); - assertNull(appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1)); - assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1)); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); // refresh app1, got remotePath2, then remove remotePath2, // it should be existed in counter until it expired applicationManager.refreshAppId(testApp1); - assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); remoteStoragePath = remotePath1; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)), - appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet()); - assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + applicationManager.getRemoteStoragePathRankValue().keySet()); + assertEquals(1, applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); // app1 is expired, remotePath2 is removed because of counter = 0 Thread.sleep(appExpiredTime + 2000); assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)), - appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + applicationManager.getRemoteStoragePathRankValue().keySet()); // restore previous manually inc for next test case - appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1); - appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1); + applicationManager.decRemoteStorageCounter(remotePath1); + applicationManager.decRemoteStorageCounter(remotePath1); // remove all remote storage applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size()); - assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size()); assertFalse(applicationManager.hasErrorInStatusCheck()); } @@ -116,12 +116,13 @@ public void storageCounterMulThreadTest() throws Exception { + Constants.COMMA_SPLIT_CHAR + remotePath3; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); String appPrefix = "testAppId"; - + // init detectStorageScheduler + Thread.sleep(2000); Thread pickThread1 = new Thread(() -> { for (int i = 0; i < 1000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); @@ -129,7 +130,7 @@ public void storageCounterMulThreadTest() throws Exception { for (int i = 1000; i < 2000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); @@ -137,7 +138,7 @@ public void storageCounterMulThreadTest() throws Exception { for (int i = 2000; i < 3000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); pickThread1.start(); @@ -149,8 +150,8 @@ public void storageCounterMulThreadTest() throws Exception { Thread.sleep(appExpiredTime + 2000); applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size()); - assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size()); assertFalse(applicationManager.hasErrorInStatusCheck()); } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java index 7a6181740f..3dd7bf625e 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java @@ -29,8 +29,6 @@ import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -52,7 +50,7 @@ public class ClientConfManagerTest { @TempDir private final File remotePath = new File("hdfs://rss"); private static MiniDFSCluster cluster; - private Configuration hdfsConf = new Configuration(); + private final Configuration hdfsConf = new Configuration(); @BeforeEach public void setUp() { @@ -172,11 +170,14 @@ public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception { CoordinatorConf conf = new CoordinatorConf(); conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, updateIntervalSec); conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); + conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 200); + conf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES, 1); conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); ApplicationManager applicationManager = new ApplicationManager(conf); final ClientConfManager clientConfManager = new ClientConfManager(conf, new Configuration(), applicationManager); - Thread.sleep(500); + // the reason for sleep here is to ensure that threads can be scheduled normally, the same below + Thread.sleep(1000); Set expectedAvailablePath = Sets.newHashSet(remotePath1); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); RemoteStorageInfo remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId1"); @@ -186,6 +187,7 @@ public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception { writeRemoteStorageConf(cfgFile, remotePath3); expectedAvailablePath = Sets.newHashSet(remotePath3); waitForUpdate(expectedAvailablePath, applicationManager); + Thread.sleep(1000); remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2"); assertEquals(remotePath3, remoteStorageInfo.getPath()); @@ -193,6 +195,7 @@ public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception { writeRemoteStorageConf(cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, confItems); expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3); waitForUpdate(expectedAvailablePath, applicationManager); + Thread.sleep(1000); remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3"); assertEquals(remotePath2, remoteStorageInfo.getPath()); assertEquals(2, remoteStorageInfo.getConfItems().size()); @@ -232,19 +235,19 @@ public void dynamicRemoteByHealthStrategyStorageTest() throws Exception { conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, updateIntervalSec); conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); + conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 200); + conf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES, 1); conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); ApplicationManager applicationManager = new ApplicationManager(conf); - // init IORankScheduler - Thread.sleep(2000); + // init readWriteRankScheduler LowestIOSampleCostSelectStorageStrategy selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); - Path testPath = new Path("/test"); - FileSystem fs = testPath.getFileSystem(hdfsConf); - selectStorageStrategy.setFs(fs); + String testPath = "/test"; final ClientConfManager clientConfManager = new ClientConfManager(conf, new Configuration(), applicationManager); - Thread.sleep(500); + // the reason for sleep here is to ensure that threads can be scheduled normally, the same below + Thread.sleep(1000); Set expectedAvailablePath = Sets.newHashSet(remotePath1); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); selectStorageStrategy.sortPathByRankValue(remotePath1, testPath, System.currentTimeMillis()); @@ -255,9 +258,9 @@ public void dynamicRemoteByHealthStrategyStorageTest() throws Exception { writeRemoteStorageConf(cfgFile, remotePath3); expectedAvailablePath = Sets.newHashSet(remotePath3); waitForUpdate(expectedAvailablePath, applicationManager); - // The reason for setting the filesystem here is to trigger the execution of sortPathByRankValue - selectStorageStrategy.setFs(fs); + selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, System.currentTimeMillis()); + Thread.sleep(1000); remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2"); assertEquals(remotePath3, remoteStorageInfo.getPath()); @@ -266,10 +269,9 @@ public void dynamicRemoteByHealthStrategyStorageTest() throws Exception { writeRemoteStorageConf(cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, confItems); expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3); waitForUpdate(expectedAvailablePath, applicationManager); - selectStorageStrategy.setFs(fs); selectStorageStrategy.sortPathByRankValue(remotePath2, testPath, current); - selectStorageStrategy.setFs(fs); selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, current); + Thread.sleep(1000); remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3"); assertEquals(remotePath2, remoteStorageInfo.getPath()); assertEquals(2, remoteStorageInfo.getConfItems().size()); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java index 41eec3e758..e0c3dde829 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java @@ -48,7 +48,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest { private final String remoteStorage1 = "hdfs://p1"; private final String remoteStorage2 = "hdfs://p2"; private final String remoteStorage3 = "hdfs://p3"; - private final Path testFile = new Path("test"); + private final String testFile = "test"; @TempDir private static File remotePath = new File("hdfs://rss"); @@ -77,96 +77,95 @@ public void setUpHdfs(String hdfsPath) throws Exception { Thread.sleep(500L); CoordinatorConf conf = new CoordinatorConf(); conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime); - conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 5000); + conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 200); conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); applicationManager = new ApplicationManager(conf); selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); - selectStorageStrategy.setConf(hdfsConf); Thread.sleep(1000); } @Test public void selectStorageTest() throws Exception { - FileSystem fs = testFile.getFileSystem(hdfsConf); - selectStorageStrategy.setFs(fs); + final Path path = new Path(testFile); + final FileSystem fs = path.getFileSystem(hdfsConf); String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); //default value is 0 assertEquals(0, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage1).getReadAndWriteTime().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage1).getCostTime().get()); assertEquals(0, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getReadAndWriteTime().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getCostTime().get()); String storageHost1 = "p1"; assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5); String storageHost2 = "p2"; assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); // compare with two remote path - selectStorageStrategy.incRemoteStorageCounter(remoteStorage1); - selectStorageStrategy.incRemoteStorageCounter(remoteStorage1); + applicationManager.incRemoteStorageCounter(remoteStorage1); + applicationManager.incRemoteStorageCounter(remoteStorage1); String testApp1 = "testApp1"; + Thread.sleep(1000); final long current = System.currentTimeMillis(); applicationManager.refreshAppId(testApp1); - fs.create(testFile); + fs.create(path); selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, current); - fs.create(testFile); + fs.create(path); selectStorageStrategy.sortPathByRankValue(remoteStorage1, testFile, current); - assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); - assertEquals(remoteStorage2, selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); + assertEquals(remoteStorage2, applicationManager.pickRemoteStorage(testApp1).getPath()); + assertEquals(remoteStorage2, applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); assertEquals(1, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); // return the same value if did the assignment already - assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(remoteStorage2, applicationManager.pickRemoteStorage(testApp1).getPath()); assertEquals(1, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); // when the expiration time is reached, the app was removed Thread.sleep(appExpiredTime + 2000); - assertNull(selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1)); + assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1)); assertEquals(0, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); // refresh app1, got remotePath2, then remove remotePath2, // it should be existed in counter until it expired applicationManager.refreshAppId(testApp1); - assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(remoteStorage2, applicationManager.pickRemoteStorage(testApp1).getPath()); remoteStoragePath = remoteStorage1; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1, remoteStorage2)), - selectStorageStrategy.getRemoteStoragePathRankValue().keySet()); - assertTrue(selectStorageStrategy.getRemoteStoragePathRankValue() - .get(remoteStorage2).getReadAndWriteTime().get() > 0); + applicationManager.getRemoteStoragePathRankValue().keySet()); + assertTrue(applicationManager.getRemoteStoragePathRankValue() + .get(remoteStorage2).getCostTime().get() > 0); assertEquals(1, - selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); // app1 is expired, p2 is removed because of counter = 0 Thread.sleep(appExpiredTime + 2000); assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1)), - selectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + applicationManager.getRemoteStoragePathRankValue().keySet()); // restore previous manually inc for next test case - selectStorageStrategy.decRemoteStorageCounter(remoteStorage1); - selectStorageStrategy.decRemoteStorageCounter(remoteStorage1); + applicationManager.decRemoteStorageCounter(remoteStorage1); + applicationManager.decRemoteStorageCounter(remoteStorage1); // remove all remote storage applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size()); - assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size()); assertFalse(applicationManager.hasErrorInStatusCheck()); } @Test public void selectStorageMulThreadTest() throws Exception { - FileSystem fs = testFile.getFileSystem(hdfsConf); - selectStorageStrategy.setFs(fs); String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2 + Constants.COMMA_SPLIT_CHAR + remoteStorage3; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); String appPrefix = "testAppId"; - + // init detectStorageScheduler + Thread.sleep(2000); Thread pickThread1 = new Thread(() -> { for (int i = 0; i < 1000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - selectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); @@ -174,7 +173,7 @@ public void selectStorageMulThreadTest() throws Exception { for (int i = 1000; i < 2000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - selectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); @@ -182,7 +181,7 @@ public void selectStorageMulThreadTest() throws Exception { for (int i = 2000; i < 3000; i++) { String appId = appPrefix + i; applicationManager.refreshAppId(appId); - selectStorageStrategy.pickRemoteStorage(appId); + applicationManager.pickRemoteStorage(appId); } }); pickThread1.start(); @@ -194,8 +193,8 @@ public void selectStorageMulThreadTest() throws Exception { Thread.sleep(appExpiredTime + 2000); applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size()); - assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); + assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size()); assertFalse(applicationManager.hasErrorInStatusCheck()); } } diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index 7a98f708f8..79f9209db5 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -96,9 +96,9 @@ This document will introduce how to deploy Uniffle coordinators. |rss.rpc.server.port|-|RPC port for coordinator| |rss.jetty.http.port|-|Http port for coordinator| |rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for selecting the remote path| -|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of scheduling the read and write time of the paths to obtain different HDFS| -|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the file that the scheduled thread reads and writes| -|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times to read and write HDFS files| +|rss.coordinator.remote.storage.schedule.time|60000|The time of scheduling the read and write time of the paths to obtain different HDFS| +|rss.coordinator.remote.storage.schedule.file.size|204800000|The size of the file that the scheduled thread reads and writes| +|rss.coordinator.remote.storage.schedule.access.times|3|The number of times to read and write HDFS files| ### AccessClusterLoadChecker settings |Property Name|Default| Description| diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java index 73450d7deb..86faac4d0f 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java @@ -110,6 +110,8 @@ public void testFetchRemoteStorageByApp(@TempDir File tempDir) throws Exception coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 3); + coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 200); + coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES, 1); createCoordinatorServer(coordinatorConf); startServers(); @@ -127,6 +129,7 @@ public void testFetchRemoteStorageByApp(@TempDir File tempDir) throws Exception writeRemoteStorageConf(cfgFile, dynamicConf); waitForUpdate(Sets.newHashSet(remotePath2), coordinators.get(0).getApplicationManager()); request = new RssFetchRemoteStorageRequest(appId); + Thread.sleep(1500); response = coordinatorClient.fetchRemoteStorage(request); // remotePath1 will be return because (appId -> remote storage path) is in cache remoteStorageInfo = response.getRemoteStorageInfo(); @@ -158,7 +161,8 @@ public void testFetchRemoteStorageByIO(@TempDir File tempDir) throws Exception { coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 2); - coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 500); + coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 100); + coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES, 1); coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); createCoordinatorServer(coordinatorConf); startServers(); diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 6acb4f9559..b7a6bec37a 100644 --- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -79,6 +79,8 @@ public void test() throws Exception { coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 1); coordinatorConf.setInteger("rss.coordinator.access.candidates.updateIntervalSec", 1); coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 1); + coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time", 200); + coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times", 1); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); @@ -120,6 +122,8 @@ public void test() throws Exception { JavaPairRDD> javaPairRDD = TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc2)); ShuffleDependency shuffleDependency = (ShuffleDependency) javaPairRDD.rdd().dependencies().head(); rssShuffleHandle = (RssShuffleHandle) shuffleDependency.shuffleHandle(); + // the reason for sleep here is to ensure that threads can be scheduled normally + Thread.sleep(500); RemoteStorageInfo remoteStorageInfo3 = rssShuffleHandle.getRemoteStorage(); assertEquals(remoteStorage1, remoteStorageInfo1.getPath()); assertEquals(2, remoteStorageInfo3.getConfItems().size()); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 827e793c89..01b4d35f0d 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -89,6 +89,8 @@ public void test() throws Exception { coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 1); coordinatorConf.setInteger("rss.coordinator.access.candidates.updateIntervalSec", 1); coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 1); + coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time", 200); + coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times", 1); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); @@ -130,6 +132,8 @@ public void test() throws Exception { JavaPairRDD> javaPairRDD = TestUtils.combineByKeyRDD(TestUtils.getRDD(jsc2)); ShuffleDependency shuffleDependency = (ShuffleDependency) javaPairRDD.rdd().dependencies().head(); rssShuffleHandle = (RssShuffleHandle) shuffleDependency.shuffleHandle(); + // the reason for sleep here is to ensure that threads can be scheduled normally + Thread.sleep(500); RemoteStorageInfo remoteStorageInfo3 = rssShuffleHandle.getRemoteStorage(); assertEquals(remoteStorage1, remoteStorageInfo1.getPath()); assertEquals(2, remoteStorageInfo3.getConfItems().size());