Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
timeTaken);
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
Expand Down Expand Up @@ -193,8 +193,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
timeTaken);
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
Expand Down Expand Up @@ -236,7 +236,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void handle(RequestMessage request) {

private void processFetchRequest(final ChunkFetchRequest req) {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
req.streamChunkId);
}

ManagedBuffer buf;
Expand All @@ -125,8 +126,8 @@ private void processFetchRequest(final ChunkFetchRequest req) {
streamManager.registerChannel(channel, req.streamChunkId.streamId);
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format(
"Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
logger.error(String.format("Error opening block %s for request from %s",
req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper

public static class StoreVersion {

final static byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);

public final int major;
public final int minor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* A central location that tracks all the settings we expose to users.
*/
public class TransportConf {

static {
// Set this due to Netty PR #5661 for Netty 4.0.37+ to work
System.setProperty("io.netty.maxDirectMemory", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ protected void serviceInit(Configuration conf) throws Exception {
private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
Expand Down Expand Up @@ -306,7 +306,7 @@ protected void serviceStop() {
}
if (db != null) {
db.close();
}
}
} catch (Exception e) {
logger.error("Exception when stopping service", e);
}
Expand All @@ -329,7 +329,7 @@ public void setRecoveryPath(Path recoveryPath) {

/**
* Get the path specific to this auxiliary service to use for recovery.
*/
*/
protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}
Expand All @@ -345,7 +345,7 @@ protected File initRecoveryDb(String dbFileName) {
if (recoveryFile.exists()) {
return recoveryFile;
}
}
}
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ private PrefixComparators() {}
public static final PrefixComparator STRING = new UnsignedPrefixComparator();
public static final PrefixComparator STRING_DESC = new UnsignedPrefixComparatorDesc();
public static final PrefixComparator STRING_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
public static final PrefixComparator STRING_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
public static final PrefixComparator STRING_DESC_NULLS_FIRST =
new UnsignedPrefixComparatorDescNullsFirst();

public static final PrefixComparator BINARY = new UnsignedPrefixComparator();
public static final PrefixComparator BINARY_DESC = new UnsignedPrefixComparatorDesc();
public static final PrefixComparator BINARY_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
public static final PrefixComparator BINARY_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
public static final PrefixComparator BINARY_DESC_NULLS_FIRST =
new UnsignedPrefixComparatorDescNullsFirst();

public static final PrefixComparator LONG = new SignedPrefixComparator();
public static final PrefixComparator LONG_DESC = new SignedPrefixComparatorDesc();
public static final PrefixComparator LONG_NULLS_LAST = new SignedPrefixComparatorNullsLast();
public static final PrefixComparator LONG_DESC_NULLS_FIRST = new SignedPrefixComparatorDescNullsFirst();
public static final PrefixComparator LONG_DESC_NULLS_FIRST =
new SignedPrefixComparatorDescNullsFirst();

public static final PrefixComparator DOUBLE = new UnsignedPrefixComparator();
public static final PrefixComparator DOUBLE_DESC = new UnsignedPrefixComparatorDesc();
public static final PrefixComparator DOUBLE_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
public static final PrefixComparator DOUBLE_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
public static final PrefixComparator DOUBLE_DESC_NULLS_FIRST =
new UnsignedPrefixComparatorDescNullsFirst();

public static final class StringPrefixComparator {
public static long computePrefix(UTF8String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,4 @@ public UnsafeSorterIterator getSortedIterator() {
return new SortedIterator(pos / 2, offset);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public UnsafeSorterSpillReader(
if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
// fall back to a sane default value
logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
"allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
"allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}

Expand Down