diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index 169791705a..d89c2a0f15 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -33,6 +33,13 @@ impl Column { } } } + + pub fn min_value(&self) -> T { + self.values.min_value() + } + pub fn max_value(&self) -> T { + self.values.max_value() + } } impl Column { diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index 748361bc17..723335809e 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -44,8 +44,9 @@ impl<'a> ColumnIndex<'a> { } } ColumnIndex::Multivalued(multivalued_index) => { - let start = multivalued_index.get_val(row_id); - let end = multivalued_index.get_val(row_id + 1); + let multivalued_index_ref = &**multivalued_index; + let start: u32 = multivalued_index_ref.get_val(row_id); + let end: u32 = multivalued_index_ref.get_val(row_id + 1); start..end } } diff --git a/columnar/src/column_index/optional_index/set_block/tests.rs b/columnar/src/column_index/optional_index/set_block/tests.rs index 6e21b1b5ba..354d5a5a02 100644 --- a/columnar/src/column_index/optional_index/set_block/tests.rs +++ b/columnar/src/column_index/optional_index/set_block/tests.rs @@ -51,6 +51,7 @@ fn test_sparse_block_set_u16_max() { use proptest::prelude::*; proptest! { + #![proptest_config(ProptestConfig::with_cases(1))] #[test] fn test_prop_test_dense(els in proptest::collection::btree_set(0..=u16::MAX, 0..=u16::MAX as usize)) { let vals: Vec = els.into_iter().collect(); diff --git a/columnar/src/column_values/column.rs b/columnar/src/column_values/column.rs index e53c5b265e..f0cbeef46a 100644 --- a/columnar/src/column_values/column.rs +++ b/columnar/src/column_values/column.rs @@ -78,6 +78,32 @@ pub trait ColumnValues: Send + Sync { } } +impl ColumnValues for std::sync::Arc> { + fn get_val(&self, idx: u32) -> T { + self.as_ref().get_val(idx) + } + + fn min_value(&self) -> T { + self.as_ref().min_value() + } + + fn max_value(&self) -> T { + self.as_ref().max_value() + } + + fn num_vals(&self) -> u32 { + self.as_ref().num_vals() + } + + fn iter<'b>(&'b self) -> Box + 'b> { + self.as_ref().iter() + } + + fn get_range(&self, start: u64, output: &mut [T]) { + self.as_ref().get_range(start, output) + } +} + impl<'a, C: ColumnValues + ?Sized, T: Copy + PartialOrd> ColumnValues for &'a C { fn get_val(&self, idx: u32) -> T { (*self).get_val(idx) diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index a2fe69e5db..a723cb015f 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -64,6 +64,18 @@ impl From for ColumnType { } impl ColumnType { + /// get column type category + pub(crate) fn column_type_category(self) -> ColumnTypeCategory { + match self { + ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical, + ColumnType::Bytes => ColumnTypeCategory::Bytes, + ColumnType::Str => ColumnTypeCategory::Str, + ColumnType::Bool => ColumnTypeCategory::Bool, + ColumnType::IpAddr => ColumnTypeCategory::IpAddr, + ColumnType::DateTime => ColumnTypeCategory::DateTime, + } + } + pub fn numerical_type(&self) -> Option { match self { ColumnType::I64 => Some(NumericalType::I64), @@ -149,9 +161,9 @@ impl HasAssociatedColumnType for Ipv6Addr { /// at most one column exist per `ColumnTypeCategory`. /// /// See also [README.md]. -#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] #[repr(u8)] -pub(crate) enum ColumnTypeCategory { +pub enum ColumnTypeCategory { Bool, Str, Numerical, diff --git a/columnar/src/columnar/merge.rs b/columnar/src/columnar/merge.rs index 63e242e1b4..3255dc3097 100644 --- a/columnar/src/columnar/merge.rs +++ b/columnar/src/columnar/merge.rs @@ -1,6 +1,9 @@ +use std::collections::HashMap; use std::io; +use super::column_type::ColumnTypeCategory; use crate::columnar::ColumnarReader; +use crate::dynamic_column::DynamicColumn; pub enum MergeDocOrder { /// Columnar tables are simply stacked one above the other. @@ -31,3 +34,143 @@ pub fn merge_columnar( } } } + +pub fn collect_columns( + columnar_readers: &[&ColumnarReader], +) -> io::Result>>> { + // Each column name may have multiple types of column associated. + // For merging we are interested in the same column type category since they can be merged. + let mut field_name_to_group: HashMap>> = + HashMap::new(); + + for columnar_reader in columnar_readers { + let column_name_and_handle = columnar_reader.list_columns()?; + for (column_name, handle) in column_name_and_handle { + let column_type_to_handles = field_name_to_group + .entry(column_name.to_string()) + .or_default(); + + let columns = column_type_to_handles + .entry(handle.column_type().column_type_category()) + .or_default(); + columns.push(handle.open()?); + } + } + + normalize_columns(&mut field_name_to_group); + + Ok(field_name_to_group) +} + +/// Cast numerical type columns to the same type +pub(crate) fn normalize_columns( + map: &mut HashMap>>, +) { + for (_field_name, type_category_to_columns) in map.iter_mut() { + for (type_category, columns) in type_category_to_columns { + if type_category == &ColumnTypeCategory::Numerical { + let casted_columns = cast_to_common_numerical_column(&columns); + *columns = casted_columns; + } + } + } +} + +/// Receives a list of columns of numerical types (u64, i64, f64) +/// +/// Returns a list of `DynamicColumn` which are all of the same numerical type +fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec { + assert!(columns + .iter() + .all(|column| column.column_type().numerical_type().is_some())); + let coerce_to_i64: Vec<_> = columns + .iter() + .map(|column| column.clone().coerce_to_i64()) + .collect(); + + if coerce_to_i64.iter().all(|column| column.is_some()) { + return coerce_to_i64 + .into_iter() + .map(|column| column.unwrap()) + .collect(); + } + + let coerce_to_u64: Vec<_> = columns + .iter() + .map(|column| column.clone().coerce_to_u64()) + .collect(); + + if coerce_to_u64.iter().all(|column| column.is_some()) { + return coerce_to_u64 + .into_iter() + .map(|column| column.unwrap()) + .collect(); + } + + columns + .iter() + .map(|column| { + column + .clone() + .coerce_to_f64() + .expect("couldn't cast column to f64") + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ColumnarWriter; + + #[test] + fn test_column_coercion() { + // i64 type + let columnar1 = { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_numerical(1u32, "numbers", 1i64); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(2, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() + }; + // u64 type + let columnar2 = { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_numerical(1u32, "numbers", u64::MAX - 100); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(2, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() + }; + + // f64 type + let columnar3 = { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_numerical(1u32, "numbers", 30.5); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(2, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() + }; + + let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap(); + assert_eq!(column_map.len(), 1); + let cat_to_columns = column_map.get("numbers").unwrap(); + assert_eq!(cat_to_columns.len(), 1); + + let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap(); + assert!(numerical.iter().all(|column| column.is_f64())); + + let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap(); + assert_eq!(column_map.len(), 1); + let cat_to_columns = column_map.get("numbers").unwrap(); + assert_eq!(cat_to_columns.len(), 1); + let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap(); + assert!(numerical.iter().all(|column| column.is_i64())); + + let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap(); + assert_eq!(column_map.len(), 1); + let cat_to_columns = column_map.get("numbers").unwrap(); + assert_eq!(cat_to_columns.len(), 1); + let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap(); + assert!(numerical.iter().all(|column| column.is_u64())); + } +} diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 8669c093d3..f668d08c8d 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -1,11 +1,14 @@ use std::io; use std::net::Ipv6Addr; +use std::sync::Arc; use common::file_slice::FileSlice; use common::{HasLen, OwnedBytes}; use crate::column::{BytesColumn, Column, StrColumn}; +use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::columnar::ColumnType; +use crate::{DateTime, NumericalType}; #[derive(Clone)] pub enum DynamicColumn { @@ -14,11 +17,133 @@ pub enum DynamicColumn { U64(Column), F64(Column), IpAddr(Column), - DateTime(Column), + DateTime(Column), Bytes(BytesColumn), Str(StrColumn), } +impl DynamicColumn { + pub fn column_type(&self) -> ColumnType { + match self { + DynamicColumn::Bool(_) => ColumnType::Bool, + DynamicColumn::I64(_) => ColumnType::I64, + DynamicColumn::U64(_) => ColumnType::U64, + DynamicColumn::F64(_) => ColumnType::F64, + DynamicColumn::IpAddr(_) => ColumnType::IpAddr, + DynamicColumn::DateTime(_) => ColumnType::DateTime, + DynamicColumn::Bytes(_) => ColumnType::Bytes, + DynamicColumn::Str(_) => ColumnType::Str, + } + } + + pub fn is_numerical(&self) -> bool { + self.column_type().numerical_type().is_some() + } + + pub fn is_f64(&self) -> bool { + self.column_type().numerical_type() == Some(NumericalType::F64) + } + pub fn is_i64(&self) -> bool { + self.column_type().numerical_type() == Some(NumericalType::I64) + } + pub fn is_u64(&self) -> bool { + self.column_type().numerical_type() == Some(NumericalType::U64) + } + + pub fn coerce_to_f64(self) -> Option { + match self { + DynamicColumn::I64(column) => Some(DynamicColumn::F64(Column { + idx: column.idx, + values: Arc::new(monotonic_map_column(column.values, MapI64ToF64)), + })), + DynamicColumn::U64(column) => Some(DynamicColumn::F64(Column { + idx: column.idx, + values: Arc::new(monotonic_map_column(column.values, MapU64ToF64)), + })), + DynamicColumn::F64(_) => Some(self), + _ => None, + } + } + pub fn coerce_to_i64(self) -> Option { + match self { + DynamicColumn::U64(column) => { + if column.max_value() > i64::MAX as u64 { + return None; + } + Some(DynamicColumn::I64(Column { + idx: column.idx, + values: Arc::new(monotonic_map_column(column.values, MapU64ToI64)), + })) + } + DynamicColumn::I64(_) => Some(self), + _ => None, + } + } + pub fn coerce_to_u64(self) -> Option { + match self { + DynamicColumn::I64(column) => { + if column.min_value() < 0 { + return None; + } + Some(DynamicColumn::U64(Column { + idx: column.idx, + values: Arc::new(monotonic_map_column(column.values, MapI64ToU64)), + })) + } + DynamicColumn::U64(_) => Some(self), + _ => None, + } + } +} + +struct MapI64ToF64; +impl StrictlyMonotonicFn for MapI64ToF64 { + #[inline(always)] + fn mapping(&self, inp: i64) -> f64 { + inp as f64 + } + #[inline(always)] + fn inverse(&self, out: f64) -> i64 { + out as i64 + } +} + +struct MapU64ToF64; +impl StrictlyMonotonicFn for MapU64ToF64 { + #[inline(always)] + fn mapping(&self, inp: u64) -> f64 { + inp as f64 + } + #[inline(always)] + fn inverse(&self, out: f64) -> u64 { + out as u64 + } +} + +struct MapU64ToI64; +impl StrictlyMonotonicFn for MapU64ToI64 { + #[inline(always)] + fn mapping(&self, inp: u64) -> i64 { + inp as i64 + } + #[inline(always)] + fn inverse(&self, out: i64) -> u64 { + out as u64 + } +} + +struct MapI64ToU64; +impl StrictlyMonotonicFn for MapI64ToU64 { + #[inline(always)] + fn mapping(&self, inp: i64) -> u64 { + inp as u64 + } + #[inline(always)] + fn inverse(&self, out: u64) -> i64 { + out as i64 + } +} + macro_rules! static_dynamic_conversions { ($typ:ty, $enum_name:ident) => { impl Into> for DynamicColumn {