Skip to content

Commit

Permalink
Add support for multivalues (quickwit-oss#1809)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored and Jamie Hodkinson committed Jan 30, 2023
1 parent cb80172 commit cfb2ec2
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 121 deletions.
26 changes: 15 additions & 11 deletions columnar/src/column/dictionary_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,35 @@ use crate::column::Column;
use crate::RowId;

/// Dictionary encoded column.
///
/// The column simply gives access to a regular u64-column that, in
/// which the values are term-ordinals.
///
/// These ordinals are ids uniquely identify the bytes that are stored in
/// the column. These ordinals are small, and sorted in the same order
/// as the term_ord_column.
#[derive(Clone)]
pub struct BytesColumn {
pub(crate) dictionary: Arc<Dictionary<VoidSSTable>>,
pub(crate) term_ord_column: Column<u64>,
}

impl BytesColumn {
/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
///
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
/// overll number of terms).
pub fn ord_to_bytes(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
self.dictionary.ord_to_term(term_ord, output)
pub fn ord_to_bytes(&self, ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
self.dictionary.ord_to_term(ord, output)
}

/// Returns the number of rows in the column.
pub fn num_rows(&self) -> RowId {
self.term_ord_column.num_rows()
}

pub fn term_ords(&self) -> &Column<u64> {
/// Returns the column of ordinals
pub fn ords(&self) -> &Column<u64> {
&self.term_ord_column
}
}
Expand All @@ -40,6 +51,7 @@ impl From<BytesColumn> for StrColumn {
}

impl StrColumn {
/// Fills the buffer
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
unsafe {
let buf = output.as_mut_vec();
Expand All @@ -55,14 +67,6 @@ impl StrColumn {
}
Ok(true)
}

pub fn num_rows(&self) -> RowId {
self.term_ord_column.num_rows()
}

pub fn ordinal_dictionary(&self) -> &Column<u64> {
&self.0.term_ord_column
}
}

impl Deref for StrColumn {
Expand Down
32 changes: 20 additions & 12 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::ops::Deref;
use std::sync::Arc;

use common::BinarySerializable;
pub use dictionary_encoded::{BytesColumn, StrColumn};
pub use serialize::{
open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128,
open_column_bytes, open_column_u128, open_column_u64, serialize_column_mappable_to_u128,
serialize_column_u64,
};
pub use dictionary_encoded::{BytesColumn, StrColumn};

use crate::column_index::ColumnIndex;
use crate::column_values::ColumnValues;
Expand All @@ -21,23 +21,31 @@ pub struct Column<T> {
pub values: Arc<dyn ColumnValues<T>>,
}

use crate::column_index::Set;

impl<T: PartialOrd> Column<T> {
pub fn first(&self, row_id: RowId) -> Option<T> {
pub fn num_rows(&self) -> RowId {
match &self.idx {
ColumnIndex::Full => Some(self.values.get_val(row_id)),
ColumnIndex::Optional(opt_idx) => {
let value_row_idx = opt_idx.rank_if_exists(row_id)?;
Some(self.values.get_val(value_row_idx))
}
ColumnIndex::Multivalued(_multivalued_index) => {
todo!();
ColumnIndex::Full => self.values.num_vals() as u32,
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
ColumnIndex::Multivalued(col_index) => {
// The multivalued index contains all value start row_id,
// and one extra value at the end with the overall number of rows.
col_index.num_vals() - 1
}
}
}
}

impl<T: PartialOrd> Column<T> {
pub fn first(&self, row_id: RowId) -> Option<T> {
self.values(row_id).next()
}

pub fn values(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(row_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}
}

impl<T> Deref for Column<T> {
type Target = ColumnIndex<'static>;

Expand Down
15 changes: 11 additions & 4 deletions columnar/src/column/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use crate::column::{BytesColumn, Column};
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::serialize::serialize_column_values_u128;
use crate::column_values::{
serialize_column_values, ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
ALL_CODEC_TYPES,
serialize_column_values, ColumnValues, FastFieldCodecType, MonotonicallyMappableToU128,
MonotonicallyMappableToU64,
};

pub fn serialize_column_u128<
pub fn serialize_column_mappable_to_u128<
F: Fn() -> I,
I: Iterator<Item = T>,
T: MonotonicallyMappableToU128,
Expand All @@ -39,7 +39,14 @@ pub fn serialize_column_u64<T: MonotonicallyMappableToU64>(
output: &mut impl Write,
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?;
serialize_column_values(
column_values,
&[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
],
output,
)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}
Expand Down
27 changes: 20 additions & 7 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod multivalued_index;
mod optional_index;
mod serialize;

use std::ops::Range;
use std::sync::Arc;

pub use optional_index::{OptionalIndex, SerializableOptionalIndex, Set};
Expand All @@ -14,8 +15,12 @@ use crate::{Cardinality, RowId};
pub enum ColumnIndex<'a> {
Full,
Optional(OptionalIndex),
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
// TODO Remove the static by fixing the codec if possible.
/// The column values enclosed contains for all row_id,
/// the value start_index.
///
/// In addition, at index num_rows, an extra value is added
/// containing the overal number of values.
Multivalued(Arc<dyn ColumnValues<RowId> + 'a>),
}

Expand All @@ -28,13 +33,21 @@ impl<'a> ColumnIndex<'a> {
}
}

pub fn num_rows(&self) -> RowId {
pub fn value_row_ids(&self, row_id: RowId) -> Range<RowId> {
match self {
ColumnIndex::Full => {
todo!()
ColumnIndex::Full => row_id..row_id + 1,
ColumnIndex::Optional(optional_index) => {
if let Some(val) = optional_index.rank_if_exists(row_id) {
val..val + 1
} else {
0..0
}
}
ColumnIndex::Multivalued(multivalued_index) => {
let start = multivalued_index.get_val(row_id);
let end = multivalued_index.get_val(row_id + 1);
start..end
}
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.num_vals() - 1,
}
}
}
8 changes: 5 additions & 3 deletions columnar/src/column_index/multivalued_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ use crate::RowId;
pub struct MultivaluedIndex(Arc<dyn ColumnValues<RowId>>);

pub fn serialize_multivalued_index(
multivalued_index: MultivaluedIndex,
multivalued_index: &dyn ColumnValues<RowId>,
output: &mut impl Write,
) -> io::Result<()> {
crate::column_values::serialize_column_values(
&*multivalued_index.0,
&*multivalued_index,
&[FastFieldCodecType::Bitpacked, FastFieldCodecType::Linear],
output,
)?;
Ok(())
}

pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<RowId>>> {
todo!();
let start_index_column: Arc<dyn ColumnValues<RowId>> =
crate::column_values::open_u64_mapped(bytes)?;
Ok(start_index_column)
}
9 changes: 5 additions & 4 deletions columnar/src/column_index/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use std::io::Write;

use common::{CountingWriter, OwnedBytes};

use crate::column_index::multivalued_index::{serialize_multivalued_index, MultivaluedIndex};
use crate::column_index::multivalued_index::serialize_multivalued_index;
use crate::column_index::optional_index::serialize_optional_index;
use crate::column_index::{ColumnIndex, SerializableOptionalIndex};
use crate::Cardinality;
use crate::column_values::ColumnValues;
use crate::{Cardinality, RowId};

pub enum SerializableColumnIndex<'a> {
Full,
Optional(Box<dyn SerializableOptionalIndex<'a> + 'a>),
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
Multivalued(MultivaluedIndex),
Multivalued(Box<dyn ColumnValues<RowId> + 'a>),
}

impl<'a> SerializableColumnIndex<'a> {
Expand All @@ -39,7 +40,7 @@ pub fn serialize_column_index(
serialize_optional_index(&*optional_index, &mut output)?
}
SerializableColumnIndex::Multivalued(multivalued_index) => {
serialize_multivalued_index(multivalued_index, &mut output)?
serialize_multivalued_index(&*multivalued_index, &mut output)?
}
}
let column_index_num_bytes = output.written_bytes() as u32;
Expand Down
19 changes: 3 additions & 16 deletions columnar/src/column_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ pub mod serialize;
pub use self::column::{monotonic_map_column, ColumnValues, IterColumn, VecColumn};
pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
pub use self::serialize::{serialize_and_load, serialize_column_values, NormalizedHeader};
#[cfg(test)]
pub use self::serialize::serialize_and_load;
pub use self::serialize::{serialize_column_values, NormalizedHeader};
use crate::column_values::bitpacked::BitpackedCodec;
use crate::column_values::blockwise_linear::BlockwiseLinearCodec;
use crate::column_values::linear::LinearCodec;
Expand Down Expand Up @@ -121,21 +123,6 @@ impl U128FastFieldCodecType {
}
}

/// Returns the correct codec reader wrapped in the `Arc` for the data.
// pub fn open_u128<Item: MonotonicallyMappableToU128>(
// bytes: OwnedBytes,
// ) -> io::Result<Arc<dyn Column<Item>>> {
// todo!();
// // let (bytes, _format_version) = read_format_version(bytes)?;
// // let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
// // let header = U128Header::deserialize(&mut bytes)?;
// // assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
// // let reader = CompactSpaceDecompressor::open(bytes)?;
// // let inverted: StrictlyMonotonicMappingInverter<StrictlyMonotonicMappingToInternal<Item>> =
// // StrictlyMonotonicMappingToInternal::<Item>::new().into();
// // Ok(Arc::new(monotonic_map_column(reader, inverted)))
// }

/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128_mapped<T: MonotonicallyMappableToU128>(
mut bytes: OwnedBytes,
Expand Down
23 changes: 0 additions & 23 deletions columnar/src/column_values/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,28 +161,6 @@ impl BinarySerializable for Header {
}
}

/// Return estimated compression for given codec in the value range [0.0..1.0], where 1.0 means no
/// compression.
pub(crate) fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl ColumnValues<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::<T>::new());
let min_value = column.min_value();
let gcd = super::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(
gcd.map(|gcd| gcd.get()).unwrap_or(1u64),
min_value,
);
let normalized_column = monotonic_map_column(&column, mapping);
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
}
}

/// Serializes u128 values with the compact space codec.
pub fn serialize_column_values_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
iter_gen: F,
Expand All @@ -194,7 +172,6 @@ pub fn serialize_column_values_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
codec_type: U128FastFieldCodecType::CompactSpace,
};
header.serialize(output)?;

let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output)?;

Expand Down
6 changes: 3 additions & 3 deletions columnar/src/columnar/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ pub enum MergeDocOrder {
Complex(()),
}

pub fn merge(
columnar_readers: &[ColumnarReader],
pub fn merge_columnar(
_columnar_readers: &[ColumnarReader],
mapping: MergeDocOrder,
output: &mut impl io::Write,
_output: &mut impl io::Write,
) -> io::Result<()> {
match mapping {
MergeDocOrder::Stack => {
Expand Down
1 change: 1 addition & 0 deletions columnar/src/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ mod reader;
mod writer;

pub use column_type::ColumnType;
pub use merge::{merge_columnar, MergeDocOrder};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;
Loading

0 comments on commit cfb2ec2

Please sign in to comment.