diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java index 92b643b0cb92..ebb5c2c5ed55 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java @@ -17,8 +17,6 @@ package org.apache.spark.util.kvstore; -import com.google.common.base.Objects; - public class CustomType1 { @KVIndex @@ -52,12 +50,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("key", key) - .add("id", id) - .add("name", name) - .add("num", num) - .toString(); + return "CustomType1[key=" + key + ",id=" + id + ",name=" + name + ",num=" + num; } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 45fee541a4f5..66566b67870f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -26,9 +26,10 @@ import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import com.google.common.base.Objects; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.LimitedInputStream; @@ -144,10 +145,10 @@ public Object convertToNetty() throws IOException { @Override public String toString() { - return Objects.toStringHelper(this) - .add("file", file) - .add("offset", offset) - .add("length", length) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("file", file) + .append("offset", offset) + .append("length", length) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java index acc49d968c18..b42977c7cb7f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java @@ -21,9 +21,10 @@ import java.io.InputStream; import java.nio.ByteBuffer; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}. @@ -69,8 +70,8 @@ public Object convertToNetty() throws IOException { @Override public String toString() { - return Objects.toStringHelper(this) - .add("buf", buf) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("buf", buf) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java index 631d76771525..084f89d2611c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java @@ -21,9 +21,10 @@ import java.io.InputStream; import java.nio.ByteBuffer; -import com.google.common.base.Objects; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * A {@link ManagedBuffer} backed by {@link ByteBuffer}. @@ -67,8 +68,8 @@ public Object convertToNetty() throws IOException { @Override public String toString() { - return Objects.toStringHelper(this) - .add("buf", buf) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("buf", buf) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index b018197deaf2..6dcc703e9266 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -27,13 +27,14 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -301,10 +302,10 @@ public void close() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("remoteAdress", channel.remoteAddress()) - .add("clientId", clientId) - .add("isActive", isActive()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("remoteAdress", channel.remoteAddress()) + .append("clientId", clientId) + .append("isActive", isActive()) .toString(); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java index a7afbfa8621c..0f1781cbf1f2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * Response to {@link ChunkFetchRequest} when there is an error fetching the chunk. @@ -54,7 +57,7 @@ public static ChunkFetchFailure decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(streamChunkId, errorString); + return Objects.hash(streamChunkId, errorString); } @Override @@ -68,9 +71,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamChunkId", streamChunkId) - .add("errorString", errorString) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkId) + .append("errorString", errorString) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java index fe54fcc50dc8..7b034d5c2f59 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java @@ -17,8 +17,9 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * Request to fetch a sequence of a single chunk of a stream. This will correspond to a single @@ -64,8 +65,8 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamChunkId", streamChunkId) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkId) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java index d5c9a9b3202f..eaad143fc3f5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; @@ -67,7 +70,7 @@ public static ChunkFetchSuccess decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(streamChunkId, body()); + return Objects.hash(streamChunkId, body()); } @Override @@ -81,9 +84,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamChunkId", streamChunkId) - .add("buffer", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkId) + .append("buffer", body()) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/OneWayMessage.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/OneWayMessage.java index 1632fb9e0368..719f6c64c5de 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/OneWayMessage.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/OneWayMessage.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; @@ -72,8 +75,8 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("body", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("body", body()) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index 61061903de23..6e4f5687d16c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** Response to {@link RpcRequest} for a failed RPC. */ public final class RpcFailure extends AbstractMessage implements ResponseMessage { @@ -52,7 +55,7 @@ public static RpcFailure decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(requestId, errorString); + return Objects.hash(requestId, errorString); } @Override @@ -66,9 +69,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("errorString", errorString) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("errorString", errorString) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java index cc1bb95d2d56..f2609ce2dbdb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; @@ -64,7 +67,7 @@ public static RpcRequest decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(requestId, body()); + return Objects.hash(requestId, body()); } @Override @@ -78,9 +81,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("body", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("body", body()) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java index c03291e9c0b2..51b36ea18336 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; @@ -64,7 +67,7 @@ public static RpcResponse decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(requestId, body()); + return Objects.hash(requestId, body()); } @Override @@ -78,9 +81,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("body", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("body", body()) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java index d46a26388480..75c6d630b9c3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * Encapsulates a request for a particular chunk of a stream. @@ -51,7 +54,7 @@ public static StreamChunkId decode(ByteBuf buffer) { @Override public int hashCode() { - return Objects.hashCode(streamId, chunkIndex); + return Objects.hash(streamId, chunkIndex); } @Override @@ -65,9 +68,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("chunkIndex", chunkIndex) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("chunkIndex", chunkIndex) .toString(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamFailure.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamFailure.java index 68fcfa774861..06836f5eea39 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamFailure.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * Message indicating an error when transferring a stream. @@ -54,7 +57,7 @@ public static StreamFailure decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(streamId, error); + return Objects.hash(streamId, error); } @Override @@ -68,9 +71,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("error", error) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("error", error) .toString(); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamRequest.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamRequest.java index 1b135af752bd..3d035e5c94f2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamRequest.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamRequest.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; /** * Request to stream data from the remote end. @@ -67,8 +70,8 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) .toString(); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamResponse.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamResponse.java index 568108c4fe5e..f30605ce836f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamResponse.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamResponse.java @@ -17,8 +17,11 @@ package org.apache.spark.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; @@ -67,7 +70,7 @@ public static StreamResponse decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(byteCount, streamId); + return Objects.hash(byteCount, streamId); } @Override @@ -81,10 +84,10 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("byteCount", byteCount) - .add("body", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("byteCount", byteCount) + .append("body", body()) .toString(); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java index 7d21151e0107..fb50801a51ba 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java @@ -20,8 +20,9 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NettyManagedBuffer; @@ -99,9 +100,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("body", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("body", body()) .toString(); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 657774c1b468..ba1a17bf7e5e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -27,12 +27,13 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.commons.lang3.tuple.Pair; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -401,19 +402,19 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; AppExecId appExecId = (AppExecId) o; - return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId); + return Objects.equals(appId, appExecId.appId) && Objects.equals(execId, appExecId.execId); } @Override public int hashCode() { - return Objects.hashCode(appId, execId); + return Objects.hash(appId, execId); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) .toString(); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlocksRemoved.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlocksRemoved.java index 723b2f75c6fc..a4d6035df807 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlocksRemoved.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlocksRemoved.java @@ -17,8 +17,11 @@ package org.apache.spark.network.shuffle.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; // Needed by ScalaDoc. See SPARK-7726 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; @@ -41,8 +44,8 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("numRemovedBlocks", numRemovedBlocks) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("numRemovedBlocks", numRemovedBlocks) .toString(); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index 540ecd09a7e3..b4e7bc409d3b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -18,11 +18,13 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import java.util.Objects; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.protocol.Encoders; @@ -48,15 +50,15 @@ public ExecutorShuffleInfo( @Override public int hashCode() { - return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs); + return Objects.hash(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("localDirs", Arrays.toString(localDirs)) - .add("subDirsPerLocalDir", subDirsPerLocalDir) - .add("shuffleManager", shuffleManager) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("localDirs", Arrays.toString(localDirs)) + .append("subDirsPerLocalDir", subDirsPerLocalDir) + .append("shuffleManager", shuffleManager) .toString(); } @@ -66,7 +68,7 @@ public boolean equals(Object other) { ExecutorShuffleInfo o = (ExecutorShuffleInfo) other; return Arrays.equals(localDirs, o.localDirs) && subDirsPerLocalDir == o.subDirsPerLocalDir - && Objects.equal(shuffleManager, o.shuffleManager); + && Objects.equals(shuffleManager, o.shuffleManager); } return false; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java index c0f307af042e..98057d58f7ab 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java @@ -19,8 +19,9 @@ import java.util.Arrays; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -68,13 +69,13 @@ public FetchShuffleBlocks( @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .add("shuffleId", shuffleId) - .add("mapIds", Arrays.toString(mapIds)) - .add("reduceIds", Arrays.deepToString(reduceIds)) - .add("batchFetchEnabled", batchFetchEnabled) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) + .append("shuffleId", shuffleId) + .append("mapIds", Arrays.toString(mapIds)) + .append("reduceIds", Arrays.deepToString(reduceIds)) + .append("batchFetchEnabled", batchFetchEnabled) .toString(); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java index 90c416acc69a..47f617c5e0a0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java @@ -18,9 +18,11 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import java.util.Objects; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -47,9 +49,9 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execIds", Arrays.toString(execIds)) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execIds", Arrays.toString(execIds)) .toString(); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java index 0c3aa6a46114..9e2f0668cbd2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java @@ -19,8 +19,9 @@ import java.util.*; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -63,10 +64,10 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("execIds", Arrays.toString(execIds)) - .add("numLocalDirsByExec", Arrays.toString(numLocalDirsByExec)) - .add("allLocalDirs", Arrays.toString(allLocalDirs)) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("execIds", Arrays.toString(execIds)) + .append("numLocalDirsByExec", Arrays.toString(numLocalDirsByExec)) + .append("allLocalDirs", Arrays.toString(allLocalDirs)) .toString(); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java index ce954b8a289e..771e17b3233e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java @@ -18,9 +18,11 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import java.util.Objects; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -44,15 +46,15 @@ public OpenBlocks(String appId, String execId, String[] blockIds) { @Override public int hashCode() { - return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds); + return Objects.hash(appId, execId) * 41 + Arrays.hashCode(blockIds); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .add("blockIds", Arrays.toString(blockIds)) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) + .append("blockIds", Arrays.toString(blockIds)) .toString(); } @@ -60,8 +62,8 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof OpenBlocks) { OpenBlocks o = (OpenBlocks) other; - return Objects.equal(appId, o.appId) - && Objects.equal(execId, o.execId) + return Objects.equals(appId, o.appId) + && Objects.equals(execId, o.execId) && Arrays.equals(blockIds, o.blockIds); } return false; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java index 167ef3310422..f6af755cd9cd 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java @@ -17,8 +17,11 @@ package org.apache.spark.network.shuffle.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -48,15 +51,15 @@ public RegisterExecutor( @Override public int hashCode() { - return Objects.hashCode(appId, execId, executorInfo); + return Objects.hash(appId, execId, executorInfo); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .add("executorInfo", executorInfo) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) + .append("executorInfo", executorInfo) .toString(); } @@ -64,9 +67,9 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof RegisterExecutor) { RegisterExecutor o = (RegisterExecutor) other; - return Objects.equal(appId, o.appId) - && Objects.equal(execId, o.execId) - && Objects.equal(executorInfo, o.executorInfo); + return Objects.equals(appId, o.appId) + && Objects.equals(execId, o.execId) + && Objects.equals(executorInfo, o.executorInfo); } return false; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveBlocks.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveBlocks.java index 1c718d307753..ade838bd4286 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveBlocks.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveBlocks.java @@ -17,11 +17,14 @@ package org.apache.spark.network.shuffle.protocol; -import com.google.common.base.Objects; +import java.util.Arrays; +import java.util.Objects; + import io.netty.buffer.ByteBuf; -import org.apache.spark.network.protocol.Encoders; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; -import java.util.Arrays; +import org.apache.spark.network.protocol.Encoders; // Needed by ScalaDoc. See SPARK-7726 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; @@ -43,15 +46,15 @@ public RemoveBlocks(String appId, String execId, String[] blockIds) { @Override public int hashCode() { - return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds); + return Objects.hash(appId, execId) * 41 + Arrays.hashCode(blockIds); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .add("blockIds", Arrays.toString(blockIds)) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) + .append("blockIds", Arrays.toString(blockIds)) .toString(); } @@ -59,8 +62,8 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof RemoveBlocks) { RemoveBlocks o = (RemoveBlocks) other; - return Objects.equal(appId, o.appId) - && Objects.equal(execId, o.execId) + return Objects.equals(appId, o.appId) + && Objects.equals(execId, o.execId) && Arrays.equals(blockIds, o.blockIds); } return false; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java index 1915295aa6cc..dd7715a4e82d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java @@ -17,8 +17,11 @@ package org.apache.spark.network.shuffle.protocol; -import com.google.common.base.Objects; +import java.util.Objects; + import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; // Needed by ScalaDoc. See SPARK-7726 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; @@ -41,14 +44,14 @@ public StreamHandle(long streamId, int numChunks) { @Override public int hashCode() { - return Objects.hashCode(streamId, numChunks); + return Objects.hash(streamId, numChunks); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("numChunks", numChunks) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("numChunks", numChunks) .toString(); } @@ -56,8 +59,8 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof StreamHandle) { StreamHandle o = (StreamHandle) other; - return Objects.equal(streamId, o.streamId) - && Objects.equal(numChunks, o.numChunks); + return Objects.equals(streamId, o.streamId) + && Objects.equals(numChunks, o.numChunks); } return false; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java index 3caed59d508f..a5bc3f7009b4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java @@ -18,9 +18,11 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import java.util.Objects; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -60,18 +62,18 @@ public UploadBlock( @Override public int hashCode() { - int objectsHashCode = Objects.hashCode(appId, execId, blockId); + int objectsHashCode = Objects.hash(appId, execId, blockId); return (objectsHashCode * 41 + Arrays.hashCode(metadata)) * 41 + Arrays.hashCode(blockData); } @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) - .add("execId", execId) - .add("blockId", blockId) - .add("metadata size", metadata.length) - .add("block size", blockData.length) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("execId", execId) + .append("blockId", blockId) + .append("metadata size", metadata.length) + .append("block size", blockData.length) .toString(); } @@ -79,9 +81,9 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof UploadBlock) { UploadBlock o = (UploadBlock) other; - return Objects.equal(appId, o.appId) - && Objects.equal(execId, o.execId) - && Objects.equal(blockId, o.blockId) + return Objects.equals(appId, o.appId) + && Objects.equals(execId, o.execId) + && Objects.equals(blockId, o.blockId) && Arrays.equals(metadata, o.metadata) && Arrays.equals(blockData, o.blockData); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java index 9df30967d5bb..958a84e516c8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java @@ -18,9 +18,11 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import java.util.Objects; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.spark.network.protocol.Encoders; @@ -53,9 +55,9 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("blockId", blockId) - .add("metadata size", metadata.length) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("blockId", blockId) + .append("metadata size", metadata.length) .toString(); } @@ -63,7 +65,7 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof UploadBlockStream) { UploadBlockStream o = (UploadBlockStream) other; - return Objects.equal(blockId, o.blockId) + return Objects.equals(blockId, o.blockId) && Arrays.equals(metadata, o.metadata); } return false; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java index e38442327e22..b37d8620a57f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java @@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -42,7 +41,7 @@ public class CleanupNonShuffleServiceServedFilesSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. - private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + private Executor sameThreadExecutor = Runnable::run; private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 47c087088a8a..48b73e32216c 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -24,7 +24,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -35,7 +34,7 @@ public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. - private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + private Executor sameThreadExecutor = Runnable::run; private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY); private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c170f99b112c..815a56d765b6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -23,14 +23,16 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Objects; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -417,7 +419,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; AppId appExecId = (AppId) o; - return Objects.equal(appId, appExecId.appId); + return Objects.equals(appId, appExecId.appId); } @Override @@ -427,8 +429,8 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("appId", appId) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) .toString(); } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5fe1c663affa..8ba173983180 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -26,7 +26,7 @@ import scala.collection.concurrent import scala.collection.mutable import scala.util.Properties -import com.google.common.collect.MapMaker +import com.google.common.cache.CacheBuilder import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.DeveloperApi @@ -76,7 +76,8 @@ class SparkEnv ( // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). - private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() + private[spark] val hadoopJobMetadata = + CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap() private[spark] var driverTmpDir: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a3776b3ad756..aa34a404bdf3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,7 +32,6 @@ import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants @@ -195,7 +194,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (!Utils.isTesting) { ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor") } else { - MoreExecutors.sameThreadExecutor() + ThreadUtils.sameThreadExecutorService } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index ff4928dae6bf..9742d12cfe01 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -405,9 +405,9 @@ private[spark] object HadoopRDD extends Logging { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String): AnyRef = SparkEnv.get.hadoopJobMetadata.get(key) - private def putCachedMetadata(key: String, value: Any): Unit = + private def putCachedMetadata(key: String, value: AnyRef): Unit = SparkEnv.get.hadoopJobMetadata.put(key, value) /** Add Hadoop configuration specific to a single partition and attempt. */ diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 38cb030297c8..1b8dc9c8275a 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -18,14 +18,12 @@ package org.apache.spark.status import java.util.Collection -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ExecutorService, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer} -import com.google.common.util.concurrent.MoreExecutors - import org.apache.spark.SparkConf import org.apache.spark.internal.config.Status._ import org.apache.spark.status.ElementTrackingStore._ @@ -72,10 +70,10 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten private val triggers = new HashMap[Class[_], LatchedTriggers]() private val flushTriggers = new ListBuffer[() => Unit]() - private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + private val executor: ExecutorService = if (conf.get(ASYNC_TRACKING_ENABLED)) { ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") } else { - MoreExecutors.sameThreadExecutor() + ThreadUtils.sameThreadExecutorService } @volatile private var stopped = false diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 9ed95f6b7bd5..de39e4b410f2 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -18,22 +18,97 @@ package org.apache.spark.util import java.util.concurrent._ +import java.util.concurrent.locks.ReentrantLock -import scala.collection.generic.CanBuildFrom -import scala.language.higherKinds - -import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.language.higherKinds import scala.util.control.NonFatal +import com.google.common.util.concurrent.ThreadFactoryBuilder + import org.apache.spark.SparkException import org.apache.spark.rpc.RpcAbortException private[spark] object ThreadUtils { private val sameThreadExecutionContext = - ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor()) + ExecutionContext.fromExecutorService(sameThreadExecutorService()) + + // Inspired by Guava MoreExecutors.sameThreadExecutor; inlined and converted + // to Scala here to avoid Guava version issues + def sameThreadExecutorService(): ExecutorService = new AbstractExecutorService { + private val lock = new ReentrantLock() + private val termination = lock.newCondition() + private var runningTasks = 0 + private var serviceIsShutdown = false + + override def shutdown(): Unit = { + lock.lock() + try { + serviceIsShutdown = true + } finally { + lock.unlock() + } + } + + override def shutdownNow(): java.util.List[Runnable] = { + shutdown() + java.util.Collections.emptyList() + } + + override def isShutdown: Boolean = { + lock.lock() + try { + serviceIsShutdown + } finally { + lock.unlock() + } + } + + override def isTerminated: Boolean = synchronized { + lock.lock() + try { + serviceIsShutdown && runningTasks == 0 + } finally { + lock.unlock() + } + } + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = { + var nanos = unit.toNanos(timeout) + lock.lock() + try { + while (nanos > 0 && !isTerminated()) { + nanos = termination.awaitNanos(nanos) + } + isTerminated() + } finally { + lock.unlock() + } + } + + override def execute(command: Runnable): Unit = { + lock.lock() + try { + if (isShutdown()) throw new RejectedExecutionException("Executor already shutdown") + runningTasks += 1 + } finally { + lock.unlock() + } + try { + command.run() + } finally { + lock.lock() + try { + runningTasks -= 1 + if (isTerminated()) termination.signalAll() + } finally { + lock.unlock() + } + } + } + } /** * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6b42815aa25a..5af6a020d517 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -28,7 +28,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files import java.security.SecureRandom -import java.util.{Locale, Properties, Random, UUID} +import java.util.{Arrays, Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.GZIPInputStream @@ -45,9 +45,9 @@ import scala.util.matching.Regex import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import com.google.common.hash.HashCodes import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses +import org.apache.commons.codec.binary.Hex import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -2810,7 +2810,7 @@ private[spark] object Utils extends Logging { val rnd = new SecureRandom() val secretBytes = new Array[Byte](bits / JByte.SIZE) rnd.nextBytes(secretBytes) - HashCodes.fromBytes(secretBytes).toString() + Hex.encodeHexString(secretBytes) } /** diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 10e6936eb379..ee8e38c24b47 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -30,7 +30,6 @@ import scala.collection.Iterator; import com.google.common.collect.HashMultiset; -import com.google.common.collect.Iterators; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -249,7 +248,7 @@ class BadRecords extends scala.collection.AbstractIterator writer = createWriter(true); - writer.write(Iterators.emptyIterator()); + writer.write(new ArrayList>().iterator()); final Option mapStatus = writer.stop(true); assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 406bd9244870..2efe6da5e986 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.util.control.NonFatal -import com.google.common.util.concurrent.MoreExecutors import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, anyLong} import org.mockito.Mockito.{spy, times, verify} @@ -38,7 +37,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.TestUtils.JavaSourceFromString import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.storage.TaskResultBlockId -import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils} +import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, ThreadUtils, Utils} /** @@ -99,7 +98,7 @@ private class MyTaskResultGetter(env: SparkEnv, scheduler: TaskSchedulerImpl) extends TaskResultGetter(env, scheduler) { // Use the current thread so we can access its results synchronously - protected override val getTaskResultExecutor = MoreExecutors.sameThreadExecutor() + protected override val getTaskResultExecutor = ThreadUtils.sameThreadExecutorService // DirectTaskResults that we receive from the executors private val _taskResults = new ArrayBuffer[DirectTaskResult[_]] diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index 2ee3224b3c28..af84498d5e47 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -22,9 +22,12 @@ import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult} +import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, + KinesisProducerConfiguration, UserRecordResult} import com.google.common.util.concurrent.{FutureCallback, Futures} +import org.apache.spark.util.ThreadUtils + private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2) extends KinesisTestUtils(streamShardCount) { override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = { @@ -66,7 +69,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG sentSeqNumbers += ((num, seqNumber)) } } - Futures.addCallback(future, kinesisCallBack) + Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService) } producer.flushSync() shardIdToSeqNumbers.toMap diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index c5be3efc6371..91ddf0f28ad8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -47,6 +47,17 @@ object JavaTypeInference { private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType + // Guava changed the name of this method; this tries to stay compatible with both + // TODO replace with isSupertypeOf when Guava 14 support no longer needed for Hadoop + private val ttIsAssignableFrom: (TypeToken[_], TypeToken[_]) => Boolean = { + val ttMethods = classOf[TypeToken[_]].getMethods. + filter(_.getParameterCount == 1). + filter(_.getParameterTypes.head == classOf[TypeToken[_]]) + val isAssignableFromMethod = ttMethods.find(_.getName == "isSupertypeOf").getOrElse( + ttMethods.find(_.getName == "isAssignableFrom").get) + (a: TypeToken[_], b: TypeToken[_]) => isAssignableFromMethod.invoke(a, b).asInstanceOf[Boolean] + } + /** * Infers the corresponding SQL data type of a JavaBean class. * @param beanClass Java type @@ -111,11 +122,11 @@ object JavaTypeInference { val (dataType, nullable) = inferDataType(typeToken.getComponentType, seenTypeSet) (ArrayType(dataType, nullable), true) - case _ if iterableType.isAssignableFrom(typeToken) => + case _ if ttIsAssignableFrom(iterableType, typeToken) => val (dataType, nullable) = inferDataType(elementType(typeToken), seenTypeSet) (ArrayType(dataType, nullable), true) - case _ if mapType.isAssignableFrom(typeToken) => + case _ if ttIsAssignableFrom(mapType, typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) val (keyDataType, _) = inferDataType(keyType, seenTypeSet) val (valueDataType, nullable) = inferDataType(valueType, seenTypeSet) @@ -273,7 +284,7 @@ object JavaTypeInference { } Invoke(arrayData, methodName, ObjectType(c)) - case c if listType.isAssignableFrom(typeToken) => + case c if ttIsAssignableFrom(listType, typeToken) => val et = elementType(typeToken) val newTypePath = walkedTypePath.recordArray(et.getType.getTypeName) val (dataType, elementNullable) = inferDataType(et) @@ -289,7 +300,7 @@ object JavaTypeInference { UnresolvedMapObjects(mapFunction, path, customCollectionCls = Some(c)) - case _ if mapType.isAssignableFrom(typeToken) => + case _ if ttIsAssignableFrom(mapType, typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) val newTypePath = walkedTypePath.recordMap(keyType.getType.getTypeName, valueType.getType.getTypeName) @@ -404,10 +415,10 @@ object JavaTypeInference { case _ if typeToken.isArray => toCatalystArray(inputObject, typeToken.getComponentType) - case _ if listType.isAssignableFrom(typeToken) => + case _ if ttIsAssignableFrom(listType, typeToken) => toCatalystArray(inputObject, elementType(typeToken)) - case _ if mapType.isAssignableFrom(typeToken) => + case _ if ttIsAssignableFrom(mapType, typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) createSerializerForMap( diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index c8b5555a135d..5603cb988b8e 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -22,6 +22,10 @@ import java.time.LocalDate; import java.util.*; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.junit.*; + import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.catalyst.util.DateTimeUtils; @@ -29,7 +33,6 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; -import org.junit.*; import org.apache.spark.sql.test.TestSparkSession; @@ -486,17 +489,17 @@ public int hashCode() { @Override public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("shortField", shortField) - .add("intField", intField) - .add("longField", longField) - .add("floatField", floatField) - .add("doubleField", doubleField) - .add("stringField", stringField) - .add("booleanField", booleanField) - .add("timestampField", timestampField) - .add("nullIntField", nullIntField) - .toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("shortField", shortField) + .append("intField", intField) + .append("longField", longField) + .append("floatField", floatField) + .append("doubleField", doubleField) + .append("stringField", stringField) + .append("booleanField", booleanField) + .append("timestampField", timestampField) + .append("nullIntField", nullIntField) + .toString(); } } @@ -584,11 +587,12 @@ public int hashCode() { @Override public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("localDateField", localDateField) - .add("instantField", instantField) - .toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("localDateField", localDateField) + .append("instantField", instantField) + .toString(); } + } private static Row createLocalDateInstantRow(Long index) {