diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java index 1a3cdff63826..2cd39bd60c2a 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java @@ -39,7 +39,7 @@ public final class LongArray { private final long length; public LongArray(MemoryBlock memory) { - assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements"; + assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements"; this.memory = memory; this.baseObj = memory.getBaseObject(); this.baseOffset = memory.getBaseOffset(); diff --git a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java index 20654e4eeaa0..b8c2294c7b7a 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java @@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy { HashMapGrowthStrategy DOUBLING = new Doubling(); class Doubling implements HashMapGrowthStrategy { + + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat + // smaller. Be conservative and lower the cap a little. + private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; + @Override public int nextCapacity(int currentCapacity) { assert (currentCapacity > 0); + int doubleCapacity = currentCapacity * 2; // Guard against overflow - return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE; + return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX; } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala index 4d43d8d5cc8d..f5d2fa14e49c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala @@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable /** Increase our size to newSize and grow the backing array if needed. */ private def growToSize(newSize: Int): Unit = { - if (newSize < 0) { - throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements") + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat + // smaller. Be conservative and lower the cap a little. + val arrayMax = Int.MaxValue - 8 + if (newSize < 0 || newSize - 2 > arrayMax) { + throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements") } val capacity = if (otherElements != null) otherElements.length + 2 else 2 if (newSize > capacity) { - var newArrayLen = 8 + var newArrayLen = 8L while (newSize - 2 > newArrayLen) { newArrayLen *= 2 - if (newArrayLen == Int.MinValue) { - // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue. - // Note that we set the new array length to Int.MaxValue - 2 so that our capacity - // calculation above still gives a positive integer. - newArrayLen = Int.MaxValue - 2 - } } - val newArray = new Array[T](newArrayLen) + if (newArrayLen > arrayMax) { + newArrayLen = arrayMax + } + val newArray = new Array[T](newArrayLen.toInt) if (otherElements != null) { System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index f5844d5353be..b755e5da5168 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._ * Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track * of its estimated size in bytes. * - * The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements. + * The buffer can support up to 1073741819 elements. */ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) extends WritablePartitionedPairCollection[K, V] with SizeTracker @@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements") } val newCapacity = - if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow + if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow MAXIMUM_CAPACITY } else { capacity * 2 @@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) } private object PartitionedPairBuffer { - val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1 + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat + // smaller. Be conservative and lower the cap a little. + val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2 } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 0e4264fe8dfb..971d19973f06 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -35,6 +35,11 @@ * if the fields of row are all fixed-length, as the size of result row is also fixed. */ public class BufferHolder { + + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat + // smaller. Be conservative and lower the cap a little. + private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; + public byte[] buffer; public int cursor = Platform.BYTE_ARRAY_OFFSET; private final UnsafeRow row; @@ -61,15 +66,15 @@ public BufferHolder(UnsafeRow row, int initialSize) { * Grows the buffer by at least neededSize and points the row to the buffer. */ public void grow(int neededSize) { - if (neededSize > Integer.MAX_VALUE - totalSize()) { + if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + - "exceeds size limitation " + Integer.MAX_VALUE); + "exceeds size limitation " + ARRAY_MAX); } final int length = totalSize() + neededSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. - int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE; + int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; final byte[] tmp = new byte[newLength]; Platform.copyMemory( buffer, diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index b4f753c0bc2a..0bddc351e1be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -559,7 +559,7 @@ public final int appendStruct(boolean isNull) { * Upper limit for the maximum capacity for this column. */ @VisibleForTesting - protected int MAX_CAPACITY = Integer.MAX_VALUE; + protected int MAX_CAPACITY = Integer.MAX_VALUE - 8; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.