Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with kudo #11724

Open
wants to merge 5 commits into
base: branch-24.12
Choose a base branch
from
Open

Conversation

liurenjie1024
Copy link
Collaborator

This pr introduces integration with kudo serialization format in spark rapids, for epic issue please see #11590.

@liurenjie1024
Copy link
Collaborator Author

Currently blocked by NVIDIA/spark-rapids-jni#2596, but it's ready for review.

Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
.internal()
.startupOnly()
.booleanConf
.createWithDefault(true)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We enable this by default so that it's easier to do integration tests for now, will revert it to false before merging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to continue testing this, and the JCudfSerialization approach, while they continue to coexist. Just testing during premerge isn't enough.

Note that we don't have to run the entire test suite for both shuffle approaches. IMHO taking the tests in repart_test and adding a "with kudo" dimension would suffice, and/or we could do the same tactic we use for the RAPIDS caching shuffle where we run the mortgage test queries with that shuffle type enabled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to add tests for join_test, hash_aggregate_test and repart_test. They helped to identity many bugs in early tests, will do this befor merging.

private static void visit(DataType[] dataTypes, Schema.Builder builder, int level) {
for (int idx = 0; idx < dataTypes.length; idx ++) {
DataType dt = dataTypes[idx];
String name = "_col_" + level + "_" + idx;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's annoying that we need to spend any time building unique column names when they are never used. We may want to consider allowing Schema to build a "data type only" schema (or have a separate class for that) that doesn't have any column names anywhere, which matches the case we're using it for here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two approaches here:

  1. We allow schema to accept empty field name
  2. We make a new class which mimics the behavior of spark's DataType system.

I lean towards option 2 since it would be cleaner. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd prefer 1, especially since cudf doesn't care about column names in most contexts. cc: @revans2 and @abellina who may have a strong opinion on this.

.internal()
.startupOnly()
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to continue testing this, and the JCudfSerialization approach, while they continue to coexist. Just testing during premerge isn't enough.

Note that we don't have to run the entire test suite for both shuffle approaches. IMHO taking the tests in repart_test and adding a "with kudo" dimension would suffice, and/or we could do the same tactic we use for the RAPIDS caching shuffle where we run the mortgage test queries with that shuffle type enabled.

@sameerz sameerz added the performance A performance related task/issue label Nov 16, 2024
@liurenjie1024
Copy link
Collaborator Author

liurenjie1024 commented Nov 18, 2024

Currently in local dev env: hash_aggregate_test, join_test, repart_test passed.

The build break waiting for NVIDIA/spark-rapids-jni#2601 to be merge.

@@ -194,13 +205,80 @@ class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn]
}
}

case class KudoHostMergeResultWrapper(inner: KudoHostMergeResult,
dataSize: Long) extends CoalescedHostResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a constructor argument? I'm not sure there's a case where we want getDataSize to return a value different than inner.getDataLength, and therefore it's safer to not give the caller the chance to screw this up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants