Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Row format backed by raw bytes #1782

Merged
merged 9 commits into from
Feb 10, 2022
Merged

Introduce Row format backed by raw bytes #1782

merged 9 commits into from
Feb 10, 2022

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Feb 8, 2022

Which issue does this PR close?

Closes #1708 .

What changes are included in this PR?

A row format backed by raw bytes:

Each tuple consists of up to three parts: [null bit set] [values] [var length data]

The null bit set is used for null tracking and is aligned to 1-byte. It stores one bit per field.

In the region of the values, we store the fields in the order they are defined in the schema.

  • For fixed-length, sequential access fields, we store them directly. E.g., 4 bytes for int and 1 byte for bool.
  • For fixed-length, update often fields, we store one 8-byte word per field.
  • For fields of non-primitive or variable-length types, we append their actual content to the end of the var length region and store their offset relative to row base and their length, packed into an 8-byte word.

Are there any user-facing changes?

No.

Move todo items to #1861 for tracking.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Feb 8, 2022
@yjshen
Copy link
Member Author

yjshen commented Feb 8, 2022

Currently, the row format has not been hooked with the rest of the codebase. I'm not sure if it's appropriate to have its own PR or should it accompanied with a use case, such as in SortExec.

Cargo.toml Outdated

[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relies on apache/arrow-rs@e375bba, will remove this once we have arrow 9.0.1 released.

@yjshen
Copy link
Member Author

yjshen commented Feb 8, 2022

cc @alamb @Dandandan @houqp


//! General utilities for null bit section handling
//!
//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this appears to itself be a copy of https://docs.rs/arrow/latest/arrow/util/bit_util/index.html

Copy link
Member Author

@yjshen yjshen Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bitmap is rewritten on top of arrow/util/bit_util, along with a much-simplified version of fmt.

@alamb
Copy link
Contributor

alamb commented Feb 8, 2022

Thank @yjshen -- I look forward to reviewing this carefully later today 👍

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely cool 👍

#[test]
#[should_panic(expected = "supported(schema)")]
fn test_unsupported_type_write() {
let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it would be really cool to support this as IOx uses it and it is just an Int64Array with a different logical type, but I can always add later 😁

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add this as a TODO item and will add this later.

datafusion/src/row/bitmap/mod.rs Outdated Show resolved Hide resolved
fn type_width(dt: &DataType) -> usize {
use DataType::*;
if var_length(dt) {
return 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be std::mem::size_of<u64> or ideally a varlena offset type alias

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! I temporarily use size_of::<u64>(), and I can make it a type parameter for RowWriter and RowReader as we do for StringArray and LargeStringArray, for memory-saving purposes.

// specific language governing permissions and limitations
// under the License.

//! Accessing row from raw bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a pretty picture would be very helpful, showing how data is encoded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one for consieration:

                                                                                                        
 Row Layout                                                                                             
                                                                                                        
┌────────────────┬──────────────────────────┬───────────────────────┐        ┌───────────────────────┐  
│Validity Bitmask│    Fixed Width Field     │ Variable Width Field  │   ...  │     vardata area      │  
│ (byte aligned) │   (native type width)    │(len + vardata offset) │        │   (variable length)   │  
└────────────────┴──────────────────────────┴───────────────────────┘        └───────────────────────┘  
                                                                                                        
                                                                                                        
                                                                                                        
 For example, given the schema (Int8, Float32, Utf8, Utf8)                                              
                                                                                                        
 Encoding the tuple (1, NULL, "FooBar", "baz")                                                          
                                                                                                        
 Requires 35 bytes as shown                                                                             
┌────────┬────────┬──────────────┬──────────────────────┬──────────────────────┬───────────────────────┐
│0b000110│  0x01  │  0x00000000  │0x00000000  0x00000006│0x00000006  0x00000003│       FooBarbaz       │
└────────┴────────┴──────────────┴──────────────────────┴──────────────────────┴───────────────────────┘
0        1         2             10                     18                     26                     35
                                                                                                        
 Validity    Int8  Float32 Field       Utf8 Field 1         Utf8 Field 2          Variable length       
   Mask     Field    (4 bytes)           Offset: 0            Offset: 6                area             
 (1 byte)  (1 byte)                       Size: 6              Size: 3               (9 bytes)          
                                         (8 bytes)            (8 bytes)                                 

Also attaching the monopic file in case anyone finds that useful:
drawing.zip

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adapted the fig a little bit and put it on the module doc. Many thanks, @alamb!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?

I don't mind at all -- that is why I made it :)

datafusion/src/row/writer.rs Show resolved Hide resolved
datafusion/src/row/bitmap/fmt.rs Outdated Show resolved Hide resolved
datafusion/src/row/bitmap/fmt.rs Outdated Show resolved Hide resolved
get_idx!(i32, self, idx, 4)
}

fn get_date64(&self, idx: usize) -> i64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps these methods should assert that idx is actually the corresponding type. Otherwise this is effectively reinterpreting memory as different primitives, which whilst technically safe, is borderline unsafe 😀

datafusion/src/row/writer.rs Outdated Show resolved Hide resolved
datafusion/src/row/writer.rs Outdated Show resolved Hide resolved
}

fn set_offset_size(&mut self, idx: usize, size: usize) {
let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;
Copy link
Contributor

@tustvold tustvold Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably panic if size or verlana_offset are too large, on a related note - perhaps they should be u32?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started going through the code @yjshen and it is very neat. Thank you!

I think it would help me to see some example of how we intend to use this to hone the interface.

The main usecases explained on #1708 are sort and grouping and I think would be used slightly differently.

For example, for sorting I imagine code that may look like

// get NULLS FIRST / LAST / ASC / DESC for each column
let sort_opts: Vec<&SortOptions> = sort_exprs.iter().map(|s| &s.opts).collect();
let sort_expr_batches: RecordBatch = sort_exprs.eval(input_batch)?;

// build "sort keys" that will order in the same way as the original row values
let sort_key: Rows = RowWriter::new()
  .for_sorting(sort_expr_batches, sort_opts)
  .build();

And then sorting (or merging) could proceed by sorting the appropriate part of the [u8] using memcmpd (no type dispatch needed) and take kernels can be used to form the final arrays

There is no need to reform the Array's from the sort key

grouping is a little different as the group key is part of the final output and only equality / non equality is important (not ordering). In that case perhaps having a mut Rows would help:

let mut output_group_keys: Rows = ...;

// for each batch
// compute group key exprs
let group_expr_batch: RecordBatch = group_exprs.eval(input_batch)?;

let group_keys: Rows = RowWriter::new()
  .for_grouping(group_expr_batch)
  .build();

for key in group_keys {
  let accum_index = !if hash_table.contains(key) {
    output_group_keys.push(key); // copy bytes
    hash_table.add(key, output_group_keys.len()); // remember what index this key is
  } else {
    hash_table.get(key)
  }
  
  // somehow get the accumulator state (ideally could also be in the output_group_keys)
  // and update
}

datafusion/src/row/writer.rs Outdated Show resolved Hide resolved
}

/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
fn write_row(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a vectorized version of this would also be helpful so an entire record batch could be converted at once

@yjshen
Copy link
Member Author

yjshen commented Feb 9, 2022

Thanks a lot for the detailed review @tustvold

I made several changes, including bitmap rewrite, size rounding, some docs, and made the rest of the great suggestions as TODOs in the PR desc, in case it slipped away silently.

@yjshen
Copy link
Member Author

yjshen commented Feb 9, 2022

Thanks @alamb for the write-up of row use cases.

[Use directly] The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.

[Minor adapt] We should change it a little bit by adhering to word-aligned initializing and updating for aggregation state (for CPU friendly), much as you suggested:

let state = RowWriter::new()
  .for_aggregate(aggregate_exprs)
  .build();

[Minor adapt] For composite sort key with no varlena, we shall remove the null-bits part, padding null attributes bytes as all 0xFF or all 0x00 (according to null first or null last sort option), and do raw bytes comparison.

[NOT FIT] For composite sort key, if var length attributes (varlena) exist and not the last, direct comparison of raw bytes of the current row format doesn't fit. We need to store varlena in place, padding all sorting keys to the longest width, on which we could compare directly using raw bytes.

@alamb
Copy link
Contributor

alamb commented Feb 9, 2022

[Use directly] The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.

I am not sure about sorting payloads. It seems to me like copying around the sort payload will potentially be quite ineffecient

Consider a table like

CREATE TABLE sales (
  value float,
  first_name varchar(2000),
  last_name varchar(2000),
  address varchar(2000)
)

And a query like

SELECT * from sales order by value;

In this case only value needs to be compared, and the payload may be substantial

I thought the current state of the art was to do something like

  1. RecordBatches --> Rows (only for sort key)
  2. Sort the Rows using memcmp
  3. Use take kernel to form the output rows (copy payload columns)

This copies the payload columns only once

If you instead use the Row to hold the payload, you end up

  1. RecordBatches --> Row payload
  2. Form something to compare using
  3. Sort
  4. Now covert back from Row to RecordBatch

Which results in copying the payloads twice - and for large tables this is a substantial overhead.

However, I agree a format like this be helpful for storing hash aggregate composed grouping keys, join keys (and maybe intermediate aggregates)

I'll give this PR a good look tomorrow morning

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great start @yjshen -- thank you.

I do have some concerns about this data structure (especially with respect to storing multiple rows using variable length fields) but I think we can iterate on it in follow on PRs as we try and use this structure in DataFusion.

In terms of next steps, how about we pick one operation (such as either Sort or GroupHash) to migrate to use this Row implementation with? I bet we will end up refining the interface

datafusion/src/row/bitmap.rs Outdated Show resolved Hide resolved
datafusion/src/lib.rs Outdated Show resolved Hide resolved
//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
//! one bit per field.
//!
//! In the region of the values, we store the fields in the order they are defined in the schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This describes a variable length tuple -- so if you pack a bunch of Rows together in memory it will not be possible to find some arbitrary location quickly

For example, finding where Row3 starts in the following picture needs to scan all columns of Row1 and Row2

┌─────────────────────────────────┐
│              Row1               │
├──────────┬──────────────────────┤
│   Row2   │         Row3         │
├──────────┴─┬────────────────────┤
│    Row4    │        Row5        │
└────────────┴────────────────────┘

The benefit of this strategy is that tuple construction will be very fast and memory usage optimal

There are other strategies that have a fixed width tuples, as discussed on #1708 that have benefits (though are likely not as memory optimal)

I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:

┌───────────┬──┐               ┌─────────────┐
│     Row1  │  ├──────┐        │             │
├───────────┼──┤      │ ┌──────┼▶            │
│     Row2  │  │──────┼┐├──────┼─▶           │
├───────────┼──┤      │││      │             │
│     Row3  │  │──────┼┼┤      │  variable   │
├───────────┼──┤      └┼┼─────▶│ length area │
│     Row4  │  │───────┼┘      │             │
├───────────┼──┤       └──────▶│             │
│     Row5  │  │───────────┐   │             │
└───────────┴──┘           └───┼───────▶     │
                               └─────────────┘

Maybe I can find some time this weekend to play around with some ideas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One classic database technique to pack such data into a single block (rather than a separate variable length area) is to preallocate the page (e.g. 32K or something) and then write rows into the front of the page, but filling the variable length area from the back.

Copy link
Member Author

@yjshen yjshen Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finding where Row3 starts in the following picture needs to scan all columns of Row1 and Row2

I think since we are doing in-memory processing, we can actually store each row's starting offset as a separate vector. just like the method shows:

pub fn write_batch_unchecked(
    output: &mut [u8],
    offset: usize,
    batch: &RecordBatch,
    row_idx: usize,
    schema: Arc<Schema>,
) -> Vec<usize> {
    let mut writer = RowWriter::new(&schema);
    let mut current_offset = offset;
    let mut offsets = vec![];
    for cur_row in row_idx..batch.num_rows() {
        offsets.push(current_offset);
        let row_width = write_row(&mut writer, cur_row, batch);
        output[current_offset..current_offset + row_width]
            .copy_from_slice(writer.get_row());
        current_offset += row_width;
        writer.reset()
    }
    offsets
}

I'm thinking of just keeping the offset vector we got while writing, and using it hereafter.

I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:

I'm afraid it will be hard to vectorize the operation on a row format since row width may easily exceed the SIMD lane?

filling the variable length area from the back.

Yes, I'm aware of the strategy. but we are using rows mainly during execution, unlike the DBMS systems using this to keep tuples in long-term stores, I think we can just store offset separately in a vector?

//! In the region of the values, we store the fields in the order they are defined in the schema.
//! - For fixed-length, sequential access fields, we store them directly.
//! E.g., 4 bytes for int and 1 byte for bool.
//! - For fixed-length, update often fields, we store one 8-byte word per field.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see a notion of "update often" appearing in this code. Maybe it is future work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not implemented yet. As discussed in [Minor adapt] for aggregation state.

// specific language governing permissions and limitations
// under the License.

//! Accessing row from raw bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one for consieration:

                                                                                                        
 Row Layout                                                                                             
                                                                                                        
┌────────────────┬──────────────────────────┬───────────────────────┐        ┌───────────────────────┐  
│Validity Bitmask│    Fixed Width Field     │ Variable Width Field  │   ...  │     vardata area      │  
│ (byte aligned) │   (native type width)    │(len + vardata offset) │        │   (variable length)   │  
└────────────────┴──────────────────────────┴───────────────────────┘        └───────────────────────┘  
                                                                                                        
                                                                                                        
                                                                                                        
 For example, given the schema (Int8, Float32, Utf8, Utf8)                                              
                                                                                                        
 Encoding the tuple (1, NULL, "FooBar", "baz")                                                          
                                                                                                        
 Requires 35 bytes as shown                                                                             
┌────────┬────────┬──────────────┬──────────────────────┬──────────────────────┬───────────────────────┐
│0b000110│  0x01  │  0x00000000  │0x00000000  0x00000006│0x00000006  0x00000003│       FooBarbaz       │
└────────┴────────┴──────────────┴──────────────────────┴──────────────────────┴───────────────────────┘
0        1         2             10                     18                     26                     35
                                                                                                        
 Validity    Int8  Float32 Field       Utf8 Field 1         Utf8 Field 2          Variable length       
   Mask     Field    (4 bytes)           Offset: 0            Offset: 6                area             
 (1 byte)  (1 byte)                       Size: 6              Size: 3               (9 bytes)          
                                         (8 bytes)            (8 bytes)                                 

Also attaching the monopic file in case anyone finds that useful:
drawing.zip

@alamb
Copy link
Contributor

alamb commented Feb 10, 2022

@yjshen let me know if you want to make any changes to this PR otherwise I'll merge it in and we can iterate from there

@yjshen
Copy link
Member Author

yjshen commented Feb 10, 2022

Thanks @alamb for all the thoughts!

The PR currently relies on arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" } since the need to access ArrayBuilder function. Do you think I should make this PR behind a non-default feature and just remove the patch.crates-io section?
Remove the patch.crates-io section, move this PR all behind the feature row. I've tested row feature succeed locally. While experimenting on row, please add these back before arrow 9.0.3 is released:

[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }

Besides, I'd like to incorporate your bit_util comments as well the row diagram into the PR. [done]

I've listed several TODOs in the PR description, will do as follow-ups.

I think the PR is ready to be merged now.

// specific language governing permissions and limitations
// under the License.

//! Accessing row from raw bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?

I don't mind at all -- that is why I made it :)

@alamb alamb merged commit 0138b39 into apache:master Feb 10, 2022
@xudong963
Copy link
Member

Thanks @yjshen . I didn't spare time to see this wonderful ticket, will enjoy it over the weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce a Vec<u8> based row-wise representation for DataFusion
4 participants