Skip to content

Support User-Defined Sorting #14828

@tobixdev

Description

@tobixdev

Is your feature request related to a problem or challenge?

In our system we are working heavily with tagged unions. Basically every column in our results are a DataType::Union. However, comparing unions is not natively supported by DF/arrow-rs. This makes sense in a general setting; however, we only have a single union type that we know how to compare.

This issue should address the following two problems in this context:

  1. Sorting by a column
  2. Support distinct on a column

I think 2. is a consequence of 1. as somewhere sorting is required during the execution of the query.

We have built a workaround for 1. by project to a "sortable" value that DF can support natively. While we may can do some similar workaround with Distinct::On for 2., we hope to find a better solution to our problem.

An extension of this issue could also allow users to override the default sorting behavior for certain types.

Describe the solution you'd like

I have read this blog post on sorting in arrow-rs. I think it would be nice if we can extend this mechanism in DF/arrow-rs. Maybe something like providing a byte encoding for a particular DataType? However, I am not really experienced in this area.

Looking forward to your opinions!

Describe alternatives you've considered

I think one good way to achieve this would be to integrate this work with logical and extension types (#12622, #12644). However, we would love to be able to use these capabilities before getting full support for logical/extension types.

Another way is also to work with the workarounds (basically we could materialize the comparison byte arrays in a UDF). However, using this projection technique is cumbersome as every internal call to sort (see problems with distinct) can cause crashes as the actual column is not sortable.

Additional context

Maybe we also need a "sister-issue" in arrow-rs for tracking this issue. I'd also be interested in helping out to implement this feature. However, I'd need some more experienced input before tackling it.

Here are some crude tests that raise the following error.

Error: ArrowError(NotYetImplemented("Row format support not yet implemented for: [SortField { options: SortOptions { descending: false, nulls_first: true }, data_type: Union([(0, Field { name: \"A\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), (1, Field { name: \"B\", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} })], Dense) }]"), None)

Test Case (Distinct)

Here is a test case that can create this problem:

#[tokio::test]
async fn test_distinct_on_union() -> Result<()> {
    let fields = [
        (0, Arc::new(Field::new("A", DataType::Int32, false))),
        (1, Arc::new(Field::new("B", DataType::Float64, false))),
    ]
    .into_iter()
    .collect();
    let schema = Schema::new(vec![Field::new(
        "my_union",
        DataType::Union(fields, UnionMode::Dense),
        false,
    )]);

    let mut builder = UnionBuilder::new_dense();
    builder.append::<Int32Type>("A", 1)?;
    builder.append::<Float64Type>("B", 3.0)?;
    builder.append::<Int32Type>("A", 1)?;
    builder.append::<Float64Type>("B", 3.0)?;
    let union = builder.build()?;

    let ctx = SessionContext::new();
    ctx.register_table(
        "test_table",
        Arc::new(MemTable::try_new(
            Arc::new(schema.clone()),
            vec![vec![RecordBatch::try_new(
                Arc::new(schema),
                vec![Arc::new(union)],
            )?]],
        )?),
    )?;
    let result = ctx.table("test_table").await?.distinct()?.count().await?;
    assert_eq!(result, 1);

    Ok(())
}

Test Case for Sort

This test fails with the same error. Note that I think this test should also fail in the future. However, by extending the row converter procedure, we hope to get this test running.

#[tokio::test]
async fn test_sort_on_union() -> Result<()> {
    let fields = [
        (0, Arc::new(Field::new("A", DataType::Int32, false))),
        (1, Arc::new(Field::new("B", DataType::Float64, false))),
    ]
    .into_iter()
    .collect();
    let schema = Schema::new(vec![Field::new(
        "my_union",
        DataType::Union(fields, UnionMode::Dense),
        false,
    )]);

    let mut builder = UnionBuilder::new_dense();
    builder.append::<Int32Type>("A", 1)?;
    builder.append::<Float64Type>("B", 3.0)?;
    builder.append::<Int32Type>("A", 1)?;
    builder.append::<Float64Type>("B", 3.0)?;
    let union = builder.build()?;

    let ctx = SessionContext::new();
    ctx.register_table(
        "test_table",
        Arc::new(MemTable::try_new(
            Arc::new(schema.clone()),
            vec![vec![RecordBatch::try_new(
                Arc::new(schema),
                vec![Arc::new(union)],
            )?]],
        )?),
    )?;
    ctx.table("test_table")
        .await?
        .sort_by(vec![Expr::from(datafusion::common::Column::from(
            "my_union",
        ))])?
        .execute_stream()
        .await?
        .next()
        .await
        .unwrap()?;

    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions