Skip to content

Conversation

@wzhfy
Copy link
Contributor

@wzhfy wzhfy commented Oct 16, 2017

What changes were proposed in this pull request?

The current implementation of ApproxCountDistinctForIntervals is ImperativeAggregate. The number of aggBufferAttributes is the number of total words in the hllppHelper array. Each hllppHelper has 52 words by default relativeSD.

Since this aggregate function is used in equi-height histogram generation, and the number of buckets in histogram is usually hundreds, the number of aggBufferAttributes can easily reach tens of thousands or even more.

This leads to a huge method in codegen and causes error:

org.codehaus.janino.JaninoRuntimeException: Code of method "apply(Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB. 

Besides, huge generated methods also result in performance regression.

In this PR, we change its implementation to TypedImperativeAggregate. After the fix, ApproxCountDistinctForIntervals can deal with more than thousands endpoints without throwing codegen error, and improve performance from 20 sec to 2 sec in a test case of 500 endpoints.

How was this patch tested?

Test by an added test case and existing tests.

@wzhfy
Copy link
Contributor Author

wzhfy commented Oct 16, 2017

cc @cloud-fan

@wzhfy
Copy link
Contributor Author

wzhfy commented Oct 16, 2017

test this please

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82798 has finished for PR 19506 at commit 652b301.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ApproxCountDistinctForIntervalsQuerySuite extends QueryTest with SharedSQLContext

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82796 has finished for PR 19506 at commit 652b301.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ApproxCountDistinctForIntervalsQuerySuite extends QueryTest with SharedSQLContext

override def prettyName: String = "approx_count_distinct_for_intervals"

override def serialize(obj: Array[Long]): Array[Byte] = {
val buffer = ByteBuffer.wrap(new Array(obj.length * Longs.BYTES))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC ByteBuffer is pretty slow for writing, shall we use unsafe writing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to unsafe writing, could you take another look?

val offset = mutableAggBufferOffset + hllppIndex * numWordsPerHllpp
hllppArray(hllppIndex).update(buffer, offset, value, child.dataType)
val offset = hllppIndex * numWordsPerHllpp
hllppArray(hllppIndex).update(LongArrayInput(buffer), offset, value, child.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just pass InternalRow(buffer) here, to save a lot of code changes. If performance matters here, you can create a LongArrayInternalRow to avoid boxing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InternalRow(buffer) will copy the buffer.
Creating a LongArrayInternalRow is a good idea, thanks!

@SparkQA
Copy link

SparkQA commented Oct 20, 2017

Test build #82929 has finished for PR 19506 at commit 2d1b070.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2017

Test build #82947 has finished for PR 19506 at commit 1b75428.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2017

Test build #82948 has finished for PR 19506 at commit 49b4ac2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def serialize(obj: Array[Long]): Array[Byte] = {
val byteArray = new Array[Byte](obj.length * 8)
obj.indices.foreach { i =>
Copy link
Contributor

@cloud-fan cloud-fan Oct 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use while loop here for better performance in Scala, as this is a performance sensitive code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for the reminder!

override def deserialize(bytes: Array[Byte]): Array[Long] = {
val length = bytes.length / 8
val longArray = new Array[Long](length)
(0 until length).foreach { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

override def deserialize(bytes: Array[Byte]): Array[Long] = {
val length = bytes.length / 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add assert(bytes.length % 8 == 0)

@cloud-fan
Copy link
Contributor

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Oct 23, 2017

Test build #82966 has finished for PR 19506 at commit 1e95a2f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in f6290ae Oct 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants