From d9d8e62c2de7d9d04534396ab3bbf984ab16c7f5 Mon Sep 17 00:00:00 2001 From: sychen Date: Sat, 12 May 2018 19:14:17 +0800 Subject: [PATCH 1/6] LongToUnsafeRowMap Calculate the new correct size --- .../sql/execution/joins/HashedRelation.scala | 9 ++++--- .../execution/joins/HashedRelationSuite.scala | 24 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 1465346eb802..518e6bbdf518 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value - if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { + val needSize = cursor + 8 + row.getSizeInBytes + val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET + if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) + val newPage = new Array[Long](used * multiples) Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, cursor - Platform.LONG_ARRAY_OFFSET) page = newPage diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 51f8c3325fdf..fefdae36af35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -254,6 +254,30 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { map.free() } + test("LongToUnsafeRowMap with big values") { + val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Long.MaxValue, + Long.MaxValue, + 1), + 0) + val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false))) + val keys = Seq(0L) + val map = new LongToUnsafeRowMap(taskMemoryManager, 1) + val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) + keys.foreach { k => + map.append(k, unsafeProj(InternalRow(bigStr))) + } + map.optimize() + val row = unsafeProj(InternalRow(bigStr)).copy() + keys.foreach { k => + assert(map.getValue(k, row) eq row) + assert(row.getUTF8String(0) === bigStr) + } + map.free() + } + test("Spark-14521") { val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() From 22a2767b98185edf32be3c36bb255f5837ad7466 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 14 May 2018 18:39:59 +0800 Subject: [PATCH 2/6] spliting append func into two parts:grow/append;doubling the size when growing;sys.error instead of UnsupportedOperationException --- .../sql/execution/joins/HashedRelation.scala | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 518e6bbdf518..76d0f7be43c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.types.LongType import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.util.{KnownSizeEstimation, Utils} @@ -362,6 +363,8 @@ private[joins] object UnsafeHashedRelation { private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, capacity: Int) extends MemoryConsumer(mm) with Externalizable with KryoSerializable { + private val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + // Whether the keys are stored in dense mode or not. private var isDense = false @@ -557,7 +560,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap def append(key: Long, row: UnsafeRow): Unit = { val sizeInBytes = row.getSizeInBytes if (sizeInBytes >= (1 << SIZE_BITS)) { - sys.error("Does not support row that is larger than 256M") + throw new UnsupportedOperationException("Does not support row that is larger than 256M") } if (key < minKey) { @@ -567,22 +570,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap maxKey = key } - // There is 8 bytes for the pointer to next value - val needSize = cursor + 8 + row.getSizeInBytes - val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET - if (needSize > nowSize) { - val used = page.length - if (used >= (1 << 30)) { - sys.error("Can not build a HashedRelation that is larger than 8G") - } - val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) - ensureAcquireMemory(used * 8L * multiples) - val newPage = new Array[Long](used * multiples) - Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, - cursor - Platform.LONG_ARRAY_OFFSET) - page = newPage - freeMemory(used * 8L) - } + grow(row.getSizeInBytes) // copy the bytes of UnsafeRow val offset = cursor @@ -618,7 +606,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap growArray() } else if (numKeys > array.length / 2 * 0.75) { // The fill ratio should be less than 0.75 - sys.error("Cannot build HashedRelation with more than 1/3 billions unique keys") + throw new UnsupportedOperationException( + "Cannot build HashedRelation with more than 1/3 billions unique keys") } } } else { @@ -629,6 +618,32 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } + private def grow(neededSize: Int): Unit = { + // There is 8 bytes for the pointer to next value + val totalNeededSize = cursor + 8 + neededSize + val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET + if (totalNeededSize > nowSize) { + val used = page.length + if (used >= (1 << 30)) { + throw new UnsupportedOperationException( + "Can not build a HashedRelation that is larger than 8G") + } + val multiples = math.floor(totalNeededSize.toDouble / nowSize).toInt * 2 + val newLength = used * multiples + if (newLength > ARRAY_MAX) { + throw new UnsupportedOperationException( + "Cannot grow internal buffer by size " + newLength + + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX) + } + ensureAcquireMemory(newLength * 8L) + val newPage = new Array[Long](newLength) + Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, + cursor - Platform.LONG_ARRAY_OFFSET) + page = newPage + freeMemory(used * 8L) + } + } + private def growArray(): Unit = { var old_array = array val n = array.length From 6fe1dd07c4dc4ad8c42f374def86ecacf5ad08c4 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 22 May 2018 21:44:15 +0800 Subject: [PATCH 3/6] grow to fit the new row, otherwise OOM should be thrown.If possible, grow to oldSize * 2 --- .../sql/execution/joins/HashedRelation.scala | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 76d0f7be43c4..a5ac9fc143c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -618,27 +618,24 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } - private def grow(neededSize: Int): Unit = { - // There is 8 bytes for the pointer to next value - val totalNeededSize = cursor + 8 + neededSize - val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET - if (totalNeededSize > nowSize) { - val used = page.length - if (used >= (1 << 30)) { + private def grow(inputRowSize: Int): Unit = { + val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8 + if (neededNumWords > page.length) { + if (neededNumWords > (1 << 30)) { throw new UnsupportedOperationException( "Can not build a HashedRelation that is larger than 8G") } - val multiples = math.floor(totalNeededSize.toDouble / nowSize).toInt * 2 - val newLength = used * multiples - if (newLength > ARRAY_MAX) { + val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30)) + if (newNumWords > ARRAY_MAX) { throw new UnsupportedOperationException( - "Cannot grow internal buffer by size " + newLength + + "Cannot grow internal buffer by size " + newNumWords + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX) } - ensureAcquireMemory(newLength * 8L) - val newPage = new Array[Long](newLength) + ensureAcquireMemory(newNumWords * 8L) + val newPage = new Array[Long](newNumWords.toInt) Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, cursor - Platform.LONG_ARRAY_OFFSET) + val used = page.length page = newPage freeMemory(used * 8L) } From f3916e765c8efe640c0b3c9d33ea336393df5d01 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 22 May 2018 21:56:19 +0800 Subject: [PATCH 4/6] Remove unnecessary checks and add comments --- .../apache/spark/sql/execution/joins/HashedRelation.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index a5ac9fc143c1..19aa9edc150d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -363,8 +363,6 @@ private[joins] object UnsafeHashedRelation { private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, capacity: Int) extends MemoryConsumer(mm) with Externalizable with KryoSerializable { - private val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - // Whether the keys are stored in dense mode or not. private var isDense = false @@ -619,6 +617,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } private def grow(inputRowSize: Int): Unit = { + // There is 8 bytes for the pointer to next value val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8 if (neededNumWords > page.length) { if (neededNumWords > (1 << 30)) { @@ -626,11 +625,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap "Can not build a HashedRelation that is larger than 8G") } val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30)) - if (newNumWords > ARRAY_MAX) { - throw new UnsupportedOperationException( - "Cannot grow internal buffer by size " + newNumWords + - " because the size after growing " + "exceeds size limitation " + ARRAY_MAX) - } ensureAcquireMemory(newNumWords * 8L) val newPage = new Array[Long](newNumWords.toInt) Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, From d7da8aebf3f99cc38bba4f8bf9f148e87ed2fa23 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 22 May 2018 22:20:16 +0800 Subject: [PATCH 5/6] Remove unnecessary import and simplify test case --- .../sql/execution/joins/HashedRelation.scala | 1 - .../execution/joins/HashedRelationSuite.scala | 22 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 19aa9edc150d..20ce01f4ce8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.types.LongType import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.util.{KnownSizeEstimation, Utils} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index fefdae36af35..07a2ac828b53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.CompactBuffer @@ -262,19 +262,19 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { Long.MaxValue, 1), 0) - val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false))) - val keys = Seq(0L) + val unsafeProj = UnsafeProjection.create(Array[DataType](StringType)) val map = new LongToUnsafeRowMap(taskMemoryManager, 1) + + val key = 0L + // the page array is initialized with length 1 << 17, + // so here we need a value larger than 1 << 18 val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) - keys.foreach { k => - map.append(k, unsafeProj(InternalRow(bigStr))) - } + + map.append(key, unsafeProj(InternalRow(bigStr))) map.optimize() - val row = unsafeProj(InternalRow(bigStr)).copy() - keys.foreach { k => - assert(map.getValue(k, row) eq row) - assert(row.getUTF8String(0) === bigStr) - } + + val resultRow = new UnsafeRow(1) + assert(map.getValue(key, resultRow).getUTF8String(0) === bigStr) map.free() } From b8b632450d824d7abf092c10f8e94f6938be1104 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 23 May 2018 00:36:58 +0800 Subject: [PATCH 6/6] add a comment --- .../spark/sql/execution/joins/HashedRelationSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 07a2ac828b53..bf779faee95b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -266,9 +266,9 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { val map = new LongToUnsafeRowMap(taskMemoryManager, 1) val key = 0L - // the page array is initialized with length 1 << 17, - // so here we need a value larger than 1 << 18 - val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2) + // the page array is initialized with length 1 << 17 (1M bytes), + // so here we need a value larger than 1 << 18 (2M bytes),to trigger the bug + val bigStr = UTF8String.fromString("x" * (1 << 22)) map.append(key, unsafeProj(InternalRow(bigStr))) map.optimize()