Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix arrow error when sorting on empty batch #271

Merged
merged 2 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "111a940" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "111a940", features = ["unicode_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "111a940" }
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "111a940", default-features = false, features = ["unicode_expressions"] }
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, should we wait until the DF PR is merged and use https://github.com/apache/arrow-datafusion.git instead of your personal fork?

Copy link
Member Author

@viirya viirya Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pointed to my fork of DataFusion not just for this PR. We uses the fork of arrow-rs to use a workaround for the Java Arrow bug. So we must use the fork too in our DataFusion crate, otherwise rust compiler will complain duplicated structs etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I missed #239. It is OK as long as we don't stay in this state for too long 😂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we will change back to official arrow-rs and DataFusion once we get new Java Arrow release.

datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["unicode_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", default-features = false, features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
6 changes: 4 additions & 2 deletions core/src/execution/datafusion/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_array::{RecordBatch, RecordBatchOptions};
use arrow_schema::SchemaRef;
use datafusion::{
execution::TaskContext,
Expand Down Expand Up @@ -169,7 +169,9 @@ impl ExpandStream {
Ok::<(), DataFusionError>(())
})?;

RecordBatch::try_new(self.schema.clone(), columns).map_err(|e| e.into())
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(self.schema.clone(), columns, &options)
.map_err(|e| e.into())
}
}

Expand Down
7 changes: 5 additions & 2 deletions core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{

use futures::{Stream, StreamExt};

use arrow_array::{ArrayRef, RecordBatch};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};

use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
Expand Down Expand Up @@ -149,7 +149,10 @@ impl CopyStream {
.iter()
.map(|v| copy_or_cast_array(v))
.collect::<Result<Vec<ArrayRef>, _>>()?;
RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e| arrow_datafusion_err!(e))

let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(self.schema.clone(), vectors, &options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also check is there any other invocation of RecordBatch::try_new in the codebase and replace it with RecordBatch::try_new_with_options if necessary.

I just did a code search, maybe we probably need to revise the code in Expand.rs too: https://github.com/apache/arrow-datafusion-comet/blob/main/core/src/execution/datafusion/operators/expand.rs#L172

Of course, it should be done in a follow up PR.

Copy link
Member Author

@viirya viirya Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Although I'm not sure if it is possible to have Expand with empty projections, it is good to have it here too. I will add it in this PR as it is a simple change.

.map_err(|e| arrow_datafusion_err!(e))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._

test("lead/lag should return the default value if the offset row does not exist") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
checkSparkAnswer(sql("""
|SELECT
| lag(123, 100, 321) OVER (ORDER BY id) as lag,
| lead(123, 100, 321) OVER (ORDER BY id) as lead
|FROM (SELECT 1 as id) tmp
""".stripMargin))

checkSparkAnswer(sql("""
|SELECT
| lag(123, 100, a) OVER (ORDER BY id) as lag,
| lead(123, 100, a) OVER (ORDER BY id) as lead
|FROM (SELECT 1 as id, 2 as a) tmp
""".stripMargin))
}
}

test("multiple column distinct count") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down
Loading