diff --git a/checkstyle.xml b/checkstyle.xml index f4c3a8d9e4..b1f5487169 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -251,26 +251,12 @@ value="GenericWhitespace ''{0}'' is not preceded with whitespace."/> - - - - - - - - - - - diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java index 03825384d2..4995cc1cf1 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java @@ -176,7 +176,7 @@ public void addRecord(int partitionId, K key, V value) throws IOException,Interr } } if (memoryUsedSize.get() > maxMemSize * memoryThreshold - && inSendListBytes.get() <= maxMemSize * sendThreshold) { + && inSendListBytes.get() <= maxMemSize * sendThreshold) { sendBuffersToServers(); } mapOutputRecordCounter.increment(1); diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java index 97bd7166c0..f2e7984a87 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java @@ -95,13 +95,13 @@ public static ShuffleWriteClient createShuffleClient(JobConf jobConf) { .getInstance() .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled, - dataTransferPoolSize); + dataTransferPoolSize); return client; } public static Set getAssignedServers(JobConf jobConf, int reduceID) { String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX - + String.valueOf(reduceID)); + + String.valueOf(reduceID)); String[] splitServers = servers.split(","); Set assignServers = Sets.newHashSet(); for (String splitServer : splitServers) { @@ -110,7 +110,7 @@ public static Set getAssignedServers(JobConf jobConf, int red throw new RssException("partition " + reduceID + " server info isn't right"); } ShuffleServerInfo sever = new ShuffleServerInfo(StringUtils.join(serverInfo, "-"), - serverInfo[0], Integer.parseInt(serverInfo[1])); + serverInfo[0], Integer.parseInt(serverInfo[1])); assignServers.add(sever); } return assignServers; @@ -118,7 +118,7 @@ public static Set getAssignedServers(JobConf jobConf, int red public static ApplicationAttemptId getApplicationAttemptId() { String containerIdStr = - System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()); + System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()); ContainerId containerId = ContainerId.fromString(containerIdStr); return containerId.getApplicationAttemptId(); } diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java index d30a19299f..1619678fd6 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java @@ -67,8 +67,8 @@ public Roaring64NavigableMap fetchAllRssTaskIds() { acceptMapCompletionEvents(); } catch (Exception e) { throw new RssException("Reduce: " + reduce - + " fails to accept completion events due to: " - + e.getMessage()); + + " fails to accept completion events due to: " + + e.getMessage()); } Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(); @@ -117,13 +117,13 @@ public void resolve(TaskCompletionEvent event) { case OBSOLETE: obsoleteMaps.add(event.getTaskAttemptId()); LOG.info("Ignoring obsolete output of " - + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'"); + + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'"); break; case TIPFAILED: tipFailedCount++; LOG.info("Ignoring output of failed map TIP: '" - + event.getTaskAttemptId() + "'"); + + event.getTaskAttemptId() + "'"); break; default: @@ -138,14 +138,14 @@ public void acceptMapCompletionEvents() throws IOException { do { MapTaskCompletionEventsUpdate update = - umbilical.getMapCompletionEvents( - (org.apache.hadoop.mapred.JobID) reduce.getJobID(), - fromEventIdx, - maxEventsToFetch, - (org.apache.hadoop.mapred.TaskAttemptID) reduce); + umbilical.getMapCompletionEvents( + (org.apache.hadoop.mapred.JobID) reduce.getJobID(), + fromEventIdx, + maxEventsToFetch, + (org.apache.hadoop.mapred.TaskAttemptID) reduce); events = update.getMapTaskCompletionEvents(); LOG.debug("Got " + events.length + " map completion events from " - + fromEventIdx); + + fromEventIdx); assert !update.shouldReset() : "Unexpected legacy state"; diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java index ba9a1c8378..128bfb9ff0 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java @@ -100,17 +100,17 @@ private enum ShuffleErrors { this.metrics = metrics; this.reduceId = reduceId; ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.IO_ERROR.toString()); + RssFetcher.ShuffleErrors.IO_ERROR.toString()); wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.WRONG_LENGTH.toString()); + RssFetcher.ShuffleErrors.WRONG_LENGTH.toString()); badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.BAD_ID.toString()); + RssFetcher.ShuffleErrors.BAD_ID.toString()); wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.WRONG_MAP.toString()); + RssFetcher.ShuffleErrors.WRONG_MAP.toString()); connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.CONNECTION.toString()); + RssFetcher.ShuffleErrors.CONNECTION.toString()); wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, - RssFetcher.ShuffleErrors.WRONG_REDUCE.toString()); + RssFetcher.ShuffleErrors.WRONG_REDUCE.toString()); this.shuffleReadClient = shuffleReadClient; this.totalBlockCount = totalBlockCount; @@ -151,7 +151,7 @@ public void copyFromRssServer() throws IOException { if (!hasPendingData && compressedData != null) { final long startDecompress = System.currentTimeMillis(); uncompressedData = RssShuffleUtils.decompressData( - compressedData, compressedBlock.getUncompressLength(), false).array(); + compressedData, compressedBlock.getUncompressLength(), false).array(); unCompressionLength += compressedBlock.getUncompressLength(); long decompressDuration = System.currentTimeMillis() - startDecompress; decompressTime += decompressDuration; @@ -187,9 +187,9 @@ public void copyFromRssServer() throws IOException { shuffleReadClient.logStatics(); metrics.inputBytes(unCompressionLength); LOG.info("reduce task " + reduceId.toString() + " cost " + readTime + " ms to fetch and " - + decompressTime + " ms to decompress with unCompressionLength[" - + unCompressionLength + "] and " + serializeTime + " ms to serialize and " - + waitTime + " ms to wait resource"); + + decompressTime + " ms to decompress with unCompressionLength[" + + unCompressionLength + "] and " + serializeTime + " ms to serialize and " + + waitTime + " ms to wait resource"); stopFetch(); } } @@ -226,13 +226,13 @@ private boolean issueMapOutputMerge() throws IOException { mapOutput.commit(); if (mapOutput instanceof OnDiskMapOutput) { LOG.info("Reduce: " + reduceId + " allocates disk to accept block " - + " with byte sizes: " + uncompressedData.length); + + " with byte sizes: " + uncompressedData.length); } } catch (Throwable t) { ioErrs.increment(1); mapOutput.abort(); throw new RssException("Reduce: " + reduceId + " cannot write block to " - + mapOutput.getClass().getSimpleName() + " due to: " + t.getClass().getName()); + + mapOutput.getClass().getSimpleName() + " due to: " + t.getClass().getName()); } return true; } @@ -258,7 +258,7 @@ private void updateStatus() { double transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS; progress.setStatus("copy(" + copyBlockCount + " of " + totalBlockCount + " at " - + mbpsFormat.format(transferRate) + " MB/s)"); + + mbpsFormat.format(transferRate) + " MB/s)"); } @VisibleForTesting diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java index 618f80b7c8..8f3efd33ec 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java @@ -105,34 +105,34 @@ public void init(ShuffleConsumerPlugin.Context context) { this.appAttemptId = RssMRUtils.getApplicationAttemptId().getAttemptId(); this.storageType = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_STORAGE_TYPE); this.replicaWrite = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA_WRITE, - RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); this.replicaRead = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA_READ, - RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA, - RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); this.partitionNum = mrJobConf.getNumReduceTasks(); this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_PARTITION_NUM_PER_RANGE, - RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE); + RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE); this.basePath = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_PATH); this.indexReadLimit = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_INDEX_READ_LIMIT, - RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE); + RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE); this.readBufferSize = (int)UnitConverter.byteStringAsBytes( - RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE, - RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE)); + RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE, + RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE)); String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, ""); this.remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf); - } + } protected MergeManager createMergeManager( - ShuffleConsumerPlugin.Context context) { + ShuffleConsumerPlugin.Context context) { return new MergeManagerImpl(reduceId, mrJobConf, context.getLocalFS(), - context.getLocalDirAllocator(), reporter, context.getCodec(), - context.getCombinerClass(), context.getCombineCollector(), - context.getSpilledRecordsCounter(), - context.getReduceCombineInputCounter(), - context.getMergedMapOutputsCounter(), this, context.getMergePhase(), - context.getMapOutputFile()); + context.getLocalDirAllocator(), reporter, context.getCodec(), + context.getCombinerClass(), context.getCombineCollector(), + context.getSpilledRecordsCounter(), + context.getReduceCombineInputCounter(), + context.getMergedMapOutputsCounter(), this, context.getMergePhase(), + context.getMapOutputFile()); } @Override @@ -149,21 +149,21 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { // just get blockIds from RSS servers ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf); Roaring64NavigableMap blockIdBitmap = writeClient.getShuffleResult( - clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId()); + clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId()); writeClient.close(); // get map-completion events to generate RSS taskIDs final RssEventFetcher eventFetcher = - new RssEventFetcher(appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH); + new RssEventFetcher(appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH); Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds(); LOG.info("In reduce: " + reduceId - + ", RSS MR client has fetched blockIds and taskIds successfully"); + + ", RSS MR client has fetched blockIds and taskIds successfully"); // start fetcher to fetch blocks from RSS servers if (!taskIdBitmap.isEmpty()) { LOG.info("In reduce: " + reduceId - + ", Rss MR client starts to fetch blocks from RSS server"); + + ", Rss MR client starts to fetch blocks from RSS server"); JobConf readerJobConf = new JobConf((mrJobConf)); if (!remoteStorageInfo.isEmpty()) { for (Map.Entry entry : remoteStorageInfo.getConfItems().entrySet()) { @@ -171,15 +171,15 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { } } CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest( - appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize, - partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList, - readerJobConf, new MRIdHelper()); + appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize, + partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList, + readerJobConf, new MRIdHelper()); ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request); RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics, - shuffleReadClient, blockIdBitmap.getLongCardinality()); + shuffleReadClient, blockIdBitmap.getLongCardinality()); fetcher.fetchAllRssBlocks(); LOG.info("In reduce: " + reduceId - + ", Rss MR client fetches blocks from RSS server successfully"); + + ", Rss MR client fetches blocks from RSS server successfully"); } copyPhase.complete(); @@ -198,12 +198,12 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { synchronized (this) { if (throwable != null) { throw new Shuffle.ShuffleError("error in shuffle in " + throwingThreadName, - throwable); + throwable); } } LOG.info("In reduce: " + reduceId - + ", Rss MR client returns sorted data to reduce successfully"); + + ", Rss MR client returns sorted data to reduce successfully"); return kvIter; } diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java index db8d7b9c44..e15351b93e 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java @@ -194,7 +194,7 @@ public Thread newThread(Runnable r) { RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, "")); RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage( - appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client); + appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client); // set the remote storage with actual value extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath()); extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString()); diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index c5da1ac2b6..705d66b39f 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -147,7 +147,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { this.dataReplicaRead = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ, RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE, - RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); this.dataReplicaSkipEnabled = sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED, RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); LOG.info("Check quorum config [" @@ -171,7 +171,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { shuffleWriteClient = ShuffleClientFactory .getInstance() .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum, - dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); + dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); registerCoordinator(); // fetch client conf and apply them if necessary and disable ESS if (isDriver && dynamicConfEnabled) { diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index be347a3200..19012584aa 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -287,23 +287,23 @@ protected void checkBlockSendResult(Set blockIds) throws RuntimeException public Option stop(boolean success) { try { if (success) { - // fill partitionLengths with non zero dummy value so map output tracker could work correctly - long[] partitionLengths = new long[partitioner.numPartitions()]; - Arrays.fill(partitionLengths, 1); - final BlockManagerId blockManagerId = - createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId); + // fill partitionLengths with non zero dummy value so map output tracker could work correctly + long[] partitionLengths = new long[partitioner.numPartitions()]; + Arrays.fill(partitionLengths, 1); + final BlockManagerId blockManagerId = + createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId); - Map> ptb = Maps.newHashMap(); - for (Map.Entry> entry : partitionToBlockIds.entrySet()) { - ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue())); - } - long start = System.currentTimeMillis(); - shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId, - taskAttemptId, ptb, bitmapSplitNum); - LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms", - taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start)); - MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths); - return Option.apply(mapStatus); + Map> ptb = Maps.newHashMap(); + for (Map.Entry> entry : partitionToBlockIds.entrySet()) { + ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue())); + } + long start = System.currentTimeMillis(); + shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId, + taskAttemptId, ptb, bitmapSplitNum); + LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms", + taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start)); + MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths); + return Option.apply(mapStatus); } else { return Option.empty(); } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 6b8a77e630..6ec8901013 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -172,7 +172,7 @@ public ShuffleReader getReader( TaskContext context, ShuffleReadMetricsReporter metrics) { return delegate.getReader(handle, - startPartition, endPartition, context, metrics); + startPartition, endPartition, context, metrics); } // The interface is only used for compatibility with spark 3.1.2 @@ -187,21 +187,21 @@ public ShuffleReader getReader( ShuffleReader reader = null; try { reader = (ShuffleReader)delegate.getClass().getDeclaredMethod( - "getReader", - ShuffleHandle.class, - int.class, - int.class, - int.class, - int.class, - TaskContext.class, - ShuffleReadMetricsReporter.class).invoke( - handle, - startMapIndex, - endMapIndex, - startPartition, - endPartition, - context, - metrics); + "getReader", + ShuffleHandle.class, + int.class, + int.class, + int.class, + int.class, + TaskContext.class, + ShuffleReadMetricsReporter.class).invoke( + handle, + startMapIndex, + endMapIndex, + startPartition, + endPartition, + context, + metrics); } catch (Exception e) { throw new RuntimeException(e); } @@ -210,31 +210,31 @@ public ShuffleReader getReader( // The interface is only used for compatibility with spark 3.0.1 public ShuffleReader getReaderForRange( - ShuffleHandle handle, - int startMapIndex, - int endMapIndex, - int startPartition, - int endPartition, - TaskContext context, - ShuffleReadMetricsReporter metrics) { + ShuffleHandle handle, + int startMapIndex, + int endMapIndex, + int startPartition, + int endPartition, + TaskContext context, + ShuffleReadMetricsReporter metrics) { ShuffleReader reader = null; try { reader = (ShuffleReader)delegate.getClass().getDeclaredMethod( - "getReaderForRange", - ShuffleHandle.class, - int.class, - int.class, - int.class, - int.class, - TaskContext.class, - ShuffleReadMetricsReporter.class).invoke( - handle, - startMapIndex, - endMapIndex, - startPartition, - endPartition, - context, - metrics); + "getReaderForRange", + ShuffleHandle.class, + int.class, + int.class, + int.class, + int.class, + TaskContext.class, + ShuffleReadMetricsReporter.class).invoke( + handle, + startMapIndex, + endMapIndex, + startPartition, + endPartition, + context, + metrics); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 032767a943..b71f1b3b30 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -168,12 +168,12 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE); this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE, - RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); shuffleWriteClient = ShuffleClientFactory .getInstance() .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum, - dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); + dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); registerCoordinator(); // fetch client conf and apply them if necessary and disable ESS if (isDriver && dynamicConfEnabled) { @@ -221,28 +221,28 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { this.dataReplica = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA, RssSparkConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); this.dataReplicaWrite = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_WRITE, - RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); this.dataReplicaRead = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ, - RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); this.dataReplicaSkipEnabled = sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED, - RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); LOG.info("Check quorum config [" - + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" + dataReplicaSkipEnabled + "]"); + + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" + dataReplicaSkipEnabled + "]"); RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, dataReplicaRead); int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX, - RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); + RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); long retryIntervalMax = sparkConf.getLong(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, - RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); + RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM, - RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE); + RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE); this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE, - RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); + RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); - shuffleWriteClient = ShuffleClientFactory + shuffleWriteClient = ShuffleClientFactory .getInstance() .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum, - dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); + dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize); this.taskToSuccessBlockIds = taskToSuccessBlockIds; this.taskToFailedBlockIds = taskToFailedBlockIds; if (loop != null) { @@ -339,7 +339,7 @@ public ShuffleReader getReader( TaskContext context, ShuffleReadMetricsReporter metrics) { return getReader(handle, 0, Integer.MAX_VALUE, startPartition, endPartition, - context, metrics); + context, metrics); } // The interface is used for compatibility with spark 3.0.1 @@ -353,14 +353,14 @@ public ShuffleReader getReader( ShuffleReadMetricsReporter metrics) { long start = System.currentTimeMillis(); Roaring64NavigableMap taskIdBitmap = getExpectedTasksByExecutorId( - handle.shuffleId(), - startPartition, - endPartition, - startMapIndex, - endMapIndex); + handle.shuffleId(), + startPartition, + endPartition, + startMapIndex, + endMapIndex); LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, and request expected blockIds from " - + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" - + startPartition + ", " + endPartition + "]"); + + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" + + startPartition + ", " + endPartition + "]"); return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics, taskIdBitmap); } @@ -376,14 +376,14 @@ public ShuffleReader getReaderForRange( ShuffleReadMetricsReporter metrics) { long start = System.currentTimeMillis(); Roaring64NavigableMap taskIdBitmap = getExpectedTasksByRange( - handle.shuffleId(), - startPartition, - endPartition, - startMapIndex, - endMapIndex); + handle.shuffleId(), + startPartition, + endPartition, + startMapIndex, + endMapIndex); LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, and request expected blockIds from " - + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" - + startPartition + ", " + endPartition + "]"); + + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + handle.shuffleId() + "], partitionId[" + + startPartition + ", " + endPartition + "]"); return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics, taskIdBitmap); } @@ -507,24 +507,24 @@ private Roaring64NavigableMap getExpectedTasksByExecutorId( // This API is only used by Spark3.0 and removed since 3.1, // so we extract it from getExpectedTasksByExecutorId. private Roaring64NavigableMap getExpectedTasksByRange( - int shuffleId, - int startPartition, - int endPartition, - int startMapIndex, - int endMapIndex) { + int shuffleId, + int startPartition, + int endPartition, + int startMapIndex, + int endMapIndex) { Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(); Iterator>>> mapStatusIter = null; try { mapStatusIter = (Iterator>>>) - SparkEnv.get().mapOutputTracker().getClass() - .getDeclaredMethod("getMapSizesByRange", - int.class, int.class, int.class, int.class, int.class) - .invoke(SparkEnv.get().mapOutputTracker(), - shuffleId, - startMapIndex, - endMapIndex, - startPartition, - endPartition); + SparkEnv.get().mapOutputTracker().getClass() + .getDeclaredMethod("getMapSizesByRange", + int.class, int.class, int.class, int.class, int.class) + .invoke(SparkEnv.get().mapOutputTracker(), + shuffleId, + startMapIndex, + endMapIndex, + startPartition, + endPartition); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 669431cbb2..ce4c24731a 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -121,7 +121,7 @@ private boolean sendShuffleDataAsync( Map>> shuffleIdToBlocks = entry.getValue(); // todo: compact unnecessary blocks that reach replicaWrite RssSendShuffleDataRequest request = new RssSendShuffleDataRequest( - appId, retryMax, retryIntervalMax, shuffleIdToBlocks); + appId, retryMax, retryIntervalMax, shuffleIdToBlocks); long s = System.currentTimeMillis(); RssSendShuffleDataResponse response = getShuffleServerClient(ssi).sendShuffleData(request); LOG.info("ShuffleWriteClientImpl sendShuffleData cost:" + (System.currentTimeMillis() - s) + "(ms)"); @@ -130,11 +130,11 @@ private boolean sendShuffleDataAsync( // mark a replica of block that has been sent serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet()); LOG.info("Send: " + serverToBlockIds.get(ssi).size() - + " blocks to [" + ssi.getId() + "] successfully"); + + " blocks to [" + ssi.getId() + "] successfully"); } else { isAllServersSuccess.set(false); LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId() - + "] failed with statusCode[" + response.getStatusCode() + "], "); + + "] failed with statusCode[" + response.getStatusCode() + "], "); } } catch (Exception e) { isAllServersSuccess.set(false); @@ -146,9 +146,8 @@ private boolean sendShuffleDataAsync( } private void genServerToBlocks(ShuffleBlockInfo sbi, List serverList, - Map>>> serverToBlocks, - Map> serverToBlockIds) { + Map>>> serverToBlocks, + Map> serverToBlockIds) { int partitionId = sbi.getPartitionId(); int shuffleId = sbi.getShuffleId(); for (ShuffleServerInfo ssi : serverList) { @@ -180,7 +179,7 @@ public SendShuffleDataResult sendShuffleData(String appId, List>>> primaryServerToBlocks = Maps.newHashMap(); Map>>> secondaryServerToBlocks = Maps.newHashMap(); + Map>>> secondaryServerToBlocks = Maps.newHashMap(); Map> primaryServerToBlockIds = Maps.newHashMap(); Map> secondaryServerToBlockIds = Maps.newHashMap(); @@ -198,23 +197,23 @@ public SendShuffleDataResult sendShuffleData(String appId, List allServers = sbi.getShuffleServerInfos(); if (replicaSkipEnabled) { genServerToBlocks(sbi, allServers.subList(0, replicaWrite), - primaryServerToBlocks, primaryServerToBlockIds); + primaryServerToBlocks, primaryServerToBlockIds); genServerToBlocks(sbi, allServers.subList(replicaWrite, replica), - secondaryServerToBlocks, secondaryServerToBlockIds); + secondaryServerToBlocks, secondaryServerToBlockIds); } else { // When replicaSkip is disabled, we send data to all replicas within one round. genServerToBlocks(sbi, allServers, - primaryServerToBlocks, primaryServerToBlockIds); + primaryServerToBlocks, primaryServerToBlockIds); } } // maintain the count of blocks that have been sent to the server Map blockIdsTracker = Maps.newConcurrentMap(); primaryServerToBlockIds.values().forEach( - blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))) + blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))) ); secondaryServerToBlockIds.values().forEach( - blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))) + blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))) ); Set failedBlockIds = Sets.newConcurrentHashSet(); @@ -237,15 +236,14 @@ public SendShuffleDataResult sendShuffleData(String appId, List { - long blockId = blockCt.getKey(); - int count = blockCt.getValue().get(); - if (count >= replicaWrite) { - successBlockIds.add(blockId); - } else { - failedBlockIds.add(blockId); - } + long blockId = blockCt.getKey(); + int count = blockCt.getValue().get(); + if (count >= replicaWrite) { + successBlockIds.add(blockId); + } else { + failedBlockIds.add(blockId); } - ); + }); return new SendShuffleDataResult(successBlockIds, failedBlockIds); } @@ -420,7 +418,7 @@ public void reportShuffleResult( for (Map.Entry entry: partitionReportTracker.entrySet()) { if (entry.getValue() < replicaWrite) { throw new RssException("Quorum check of report shuffle result is failed for appId[" - + appId + "], shuffleId[" + shuffleId + "]"); + + appId + "], shuffleId[" + shuffleId + "]"); } } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java index a9f989a3e3..3de569279a 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java @@ -83,8 +83,8 @@ static Integer convertToInt(Object o) { return (int) value; } else { throw new IllegalArgumentException(String.format( - "Configuration value %s overflows/underflows the integer type.", - value)); + "Configuration value %s overflows/underflows the integer type.", + value)); } } return Integer.parseInt(o.toString()); @@ -126,8 +126,8 @@ static Boolean convertToBoolean(Object o) { return false; default: throw new IllegalArgumentException(String.format( - "Unrecognized option for boolean: %s. Expected either true or false(case insensitive)", - o)); + "Unrecognized option for boolean: %s. Expected either true or false(case insensitive)", + o)); } } @@ -137,13 +137,13 @@ static Float convertToFloat(Object o) { } else if (o.getClass() == Double.class) { double value = ((Double) o); if (value == 0.0 - || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE) - || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) { + || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE) + || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) { return (float) value; } else { throw new IllegalArgumentException(String.format( - "Configuration value %s overflows/underflows the float type.", - value)); + "Configuration value %s overflows/underflows the float type.", + value)); } } @@ -189,5 +189,5 @@ && isFinal(modifiers) && field.getType().isAssignableFrom(ConfigOption.class)) { public static final Function PERCENTAGE_DOUBLE_VALIDATOR = (Function) value -> Double.compare(value, 100.0) <= 0 && Double.compare(value, 0.0) >= 0; - + } diff --git a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java index 57347fb16f..97cc769732 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java +++ b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java @@ -29,30 +29,30 @@ public class UnitConverter { private static final Map byteSuffixes = - ImmutableMap.builder() - .put("b", ByteUnit.BYTE) - .put("k", ByteUnit.KiB) - .put("kb", ByteUnit.KiB) - .put("m", ByteUnit.MiB) - .put("mb", ByteUnit.MiB) - .put("g", ByteUnit.GiB) - .put("gb", ByteUnit.GiB) - .put("t", ByteUnit.TiB) - .put("tb", ByteUnit.TiB) - .put("p", ByteUnit.PiB) - .put("pb", ByteUnit.PiB) - .build(); + ImmutableMap.builder() + .put("b", ByteUnit.BYTE) + .put("k", ByteUnit.KiB) + .put("kb", ByteUnit.KiB) + .put("m", ByteUnit.MiB) + .put("mb", ByteUnit.MiB) + .put("g", ByteUnit.GiB) + .put("gb", ByteUnit.GiB) + .put("t", ByteUnit.TiB) + .put("tb", ByteUnit.TiB) + .put("p", ByteUnit.PiB) + .put("pb", ByteUnit.PiB) + .build(); private static final Map timeSuffixes = - ImmutableMap.builder() - .put("us", TimeUnit.MICROSECONDS) - .put("ms", TimeUnit.MILLISECONDS) - .put("s", TimeUnit.SECONDS) - .put("m", TimeUnit.MINUTES) - .put("min", TimeUnit.MINUTES) - .put("h", TimeUnit.HOURS) - .put("d", TimeUnit.DAYS) - .build(); + ImmutableMap.builder() + .put("us", TimeUnit.MICROSECONDS) + .put("ms", TimeUnit.MILLISECONDS) + .put("s", TimeUnit.SECONDS) + .put("m", TimeUnit.MINUTES) + .put("min", TimeUnit.MINUTES) + .put("h", TimeUnit.HOURS) + .put("d", TimeUnit.DAYS) + .build(); public static boolean isByteString(String str) { String strLower = str.toLowerCase(); @@ -94,14 +94,14 @@ public static long byteStringAs(String str, ByteUnit unit) { return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit); } else if (fractionMatcher.matches()) { throw new NumberFormatException("Fractional values are not supported. Input was: " - + fractionMatcher.group(1)); + + fractionMatcher.group(1)); } else { throw new NumberFormatException("Failed to parse byte string: " + str); } } catch (NumberFormatException e) { String byteError = "Size must be specified as bytes (b), " - + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " - + "E.g. 50b, 100k, or 250m."; + + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " + + "E.g. 50b, 100k, or 250m."; throw new NumberFormatException(byteError + "\n" + e.getMessage()); } } @@ -171,8 +171,8 @@ public static long timeStringAs(String str, TimeUnit unit) { return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit); } catch (NumberFormatException e) { String timeError = "Time must be specified as seconds (s), " - + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " - + "E.g. 50s, 100ms, or 250us."; + + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " + + "E.g. 50s, 100ms, or 250us."; throw new NumberFormatException(timeError + "\n" + e.getMessage()); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java index 3dffb8b0e9..27d8e93f76 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java @@ -74,58 +74,58 @@ public PartitionRangeAssignment assign( SortedMap> assignments = new TreeMap<>(); synchronized (this) { - List nodes = clusterManager.getServerList(requiredTags); - Map newPartitionInfos = Maps.newConcurrentMap(); - for (ServerNode node : nodes) { - PartitionAssignmentInfo partitionInfo; - if (serverToPartitions.containsKey(node)) { - partitionInfo = serverToPartitions.get(node); - if (partitionInfo.getTimestamp() < node.getTimestamp()) { - partitionInfo.resetPartitionNum(); - partitionInfo.setTimestamp(node.getTimestamp()); - } - } else { - partitionInfo = new PartitionAssignmentInfo(); + List nodes = clusterManager.getServerList(requiredTags); + Map newPartitionInfos = Maps.newConcurrentMap(); + for (ServerNode node : nodes) { + PartitionAssignmentInfo partitionInfo; + if (serverToPartitions.containsKey(node)) { + partitionInfo = serverToPartitions.get(node); + if (partitionInfo.getTimestamp() < node.getTimestamp()) { + partitionInfo.resetPartitionNum(); + partitionInfo.setTimestamp(node.getTimestamp()); } - newPartitionInfos.putIfAbsent(node, partitionInfo); + } else { + partitionInfo = new PartitionAssignmentInfo(); } - serverToPartitions = newPartitionInfos; - int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax(); - int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions; - nodes.sort(new Comparator() { - @Override - public int compare(ServerNode o1, ServerNode o2) { - PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1); - PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2); - double v1 = o1.getAvailableMemory() * 1.0 / (partitionInfo1.getPartitionNum() + assignPartitions); - double v2 = o2.getAvailableMemory() * 1.0 / (partitionInfo2.getPartitionNum() + assignPartitions); - return -Double.compare(v1, v2); - } - }); - - if (nodes.isEmpty() || nodes.size() < replica) { - throw new RuntimeException("There isn't enough shuffle servers"); - } - - int expectNum = clusterManager.getShuffleNodesMax(); - if (nodes.size() < clusterManager.getShuffleNodesMax()) { - LOG.warn("Can't get expected servers [" + expectNum + "] and found only [" + nodes.size() + "]"); - expectNum = nodes.size(); + newPartitionInfos.putIfAbsent(node, partitionInfo); + } + serverToPartitions = newPartitionInfos; + int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax(); + int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions; + nodes.sort(new Comparator() { + @Override + public int compare(ServerNode o1, ServerNode o2) { + PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1); + PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2); + double v1 = o1.getAvailableMemory() * 1.0 / (partitionInfo1.getPartitionNum() + assignPartitions); + double v2 = o2.getAvailableMemory() * 1.0 / (partitionInfo2.getPartitionNum() + assignPartitions); + return -Double.compare(v1, v2); } - - List candidatesNodes = nodes.subList(0, expectNum); - int idx = 0; - List ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1); - for (PartitionRange range : ranges) { - List assignNodes = Lists.newArrayList(); - for (int rc = 0; rc < replica; rc++) { - ServerNode node = candidatesNodes.get(idx); - idx = CoordinatorUtils.nextIdx(idx, candidatesNodes.size()); - serverToPartitions.get(node).incrementPartitionNum(); - assignNodes.add(node); - } - assignments.put(range, assignNodes); + }); + + if (nodes.isEmpty() || nodes.size() < replica) { + throw new RuntimeException("There isn't enough shuffle servers"); + } + + int expectNum = clusterManager.getShuffleNodesMax(); + if (nodes.size() < clusterManager.getShuffleNodesMax()) { + LOG.warn("Can't get expected servers [" + expectNum + "] and found only [" + nodes.size() + "]"); + expectNum = nodes.size(); + } + + List candidatesNodes = nodes.subList(0, expectNum); + int idx = 0; + List ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1); + for (PartitionRange range : ranges) { + List assignNodes = Lists.newArrayList(); + for (int rc = 0; rc < replica; rc++) { + ServerNode node = candidatesNodes.get(idx); + idx = CoordinatorUtils.nextIdx(idx, candidatesNodes.size()); + serverToPartitions.get(node).incrementPartitionNum(); + assignNodes.add(node); } + assignments.put(range, assignNodes); + } } return new PartitionRangeAssignment(assignments); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 79f7bd5b18..7c5beafe5e 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -416,8 +416,8 @@ public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest r .setPartitionId(request.getPartitionId()) .build(); GetShuffleResultResponse rpcResponse = blockingStub - .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS) - .getShuffleResult(rpcRequest); + .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS) + .getShuffleResult(rpcRequest); StatusCode statusCode = rpcResponse.getStatus(); RssGetShuffleResultResponse response; diff --git a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java index e6a1811acf..1a54ee2f2d 100644 --- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java +++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java @@ -165,18 +165,18 @@ boolean checkStorageReadAndWrite() { byte[] readData = new byte[1024]; int readBytes = -1; try (FileInputStream fis = new FileInputStream(writeFile)) { - int hasReadBytes = 0; - do { - readBytes = fis.read(readData); - if (hasReadBytes < 1024) { - for (int i = 0; i < readBytes; i++) { - if (data[hasReadBytes + i] != readData[i]) { - return false; - } + int hasReadBytes = 0; + do { + readBytes = fis.read(readData); + if (hasReadBytes < 1024) { + for (int i = 0; i < readBytes; i++) { + if (data[hasReadBytes + i] != readData[i]) { + return false; } } - hasReadBytes += readBytes; - } while (readBytes != -1); + } + hasReadBytes += readBytes; + } while (readBytes != -1); } } catch (Exception e) { LOG.error("Storage read and write error ", e); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 99efc50179..bc3ec39957 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -290,8 +290,8 @@ public void releaseReadMemory(long size) { // flush the buffer with required map which is shuffleId> public synchronized void flush(Map> requiredFlush) { - for (Map.Entry>> appIdToBuffers : bufferPool.entrySet()) { + for (Map.Entry>> + appIdToBuffers : bufferPool.entrySet()) { String appId = appIdToBuffers.getKey(); if (requiredFlush.containsKey(appId)) { for (Map.Entry> shuffleIdToBuffers : diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java index a49b12ea09..117c3479b8 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java @@ -133,16 +133,16 @@ public boolean unlockShuffleExcluded(String shuffleKey) { @Override public void updateWriteMetrics(StorageWriteMetrics metrics) { - updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()), - metrics.getDataSize(), - metrics.getPartitions()); + updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()), + metrics.getDataSize(), + metrics.getPartitions()); } @Override public void updateReadMetrics(StorageReadMetrics metrics) { - String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()); - prepareStartRead(shuffleKey); - updateShuffleLastReadTs(shuffleKey); + String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()); + prepareStartRead(shuffleKey); + updateShuffleLastReadTs(shuffleKey); } @Override diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java index 2505cd1893..a27136b5ca 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java @@ -55,11 +55,11 @@ public List getSortedShuffleKeys(boolean checkRead, int hint) { .filter(e -> (!checkRead || e.getValue().isStartRead.get()) && e.getValue().getNotUploadedSize() > 0) .collect(Collectors.toList()); - shuffleMetaList.sort((Entry o1, Entry o2) -> { - long sz1 = o1.getValue().getSize().longValue(); - long sz2 = o2.getValue().getSize().longValue(); - return -Long.compare(sz1, sz2); - }); + shuffleMetaList.sort((Entry o1, Entry o2) -> { + long sz1 = o1.getValue().getSize().longValue(); + long sz2 = o2.getValue().getSize().longValue(); + return -Long.compare(sz1, sz2); + }); return shuffleMetaList .subList(0, Math.min(shuffleMetaList.size(), hint)) @@ -82,7 +82,7 @@ public RoaringBitmap getNotUploadedPartitions(String shuffleKey) { uploadedPartitionBitmap = shuffleMeta.uploadedPartitionBitmap.clone(); } for (int partition : uploadedPartitionBitmap) { - partitionBitmap.remove(partition); + partitionBitmap.remove(partition); } return partitionBitmap; } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java index 9819c01a60..f990c383f2 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java @@ -231,30 +231,30 @@ public void logConsumedBlockInfo() { @VisibleForTesting public String getReadBlokNumInfo() { long totalBlockNum = hotReadBlockNum + warmReadBlockNum - + coldReadBlockNum + frozenReadBlockNum; + + coldReadBlockNum + frozenReadBlockNum; return "Client read " + totalBlockNum + " blocks [" - + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum - + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]"; + + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum + + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]"; } @VisibleForTesting public String getReadLengthInfo() { long totalReadLength = hotReadLength + warmReadLength - + coldReadLength + frozenReadLength; + + coldReadLength + frozenReadLength; return "Client read " + totalReadLength + " bytes [" - + " hot:" + hotReadLength + " warm:" + warmReadLength - + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]"; + + " hot:" + hotReadLength + " warm:" + warmReadLength + + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]"; } @VisibleForTesting public String getReadUncompressLengthInfo() { long totalReadUncompressLength = hotReadUncompressLength + warmReadUncompressLength - + coldReadUncompressLength + frozenReadUncompressLength; + + coldReadUncompressLength + frozenReadUncompressLength; return "Client read " + totalReadUncompressLength + " uncompressed bytes [" - + " hot:" + hotReadUncompressLength - + " warm:" + warmReadUncompressLength - + " cold:" + coldReadUncompressLength - + " frozen:" + frozenReadUncompressLength + " ]"; + + " hot:" + hotReadUncompressLength + + " warm:" + warmReadUncompressLength + + " cold:" + coldReadUncompressLength + + " frozen:" + frozenReadUncompressLength + " ]"; } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java index 6bc11f175e..ce7a515f61 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java @@ -39,12 +39,12 @@ public abstract class DataSkippableReadHandler extends AbstractClientReadHandler protected Roaring64NavigableMap processBlockIds; public DataSkippableReadHandler( - String appId, - int shuffleId, - int partitionId, - int readBufferSize, - Roaring64NavigableMap expectBlockIds, - Roaring64NavigableMap processBlockIds) { + String appId, + int shuffleId, + int partitionId, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java index 7151d94b51..f0e74911f4 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java @@ -122,8 +122,8 @@ public ShuffleDataResult readShuffleData() { // init lazily like LocalFileClientRead if (readHandlers.isEmpty()) { String fullShufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, - ShuffleStorageUtils.getShuffleDataPathWithRange(appId, - shuffleId, partitionId, partitionNumPerRange, partitionNum)); + ShuffleStorageUtils.getShuffleDataPathWithRange(appId, + shuffleId, partitionId, partitionNumPerRange, partitionNum)); init(fullShufflePath); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java index 75dae67313..955bd5f8d4 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java @@ -109,12 +109,12 @@ public void writeHeader(List partitionList, fsDataOutputStream.writeInt(partitionList.size()); headerContentBuf.putInt(partitionList.size()); for (int i = 0; i < partitionList.size(); i++) { - fsDataOutputStream.writeInt(partitionList.get(i)); - fsDataOutputStream.writeLong(indexFileSizeList.get(i)); - fsDataOutputStream.writeLong(dataFileSizeList.get(i)); - headerContentBuf.putInt(partitionList.get(i)); - headerContentBuf.putLong(indexFileSizeList.get(i)); - headerContentBuf.putLong(dataFileSizeList.get(i)); + fsDataOutputStream.writeInt(partitionList.get(i)); + fsDataOutputStream.writeLong(indexFileSizeList.get(i)); + fsDataOutputStream.writeLong(dataFileSizeList.get(i)); + headerContentBuf.putInt(partitionList.get(i)); + headerContentBuf.putLong(indexFileSizeList.get(i)); + headerContentBuf.putLong(dataFileSizeList.get(i)); } headerContentBuf.flip(); fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf)); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java index f8a58f6946..de5424160f 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java @@ -82,14 +82,14 @@ protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegmen byte[] data = readShuffleData(shuffleDataSegment.getOffset(), expectedLength); if (data.length == 0) { LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {} from file {}.data", - expectedLength, data.length, shuffleDataSegment, filePrefix); + expectedLength, data.length, shuffleDataSegment, filePrefix); return null; } ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data, shuffleDataSegment.getBufferSegments()); if (shuffleDataResult.isEmpty()) { LOG.warn("Shuffle data is empty, expected length {}, data length {}, segment {} in file {}.data", - expectedLength, data.length, shuffleDataSegment, filePrefix); + expectedLength, data.length, shuffleDataSegment, filePrefix); return null; } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java index 1c9c048aa3..8cf1fe2ae3 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java @@ -40,22 +40,22 @@ public class LocalFileQuorumClientReadHandler extends AbstractClientReadHandler private long readUncompressLength = 0L; public LocalFileQuorumClientReadHandler( - String appId, - int shuffleId, - int partitionId, - int indexReadLimit, - int partitionNumPerRange, - int partitionNum, - int readBufferSize, - Roaring64NavigableMap expectBlockIds, - Roaring64NavigableMap processBlockIds, - List shuffleServerClients) { - this.appId = appId; - this.shuffleId = shuffleId; - this.partitionId = partitionId; - this.readBufferSize = readBufferSize; - for (ShuffleServerClient client: shuffleServerClients) { - handlers.add(new LocalFileClientReadHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + List shuffleServerClients) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.readBufferSize = readBufferSize; + for (ShuffleServerClient client: shuffleServerClients) { + handlers.add(new LocalFileClientReadHandler( appId, shuffleId, partitionId, @@ -66,8 +66,8 @@ public LocalFileQuorumClientReadHandler( expectBlockIds, processBlockIds, client - )); - } + )); + } } @Override @@ -85,7 +85,7 @@ public ShuffleDataResult readShuffleData() { } if (!readSuccessful) { throw new RssException("Failed to read all replicas for appId[" + appId + "], shuffleId[" - + shuffleId + "], partitionId[" + partitionId + "]"); + + shuffleId + "], partitionId[" + partitionId + "]"); } return result; } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java index 5f0db19b19..73bcc46ddc 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java @@ -54,7 +54,7 @@ public ShuffleDataResult readShuffleData() { ShuffleDataResult result = null; RssGetInMemoryShuffleDataRequest request = new RssGetInMemoryShuffleDataRequest( - appId,shuffleId, partitionId, lastBlockId, readBufferSize); + appId,shuffleId, partitionId, lastBlockId, readBufferSize); try { RssGetInMemoryShuffleDataResponse response = diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java index a3e6a5bee4..5fbe8ec684 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java @@ -45,8 +45,8 @@ public MemoryQuorumClientReadHandler( this.partitionId = partitionId; this.readBufferSize = readBufferSize; shuffleServerClients.forEach(client -> - handlers.add(new MemoryClientReadHandler( - appId, shuffleId, partitionId, readBufferSize, client)) + handlers.add(new MemoryClientReadHandler( + appId, shuffleId, partitionId, readBufferSize, client)) ); }