Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collect columns for merge #1812

Merged
merged 3 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ impl<T: PartialOrd> Column<T> {
}
}
}

pub fn min_value(&self) -> T {
self.values.min_value()
}
pub fn max_value(&self) -> T {
self.values.max_value()
}
}

impl<T: PartialOrd + Copy + Send + Sync + 'static> Column<T> {
Expand Down
5 changes: 3 additions & 2 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ impl<'a> ColumnIndex<'a> {
}
}
ColumnIndex::Multivalued(multivalued_index) => {
let start = multivalued_index.get_val(row_id);
let end = multivalued_index.get_val(row_id + 1);
let multivalued_index_ref = &**multivalued_index;
let start: u32 = multivalued_index_ref.get_val(row_id);
let end: u32 = multivalued_index_ref.get_val(row_id + 1);
start..end
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fn test_sparse_block_set_u16_max() {
use proptest::prelude::*;

proptest! {
#![proptest_config(ProptestConfig::with_cases(1))]
#[test]
fn test_prop_test_dense(els in proptest::collection::btree_set(0..=u16::MAX, 0..=u16::MAX as usize)) {
let vals: Vec<u16> = els.into_iter().collect();
Expand Down
26 changes: 26 additions & 0 deletions columnar/src/column_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}

impl<T: Copy + PartialOrd> ColumnValues<T> for std::sync::Arc<dyn ColumnValues<T>> {
fn get_val(&self, idx: u32) -> T {
self.as_ref().get_val(idx)
}

fn min_value(&self) -> T {
self.as_ref().min_value()
}

fn max_value(&self) -> T {
self.as_ref().max_value()
}

fn num_vals(&self) -> u32 {
self.as_ref().num_vals()
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
self.as_ref().iter()
}

fn get_range(&self, start: u64, output: &mut [T]) {
self.as_ref().get_range(start, output)
}
}

impl<'a, C: ColumnValues<T> + ?Sized, T: Copy + PartialOrd> ColumnValues<T> for &'a C {
fn get_val(&self, idx: u32) -> T {
(*self).get_val(idx)
Expand Down
16 changes: 14 additions & 2 deletions columnar/src/columnar/column_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ impl From<NumericalType> for ColumnType {
}

impl ColumnType {
/// get column type category
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
match self {
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}

pub fn numerical_type(&self) -> Option<NumericalType> {
match self {
ColumnType::I64 => Some(NumericalType::I64),
Expand Down Expand Up @@ -149,9 +161,9 @@ impl HasAssociatedColumnType for Ipv6Addr {
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub(crate) enum ColumnTypeCategory {
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
Expand Down
143 changes: 143 additions & 0 deletions columnar/src/columnar/merge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashMap;
use std::io;

use super::column_type::ColumnTypeCategory;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;

pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other.
Expand Down Expand Up @@ -31,3 +34,143 @@ pub fn merge_columnar(
}
}
}

pub fn collect_columns(
columnar_readers: &[&ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
let mut field_name_to_group: HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>> =
HashMap::new();

for columnar_reader in columnar_readers {
let column_name_and_handle = columnar_reader.list_columns()?;
for (column_name, handle) in column_name_and_handle {
let column_type_to_handles = field_name_to_group
.entry(column_name.to_string())
.or_default();

let columns = column_type_to_handles
.entry(handle.column_type().column_type_category())
.or_default();
columns.push(handle.open()?);
}
}

normalize_columns(&mut field_name_to_group);

Ok(field_name_to_group)
}

/// Cast numerical type columns to the same type
pub(crate) fn normalize_columns(
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>,
) {
for (_field_name, type_category_to_columns) in map.iter_mut() {
for (type_category, columns) in type_category_to_columns {
if type_category == &ColumnTypeCategory::Numerical {
let casted_columns = cast_to_common_numerical_column(&columns);
*columns = casted_columns;
}
}
}
}

/// Receives a list of columns of numerical types (u64, i64, f64)
///
/// Returns a list of `DynamicColumn` which are all of the same numerical type
fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColumn> {
assert!(columns
.iter()
.all(|column| column.column_type().numerical_type().is_some()));
let coerce_to_i64: Vec<_> = columns
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably do something a little bit nicer here, and be more coupled with the writer logic?

fn acceptable_numerical_type(numerical_types: &[NumericalType]) -> NumericalType {
     let mut compatible_numerical_types = CompatibleNumericalTypes::default();
     for &typ in numerical_types {
         compatible_numerical_types.accept(typ);
     }
     numerical_types.to_numeric_type()
}


struct DynamicColummn {
    fn column_type(&self) -> ColumnType { ... }
    fn coerce(&self, column_type: ColumnType) -> Option<DynamicColumn> { ... }
}

Copy link
Contributor Author

@PSeitz PSeitz Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type is not enough, we need to know if the a i64 column could be a u64 column (!contains negative values)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point

.iter()
.map(|column| column.clone().coerce_to_i64())
.collect();

if coerce_to_i64.iter().all(|column| column.is_some()) {
return coerce_to_i64
.into_iter()
.map(|column| column.unwrap())
.collect();
}

let coerce_to_u64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_u64())
.collect();

if coerce_to_u64.iter().all(|column| column.is_some()) {
return coerce_to_u64
.into_iter()
.map(|column| column.unwrap())
.collect();
}

columns
.iter()
.map(|column| {
column
.clone()
.coerce_to_f64()
.expect("couldn't cast column to f64")
})
.collect()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ColumnarWriter;

#[test]
fn test_column_coercion() {
// i64 type
let columnar1 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", 1i64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};
// u64 type
let columnar2 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", u64::MAX - 100);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};

// f64 type
let columnar3 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", 30.5);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};

let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);

let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_f64()));

let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_i64()));

let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_u64()));
}
}
Loading