Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Jul 7, 2022

What changes were proposed in this pull request?

This patch aims to reduce the memory overhead of TransportCipher$EncryptedMessage. In the current code, the EncryptedMessage constructor eagerly initializes a ByteArrayWritableChannel byteRawChannel (which consumes ~32kb of memory). If there are many EncryptedMessage instances on the heap (e.g. because there is a long queue of outgoing messages on a channel) then this overhead adds up and can cause OOMs or GC problems.

SPARK-24801 / #21811 fixed a similar issue in SaslEncryption. There, the fix was to lazily initialize the buffer: the buffer isn't actually accessed before transferTo() is called (and is only used there), so lazily initializing it there reduces memory requirements for queued outgoing messages.

In principle we could apply a similar lazy initialization fix here. In this PR, however, I have taken a different approach: I construct a single shared ByteArrayWritableChannel byteRawChannel in TransportChannel$EncryptionHandler and pass that shared instance to the EncryptedMessage constructor. I believe that this is safe because we are already doing this for the byteEncChannel channel buffer. That shared byteEncChannel gets reset() when EncryptedMessage.deallocate() is called. If we assume that existing sharing is correct then I think it's okay to apply similar sharing of the byteRawChannel buffer because its scope of use and lifecycle is similar.

Why are the changes needed?

Improve performance and reduce a source of OOMs when encryption is enabled.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Correctness: Existing unit tests.

PerformanceI: observed memory usage and performance improvements by running an artificial workload that significantly stresses the shuffle sending path. On a two-host Spark Standalone cluster where each host had an external shuffle service (with 1gb heap) and a 64-core executor, I ran the following code:

val numMapTasks = 25000
val numReduceTasks = 256
val random = new java.util.Random()
val inputData = spark.range(1, numMapTasks * numReduceTasks, 1, numMapTasks).map { x =>
  val bytes = new Array[Byte](10 * 1024)
  random.nextBytes(bytes)
  bytes
}
inputData.repartition(numReduceTasks).write.mode("overwrite").format("noop").save()

Prior to this patch, this job reliably failed because the Worker (where the shuffle service runs) would fill its heap and go into long GC pauses, eventually causing it to become disassociated from the Master. After this patch's changes, this job smoothly runs to completion.

@JoshRosen JoshRosen requested review from vanzin and zsxwing July 7, 2022 01:32
@github-actions github-actions bot added the CORE label Jul 7, 2022
@JoshRosen JoshRosen requested a review from felixcheung July 7, 2022 02:02
Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this issue!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @JoshRosen and @jiangxb1987 .
Merged to master for Apache Spark 3.4.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants