Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vortex Layouts File V2 #1830

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ vortex-flatbuffers = { version = "0.21.1", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.21.1", path = "./encodings/fsst" }
vortex-io = { version = "0.21.1", path = "./vortex-io" }
vortex-ipc = { version = "0.21.1", path = "./vortex-ipc" }
vortex-layout = { version = "0.21.1", path = "./vortex-layout" }
vortex-proto = { version = "0.21.1", path = "./vortex-proto" }
vortex-roaring = { version = "0.21.1", path = "./encodings/roaring" }
vortex-runend = { version = "0.21.1", path = "./encodings/runend" }
Expand Down
1 change: 1 addition & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["file"] }
vortex-io = { workspace = true }
vortex-ipc = { workspace = true }
vortex-layout = { workspace = true }
vortex-scalar = { workspace = true, features = ["flatbuffers"] }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod write;
mod byte_range;
#[cfg(test)]
mod tests;
pub mod v2;

/// The current version of the Vortex file format
pub const VERSION: u16 = 1;
Expand Down
69 changes: 69 additions & 0 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Read;

use futures_util::stream;
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_io::VortexReadAt;
use vortex_layout::scanner::{Poll, Scan};
use vortex_layout::{LayoutData, RowMask};

use crate::v2::footer::Segment;
use crate::v2::segments::SegmentCache;

pub struct VortexFile<R> {
pub(crate) read: R,
pub(crate) ctx: ContextRef,
pub(crate) layout: LayoutData,
pub(crate) segments: Vec<Segment>,
pub(crate) segment_cache: SegmentCache,
}

/// Async implementation of Vortex File.
impl<R: VortexReadAt> VortexFile<R> {
/// Returns the number of rows in the file.
pub fn row_count(&self) -> u64 {
self.layout.row_count()
}

/// Returns the DType of the file.
pub fn dtype(&self) -> &DType {
self.layout.dtype()
}

/// Performs a scan operation over the file.
pub fn scan(&self, scan: Scan) -> VortexResult<impl ArrayStream + '_> {
let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?;
let scan_dtype = layout_scan.dtype().clone();

// TODO(ngates): we could query the layout for splits and then process them in parallel.
// For now, we just scan the entire layout with one mask.
// Note that to implement this we would use stream::try_unfold
let row_mask = RowMask::new_valid_between(0, layout_scan.layout().row_count());
let mut scanner = layout_scan.create_scanner(row_mask)?;

let stream = stream::once(async move {
loop {
match scanner.poll(&self.segment_cache)? {
Poll::Some(array) => return Ok(array),
Poll::NeedMore(segment_ids) => {
for segment_id in segment_ids {
let segment = &self.segments[*segment_id as usize];
let bytes = self
.read
.read_byte_range(segment.offset, segment.length as u64)
.await?;
self.segment_cache.set(segment_id, bytes);
}
}
}
}
});

Ok(ArrayStreamAdapter::new(scan_dtype, stream))
}
}

/// Sync implementation of Vortex File.
impl<R: Read> VortexFile<R> {}
39 changes: 39 additions & 0 deletions vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_layout::LayoutData;

use crate::v2::footer::segment::Segment;

/// Captures the layout information of a Vortex file.
#[derive(Clone)]
pub(crate) struct FileLayout {
pub(crate) root_layout: LayoutData,
pub(crate) segments: Vec<Segment>,
}

impl FlatBufferRoot for FileLayout {}

impl WriteFlatBuffer for FileLayout {
type Target<'a> = fb::FileLayout<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
let root_layout = self.root_layout.write_flatbuffer(fbb);

let segments = self
.segments
.iter()
.map(|segment| segment.write_flatbuffer(fbb))
.collect::<Vec<_>>();
let segments = fbb.create_vector(&segments);

fb::FileLayout::create(
fbb,
&fb::FileLayoutArgs {
root_layout: Some(root_layout),
segments: Some(segments),
},
)
}
}
7 changes: 7 additions & 0 deletions vortex-file/src/v2/footer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod file_layout;
mod postscript;
mod segment;

pub(crate) use file_layout::*;
pub(crate) use postscript::*;
pub(crate) use segment::*;
52 changes: 52 additions & 0 deletions vortex-file/src/v2/footer/postscript.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use flatbuffers::Follow;
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer};

use crate::v2::footer::segment::Segment;

/// Captures the layout information of a Vortex file.
pub(crate) struct Postscript {
pub(crate) dtype: Segment,
pub(crate) file_layout: Segment,
}

impl FlatBufferRoot for Postscript {}

impl WriteFlatBuffer for Postscript {
type Target<'a> = fb::Postscript<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
let dtype = self.dtype.write_flatbuffer(fbb);
let file_layout = self.file_layout.write_flatbuffer(fbb);
fb::Postscript::create(
fbb,
&fb::PostscriptArgs {
dtype: Some(dtype),
file_layout: Some(file_layout),
},
)
}
}

impl ReadFlatBuffer for Postscript {
type Source<'a> = fb::Postscript<'a>;
type Error = VortexError;

fn read_flatbuffer<'buf>(
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
) -> Result<Self, Self::Error> {
Ok(Self {
dtype: Segment::read_flatbuffer(
&fb.dtype()
.ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?,
)?,
file_layout: Segment::read_flatbuffer(
&fb.file_layout()
.ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?,
)?,
})
}
}
42 changes: 42 additions & 0 deletions vortex-file/src/v2/footer/segment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer};

/// The location of a segment within a Vortex file.
#[derive(Clone, Debug)]
pub(crate) struct Segment {
pub(crate) offset: u64,
pub(crate) length: usize,
}

impl WriteFlatBuffer for Segment {
type Target<'a> = fb::Segment<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
fb::Segment::create(
fbb,
&fb::SegmentArgs {
offset: self.offset,
length: self.length as u64,
},
)
}
}

impl ReadFlatBuffer for Segment {
type Source<'a> = fb::Segment<'a>;
type Error = VortexError;

fn read_flatbuffer<'buf>(
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
) -> Result<Self, Self::Error> {
Ok(Self {
offset: fb.offset(),
length: usize::try_from(fb.length())
.map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
})
}
}
12 changes: 12 additions & 0 deletions vortex-file/src/v2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod file;
mod footer;
mod open;
mod segments;
mod strategy;
#[cfg(test)]
mod tests;
mod writer;

pub use file::*;
pub use open::*;
pub use writer::*;
Loading
Loading