Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,12 @@
value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
</module>
<module name="Indentation">
<!--
<property name="basicOffset" value="4"/>
<property name="basicOffset" value="2"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="4"/>
<property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="4"/>
-->
<!-- new add 由于checkstyle 未修复部分缩进场景有问题选择先屏蔽规则
https://github.com/checkstyle/checkstyle/issues/3342
-->

<property name="severity" value="ignore"/>

<property name="basicOffset" value="4"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="8"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="arrayInitIndent" value="4"/>
</module>
<module name="AbbreviationAsWordInName">
<!-- new add -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void addRecord(int partitionId, K key, V value) throws IOException,Interr
}
}
if (memoryUsedSize.get() > maxMemSize * memoryThreshold
&& inSendListBytes.get() <= maxMemSize * sendThreshold) {
&& inSendListBytes.get() <= maxMemSize * sendThreshold) {
sendBuffersToServers();
}
mapOutputRecordCounter.increment(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
dataTransferPoolSize);
dataTransferPoolSize);
return client;
}

public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int reduceID) {
String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX
+ String.valueOf(reduceID));
+ String.valueOf(reduceID));
String[] splitServers = servers.split(",");
Set<ShuffleServerInfo> assignServers = Sets.newHashSet();
for (String splitServer : splitServers) {
Expand All @@ -110,15 +110,15 @@ public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int red
throw new RssException("partition " + reduceID + " server info isn't right");
}
ShuffleServerInfo sever = new ShuffleServerInfo(StringUtils.join(serverInfo, "-"),
serverInfo[0], Integer.parseInt(serverInfo[1]));
serverInfo[0], Integer.parseInt(serverInfo[1]));
assignServers.add(sever);
}
return assignServers;
}

public static ApplicationAttemptId getApplicationAttemptId() {
String containerIdStr =
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
ContainerId containerId = ContainerId.fromString(containerIdStr);
return containerId.getApplicationAttemptId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public Roaring64NavigableMap fetchAllRssTaskIds() {
acceptMapCompletionEvents();
} catch (Exception e) {
throw new RssException("Reduce: " + reduce
+ " fails to accept completion events due to: "
+ e.getMessage());
+ " fails to accept completion events due to: "
+ e.getMessage());
}

Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -117,13 +117,13 @@ public void resolve(TaskCompletionEvent event) {
case OBSOLETE:
obsoleteMaps.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of "
+ event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'");
+ event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'");
break;

case TIPFAILED:
tipFailedCount++;
LOG.info("Ignoring output of failed map TIP: '"
+ event.getTaskAttemptId() + "'");
+ event.getTaskAttemptId() + "'");
break;

default:
Expand All @@ -138,14 +138,14 @@ public void acceptMapCompletionEvents() throws IOException {

do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID) reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID) reduce);
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID) reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID) reduce);
events = update.getMapTaskCompletionEvents();
LOG.debug("Got " + events.length + " map completion events from "
+ fromEventIdx);
+ fromEventIdx);

assert !update.shouldReset() : "Unexpected legacy state";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ private enum ShuffleErrors {
this.metrics = metrics;
this.reduceId = reduceId;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.IO_ERROR.toString());
RssFetcher.ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.BAD_ID.toString());
RssFetcher.ShuffleErrors.BAD_ID.toString());
wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.WRONG_MAP.toString());
RssFetcher.ShuffleErrors.WRONG_MAP.toString());
connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.CONNECTION.toString());
RssFetcher.ShuffleErrors.CONNECTION.toString());
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());

this.shuffleReadClient = shuffleReadClient;
this.totalBlockCount = totalBlockCount;
Expand Down Expand Up @@ -151,7 +151,7 @@ public void copyFromRssServer() throws IOException {
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
uncompressedData = RssShuffleUtils.decompressData(
compressedData, compressedBlock.getUncompressLength(), false).array();
compressedData, compressedBlock.getUncompressLength(), false).array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
Expand Down Expand Up @@ -187,9 +187,9 @@ public void copyFromRssServer() throws IOException {
shuffleReadClient.logStatics();
metrics.inputBytes(unCompressionLength);
LOG.info("reduce task " + reduceId.toString() + " cost " + readTime + " ms to fetch and "
+ decompressTime + " ms to decompress with unCompressionLength["
+ unCompressionLength + "] and " + serializeTime + " ms to serialize and "
+ waitTime + " ms to wait resource");
+ decompressTime + " ms to decompress with unCompressionLength["
+ unCompressionLength + "] and " + serializeTime + " ms to serialize and "
+ waitTime + " ms to wait resource");
stopFetch();
}
}
Expand Down Expand Up @@ -226,13 +226,13 @@ private boolean issueMapOutputMerge() throws IOException {
mapOutput.commit();
if (mapOutput instanceof OnDiskMapOutput) {
LOG.info("Reduce: " + reduceId + " allocates disk to accept block "
+ " with byte sizes: " + uncompressedData.length);
+ " with byte sizes: " + uncompressedData.length);
}
} catch (Throwable t) {
ioErrs.increment(1);
mapOutput.abort();
throw new RssException("Reduce: " + reduceId + " cannot write block to "
+ mapOutput.getClass().getSimpleName() + " due to: " + t.getClass().getName());
+ mapOutput.getClass().getSimpleName() + " due to: " + t.getClass().getName());
}
return true;
}
Expand All @@ -258,7 +258,7 @@ private void updateStatus() {
double transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;

progress.setStatus("copy(" + copyBlockCount + " of " + totalBlockCount + " at "
+ mbpsFormat.format(transferRate) + " MB/s)");
+ mbpsFormat.format(transferRate) + " MB/s)");
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,34 +105,34 @@ public void init(ShuffleConsumerPlugin.Context context) {
this.appAttemptId = RssMRUtils.getApplicationAttemptId().getAttemptId();
this.storageType = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_STORAGE_TYPE);
this.replicaWrite = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA_WRITE,
RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
this.replicaRead = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA_READ,
RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA,
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);

this.partitionNum = mrJobConf.getNumReduceTasks();
this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
this.basePath = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_PATH);
this.indexReadLimit = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_INDEX_READ_LIMIT,
RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
this.readBufferSize = (int)UnitConverter.byteStringAsBytes(
RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
this.remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf);
}
}

protected MergeManager<K, V> createMergeManager(
ShuffleConsumerPlugin.Context context) {
ShuffleConsumerPlugin.Context context) {
return new MergeManagerImpl<K, V>(reduceId, mrJobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
}

@Override
Expand All @@ -149,37 +149,37 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
// just get blockIds from RSS servers
ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf);
Roaring64NavigableMap blockIdBitmap = writeClient.getShuffleResult(
clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
writeClient.close();

// get map-completion events to generate RSS taskIDs
final RssEventFetcher<K,V> eventFetcher =
new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds();

LOG.info("In reduce: " + reduceId
+ ", RSS MR client has fetched blockIds and taskIds successfully");
+ ", RSS MR client has fetched blockIds and taskIds successfully");

// start fetcher to fetch blocks from RSS servers
if (!taskIdBitmap.isEmpty()) {
LOG.info("In reduce: " + reduceId
+ ", Rss MR client starts to fetch blocks from RSS server");
+ ", Rss MR client starts to fetch blocks from RSS server");
JobConf readerJobConf = new JobConf((mrJobConf));
if (!remoteStorageInfo.isEmpty()) {
for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
readerJobConf.set(entry.getKey(), entry.getValue());
}
}
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
readerJobConf, new MRIdHelper());
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
readerJobConf, new MRIdHelper());
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality());
shuffleReadClient, blockIdBitmap.getLongCardinality());
fetcher.fetchAllRssBlocks();
LOG.info("In reduce: " + reduceId
+ ", Rss MR client fetches blocks from RSS server successfully");
+ ", Rss MR client fetches blocks from RSS server successfully");
}

copyPhase.complete();
Expand All @@ -198,12 +198,12 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
synchronized (this) {
if (throwable != null) {
throw new Shuffle.ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
throwable);
}
}

LOG.info("In reduce: " + reduceId
+ ", Rss MR client returns sorted data to reduce successfully");
+ ", Rss MR client returns sorted data to reduce successfully");

return kvIter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public Thread newThread(Runnable r) {
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
// set the remote storage with actual value
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.dataReplicaRead = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
this.dataReplicaSkipEnabled = sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
LOG.info("Check quorum config ["
Expand All @@ -171,7 +171,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,23 @@ protected void checkBlockSendResult(Set<Long> blockIds) throws RuntimeException
public Option<MapStatus> stop(boolean success) {
try {
if (success) {
// fill partitionLengths with non zero dummy value so map output tracker could work correctly
long[] partitionLengths = new long[partitioner.numPartitions()];
Arrays.fill(partitionLengths, 1);
final BlockManagerId blockManagerId =
createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
// fill partitionLengths with non zero dummy value so map output tracker could work correctly
long[] partitionLengths = new long[partitioner.numPartitions()];
Arrays.fill(partitionLengths, 1);
final BlockManagerId blockManagerId =
createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);

Map<Integer, List<Long>> ptb = Maps.newHashMap();
for (Map.Entry<Integer, Set<Long>> entry : partitionToBlockIds.entrySet()) {
ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
}
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId,
taskAttemptId, ptb, bitmapSplitNum);
LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start));
MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths);
return Option.apply(mapStatus);
Map<Integer, List<Long>> ptb = Maps.newHashMap();
for (Map.Entry<Integer, Set<Long>> entry : partitionToBlockIds.entrySet()) {
ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
}
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId,
taskAttemptId, ptb, bitmapSplitNum);
LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start));
MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths);
return Option.apply(mapStatus);
} else {
return Option.empty();
}
Expand Down
Loading