Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Vortex Layouts #1805

Merged
merged 13 commits into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ members = [
"vortex-flatbuffers",
"vortex-io",
"vortex-ipc",
"vortex-layout",
"vortex-proto",
"vortex-sampling-compressor",
"vortex-scalar",
1 change: 1 addition & 0 deletions vortex-flatbuffers/Cargo.toml
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ dtype = []
scalar = ["dtype"]
array = ["dtype", "scalar"]
ipc = ["array"]
layout = ["array"]
file = ["ipc"]

[dependencies]
29 changes: 29 additions & 0 deletions vortex-flatbuffers/flatbuffers/vortex-layout/layout.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/// A `Layout` is a recursive data structure describing the physical layout of Vortex arrays in random access storage.
/// As a starting, concrete example, the first three Layout encodings are defined as:
///
/// 1. encoding == 1, `Flat` -> one buffer, zero child Layouts
/// 2. encoding == 2, `Chunked` -> zero buffers, one or more child Layouts (used for chunks of rows)
/// 3. encoding == 3, `Columnar` -> zero buffers, one or more child Layouts (used for columns of structs)
///
/// The `row_count` represents the number of rows represented by this Layout. This is very useful for
/// pruning the Layout tree based on row filters.
///
/// The `metadata` field is fully opaque at this layer, and allows the Layout implementation corresponding to
/// `encoding` to embed additional information that may be useful for the reader. For example, the `ChunkedLayout`
/// uses the first byte of the `metadata` array as a boolean to indicate whether the first child Layout represents
/// the statistics table for the other chunks.
table Layout {
/// The ID of the encoding used for this Layout.
encoding: uint16;
/// The number of rows of data represented by this Layout.
row_count: uint64;
/// Any additional metadata this layout needs to interpret its children.
/// This does not include data-specific metadata, which the layout should store in a segment.
metadata: [ubyte];
/// The children of this Layout.
children: [Layout];
/// Identifiers for each `Segment` of data required by this layout.
segments: [uint32];
}

root_type Layout;
267 changes: 267 additions & 0 deletions vortex-flatbuffers/src/generated/layout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// automatically generated by the FlatBuffers compiler, do not modify


// @generated

use core::mem;
use core::cmp::Ordering;

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};

pub enum LayoutOffset {}
#[derive(Copy, Clone, PartialEq)]

/// A `Layout` is a recursive data structure describing the physical layout of Vortex arrays in random access storage.
/// As a starting, concrete example, the first three Layout encodings are defined as:
///
/// 1. encoding == 1, `Flat` -> one buffer, zero child Layouts
/// 2. encoding == 2, `Chunked` -> zero buffers, one or more child Layouts (used for chunks of rows)
/// 3. encoding == 3, `Columnar` -> zero buffers, one or more child Layouts (used for columns of structs)
///
/// The `row_count` represents the number of rows represented by this Layout. This is very useful for
/// pruning the Layout tree based on row filters.
///
/// The `metadata` field is fully opaque at this layer, and allows the Layout implementation corresponding to
/// `encoding` to embed additional information that may be useful for the reader. For example, the `ChunkedLayout`
/// uses the first byte of the `metadata` array as a boolean to indicate whether the first child Layout represents
/// the statistics table for the other chunks.
pub struct Layout<'a> {
pub _tab: flatbuffers::Table<'a>,
}

impl<'a> flatbuffers::Follow<'a> for Layout<'a> {
type Inner = Layout<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}

impl<'a> Layout<'a> {
pub const VT_ENCODING: flatbuffers::VOffsetT = 4;
pub const VT_ROW_COUNT: flatbuffers::VOffsetT = 6;
pub const VT_METADATA: flatbuffers::VOffsetT = 8;
pub const VT_CHILDREN: flatbuffers::VOffsetT = 10;
pub const VT_SEGMENTS: flatbuffers::VOffsetT = 12;

#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Layout { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args LayoutArgs<'args>
) -> flatbuffers::WIPOffset<Layout<'bldr>> {
let mut builder = LayoutBuilder::new(_fbb);
builder.add_row_count(args.row_count);
if let Some(x) = args.segments { builder.add_segments(x); }
if let Some(x) = args.children { builder.add_children(x); }
if let Some(x) = args.metadata { builder.add_metadata(x); }
builder.add_encoding(args.encoding);
builder.finish()
}


/// The ID of the encoding used for this Layout.
#[inline]
pub fn encoding(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(Layout::VT_ENCODING, Some(0)).unwrap()}
}
/// The number of rows of data represented by this Layout.
#[inline]
pub fn row_count(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(Layout::VT_ROW_COUNT, Some(0)).unwrap()}
}
/// Any additional metadata this layout needs to interpret its children.
/// This does not include data-specific metadata, which the layout should store in a segment.
#[inline]
pub fn metadata(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(Layout::VT_METADATA, None)}
}
/// The children of this Layout.
#[inline]
pub fn children(&self) -> Option<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout>>>>(Layout::VT_CHILDREN, None)}
}
/// Identifiers for each `Segment` of data required by this layout.
#[inline]
pub fn segments(&self) -> Option<flatbuffers::Vector<'a, u32>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u32>>>(Layout::VT_SEGMENTS, None)}
}
}

impl flatbuffers::Verifiable for Layout<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
.visit_field::<u64>("row_count", Self::VT_ROW_COUNT, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<Layout>>>>("children", Self::VT_CHILDREN, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u32>>>("segments", Self::VT_SEGMENTS, false)?
.finish();
Ok(())
}
}
pub struct LayoutArgs<'a> {
pub encoding: u16,
pub row_count: u64,
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
pub children: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>>>,
pub segments: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u32>>>,
}
impl<'a> Default for LayoutArgs<'a> {
#[inline]
fn default() -> Self {
LayoutArgs {
encoding: 0,
row_count: 0,
metadata: None,
children: None,
segments: None,
}
}
}

pub struct LayoutBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> LayoutBuilder<'a, 'b, A> {
#[inline]
pub fn add_encoding(&mut self, encoding: u16) {
self.fbb_.push_slot::<u16>(Layout::VT_ENCODING, encoding, 0);
}
#[inline]
pub fn add_row_count(&mut self, row_count: u64) {
self.fbb_.push_slot::<u64>(Layout::VT_ROW_COUNT, row_count, 0);
}
#[inline]
pub fn add_metadata(&mut self, metadata: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_METADATA, metadata);
}
#[inline]
pub fn add_children(&mut self, children: flatbuffers::WIPOffset<flatbuffers::Vector<'b , flatbuffers::ForwardsUOffset<Layout<'b >>>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_CHILDREN, children);
}
#[inline]
pub fn add_segments(&mut self, segments: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u32>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_SEGMENTS, segments);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> LayoutBuilder<'a, 'b, A> {
let start = _fbb.start_table();
LayoutBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<Layout<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}

impl core::fmt::Debug for Layout<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Layout");
ds.field("encoding", &self.encoding());
ds.field("row_count", &self.row_count());
ds.field("metadata", &self.metadata());
ds.field("children", &self.children());
ds.field("segments", &self.segments());
ds.finish()
}
}
#[inline]
/// Verifies that a buffer of bytes contains a `Layout`
/// and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_layout_unchecked`.
pub fn root_as_layout(buf: &[u8]) -> Result<Layout, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root::<Layout>(buf)
}
#[inline]
/// Verifies that a buffer of bytes contains a size prefixed
/// `Layout` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `size_prefixed_root_as_layout_unchecked`.
pub fn size_prefixed_root_as_layout(buf: &[u8]) -> Result<Layout, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root::<Layout>(buf)
}
#[inline]
/// Verifies, with the given options, that a buffer of bytes
/// contains a `Layout` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_layout_unchecked`.
pub fn root_as_layout_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Layout<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root_with_opts::<Layout<'b>>(opts, buf)
}
#[inline]
/// Verifies, with the given verifier options, that a buffer of
/// bytes contains a size prefixed `Layout` and returns
/// it. Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_layout_unchecked`.
pub fn size_prefixed_root_as_layout_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Layout<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root_with_opts::<Layout<'b>>(opts, buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a Layout and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid `Layout`.
pub unsafe fn root_as_layout_unchecked(buf: &[u8]) -> Layout {
flatbuffers::root_unchecked::<Layout>(buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a size prefixed Layout and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid size prefixed `Layout`.
pub unsafe fn size_prefixed_root_as_layout_unchecked(buf: &[u8]) -> Layout {
flatbuffers::size_prefixed_root_unchecked::<Layout>(buf)
}
#[inline]
pub fn finish_layout_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>(
fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
root: flatbuffers::WIPOffset<Layout<'a>>) {
fbb.finish(root, None);
}

#[inline]
pub fn finish_size_prefixed_layout_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>(fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, root: flatbuffers::WIPOffset<Layout<'a>>) {
fbb.finish_size_prefixed(root, None);
}
22 changes: 22 additions & 0 deletions vortex-flatbuffers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -92,6 +92,28 @@ pub mod scalar;
/// ```
pub mod footer;

#[cfg(feature = "layout")]
#[allow(clippy::all)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[allow(clippy::many_single_char_names)]
#[allow(clippy::unwrap_used)]
#[allow(dead_code)]
#[allow(non_snake_case)]
#[allow(non_camel_case_types)]
#[allow(unsafe_op_in_unsafe_fn)]
#[allow(unused_imports)]
#[allow(unused_lifetimes)]
#[allow(unused_qualifications)]
#[rustfmt::skip]
#[path = "./generated/layout.rs"]
/// A serialized sequence of arrays, each with its buffers.
///
/// `layout.fbs`:
/// ```flatbuffers
#[doc = include_str!("../flatbuffers/vortex-layout/layout.fbs")]
/// ```
pub mod layout;

#[cfg(feature = "ipc")]
#[allow(clippy::all)]
#[allow(clippy::derive_partial_eq_without_eq)]
33 changes: 33 additions & 0 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "vortex-layout"
description = "Vortex layouts provide a way to perform lazy push-down scans over abstract storage"
version.workspace = true
homepage.workspace = true
repository.workspace = true
authors.workspace = true
license.workspace = true
keywords.workspace = true
include.workspace = true
edition.workspace = true
rust-version.workspace = true
readme.workspace = true
categories.workspace = true

[dependencies]
arrow-buffer = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
itertools = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true, features = ["flatbuffers"] }
vortex-error = { workspace = true }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-ipc = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }

[lints]
workspace = true
36 changes: 36 additions & 0 deletions vortex-layout/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use vortex_array::aliases::hash_map::HashMap;

use crate::encoding::{LayoutEncodingRef, LayoutId};
use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::FlatLayout;
use crate::layouts::struct_::StructLayout;

#[derive(Debug, Clone)]
pub struct LayoutContext {
layout_refs: HashMap<LayoutId, LayoutEncodingRef>,
}

impl LayoutContext {
pub fn new(layout_refs: HashMap<LayoutId, LayoutEncodingRef>) -> Self {
Self { layout_refs }
}

pub fn lookup_layout(&self, id: &LayoutId) -> Option<LayoutEncodingRef> {
self.layout_refs.get(id).cloned()
}
}

impl Default for LayoutContext {
fn default() -> Self {
Self::new(
[
&ChunkedLayout as LayoutEncodingRef,
&FlatLayout,
&StructLayout,
]
.into_iter()
.map(|l| (l.id(), l))
.collect(),
)
}
}
256 changes: 256 additions & 0 deletions vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
use std::ops::Deref;

use bytes::Bytes;
use flatbuffers::{root, FlatBufferBuilder, WIPOffset};
use vortex_array::ContextRef;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_flatbuffers::{layout as fb, layout, FlatBufferRoot, WriteFlatBuffer};

use crate::encoding::{LayoutEncodingRef, LayoutId};
use crate::scanner::{LayoutScan, Scan};
use crate::segments::SegmentId;

/// [`LayoutData`] is the lazy equivalent to [`vortex_array::ArrayData`], providing a hierarchical
/// structure.
#[derive(Debug, Clone)]
pub struct LayoutData(Inner);

#[derive(Debug, Clone)]
enum Inner {
Owned(OwnedLayoutData),
Viewed(ViewedLayoutData),
}

/// A layout that is fully deserialized and heap-allocated.
#[derive(Debug, Clone)]
pub struct OwnedLayoutData {
encoding: LayoutEncodingRef,
dtype: DType,
row_count: u64,
segments: Option<Vec<SegmentId>>,
children: Option<Vec<LayoutData>>,
metadata: Option<Bytes>,
}

/// A layout that is lazily deserialized from a flatbuffer message.
#[derive(Debug, Clone)]
struct ViewedLayoutData {
encoding: LayoutEncodingRef,
dtype: DType,
// FIXME(ngates): i think this is in the flatbuffer?
row_count: u64,
flatbuffer: ByteBuffer,
flatbuffer_loc: usize,
}

impl ViewedLayoutData {
/// Return the flatbuffer layout message.
fn flatbuffer(&self) -> layout::Layout<'_> {
unsafe {
let tab = flatbuffers::Table::new(self.flatbuffer.as_ref(), self.flatbuffer_loc);
layout::Layout::init_from_table(tab)
}
}
}

impl LayoutData {
/// Create a new owned layout.
pub fn new_owned(
encoding: LayoutEncodingRef,
dtype: DType,
row_count: u64,
segments: Option<Vec<SegmentId>>,
children: Option<Vec<LayoutData>>,
metadata: Option<Bytes>,
) -> Self {
Self(Inner::Owned(OwnedLayoutData {
encoding,
dtype,
row_count,
segments,
children,
metadata,
}))
}

/// Create a new viewed layout from a flatbuffer root message.
pub fn try_new_viewed(
encoding: LayoutEncodingRef,
dtype: DType,
row_count: u64,
flatbuffer: ByteBuffer,
) -> VortexResult<Self> {
// Validate the buffer contains a layout message.
let layout_fb = root::<fb::Layout>(flatbuffer.as_ref())?;
if layout_fb.encoding() != encoding.id().0 {
vortex_bail!(
"Mismatched encoding, flatbuffer contains {}",
layout_fb.encoding()
);
}
Ok(Self(Inner::Viewed(ViewedLayoutData {
encoding,
dtype,
row_count,
flatbuffer,
flatbuffer_loc: 0,
})))
}

/// Returns the [`crate::LayoutEncoding`] for this layout.
pub fn encoding(&self) -> LayoutEncodingRef {
match &self.0 {
Inner::Owned(owned) => owned.encoding,
Inner::Viewed(viewed) => viewed.encoding,
}
}

/// Returns the ID of the layout.
pub fn id(&self) -> LayoutId {
match &self.0 {
Inner::Owned(owned) => owned.encoding.id(),
Inner::Viewed(viewed) => LayoutId(viewed.flatbuffer().encoding()),
}
}

/// Return the row-count of the layout.
pub fn row_count(&self) -> u64 {
match &self.0 {
Inner::Owned(owned) => owned.row_count,
Inner::Viewed(viewed) => viewed.row_count,
}
}

/// Return the data type of the layout.
pub fn dtype(&self) -> &DType {
match &self.0 {
Inner::Owned(owned) => &owned.dtype,
Inner::Viewed(viewed) => &viewed.dtype,
}
}

/// Returns the number of children of the layout.
pub fn nchildren(&self) -> usize {
match &self.0 {
Inner::Owned(owned) => owned.children.as_ref().map_or(0, |children| children.len()),
Inner::Viewed(viewed) => viewed
.flatbuffer()
.children()
.map_or(0, |children| children.len()),
}
}

/// Fetch the i'th child layout.
pub fn child(&self, _i: usize, _dtype: DType) -> Option<LayoutData> {
todo!()
}

/// Fetch the i'th segment id of the layout.
pub fn segment_id(&self, i: usize) -> Option<SegmentId> {
match &self.0 {
Inner::Owned(owned) => owned
.segments
.as_ref()
.and_then(|msgs| msgs.get(i).copied()),
Inner::Viewed(viewed) => viewed
.flatbuffer()
.segments()
.and_then(|segments| (i < segments.len()).then(|| segments.get(i)))
.map(SegmentId::from),
}
}

/// Returns the layout metadata
pub fn metadata(&self) -> Option<Bytes> {
match &self.0 {
Inner::Owned(owned) => owned.metadata.clone(),
Inner::Viewed(viewed) => viewed.flatbuffer().metadata().map(|m| {
// Return the metadata bytes zero-copy by finding them in the flatbuffer.
let bytes = viewed.flatbuffer.clone().into_inner();
bytes.slice_ref(m.bytes())
}),
}
}

/// Create a scan of this layout.
pub fn new_scan(self, scan: Scan, ctx: ContextRef) -> VortexResult<Box<dyn LayoutScan>> {
self.encoding().scan(self, scan, ctx)
}
}

impl FlatBufferRoot for LayoutData {}

impl WriteFlatBuffer for LayoutData {
type Target<'a> = layout::Layout<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
match &self.0 {
Inner::Owned(layout) => {
let metadata = layout.metadata.as_ref().map(|b| fbb.create_vector(b));
let children = layout.children.as_ref().map(|children| {
children
.iter()
.map(|c| c.write_flatbuffer(fbb))
.collect::<Vec<_>>()
});
let children = children.map(|c| fbb.create_vector(&c));
let segments = layout
.segments
.as_ref()
.map(|m| m.iter().map(|s| s.deref()).copied().collect::<Vec<u32>>());
let segments = segments.map(|m| fbb.create_vector(&m));

layout::Layout::create(
fbb,
&layout::LayoutArgs {
encoding: layout.encoding.id().0,
row_count: layout.row_count,
metadata,
children,
segments,
},
)
}
Inner::Viewed(layout) => LayoutFlatBuffer(layout.flatbuffer()).write_flatbuffer(fbb),
}
}
}

struct LayoutFlatBuffer<'l>(layout::Layout<'l>);

impl WriteFlatBuffer for LayoutFlatBuffer<'_> {
type Target<'a> = layout::Layout<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let metadata = self.0.metadata().map(|m| fbb.create_vector(m.bytes()));
let children = self.0.children().map(|c| {
c.iter()
.map(|child| LayoutFlatBuffer(child).write_flatbuffer(fbb))
.collect::<Vec<_>>()
});
let children = children.map(|c| fbb.create_vector(&c));
let segments = self
.0
.segments()
.map(|m| fbb.create_vector_from_iter(m.iter()));

layout::Layout::create(
fbb,
&layout::LayoutArgs {
encoding: self.0.encoding(),
row_count: self.0.row_count(),
metadata,
children,
segments,
},
)
}
}
33 changes: 33 additions & 0 deletions vortex-layout/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::fmt::{Debug, Display, Formatter};

use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::scanner::{LayoutScan, Scan};
use crate::LayoutData;

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct LayoutId(pub u16);

impl Display for LayoutId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

pub trait LayoutEncoding: Debug + Send + Sync {
/// Returns the globally unique ID for this type of layout.
fn id(&self) -> LayoutId;

/// Construct a [`LayoutScan`] for the provided [`LayoutData`].
///
/// May panic if the provided `LayoutData` is not the same encoding as this `LayoutEncoding`.
fn scan(
&self,
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>>;
}

pub type LayoutEncodingRef = &'static dyn LayoutEncoding;
36 changes: 36 additions & 0 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
mod scan;
pub mod stats;
pub mod writer;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::chunked::scan::ChunkedScan;
use crate::scanner::{LayoutScan, LayoutScanExt, Scan};
use crate::CHUNKED_LAYOUT_ID;

#[derive(Default, Debug)]
pub struct ChunkedLayout;

/// In-memory representation of Chunked layout.
///
/// First child in the list is the metadata table
/// Subsequent children are consecutive chunks of this layout
impl LayoutEncoding for ChunkedLayout {
fn id(&self) -> LayoutId {
CHUNKED_LAYOUT_ID
}

// TODO(ngates): we probably need some reader options that we can downcast here? But how does
// the user configure the tree of readers? e.g. batch size
fn scan(
&self,
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(ChunkedScan::try_new(layout, scan, ctx)?.boxed())
}
}
112 changes: 112 additions & 0 deletions vortex-layout/src/layouts/chunked/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use vortex_array::{ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_error::{vortex_err, vortex_panic, VortexResult};

use crate::layouts::chunked::ChunkedLayout;
use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
use crate::segments::SegmentReader;
use crate::{LayoutData, LayoutEncoding, RowMask};

#[derive(Clone)]
struct Reading {
// The index of the chunk currently being read.
chunk_idx: usize,
// The layout of the chunk currently being read.
chunk_layout: LayoutData,
// The statistics table, if required
statistics: Option<ArrayData>,
}

pub struct ChunkedScan {
layout: LayoutData,
scan: Scan,
dtype: DType,
ctx: ContextRef,
}

impl ChunkedScan {
pub(super) fn try_new(layout: LayoutData, scan: Scan, ctx: ContextRef) -> VortexResult<Self> {
if layout.encoding().id() != ChunkedLayout.id() {
vortex_panic!("Mismatched layout ID")
}
let dtype = scan.result_dtype(layout.dtype())?;
Ok(Self {
layout,
scan,
dtype,
ctx,
})
}
}

impl LayoutScan for ChunkedScan {
fn layout(&self) -> &LayoutData {
&self.layout
}

fn dtype(&self) -> &DType {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
Ok(Box::new(ChunkedScanner {
layout: self.layout.clone(),
mask,
state: State::Initial,
}) as _)
}
}

#[derive(Clone)]
enum State {
Initial,
Reading(Reading),
}

struct ChunkedScanner {
layout: LayoutData,
mask: RowMask,
state: State,
}

impl ChunkedScanner {
/// Returns the [`LayoutData`] for the given chunk.
fn chunk_layout(&self, chunk_idx: usize) -> VortexResult<LayoutData> {
self.layout
.child(chunk_idx, self.layout.dtype().clone())
.ok_or_else(|| vortex_err!("Chunk index out of bounds"))
}
}

impl Scanner for ChunkedScanner {
fn poll(&mut self, _segments: &dyn SegmentReader) -> VortexResult<Poll> {
loop {
match self.state {
State::Initial => {
// TODO(ngates): decide whether to read the stats table. We should read it if:
// * The scan's filter expression exists and is prune-able,
// * The scan's mask spans more than a single chunk

// We always start at chunk zero. The reading state will skip if there's
// no work based on the mask.
let chunk_idx = 0;
let chunk_layout = self.chunk_layout(chunk_idx)?;
self.state = State::Reading(Reading {
chunk_idx,
chunk_layout,
statistics: None,
});
Comment on lines +94 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to support out of order reads, if one chunk is very big but you know you have to read the next chunk you don't have to block reading chunk + 1 on previous chunk

}
State::Reading(Reading { .. }) => {
// self.mask.is_disjoint(self.chunk_ranges)
// self.state = State::Reading(Reading {
// chunk_idx: chunk_idx + 1,
// chunk_layout: self.chunk_layout(chunk_idx + 1)?,
// statistics: None,
// });s
todo!()
}
}
}
}
}
77 changes: 77 additions & 0 deletions vortex-layout/src/layouts/chunked/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! Metadata accumulators track the per-chunk-of-a-column metadata, layout locations, and row counts.
use itertools::Itertools;
use vortex_array::array::StructArray;
use vortex_array::builders::{builder_with_capacity, ArrayBuilder, ArrayBuilderExt};
use vortex_array::stats::{ArrayStatistics as _, Stat};
use vortex_array::validity::{ArrayValidity, Validity};
use vortex_array::{ArrayData, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;

pub struct StatsAccumulator {
stats: Vec<Stat>,
builders: Vec<Box<dyn ArrayBuilder>>,
length: usize,
}

impl StatsAccumulator {
pub fn new(dtype: &DType, mut stats: Vec<Stat>) -> Self {
// Sort stats by their ordinal so we can recreate their dtype from bitset
stats.sort_by_key(|s| u8::from(*s));
let builders = stats
.iter()
.map(|s| builder_with_capacity(&s.dtype(dtype).as_nullable(), 1024))
.collect();
Self {
stats,
builders,
length: 0,
}
}

pub fn push_chunk(&mut self, array: &ArrayData) -> VortexResult<()> {
for (s, builder) in self.stats.iter().zip_eq(self.builders.iter_mut()) {
if let Some(v) = array.statistics().compute(*s) {
builder.append_scalar(&v.cast(builder.dtype())?)?;
} else {
builder.append_null();
}
}
self.length += 1;
Ok(())
}

pub fn as_array(&mut self) -> VortexResult<Option<StatsArray>> {
let mut names = Vec::new();
let mut fields = Vec::new();
let mut stats = Vec::new();

for (stat, builder) in self.stats.iter().zip(self.builders.iter_mut()) {
let values = builder
.finish()
.map_err(|e| e.with_context(format!("Failed to finish stat builder for {stat}")))?;

// We drop any all-null stats columns
if values.logical_validity().null_count()? == values.len() {
continue;
}

stats.push(*stat);
names.push(stat.to_string().into());
fields.push(values);
}

if names.is_empty() {
return Ok(None);
}

Ok(Some(StatsArray(
StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)?
.into_array(),
stats,
)))
}
}

pub struct StatsArray(pub ArrayData, pub Vec<Stat>);
100 changes: 100 additions & 0 deletions vortex-layout/src/layouts/chunked/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use bytes::Bytes;
use vortex_array::stats::{as_stat_bitset_bytes, Stat, PRUNING_STATS};
use vortex_array::{ArrayDType, ArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::data::LayoutData;
use crate::layouts::chunked::stats::StatsAccumulator;
use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter};

pub struct ChunkedLayoutOptions {
/// The statistics to collect for each chunk.
pub chunk_stats: Vec<Stat>,
/// The layout strategy for each chunk.
pub chunk_strategy: Box<dyn LayoutStrategy>,
}

impl Default for ChunkedLayoutOptions {
fn default() -> Self {
Self {
chunk_stats: PRUNING_STATS.to_vec(),
chunk_strategy: Box::new(FlatLayout),
}
}
}

/// A basic implementation of a chunked layout writer that writes each batch into its own chunk.
///
/// TODO(ngates): introduce more sophisticated layout writers with different chunking strategies.
pub struct ChunkedLayoutWriter {
options: ChunkedLayoutOptions,
chunks: Vec<Box<dyn LayoutWriter>>,
stats_accumulator: StatsAccumulator,
dtype: DType,
row_count: u64,
}

impl ChunkedLayoutWriter {
pub fn new(dtype: &DType, options: ChunkedLayoutOptions) -> Self {
let stats_accumulator = StatsAccumulator::new(dtype, options.chunk_stats.clone());
Self {
options,
chunks: Vec::new(),
stats_accumulator,
dtype: dtype.clone(),
row_count: 0,
}
}
}

impl LayoutWriter for ChunkedLayoutWriter {
fn push_chunk(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<()> {
self.row_count += chunk.len() as u64;
self.stats_accumulator.push_chunk(&chunk)?;

// We write each chunk, but don't call finish quite yet to ensure that chunks have an
// opportunity to write messages at the end of the file.
let mut chunk_writer = self.options.chunk_strategy.new_writer(chunk.dtype())?;
chunk_writer.push_chunk(segments, chunk)?;
self.chunks.push(chunk_writer);

Ok(())
}

fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
// Call finish on each chunk's writer
let mut chunk_layouts = vec![];
for writer in self.chunks.iter_mut() {
chunk_layouts.push(writer.finish(segments)?);
}

// Collect together the statistics
let stats_array = self.stats_accumulator.as_array()?;
let metadata: Option<Bytes> = match stats_array {
Some(stats_array) => {
let _stats_segment_id = segments.put_chunk(stats_array.0);
// We store a bit-set of the statistics in the layout metadata so we can infer the
// statistics array schema when reading the layout.
Some(as_stat_bitset_bytes(&stats_array.1).into())
}
None => None,
};

Ok(LayoutData::new_owned(
&ChunkedLayout,
self.dtype.clone(),
self.row_count,
None,
Some(chunk_layouts),
metadata,
))
}
}
28 changes: 28 additions & 0 deletions vortex-layout/src/layouts/flat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
mod scan;
pub mod writer;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::flat::scan::FlatScan;
use crate::scanner::{LayoutScan, LayoutScanExt, Scan};
use crate::{LayoutData, FLAT_LAYOUT_ID};

#[derive(Debug)]
pub struct FlatLayout;

impl LayoutEncoding for FlatLayout {
fn id(&self) -> LayoutId {
FLAT_LAYOUT_ID
}

fn scan(
&self,
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(FlatScan::try_new(layout, scan, ctx)?.boxed())
}
}
209 changes: 209 additions & 0 deletions vortex-layout/src/layouts/flat/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use vortex_array::compute::{fill_null, filter, FilterMask};
use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult};
use vortex_ipc::messages::{BufMessageReader, DecoderMessage};

use crate::layouts::flat::FlatLayout;
use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
use crate::segments::{SegmentId, SegmentReader};
use crate::{LayoutData, LayoutEncoding, RowMask};

#[derive(Clone, Eq, PartialEq)]
enum State {
Initial,
}

pub struct FlatScan {
layout: LayoutData,
scan: Scan,
dtype: DType,
ctx: ContextRef,
state: State,
}

impl FlatScan {
pub(super) fn try_new(layout: LayoutData, scan: Scan, ctx: ContextRef) -> VortexResult<Self> {
if layout.encoding().id() != FlatLayout.id() {
vortex_panic!("Mismatched layout ID")
}
let dtype = scan.result_dtype(layout.dtype())?;
Ok(Self {
layout,
scan,
dtype,
ctx,
state: State::Initial,
})
}
}

impl LayoutScan for FlatScan {
fn layout(&self) -> &LayoutData {
&self.layout
}

fn dtype(&self) -> &DType {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
let segment_id = self
.layout
.segment_id(0)
.ok_or_else(|| vortex_err!("FlatLayout missing SegmentID"))?;

// Convert the row-mask to a filter mask
let filter_mask = mask.into_filter_mask()?;

Ok(Box::new(FlatScanner {
segment_id,
dtype: self.layout.dtype().clone(),
scan: self.scan.clone(),
ctx: self.ctx.clone(),
mask: filter_mask,
}) as _)
}
}

struct FlatScanner {
segment_id: SegmentId,
dtype: DType,
scan: Scan,
ctx: ContextRef,
mask: FilterMask,
}

impl Scanner for FlatScanner {
fn poll(&mut self, segments: &dyn SegmentReader) -> VortexResult<Poll> {
match segments.get(self.segment_id) {
None => Ok(Poll::NeedMore(vec![self.segment_id])),
Some(bytes) => {
// Decode the ArrayParts from the message bytes.
// TODO(ngates): ArrayParts should probably live in vortex-array, and not required
// IPC message framing to read or write.
let mut msg_reader = BufMessageReader::new(bytes);
let array = if let DecoderMessage::Array(parts) = msg_reader
.next()
.ok_or_else(|| vortex_err!("Flat message body missing"))??
{
parts.into_array_data(self.ctx.clone(), self.dtype.clone())
} else {
vortex_bail!("Flat message is not ArrayParts")
}?;

// TODO(ngates): I think we can pull out a "Scanner" object that encapsulates
// clever logic for figuring out the best order to apply the filter, projection,
// and filter mask. This can then be re-used across chunks so the selectivity
// stats are preserved.

// Now we can apply the scan to the array.
// NOTE(ngates): there's not a clear answer for order to apply the filter
// expression, projection and filter mask. We should experiment.
let mut array = filter(&array, self.mask.clone())?;
if let Some(expr) = &self.scan.filter {
let mask = expr.evaluate(&array)?;
let mask = fill_null(&mask, false.into())?;
let mask = FilterMask::try_from(mask)?;
array = filter(&array, mask)?;
}
array = self.scan.projection.evaluate(&array)?;

Ok(Poll::Some(array))
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use vortex_array::array::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};
use vortex_buffer::buffer;
use vortex_dtype::{DType, Nullability};
use vortex_expr::{BinaryExpr, Identity, Literal, Operator};

use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::scanner::Scan;
use crate::segments::test::TestSegments;
use crate::strategies::LayoutWriterExt;

#[test]
fn flat_scan() {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let result = segments
.do_scan(
layout
.new_scan(Scan::all(), Default::default())
.unwrap()
.as_ref(),
)
.into_primitive()
.unwrap();

assert_eq!(array.as_slice::<i32>(), result.as_slice::<i32>());
}

#[test]
fn flat_scan_filter() {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let scan = Scan {
projection: Identity::new_expr(),
filter: Some(BinaryExpr::new_expr(
Arc::new(Identity),
Operator::Gt,
Literal::new_expr(3.into()),
)),
};

let result = segments
.do_scan(layout.new_scan(scan, Default::default()).unwrap().as_ref())
.into_primitive()
.unwrap();

assert_eq!(&[4, 5], result.as_slice::<i32>());
}

#[test]
fn flat_scan_filter_project() {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let scan = Scan {
// The projection function here changes the scan's DType to boolean
projection: BinaryExpr::new_expr(
Arc::new(Identity),
Operator::Lt,
Literal::new_expr(5.into()),
),
filter: Some(BinaryExpr::new_expr(
Arc::new(Identity),
Operator::Gt,
Literal::new_expr(3.into()),
)),
};

let scan = layout.new_scan(scan, Default::default()).unwrap();
assert_eq!(scan.dtype(), &DType::Bool(Nullability::Nullable));

let result = segments.do_scan(scan.as_ref()).into_bool().unwrap();
assert!(result.boolean_buffer().value(0));
assert!(!result.boolean_buffer().value(1));
}
}
52 changes: 52 additions & 0 deletions vortex-layout/src/layouts/flat/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentWriter;
use crate::strategies::LayoutWriter;
use crate::LayoutData;

/// Writer for the flat layout.
pub struct FlatLayoutWriter {
dtype: DType,
layout: Option<LayoutData>,
}

impl FlatLayoutWriter {
pub fn new(dtype: DType) -> Self {
Self {
dtype,
layout: None,
}
}
}

impl LayoutWriter for FlatLayoutWriter {
fn push_chunk(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<()> {
if self.layout.is_some() {
vortex_bail!("FlatLayoutStrategy::push_batch called after finish");
}
let row_count = chunk.len() as u64;
let segment_id = segments.put_chunk(chunk);
self.layout = Some(LayoutData::new_owned(
&FlatLayout,
self.dtype.clone(),
row_count,
Some(vec![segment_id]),
None,
None,
));
Ok(())
}

fn finish(&mut self, _segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
self.layout
.take()
.ok_or_else(|| vortex_err!("FlatLayoutStrategy::finish called without push_batch"))
}
}
4 changes: 4 additions & 0 deletions vortex-layout/src/layouts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! A collection of built-in layouts for Vortex
pub mod chunked;
pub mod flat;
pub mod struct_;
29 changes: 29 additions & 0 deletions vortex-layout/src/layouts/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
mod scan;
pub mod writer;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::struct_::scan::StructScan;
use crate::scanner::{LayoutScan, LayoutScanExt, Scan};
use crate::COLUMNAR_LAYOUT_ID;

#[derive(Debug)]
pub struct StructLayout;

impl LayoutEncoding for StructLayout {
fn id(&self) -> LayoutId {
COLUMNAR_LAYOUT_ID
}

fn scan(
&self,
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(StructScan::try_new(layout, scan, ctx)?.boxed())
}
}
68 changes: 68 additions & 0 deletions vortex-layout/src/layouts/struct_/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::{vortex_panic, VortexResult};

use crate::layouts::struct_::StructLayout;
use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
use crate::segments::SegmentReader;
use crate::{LayoutData, LayoutEncoding, RowMask};

pub struct StructScan {
layout: LayoutData,
scan: Scan,
dtype: DType,
}

impl StructScan {
pub(super) fn try_new(layout: LayoutData, scan: Scan, _ctx: ContextRef) -> VortexResult<Self> {
if layout.encoding().id() != StructLayout.id() {
vortex_panic!("Mismatched layout ID")
}

let dtype = scan.result_dtype(layout.dtype())?;

// This is where we need to do some complex things with the scan in order to split it into
// different scans for different fields.
Ok(Self {
layout,
scan,
dtype,
})
}
}
impl LayoutScan for StructScan {
fn layout(&self) -> &LayoutData {
&self.layout
}

fn dtype(&self) -> &DType {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
Ok(Box::new(StructScanner {
layout: self.layout.clone(),
scan: self.scan.clone(),
mask,
state: State::Initial,
}) as _)
}
}

#[derive(Clone)]
enum State {
Initial,
}

struct StructScanner {
layout: LayoutData,
scan: Scan,
mask: RowMask,
state: State,
}

impl Scanner for StructScanner {
fn poll(&mut self, _segments: &dyn SegmentReader) -> VortexResult<Poll> {
todo!()
}
}
92 changes: 92 additions & 0 deletions vortex-layout/src/layouts/struct_/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use itertools::Itertools;
use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::data::LayoutData;
use crate::layouts::struct_::StructLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter};

/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
pub struct StructLayoutWriter {
column_strategies: Vec<Box<dyn LayoutWriter>>,
dtype: DType,
row_count: u64,
}

impl StructLayoutWriter {
pub fn new(dtype: DType, column_layout_writers: Vec<Box<dyn LayoutWriter>>) -> Self {
let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
if struct_dtype.dtypes().len() != column_layout_writers.len() {
vortex_panic!(
"number of fields in struct dtype does not match number of column layout writers"
);
}
Self {
column_strategies: column_layout_writers,
dtype,
row_count: 0,
}
}

pub fn try_new_with_factory<F: LayoutStrategy>(
dtype: &DType,
factory: F,
) -> VortexResult<Self> {
let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
Ok(Self::new(
dtype.clone(),
struct_dtype
.dtypes()
.iter()
.map(|dtype| factory.new_writer(dtype))
.try_collect()?,
))
}
}

impl LayoutWriter for StructLayoutWriter {
fn push_chunk(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<()> {
let struct_array = chunk
.as_struct_array()
.ok_or_else(|| vortex_err!("batch is not a struct array"))?;

if struct_array.nfields() != self.column_strategies.len() {
vortex_bail!(
"number of fields in struct array does not match number of column layout writers"
);
}
self.row_count += struct_array.len() as u64;

for i in 0..struct_array.nfields() {
let column = chunk
.as_struct_array()
.vortex_expect("batch is a struct array")
.field(i)
.vortex_expect("bounds already checked");
self.column_strategies[i].push_chunk(segments, column)?;
}

Ok(())
}

fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
let mut column_layouts = vec![];
for writer in self.column_strategies.iter_mut() {
column_layouts.push(writer.finish(segments)?);
}
Ok(LayoutData::new_owned(
&StructLayout,
self.dtype.clone(),
self.row_count,
None,
Some(column_layouts),
None,
))
}
}
19 changes: 19 additions & 0 deletions vortex-layout/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#![allow(dead_code)]
mod data;
pub mod scanner;
pub use data::*;
mod context;
mod encoding;
pub mod layouts;
pub use encoding::*;
mod row_mask;
pub use row_mask::*;
mod segments;
pub mod strategies;

/// The layout ID for a flat layout
pub(crate) const FLAT_LAYOUT_ID: LayoutId = LayoutId(1);
/// The layout ID for a chunked layout
pub(crate) const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2);
/// The layout ID for a column layout
pub(crate) const COLUMNAR_LAYOUT_ID: LayoutId = LayoutId(3);
472 changes: 472 additions & 0 deletions vortex-layout/src/row_mask.rs

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions vortex-layout/src/scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
mod scan;

pub use scan::*;
use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::segments::{SegmentId, SegmentReader};
use crate::{LayoutData, RowMask};

/// A [`LayoutScan`] provides an encapsulation of an invocation of a scan operation.
pub trait LayoutScan: Send {
/// Returns the [`LayoutData`] that this scan is operating on.
fn layout(&self) -> &LayoutData;

/// The result [`DType`] of the scan.
fn dtype(&self) -> &DType;

/// Return a [`Scanner`] for the given row mask.
///
/// Note that since a [`Scanner`] returns a single ArrayData, the caller is responsible for
/// ensuring the working set and result of the scan fit into memory. The [`LayoutData`] can
/// be asked for "splits" if the caller needs a hint for how to partition the scan.
fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>>;
}

pub trait LayoutScanExt: LayoutScan {
/// Box the layout scan.
fn boxed(self) -> Box<dyn LayoutScan + 'static>
where
Self: Sized + 'static,
{
Box::new(self)
}
}

impl<L: LayoutScan> LayoutScanExt for L {}

/// The response to polling a scanner.
pub enum Poll {
/// The next chunk has been read.
Some(ArrayData),
/// The scanner requires additional segments before it can make progress.
NeedMore(Vec<SegmentId>),
}

/// A trait for scanning a single row range of a layout.
pub trait Scanner: Send {
/// Attempts to return the [`ArrayData`] result of this ranged scan. If the scanner cannot
/// make progress, it can return a vec of additional data segments using [`Poll::NeedMore`].
///
/// After the poll function has returned an [`ArrayData`], the result of future calls to
/// ['poll'] are undefined.
fn poll(&mut self, segments: &dyn SegmentReader) -> VortexResult<Poll>;
}
38 changes: 38 additions & 0 deletions vortex-layout/src/scanner/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::ops::RangeBounds;

use vortex_array::{ArrayDType, Canonical, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, Identity};

/// The definition of a range scan.
#[derive(Debug, Clone)]
pub struct Scan {
pub projection: ExprRef,
pub filter: Option<ExprRef>,
}

impl Scan {
/// Scan all rows with the identity projection.
pub fn all() -> Self {
Self {
projection: Identity::new_expr(),
filter: None,
}
}

/// Slice the scan to the given row range. The mask of the returned scan is relative to the
/// start of the range.
pub fn slice(&self, _range: impl RangeBounds<u64>) -> Scan {
todo!()
}

/// Compute the result dtype of the scan given the input dtype.
pub fn result_dtype(&self, dtype: &DType) -> VortexResult<DType> {
Ok(self
.projection
.evaluate(&Canonical::empty(dtype)?.into_array())?
.dtype()
.clone())
}
}
93 changes: 93 additions & 0 deletions vortex-layout/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::ops::Deref;

use bytes::Bytes;
use vortex_array::ArrayData;
use vortex_ipc::messages::{EncoderMessage, MessageEncoder};

/// The identifier for a single segment.
// TODO(ngates): should this be a `[u8]` instead? Allowing for arbitrary segment identifiers?
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SegmentId(u32);

impl From<u32> for SegmentId {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the curiosity of someone unfamiliar with the Vortex code base...how is a Segment defined? What is the relationship between a segment and a chunk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can think of a segment as some abstract buffer of bytes that can be stored anywhere.

Where ArrayData holds buffers, LayoutData holds segment IDs which can later be resolved to segments.

Hmmm... maybe I should just stick with the name "buffer"

fn from(value: u32) -> Self {
Self(value)
}
}

impl Deref for SegmentId {
type Target = u32;

fn deref(&self) -> &Self::Target {
&self.0
}
}

pub trait SegmentReader {
/// Attempt to get the data associated with a given segment ID.
///
/// If the segment ID is not found, `None` is returned.
fn get(&self, id: SegmentId) -> Option<Bytes>;
}

pub trait SegmentWriter {
/// Write the given data into a segment and return its identifier.
/// The provided buffers are concatenated together to form the segment.
fn put(&mut self, data: Vec<Bytes>) -> SegmentId;

// TODO(ngates): convert this to take an `ArrayParts` so it's obvious to the caller that the
// serialized message does not include the array's length or dtype.
// TODO(ngates): do not use the IPC message encoder since it adds extra unnecessary framing.
fn put_chunk(&mut self, array: ArrayData) -> SegmentId {
self.put(MessageEncoder::default().encode(EncoderMessage::Array(&array)))
}
}

#[cfg(test)]
pub mod test {
use bytes::{Bytes, BytesMut};
use vortex_error::{vortex_panic, VortexExpect};

use super::*;
use crate::scanner::{LayoutScan, Poll};
use crate::segments::SegmentReader;
use crate::RowMask;

#[derive(Default)]
pub struct TestSegments {
segments: Vec<Bytes>,
}

impl TestSegments {
pub fn do_scan(&self, scan: &dyn LayoutScan) -> ArrayData {
let mut scanner = scan
.create_scanner(RowMask::new_valid_between(0, scan.layout().row_count()))
.vortex_expect("Failed to create scanner");
match scanner.poll(self).vortex_expect("Failed to poll scanner") {
Poll::Some(array) => array,
Poll::NeedMore(_segments) => {
vortex_panic!("Layout requested more segments from TestSegments.")
}
}
}
}

impl SegmentReader for TestSegments {
fn get(&self, id: SegmentId) -> Option<Bytes> {
self.segments.get(*id as usize).cloned()
}
}

impl SegmentWriter for TestSegments {
fn put(&mut self, data: Vec<Bytes>) -> SegmentId {
let id = u32::try_from(self.segments.len())
.vortex_expect("Cannot store more than u32::MAX segments");
let mut buffer = BytesMut::with_capacity(data.iter().map(Bytes::len).sum());
for bytes in data {
buffer.extend_from_slice(&bytes);
}
self.segments.push(buffer.freeze());
id.into()
}
}
}
78 changes: 78 additions & 0 deletions vortex-layout/src/strategies/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! This is a collection of built-in layout strategies designed to be used in conjunction with one
//! another to develop an overall strategy.
//!
//! Each [`LayoutWriter`] is passed horizontal chunks of a Vortex array one-by-one, and is
//! eventually asked to return a [`LayoutData`]. The writers can buffer, re-chunk, flush, or
//! otherwise manipulate the chunks of data enabling experimentation with different strategies
//! all while remaining independent of the read code.
mod struct_of_chunks;

pub use struct_of_chunks::*;
use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentWriter;
use crate::LayoutData;

/// A strategy for writing chunks of an array into a layout.
/// FIXME(ngates): move this into writer.rs
pub trait LayoutWriter: Send {
fn push_chunk(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<()>;

fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData>;
}

pub trait LayoutWriterExt: LayoutWriter {
/// Box the layout writer.
fn boxed(self) -> Box<dyn LayoutWriter>
where
Self: Sized + 'static,
{
Box::new(self)
}

/// Push a single chunk into the layout writer and return the finished [`LayoutData`].
fn push_one(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<LayoutData> {
self.push_chunk(segments, chunk)?;
self.finish(segments)
}

/// Push all chunks of the iterator into the layout writer and return the finished
/// [`LayoutData`].
fn push_all<I: IntoIterator<Item = VortexResult<ArrayData>>>(
&mut self,
segments: &mut dyn SegmentWriter,
iter: I,
) -> VortexResult<LayoutData> {
for chunk in iter.into_iter() {
self.push_chunk(segments, chunk?)?
}
self.finish(segments)
}
}

impl<L: LayoutWriter> LayoutWriterExt for L {}

/// A trait for creating new layout writers given a DType.
pub trait LayoutStrategy: Send {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>>;
}

/// Implement the [`LayoutStrategy`] trait for the [`FlatLayout`] for easy use.
impl LayoutStrategy for FlatLayout {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
Ok(FlatLayoutWriter::new(dtype.clone()).boxed())
}
}
44 changes: 44 additions & 0 deletions vortex-layout/src/strategies/struct_of_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, VortexResult};

use crate::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter};
use crate::layouts::struct_::writer::StructLayoutWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter};

/// Struct-of-chunks is the default Vortex layout strategy.
///
/// This layout first splits data into struct columns, before applying chunking as per the
/// provided batches.
///
/// TODO(ngates): add configuration options to this struct to re-chunk the data within each
/// column by size.
pub struct StructOfChunks;

impl LayoutStrategy for StructOfChunks {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
match dtype {
DType::Struct(struct_dtype, nullability) => {
if nullability == &Nullability::Nullable {
vortex_bail!("Structs with nullable fields are not supported");
}

Ok(Box::new(StructLayoutWriter::new(
dtype.clone(),
struct_dtype
.dtypes()
.iter()
.map(|col_dtype| default_column_layout(col_dtype))
.collect(),
)))
}
_ => Ok(default_column_layout(dtype)),
}
}
}

fn default_column_layout(dtype: &DType) -> Box<dyn LayoutWriter> {
Box::new(ChunkedLayoutWriter::new(
dtype,
ChunkedLayoutOptions::default(),
)) as _
}
1 change: 1 addition & 0 deletions xtask/src/main.rs
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ fn execute_generate_fbs() -> anyhow::Result<()> {
"./flatbuffers/vortex-scalar/scalar.fbs",
"./flatbuffers/vortex-array/array.fbs",
"./flatbuffers/vortex-serde/footer.fbs",
"./flatbuffers/vortex-layout/layout.fbs",
"./flatbuffers/vortex-serde/message.fbs",
];