From a2537c20e5aa9bfb5e0caa4f84bd649e018bcdd7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 Jan 2025 16:27:22 +0000 Subject: [PATCH 1/4] File V2 --- Cargo.lock | 1 + Cargo.toml | 1 + vortex-file/Cargo.toml | 1 + vortex-file/src/lib.rs | 1 + vortex-file/src/v2/footer/file_layout.rs | 38 ++ vortex-file/src/v2/footer/mod.rs | 7 + vortex-file/src/v2/footer/postscript.rs | 30 ++ vortex-file/src/v2/footer/segment.rs | 25 + vortex-file/src/v2/mod.rs | 6 + vortex-file/src/v2/segments.rs | 42 ++ vortex-file/src/v2/strategy.rs | 17 + vortex-file/src/v2/writer.rs | 128 +++++ .../flatbuffers/vortex-file/footer2.fbs | 34 ++ vortex-flatbuffers/src/generated/footer2.rs | 439 ++++++++++++++++++ vortex-flatbuffers/src/lib.rs | 22 + vortex-layout/src/lib.rs | 2 +- vortex-layout/src/segments/mod.rs | 2 +- xtask/src/main.rs | 1 + 18 files changed, 795 insertions(+), 2 deletions(-) create mode 100644 vortex-file/src/v2/footer/file_layout.rs create mode 100644 vortex-file/src/v2/footer/mod.rs create mode 100644 vortex-file/src/v2/footer/postscript.rs create mode 100644 vortex-file/src/v2/footer/segment.rs create mode 100644 vortex-file/src/v2/mod.rs create mode 100644 vortex-file/src/v2/segments.rs create mode 100644 vortex-file/src/v2/strategy.rs create mode 100644 vortex-file/src/v2/writer.rs create mode 100644 vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs create mode 100644 vortex-flatbuffers/src/generated/footer2.rs diff --git a/Cargo.lock b/Cargo.lock index 5c090b2bb..04f2115d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5119,6 +5119,7 @@ dependencies = [ "vortex-flatbuffers", "vortex-io", "vortex-ipc", + "vortex-layout", "vortex-scalar", ] diff --git a/Cargo.toml b/Cargo.toml index 9ba004920..3ed417f85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ vortex-flatbuffers = { version = "0.21.1", path = "./vortex-flatbuffers" } vortex-fsst = { version = "0.21.1", path = "./encodings/fsst" } vortex-io = { version = "0.21.1", path = "./vortex-io" } vortex-ipc = { version = "0.21.1", path = "./vortex-ipc" } +vortex-layout = { version = "0.21.1", path = "./vortex-layout" } vortex-proto = { version = "0.21.1", path = "./vortex-proto" } vortex-roaring = { version = "0.21.1", path = "./encodings/roaring" } vortex-runend = { version = "0.21.1", path = "./encodings/runend" } diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index ae1c02636..76d4a81dc 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -34,6 +34,7 @@ vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true, features = ["file"] } vortex-io = { workspace = true } vortex-ipc = { workspace = true } +vortex-layout = { workspace = true } vortex-scalar = { workspace = true, features = ["flatbuffers"] } [dev-dependencies] diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 4aef82fdd..085fa3589 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -72,6 +72,7 @@ mod write; mod byte_range; #[cfg(test)] mod tests; +pub mod v2; /// The current version of the Vortex file format pub const VERSION: u16 = 1; diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs new file mode 100644 index 000000000..d9b0b9fd6 --- /dev/null +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -0,0 +1,38 @@ +use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; +use vortex_layout::LayoutData; + +use crate::v2::footer::segment::Segment; + +/// Captures the layout information of a Vortex file. +pub(crate) struct FileLayout { + pub(crate) root_layout: LayoutData, + pub(crate) segments: Vec, +} + +impl FlatBufferRoot for FileLayout {} + +impl WriteFlatBuffer for FileLayout { + type Target<'a> = fb::FileLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut flatbuffers::FlatBufferBuilder<'fb>, + ) -> flatbuffers::WIPOffset> { + let root_layout = self.root_layout.write_flatbuffer(fbb); + + let segments = self + .segments + .iter() + .map(|segment| segment.write_flatbuffer(fbb)) + .collect::>(); + let segments = fbb.create_vector(&segments); + + fb::FileLayout::create( + fbb, + &fb::FileLayoutArgs { + root_layout: Some(root_layout), + segments: Some(segments), + }, + ) + } +} diff --git a/vortex-file/src/v2/footer/mod.rs b/vortex-file/src/v2/footer/mod.rs new file mode 100644 index 000000000..a1448e0b5 --- /dev/null +++ b/vortex-file/src/v2/footer/mod.rs @@ -0,0 +1,7 @@ +mod file_layout; +mod postscript; +mod segment; + +pub(crate) use file_layout::*; +pub(crate) use postscript::*; +pub(crate) use segment::*; diff --git a/vortex-file/src/v2/footer/postscript.rs b/vortex-file/src/v2/footer/postscript.rs new file mode 100644 index 000000000..d82b89b78 --- /dev/null +++ b/vortex-file/src/v2/footer/postscript.rs @@ -0,0 +1,30 @@ +use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; + +use crate::v2::footer::segment::Segment; + +/// Captures the layout information of a Vortex file. +pub(crate) struct Postscript { + pub(crate) dtype: Segment, + pub(crate) file_layout: Segment, +} + +impl FlatBufferRoot for Postscript {} + +impl WriteFlatBuffer for Postscript { + type Target<'a> = fb::Postscript<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut flatbuffers::FlatBufferBuilder<'fb>, + ) -> flatbuffers::WIPOffset> { + let dtype = self.dtype.write_flatbuffer(fbb); + let file_layout = self.file_layout.write_flatbuffer(fbb); + fb::Postscript::create( + fbb, + &fb::PostscriptArgs { + dtype: Some(dtype), + file_layout: Some(file_layout), + }, + ) + } +} diff --git a/vortex-file/src/v2/footer/segment.rs b/vortex-file/src/v2/footer/segment.rs new file mode 100644 index 000000000..1ccbfee12 --- /dev/null +++ b/vortex-file/src/v2/footer/segment.rs @@ -0,0 +1,25 @@ +use flatbuffers::{FlatBufferBuilder, WIPOffset}; +use vortex_flatbuffers::{footer2 as fb, WriteFlatBuffer}; + +/// The location of a segment within a Vortex file. +pub(crate) struct Segment { + pub(crate) offset: u64, + pub(crate) length: usize, +} + +impl WriteFlatBuffer for Segment { + type Target<'a> = fb::Segment<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + fb::Segment::create( + fbb, + &fb::SegmentArgs { + offset: self.offset, + length: self.length as u64, + }, + ) + } +} diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs new file mode 100644 index 000000000..5fa20b565 --- /dev/null +++ b/vortex-file/src/v2/mod.rs @@ -0,0 +1,6 @@ +mod footer; +mod segments; +mod strategy; +mod writer; + +pub use writer::*; diff --git a/vortex-file/src/v2/segments.rs b/vortex-file/src/v2/segments.rs new file mode 100644 index 000000000..e636add43 --- /dev/null +++ b/vortex-file/src/v2/segments.rs @@ -0,0 +1,42 @@ +use bytes::Bytes; +use vortex_error::{vortex_err, VortexResult}; +use vortex_io::VortexWrite; +use vortex_layout::segments::{SegmentId, SegmentWriter}; + +use crate::v2::footer::Segment; + +/// A segment writer that holds buffers in memory until they are flushed by a writer. +#[derive(Default)] +pub(crate) struct BufferedSegmentWriter { + segments: Vec>, + next_id: SegmentId, +} + +impl SegmentWriter for BufferedSegmentWriter { + fn put(&mut self, data: Vec) -> SegmentId { + self.segments.push(data); + let id = self.next_id; + self.next_id = SegmentId::from(*self.next_id + 1); + id + } +} + +impl BufferedSegmentWriter { + /// Flush the segments to the provided async writer. + pub async fn flush_async( + &mut self, + write: &mut futures_util::io::Cursor, + segments: &mut Vec, + ) -> VortexResult<()> { + for segment in self.segments.drain(..) { + let offset = write.position(); + for buffer in segment { + write.write_all(buffer).await?; + } + let length = usize::try_from(write.position() - offset) + .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?; + segments.push(Segment { offset, length }); + } + Ok(()) + } +} diff --git a/vortex-file/src/v2/strategy.rs b/vortex-file/src/v2/strategy.rs new file mode 100644 index 000000000..858fd73d5 --- /dev/null +++ b/vortex-file/src/v2/strategy.rs @@ -0,0 +1,17 @@ +//! This module defines the default layout strategy for a Vortex file. + +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_layout::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter}; +use vortex_layout::strategies::{LayoutStrategy, LayoutWriter, LayoutWriterExt}; + +/// The default Vortex file layout strategy. +/// +/// The current implementation is a placeholder and needs to be fleshed out. +pub struct VortexLayoutStrategy; + +impl LayoutStrategy for VortexLayoutStrategy { + fn new_writer(&self, dtype: &DType) -> VortexResult> { + Ok(ChunkedLayoutWriter::new(dtype, ChunkedLayoutOptions::default()).boxed()) + } +} diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs new file mode 100644 index 000000000..4c675f96b --- /dev/null +++ b/vortex-file/src/v2/writer.rs @@ -0,0 +1,128 @@ +use std::io::Write; + +use futures_util::StreamExt; +use vortex_array::iter::ArrayIterator; +use vortex_array::stream::ArrayStream; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_flatbuffers::WriteFlatBufferExt; +use vortex_io::VortexWrite; +use vortex_layout::strategies::LayoutStrategy; + +use crate::v2::footer::{FileLayout, Postscript, Segment}; +use crate::v2::segments::BufferedSegmentWriter; +use crate::v2::strategy::VortexLayoutStrategy; +use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; + +pub struct WriteOptions { + strategy: Box, +} + +impl Default for WriteOptions { + fn default() -> Self { + Self { + strategy: Box::new(VortexLayoutStrategy), + } + } +} + +impl WriteOptions { + /// Replace the default layout strategy with the provided one. + pub fn with_strategy(mut self, strategy: Box) -> Self { + self.strategy = strategy; + self + } +} + +impl WriteOptions { + /// Perform a blocking write of the provided iterator of `ArrayData`. + pub fn write_sync(self, _write: W, _iter: I) -> VortexResult<()> { + todo!() + } + + /// Perform an async write of the provided stream of `ArrayData`. + pub async fn write_async( + self, + write: W, + mut stream: S, + ) -> VortexResult { + // Set up the root layout + let mut layout_writer = self.strategy.new_writer(stream.dtype())?; + + // First we write the magic number + let mut write = futures_util::io::Cursor::new(write); + write.write_all(MAGIC_BYTES).await?; + + // Our buffered message writer accumulates messages for each batch so we can flush them + // into the file. + let mut segment_writer = BufferedSegmentWriter::default(); + let mut segments = vec![]; + + // Then write the stream via the root layout + while let Some(chunk) = stream.next().await { + layout_writer.push_chunk(&mut segment_writer, chunk?)?; + // NOTE(ngates): we could spawn this task and continue to compress the next chunk. + segment_writer + .flush_async(&mut write, &mut segments) + .await?; + } + + // Write the final messages into the file + let root_layout = layout_writer.finish(&mut segment_writer)?; + segment_writer + .flush_async(&mut write, &mut segments) + .await?; + + // TODO(ngates): we may want to just write the FileLayout and DType via segment writer? + let dtype_offset = write.position(); + write + .write_all(root_layout.write_flatbuffer_bytes()) + .await?; + let dtype_segment = Segment { + offset: dtype_offset, + length: usize::try_from(write.position() - dtype_offset) + .map_err(|_| vortex_err!("dtype segment length exceeds maximum usize"))?, + }; + + let layout_offset = write.position(); + write + .write_all( + FileLayout { + root_layout, + segments, + } + .write_flatbuffer_bytes(), + ) + .await?; + let file_layout_segment = Segment { + offset: layout_offset, + length: usize::try_from(write.position() - layout_offset) + .map_err(|_| vortex_err!("layout segment length exceeds maximum usize"))?, + }; + + // Then the postscript, that we write manually to avoid any framing. + let postscript = Postscript { + dtype: dtype_segment, + file_layout: file_layout_segment, + }; + let postscript_buffer = postscript.write_flatbuffer_bytes(); + if postscript_buffer.len() > MAX_FOOTER_SIZE as usize { + vortex_bail!( + "Postscript is too large ({} bytes); max postscript size is {}", + postscript_buffer.len(), + MAX_FOOTER_SIZE + ); + } + let postscript_len = u16::try_from(postscript_buffer.len()) + .vortex_expect("Postscript already verified to fit into u16"); + write.write_all(postscript_buffer).await?; + + // And finally, the EOF 8-byte footer. + let mut eof = [0u8; EOF_SIZE]; + eof[0..2].copy_from_slice(&VERSION.to_le_bytes()); + eof[2..4].copy_from_slice(&postscript_len.to_le_bytes()); + eof[4..8].copy_from_slice(&MAGIC_BYTES); + write.write_all(eof).await?; + + Ok(write.into_inner()) + } +} diff --git a/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs b/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs new file mode 100644 index 000000000..0c2d1b553 --- /dev/null +++ b/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs @@ -0,0 +1,34 @@ +include "vortex-layout/layout.fbs"; + +/// A `Segment` acts as the locator for a buffer within the file. +table Segment { + offset: uint64; + length: uint64; + // NOTE(ngates): add compression and encryption information +} + +/// The `FileLayout` stores the root `Layout` as well as location information for each referenced segment. +table FileLayout { + root_layout: Layout; + segments: [Segment]; +} + +/// The `Postscript` is guaranteed by the file format to never exceed 65528 bytes (i.e., u16::MAX - 8 bytes) +/// in length, and is immediately followed by an 8-byte `EndOfFile` struct. +/// +/// The `EndOfFile` struct cannot change size without breaking backwards compatibility. It is not written/read +/// using flatbuffers, but the equivalent flatbuffer definition would be: +/// +/// struct EndOfFile { +/// version: uint16; +/// footer_length: uint16; +/// magic: [uint8; 4]; // "VTXF" +/// } +/// +table Postscript { + dtype: Segment; + file_layout: Segment; +} + +root_type Layout; +root_type Postscript; diff --git a/vortex-flatbuffers/src/generated/footer2.rs b/vortex-flatbuffers/src/generated/footer2.rs new file mode 100644 index 000000000..49b374b0e --- /dev/null +++ b/vortex-flatbuffers/src/generated/footer2.rs @@ -0,0 +1,439 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +// @generated + +use crate::layout::*; +use core::mem; +use core::cmp::Ordering; + +extern crate flatbuffers; +use self::flatbuffers::{EndianScalar, Follow}; + +pub enum SegmentOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// A `Segment` acts as the locator for a buffer within the file. +pub struct Segment<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Segment<'a> { + type Inner = Segment<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> Segment<'a> { + pub const VT_OFFSET: flatbuffers::VOffsetT = 4; + pub const VT_LENGTH: flatbuffers::VOffsetT = 6; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Segment { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args SegmentArgs + ) -> flatbuffers::WIPOffset> { + let mut builder = SegmentBuilder::new(_fbb); + builder.add_length(args.length); + builder.add_offset(args.offset); + builder.finish() + } + + + #[inline] + pub fn offset(&self) -> u64 { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(Segment::VT_OFFSET, Some(0)).unwrap()} + } + #[inline] + pub fn length(&self) -> u64 { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(Segment::VT_LENGTH, Some(0)).unwrap()} + } +} + +impl flatbuffers::Verifiable for Segment<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::("offset", Self::VT_OFFSET, false)? + .visit_field::("length", Self::VT_LENGTH, false)? + .finish(); + Ok(()) + } +} +pub struct SegmentArgs { + pub offset: u64, + pub length: u64, +} +impl<'a> Default for SegmentArgs { + #[inline] + fn default() -> Self { + SegmentArgs { + offset: 0, + length: 0, + } + } +} + +pub struct SegmentBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> SegmentBuilder<'a, 'b, A> { + #[inline] + pub fn add_offset(&mut self, offset: u64) { + self.fbb_.push_slot::(Segment::VT_OFFSET, offset, 0); + } + #[inline] + pub fn add_length(&mut self, length: u64) { + self.fbb_.push_slot::(Segment::VT_LENGTH, length, 0); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> SegmentBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + SegmentBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for Segment<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("Segment"); + ds.field("offset", &self.offset()); + ds.field("length", &self.length()); + ds.finish() + } +} +pub enum FileLayoutOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// The `FileLayout` stores the root `Layout` as well as location information for each referenced segment. +pub struct FileLayout<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for FileLayout<'a> { + type Inner = FileLayout<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> FileLayout<'a> { + pub const VT_ROOT_LAYOUT: flatbuffers::VOffsetT = 4; + pub const VT_SEGMENTS: flatbuffers::VOffsetT = 6; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + FileLayout { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args FileLayoutArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = FileLayoutBuilder::new(_fbb); + if let Some(x) = args.segments { builder.add_segments(x); } + if let Some(x) = args.root_layout { builder.add_root_layout(x); } + builder.finish() + } + + + #[inline] + pub fn root_layout(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>(FileLayout::VT_ROOT_LAYOUT, None)} + } + #[inline] + pub fn segments(&self) -> Option>>> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>>>(FileLayout::VT_SEGMENTS, None)} + } +} + +impl flatbuffers::Verifiable for FileLayout<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>("root_layout", Self::VT_ROOT_LAYOUT, false)? + .visit_field::>>>("segments", Self::VT_SEGMENTS, false)? + .finish(); + Ok(()) + } +} +pub struct FileLayoutArgs<'a> { + pub root_layout: Option>>, + pub segments: Option>>>>, +} +impl<'a> Default for FileLayoutArgs<'a> { + #[inline] + fn default() -> Self { + FileLayoutArgs { + root_layout: None, + segments: None, + } + } +} + +pub struct FileLayoutBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> FileLayoutBuilder<'a, 'b, A> { + #[inline] + pub fn add_root_layout(&mut self, root_layout: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(FileLayout::VT_ROOT_LAYOUT, root_layout); + } + #[inline] + pub fn add_segments(&mut self, segments: flatbuffers::WIPOffset>>>) { + self.fbb_.push_slot_always::>(FileLayout::VT_SEGMENTS, segments); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> FileLayoutBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + FileLayoutBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for FileLayout<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("FileLayout"); + ds.field("root_layout", &self.root_layout()); + ds.field("segments", &self.segments()); + ds.finish() + } +} +pub enum PostscriptOffset {} +#[derive(Copy, Clone, PartialEq)] + +/// The `Postscript` is guaranteed by the file format to never exceed 65528 bytes (i.e., u16::MAX - 8 bytes) +/// in length, and is immediately followed by an 8-byte `EndOfFile` struct. +/// +/// The `EndOfFile` struct cannot change size without breaking backwards compatibility. It is not written/read +/// using flatbuffers, but the equivalent flatbuffer definition would be: +/// +/// struct EndOfFile { +/// version: uint16; +/// footer_length: uint16; +/// magic: [uint8; 4]; // "VTXF" +/// } +/// +pub struct Postscript<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Postscript<'a> { + type Inner = Postscript<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> Postscript<'a> { + pub const VT_DTYPE: flatbuffers::VOffsetT = 4; + pub const VT_FILE_LAYOUT: flatbuffers::VOffsetT = 6; + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Postscript { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args PostscriptArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = PostscriptBuilder::new(_fbb); + if let Some(x) = args.file_layout { builder.add_file_layout(x); } + if let Some(x) = args.dtype { builder.add_dtype(x); } + builder.finish() + } + + + #[inline] + pub fn dtype(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>(Postscript::VT_DTYPE, None)} + } + #[inline] + pub fn file_layout(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>(Postscript::VT_FILE_LAYOUT, None)} + } +} + +impl flatbuffers::Verifiable for Postscript<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>("dtype", Self::VT_DTYPE, false)? + .visit_field::>("file_layout", Self::VT_FILE_LAYOUT, false)? + .finish(); + Ok(()) + } +} +pub struct PostscriptArgs<'a> { + pub dtype: Option>>, + pub file_layout: Option>>, +} +impl<'a> Default for PostscriptArgs<'a> { + #[inline] + fn default() -> Self { + PostscriptArgs { + dtype: None, + file_layout: None, + } + } +} + +pub struct PostscriptBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PostscriptBuilder<'a, 'b, A> { + #[inline] + pub fn add_dtype(&mut self, dtype: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Postscript::VT_DTYPE, dtype); + } + #[inline] + pub fn add_file_layout(&mut self, file_layout: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Postscript::VT_FILE_LAYOUT, file_layout); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PostscriptBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + PostscriptBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +impl core::fmt::Debug for Postscript<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("Postscript"); + ds.field("dtype", &self.dtype()); + ds.field("file_layout", &self.file_layout()); + ds.finish() + } +} +#[inline] +/// Verifies that a buffer of bytes contains a `Postscript` +/// and returns it. +/// Note that verification is still experimental and may not +/// catch every error, or be maximally performant. For the +/// previous, unchecked, behavior use +/// `root_as_postscript_unchecked`. +pub fn root_as_postscript(buf: &[u8]) -> Result { + flatbuffers::root::(buf) +} +#[inline] +/// Verifies that a buffer of bytes contains a size prefixed +/// `Postscript` and returns it. +/// Note that verification is still experimental and may not +/// catch every error, or be maximally performant. For the +/// previous, unchecked, behavior use +/// `size_prefixed_root_as_postscript_unchecked`. +pub fn size_prefixed_root_as_postscript(buf: &[u8]) -> Result { + flatbuffers::size_prefixed_root::(buf) +} +#[inline] +/// Verifies, with the given options, that a buffer of bytes +/// contains a `Postscript` and returns it. +/// Note that verification is still experimental and may not +/// catch every error, or be maximally performant. For the +/// previous, unchecked, behavior use +/// `root_as_postscript_unchecked`. +pub fn root_as_postscript_with_opts<'b, 'o>( + opts: &'o flatbuffers::VerifierOptions, + buf: &'b [u8], +) -> Result, flatbuffers::InvalidFlatbuffer> { + flatbuffers::root_with_opts::>(opts, buf) +} +#[inline] +/// Verifies, with the given verifier options, that a buffer of +/// bytes contains a size prefixed `Postscript` and returns +/// it. Note that verification is still experimental and may not +/// catch every error, or be maximally performant. For the +/// previous, unchecked, behavior use +/// `root_as_postscript_unchecked`. +pub fn size_prefixed_root_as_postscript_with_opts<'b, 'o>( + opts: &'o flatbuffers::VerifierOptions, + buf: &'b [u8], +) -> Result, flatbuffers::InvalidFlatbuffer> { + flatbuffers::size_prefixed_root_with_opts::>(opts, buf) +} +#[inline] +/// Assumes, without verification, that a buffer of bytes contains a Postscript and returns it. +/// # Safety +/// Callers must trust the given bytes do indeed contain a valid `Postscript`. +pub unsafe fn root_as_postscript_unchecked(buf: &[u8]) -> Postscript { + flatbuffers::root_unchecked::(buf) +} +#[inline] +/// Assumes, without verification, that a buffer of bytes contains a size prefixed Postscript and returns it. +/// # Safety +/// Callers must trust the given bytes do indeed contain a valid size prefixed `Postscript`. +pub unsafe fn size_prefixed_root_as_postscript_unchecked(buf: &[u8]) -> Postscript { + flatbuffers::size_prefixed_root_unchecked::(buf) +} +#[inline] +pub fn finish_postscript_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>( + fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + root: flatbuffers::WIPOffset>) { + fbb.finish(root, None); +} + +#[inline] +pub fn finish_size_prefixed_postscript_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>(fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, root: flatbuffers::WIPOffset>) { + fbb.finish_size_prefixed(root, None); +} diff --git a/vortex-flatbuffers/src/lib.rs b/vortex-flatbuffers/src/lib.rs index f9192dca6..7370fa9cd 100644 --- a/vortex-flatbuffers/src/lib.rs +++ b/vortex-flatbuffers/src/lib.rs @@ -92,6 +92,28 @@ pub mod scalar; /// ``` pub mod footer; +#[cfg(feature = "file")] +#[allow(clippy::all)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[allow(clippy::many_single_char_names)] +#[allow(clippy::unwrap_used)] +#[allow(dead_code)] +#[allow(non_snake_case)] +#[allow(non_camel_case_types)] +#[allow(unsafe_op_in_unsafe_fn)] +#[allow(unused_imports)] +#[allow(unused_lifetimes)] +#[allow(unused_qualifications)] +#[rustfmt::skip] +#[path = "./generated/footer2.rs"] +/// A file format footer containing a serialized `vortex-file` Layout. +/// +/// `footer.fbs`: +/// ```flatbuffers +#[doc = include_str!("../flatbuffers/vortex-file/footer2.fbs")] +/// ``` +pub mod footer2; + #[cfg(feature = "layout")] #[allow(clippy::all)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index 481cb5a4c..d2de02e01 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -10,7 +10,7 @@ pub mod layouts; pub use encoding::*; mod row_mask; pub use row_mask::*; -mod segments; +pub mod segments; pub mod strategies; /// The layout ID for a flat layout diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index 814e6d9e8..006c58093 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -6,7 +6,7 @@ use vortex_ipc::messages::{EncoderMessage, MessageEncoder}; /// The identifier for a single segment. // TODO(ngates): should this be a `[u8]` instead? Allowing for arbitrary segment identifiers? -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SegmentId(u32); impl From for SegmentId { diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 514b3910d..9b3e2d0b6 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -18,6 +18,7 @@ fn execute_generate_fbs() -> anyhow::Result<()> { "./flatbuffers/vortex-dtype/dtype.fbs", "./flatbuffers/vortex-scalar/scalar.fbs", "./flatbuffers/vortex-array/array.fbs", + "./flatbuffers/vortex-file/footer2.fbs", "./flatbuffers/vortex-serde/footer.fbs", "./flatbuffers/vortex-layout/layout.fbs", "./flatbuffers/vortex-serde/message.fbs", From fb06f874955027fab9b326270ab69600c3960195 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 Jan 2025 17:45:56 +0000 Subject: [PATCH 2/4] File V2 --- Cargo.lock | 1 + vortex-file/Cargo.toml | 1 + vortex-file/src/v2/file.rs | 75 +++++++ vortex-file/src/v2/footer/file_layout.rs | 1 + vortex-file/src/v2/footer/postscript.rs | 24 ++- vortex-file/src/v2/footer/segment.rs | 21 +- vortex-file/src/v2/mod.rs | 4 + vortex-file/src/v2/open.rs | 241 +++++++++++++++++++++++ vortex-file/src/v2/segments.rs | 36 +++- vortex-file/src/v2/writer.rs | 49 +++-- vortex-layout/src/data.rs | 33 ++-- vortex-layout/src/lib.rs | 1 + vortex-layout/src/segments/mod.rs | 2 +- 13 files changed, 443 insertions(+), 46 deletions(-) create mode 100644 vortex-file/src/v2/file.rs create mode 100644 vortex-file/src/v2/open.rs diff --git a/Cargo.lock b/Cargo.lock index 04f2115d7..200f997a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5107,6 +5107,7 @@ dependencies = [ "futures-executor", "futures-util", "itertools 0.13.0", + "moka", "once_cell", "rstest", "tokio", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 76d4a81dc..539aa02d4 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -23,6 +23,7 @@ futures = { workspace = true, features = ["std"] } futures-executor = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } +moka = { workspace = true } once_cell = { workspace = true } tokio = { workspace = true, features = ["rt"] } tracing = { workspace = true, optional = true } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs new file mode 100644 index 000000000..695351a4e --- /dev/null +++ b/vortex-file/src/v2/file.rs @@ -0,0 +1,75 @@ +use std::io::Read; + +use futures_util::stream; +use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; +use vortex_array::ContextRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_expr::ExprRef; +use vortex_io::VortexReadAt; +use vortex_layout::scanner::{Poll, Scan}; +use vortex_layout::{LayoutData, RowMask}; + +use crate::v2::footer::Segment; +use crate::v2::segments::SegmentCache; + +pub struct VortexFile { + pub(crate) read: R, + pub(crate) ctx: ContextRef, + pub(crate) layout: LayoutData, + pub(crate) segments: Vec, + pub(crate) segment_cache: SegmentCache, +} + +/// Async implementation of Vortex File. +impl VortexFile { + /// Returns the number of rows in the file. + pub fn row_count(&self) -> u64 { + self.layout.row_count() + } + + /// Returns the DType of the file. + pub fn dtype(&self) -> &DType { + self.layout.dtype() + } + + /// Performs a scan operation over the file. + pub fn scan( + &self, + projection: ExprRef, + filter: Option, + ) -> VortexResult { + let layout_scan = self + .layout + .new_scan(Scan { projection, filter }, self.ctx.clone())?; + + // TODO(ngates): we could query the layout for splits and then process them in parallel. + // For now, we just scan the entire layout with one mask. + // Note that to implement this we would use stream::try_unfold + let row_mask = RowMask::new_valid_between(0, layout_scan.layout().row_count()); + let mut scanner = layout_scan.create_scanner(row_mask)?; + + let stream = stream::once(async move { + loop { + match scanner.poll(&self.segment_cache)? { + Poll::Some(array) => return Ok(array), + Poll::NeedMore(segment_ids) => { + for segment_id in segment_ids { + let segment = &self.segments[*segment_id as usize]; + let bytes = self + .read + .read_byte_range(segment.offset, segment.length as u64) + .await?; + self.segment_cache.set(segment_id, bytes); + } + } + } + } + }); + + Ok(ArrayStreamAdapter::new(layout_scan.dtype().clone(), stream)) + } +} + +/// Sync implementation of Vortex File. +impl VortexFile {} diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index d9b0b9fd6..03bb5cb98 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -4,6 +4,7 @@ use vortex_layout::LayoutData; use crate::v2::footer::segment::Segment; /// Captures the layout information of a Vortex file. +#[derive(Clone)] pub(crate) struct FileLayout { pub(crate) root_layout: LayoutData, pub(crate) segments: Vec, diff --git a/vortex-file/src/v2/footer/postscript.rs b/vortex-file/src/v2/footer/postscript.rs index d82b89b78..c989a2002 100644 --- a/vortex-file/src/v2/footer/postscript.rs +++ b/vortex-file/src/v2/footer/postscript.rs @@ -1,4 +1,6 @@ -use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer}; +use flatbuffers::Follow; +use vortex_error::{vortex_err, VortexError}; +use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer}; use crate::v2::footer::segment::Segment; @@ -28,3 +30,23 @@ impl WriteFlatBuffer for Postscript { ) } } + +impl ReadFlatBuffer for Postscript { + type Source<'a> = fb::Postscript<'a>; + type Error = VortexError; + + fn read_flatbuffer<'buf>( + fb: & as Follow<'buf>>::Inner, + ) -> Result { + Ok(Self { + dtype: Segment::read_flatbuffer( + &fb.dtype() + .ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?, + )?, + file_layout: Segment::read_flatbuffer( + &fb.file_layout() + .ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?, + )?, + }) + } +} diff --git a/vortex-file/src/v2/footer/segment.rs b/vortex-file/src/v2/footer/segment.rs index 1ccbfee12..2c609e643 100644 --- a/vortex-file/src/v2/footer/segment.rs +++ b/vortex-file/src/v2/footer/segment.rs @@ -1,7 +1,9 @@ -use flatbuffers::{FlatBufferBuilder, WIPOffset}; -use vortex_flatbuffers::{footer2 as fb, WriteFlatBuffer}; +use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset}; +use vortex_error::{vortex_err, VortexError}; +use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer}; /// The location of a segment within a Vortex file. +#[derive(Clone, Debug)] pub(crate) struct Segment { pub(crate) offset: u64, pub(crate) length: usize, @@ -23,3 +25,18 @@ impl WriteFlatBuffer for Segment { ) } } + +impl ReadFlatBuffer for Segment { + type Source<'a> = fb::Segment<'a>; + type Error = VortexError; + + fn read_flatbuffer<'buf>( + fb: & as Follow<'buf>>::Inner, + ) -> Result { + Ok(Self { + offset: fb.offset(), + length: usize::try_from(fb.length()) + .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?, + }) + } +} diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index 5fa20b565..d4eb41c80 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -1,6 +1,10 @@ +mod file; mod footer; +mod open; mod segments; mod strategy; mod writer; +pub use file::*; +pub use open::*; pub use writer::*; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs new file mode 100644 index 000000000..71f868cae --- /dev/null +++ b/vortex-file/src/v2/open.rs @@ -0,0 +1,241 @@ +use std::io::Read; + +use flatbuffers::root; +use itertools::Itertools; +use vortex_array::ContextRef; +use vortex_buffer::{ByteBuffer, ByteBufferMut}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; +use vortex_flatbuffers::{dtype as fbd, footer2 as fb, ReadFlatBuffer}; +use vortex_io::VortexReadAt; +use vortex_layout::segments::SegmentId; +use vortex_layout::{LayoutContextRef, LayoutData, LayoutId}; + +use crate::v2::footer::{FileLayout, Postscript, Segment}; +use crate::v2::segments::SegmentCache; +use crate::v2::VortexFile; +use crate::{EOF_SIZE, MAGIC_BYTES, VERSION}; + +const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB + +/// Open options for a Vortex file reader. +pub struct OpenOptions { + /// The Vortex Array encoding context. + ctx: ContextRef, + /// The Vortex Layout encoding context. + layout_ctx: LayoutContextRef, + /// An optional, externally provided, file layout. + file_layout: Option, + /// An optional, externally provided, dtype. + dtype: Option, + // TODO(ngates): also support a messages_middleware that can wrap a message cache to provide + // additional caching, metrics, or other intercepts, etc. It should support synchronous + // read + write of Map or similar. + initial_read_size: u64, +} + +impl OpenOptions { + pub fn new(ctx: ContextRef) -> Self { + Self { + ctx, + layout_ctx: LayoutContextRef::default(), + file_layout: None, + dtype: None, + initial_read_size: INITIAL_READ_SIZE, + } + } + + /// Configure the initial read size for the Vortex file. + pub fn with_initial_read_size(mut self, initial_read_size: u64) -> VortexResult { + if self.initial_read_size < u16::MAX as u64 { + vortex_bail!("initial_read_size must be at least u16::MAX"); + } + self.initial_read_size = initial_read_size; + Ok(self) + } +} + +impl OpenOptions { + /// Open the Vortex file using synchronous IO. + pub fn open_sync(self, _read: R) -> VortexResult> { + todo!() + } + + /// Open the Vortex file using asynchronous IO. + pub async fn open(self, read: R) -> VortexResult> { + // Fetch the file size and perform the initial read. + let file_size = read.size().await?; + let initial_offset = file_size - self.initial_read_size; + let initial_read: ByteBuffer = read + .read_byte_range(initial_offset, self.initial_read_size) + .await? + .into(); + + // We know the initial read _must_ contain at least the Postscript. + let postscript = self.parse_postscript(&initial_read)?; + + // Check if we need to read more bytes. + // NOTE(ngates): for now, we assume the dtype and layout segments are adjacent. + let (initial_offset, initial_read) = if (self.dtype.is_none() + && postscript.dtype.offset < initial_offset) + || (self.file_layout.is_none() && postscript.file_layout.offset < initial_offset) + { + let offset = postscript.dtype.offset.min(postscript.file_layout.offset); + let mut new_initial_read = + ByteBufferMut::with_capacity(usize::try_from(file_size - offset)?); + new_initial_read.extend_from_slice( + &read + .read_byte_range(offset, initial_offset - offset) + .await?, + ); + new_initial_read.extend_from_slice(&initial_read); + (offset, new_initial_read.freeze()) + } else { + (initial_offset, initial_read) + }; + + // Now we try to read the DType and Layout segments. + let dtype = self.dtype.clone().unwrap_or_else(|| { + self.parse_dtype(initial_offset, &initial_read, postscript.dtype) + .vortex_expect("Failed to parse dtype") + }); + let file_layout = self.file_layout.clone().unwrap_or_else(|| { + self.parse_file_layout( + initial_offset, + &initial_read, + postscript.file_layout, + dtype.clone(), + ) + .vortex_expect("Failed to parse file layout") + }); + + // Set up our segment cache and for good measure, we populate any segments that were + // covered by the initial read. + let mut segment_cache = SegmentCache::default(); + self.populate_segments( + initial_offset, + &initial_read, + &file_layout, + &mut segment_cache, + )?; + + // Now we can create the VortexFile. + Ok(VortexFile { + read, + ctx: self.ctx.clone(), + layout: file_layout.root_layout, + segments: file_layout.segments, + segment_cache, + }) + } + + /// Parse the postscript from the initial read. + fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult { + let eof_loc = initial_read.len() - EOF_SIZE; + let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len()); + + let magic_number = &initial_read[magic_bytes_loc..]; + if magic_number != MAGIC_BYTES { + vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}") + } + + let version = u16::from_le_bytes( + initial_read[eof_loc..eof_loc + 2] + .try_into() + .map_err(|e| vortex_err!("Version was not a u16 {e}"))?, + ); + if version != VERSION { + vortex_bail!("Malformed file, unsupported version {version}") + } + + let ps_size = u16::from_le_bytes( + initial_read[eof_loc + 2..eof_loc + 4] + .try_into() + .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?, + ) as usize; + + Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc]) + } + + /// Parse the DType from the initial read. + fn parse_dtype( + &self, + initial_offset: u64, + initial_read: &[u8], + dtype: Segment, + ) -> VortexResult { + let offset = usize::try_from(dtype.offset - initial_offset)?; + let dtype_bytes = &initial_read[offset..offset + dtype.length]; + DType::try_from(root::(dtype_bytes)?) + } + + /// Parse the FileLayout from the initial read. + fn parse_file_layout( + &self, + initial_offset: u64, + initial_read: &ByteBuffer, + segment: Segment, + dtype: DType, + ) -> VortexResult { + let offset = usize::try_from(segment.offset - initial_offset)?; + let bytes = initial_read.slice(offset..offset + segment.length); + + let fb = root::(&bytes)?; + let fb_root_layout = fb + .root_layout() + .ok_or_else(|| vortex_err!("FileLayout missing root layout"))?; + + let root_encoding = self + .layout_ctx + .lookup_layout(LayoutId(fb_root_layout.encoding())) + .ok_or_else(|| { + vortex_err!( + "FileLayout root layout encoding {} not found", + fb_root_layout.encoding() + ) + })?; + let root_layout = LayoutData::try_new_viewed( + root_encoding, + dtype, + initial_read.clone(), + fb_root_layout._tab.loc(), + self.layout_ctx.clone(), + )?; + + let fb_segments = fb + .segments() + .ok_or_else(|| vortex_err!("FileLayout missing segments"))?; + let segments = fb_segments + .iter() + .map(|s| Segment::read_flatbuffer(&s)) + .try_collect()?; + + Ok(FileLayout { + root_layout, + segments, + }) + } + + fn populate_segments( + &self, + initial_offset: u64, + initial_read: &ByteBuffer, + file_layout: &FileLayout, + segments: &mut SegmentCache, + ) -> VortexResult<()> { + for (idx, segment) in file_layout.segments.iter().enumerate() { + if segment.offset < initial_offset { + // Skip segments that aren't in the initial read. + continue; + } + + let segment_id = SegmentId::from(u32::try_from(idx)?); + + let offset = usize::try_from(segment.offset - initial_offset)?; + let bytes = initial_read.slice(offset..offset + segment.length); + + segments.set(segment_id, bytes.into_inner()); + } + Ok(()) + } +} diff --git a/vortex-file/src/v2/segments.rs b/vortex-file/src/v2/segments.rs index e636add43..004f0a8fe 100644 --- a/vortex-file/src/v2/segments.rs +++ b/vortex-file/src/v2/segments.rs @@ -1,7 +1,10 @@ +use std::sync::{Arc, RwLock}; + use bytes::Bytes; -use vortex_error::{vortex_err, VortexResult}; +use vortex_array::aliases::hash_map::HashMap; +use vortex_error::{vortex_err, VortexExpect, VortexResult}; use vortex_io::VortexWrite; -use vortex_layout::segments::{SegmentId, SegmentWriter}; +use vortex_layout::segments::{SegmentId, SegmentReader, SegmentWriter}; use crate::v2::footer::Segment; @@ -40,3 +43,32 @@ impl BufferedSegmentWriter { Ok(()) } } + +/// A segment cache that holds segments in memory. +/// +/// TODO(ngates): switch to a Moka LRU cache. +#[derive(Default, Clone)] +pub(crate) struct SegmentCache { + segments: Arc>>, +} + +impl SegmentCache { + pub(crate) fn set(&self, id: SegmentId, data: Bytes) { + self.segments + .write() + .map_err(|_| vortex_err!("Poisoned cache")) + .vortex_expect("poisoned") + .insert(id, data); + } +} + +impl SegmentReader for SegmentCache { + fn get(&self, id: SegmentId) -> Option { + self.segments + .read() + .map_err(|_| vortex_err!("Poisoned cache")) + .vortex_expect("poisoned") + .get(&id) + .cloned() + } +} diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index 4c675f96b..b1706c7eb 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -4,7 +4,7 @@ use futures_util::StreamExt; use vortex_array::iter::ArrayIterator; use vortex_array::stream::ArrayStream; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; -use vortex_flatbuffers::WriteFlatBufferExt; +use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt}; use vortex_io::VortexWrite; use vortex_layout::strategies::LayoutStrategy; @@ -66,40 +66,25 @@ impl WriteOptions { .await?; } - // Write the final messages into the file + // Flush the final layout messages into the file let root_layout = layout_writer.finish(&mut segment_writer)?; segment_writer .flush_async(&mut write, &mut segments) .await?; - // TODO(ngates): we may want to just write the FileLayout and DType via segment writer? - let dtype_offset = write.position(); - write - .write_all(root_layout.write_flatbuffer_bytes()) - .await?; - let dtype_segment = Segment { - offset: dtype_offset, - length: usize::try_from(write.position() - dtype_offset) - .map_err(|_| vortex_err!("dtype segment length exceeds maximum usize"))?, - }; - - let layout_offset = write.position(); - write - .write_all( - FileLayout { + // Write the DType + FileLayout segments + let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?; + let file_layout_segment = self + .write_flatbuffer( + &mut write, + &FileLayout { root_layout, segments, - } - .write_flatbuffer_bytes(), + }, ) .await?; - let file_layout_segment = Segment { - offset: layout_offset, - length: usize::try_from(write.position() - layout_offset) - .map_err(|_| vortex_err!("layout segment length exceeds maximum usize"))?, - }; - // Then the postscript, that we write manually to avoid any framing. + // Assemble the postscript, and write it manually to avoid any framing. let postscript = Postscript { dtype: dtype_segment, file_layout: file_layout_segment, @@ -125,4 +110,18 @@ impl WriteOptions { Ok(write.into_inner()) } + + async fn write_flatbuffer( + &self, + write: &mut futures_util::io::Cursor, + flatbuffer: &F, + ) -> VortexResult { + let layout_offset = write.position(); + write.write_all(flatbuffer.write_flatbuffer_bytes()).await?; + Ok(Segment { + offset: layout_offset, + length: usize::try_from(write.position() - layout_offset) + .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?, + }) + } } diff --git a/vortex-layout/src/data.rs b/vortex-layout/src/data.rs index 9e6f545cd..2f5a288fa 100644 --- a/vortex-layout/src/data.rs +++ b/vortex-layout/src/data.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use bytes::Bytes; -use flatbuffers::{root, FlatBufferBuilder, WIPOffset}; +use flatbuffers::{FlatBufferBuilder, Table, Verifiable, Verifier, VerifierOptions, WIPOffset}; use vortex_array::ContextRef; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; @@ -40,8 +40,6 @@ pub struct OwnedLayoutData { struct ViewedLayoutData { encoding: LayoutEncodingRef, dtype: DType, - // FIXME(ngates): i think this is in the flatbuffer? - row_count: u64, flatbuffer: ByteBuffer, flatbuffer_loc: usize, ctx: LayoutContextRef, @@ -51,7 +49,7 @@ impl ViewedLayoutData { /// Return the flatbuffer layout message. fn flatbuffer(&self) -> layout::Layout<'_> { unsafe { - let tab = flatbuffers::Table::new(self.flatbuffer.as_ref(), self.flatbuffer_loc); + let tab = Table::new(self.flatbuffer.as_ref(), self.flatbuffer_loc); layout::Layout::init_from_table(tab) } } @@ -81,24 +79,30 @@ impl LayoutData { pub fn try_new_viewed( encoding: LayoutEncodingRef, dtype: DType, - row_count: u64, flatbuffer: ByteBuffer, + flatbuffer_loc: usize, ctx: LayoutContextRef, ) -> VortexResult { - // Validate the buffer contains a layout message. - let layout_fb = root::(flatbuffer.as_ref())?; - if layout_fb.encoding() != encoding.id().0 { + // Validate the buffer contains a layout message at the given location. + let opts = VerifierOptions::default(); + let mut v = Verifier::new(&opts, flatbuffer.as_ref()); + fb::Layout::run_verifier(&mut v, flatbuffer_loc)?; + + // SAFETY: we just verified the buffer contains a valid layout message. + let fb_layout = + unsafe { fb::Layout::init_from_table(Table::new(flatbuffer.as_ref(), flatbuffer_loc)) }; + if fb_layout.encoding() != encoding.id().0 { vortex_bail!( "Mismatched encoding, flatbuffer contains {}", - layout_fb.encoding() + fb_layout.encoding() ); } + Ok(Self(Inner::Viewed(ViewedLayoutData { encoding, dtype, - row_count, flatbuffer, - flatbuffer_loc: 0, + flatbuffer_loc, ctx, }))) } @@ -123,7 +127,7 @@ impl LayoutData { pub fn row_count(&self) -> u64 { match &self.0 { Inner::Owned(owned) => owned.row_count, - Inner::Viewed(viewed) => viewed.row_count, + Inner::Viewed(viewed) => viewed.flatbuffer().row_count(), } } @@ -182,7 +186,6 @@ impl LayoutData { Ok(Self(Inner::Viewed(ViewedLayoutData { encoding, dtype, - row_count: fb.row_count(), flatbuffer: v.flatbuffer.clone(), flatbuffer_loc: fb._tab.loc(), ctx: v.ctx.clone(), @@ -244,11 +247,11 @@ impl LayoutData { /// Create a scan of this layout. pub fn new_scan( - self, + &self, scan: Scan, ctx: ContextRef, ) -> VortexResult> { - self.encoding().scan(self, scan, ctx) + self.encoding().scan(self.clone(), scan, ctx) } } diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index d2de02e01..8f545857f 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -5,6 +5,7 @@ mod data; pub mod scanner; pub use data::*; mod context; +pub use context::*; mod encoding; pub mod layouts; pub use encoding::*; diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index 006c58093..a6a487b0e 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -6,7 +6,7 @@ use vortex_ipc::messages::{EncoderMessage, MessageEncoder}; /// The identifier for a single segment. // TODO(ngates): should this be a `[u8]` instead? Allowing for arbitrary segment identifiers? -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SegmentId(u32); impl From for SegmentId { From 34a5b65dd22f697ad733e9a68d7d078bf08800ad Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 Jan 2025 21:05:11 +0000 Subject: [PATCH 3/4] File V2 --- Cargo.lock | 1 - vortex-file/Cargo.toml | 1 - vortex-file/src/v2/file.rs | 11 ++--------- vortex-file/src/v2/mod.rs | 2 ++ vortex-file/src/v2/open.rs | 10 ++++++---- vortex-file/src/v2/tests.rs | 39 +++++++++++++++++++++++++++++++++++++ vortex-layout/src/data.rs | 5 +++-- 7 files changed, 52 insertions(+), 17 deletions(-) create mode 100644 vortex-file/src/v2/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 200f997a0..04f2115d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5107,7 +5107,6 @@ dependencies = [ "futures-executor", "futures-util", "itertools 0.13.0", - "moka", "once_cell", "rstest", "tokio", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 539aa02d4..76d4a81dc 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -23,7 +23,6 @@ futures = { workspace = true, features = ["std"] } futures-executor = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } -moka = { workspace = true } once_cell = { workspace = true } tokio = { workspace = true, features = ["rt"] } tracing = { workspace = true, optional = true } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 695351a4e..4d79ca462 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -5,7 +5,6 @@ use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_array::ContextRef; use vortex_dtype::DType; use vortex_error::VortexResult; -use vortex_expr::ExprRef; use vortex_io::VortexReadAt; use vortex_layout::scanner::{Poll, Scan}; use vortex_layout::{LayoutData, RowMask}; @@ -34,14 +33,8 @@ impl VortexFile { } /// Performs a scan operation over the file. - pub fn scan( - &self, - projection: ExprRef, - filter: Option, - ) -> VortexResult { - let layout_scan = self - .layout - .new_scan(Scan { projection, filter }, self.ctx.clone())?; + pub fn scan(&self, scan: Scan) -> VortexResult { + let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; // TODO(ngates): we could query the layout for splits and then process them in parallel. // For now, we just scan the entire layout with one mask. diff --git a/vortex-file/src/v2/mod.rs b/vortex-file/src/v2/mod.rs index d4eb41c80..a380f37e2 100644 --- a/vortex-file/src/v2/mod.rs +++ b/vortex-file/src/v2/mod.rs @@ -3,6 +3,8 @@ mod footer; mod open; mod segments; mod strategy; +#[cfg(test)] +mod tests; mod writer; pub use file::*; diff --git a/vortex-file/src/v2/open.rs b/vortex-file/src/v2/open.rs index 71f868cae..48906da71 100644 --- a/vortex-file/src/v2/open.rs +++ b/vortex-file/src/v2/open.rs @@ -65,9 +65,10 @@ impl OpenOptions { pub async fn open(self, read: R) -> VortexResult> { // Fetch the file size and perform the initial read. let file_size = read.size().await?; - let initial_offset = file_size - self.initial_read_size; + let initial_read_size = self.initial_read_size.min(file_size); + let initial_offset = file_size - initial_read_size; let initial_read: ByteBuffer = read - .read_byte_range(initial_offset, self.initial_read_size) + .read_byte_range(initial_offset, initial_read_size) .await? .into(); @@ -119,7 +120,7 @@ impl OpenOptions { &mut segment_cache, )?; - // Now we can create the VortexFile. + // Finally, create the VortexFile. Ok(VortexFile { read, ctx: self.ctx.clone(), @@ -194,10 +195,11 @@ impl OpenOptions { fb_root_layout.encoding() ) })?; + let _fb_encoding_id = fb_root_layout.encoding(); let root_layout = LayoutData::try_new_viewed( root_encoding, dtype, - initial_read.clone(), + bytes.clone(), fb_root_layout._tab.loc(), self.layout_ctx.clone(), )?; diff --git a/vortex-file/src/v2/tests.rs b/vortex-file/src/v2/tests.rs new file mode 100644 index 000000000..4718e3325 --- /dev/null +++ b/vortex-file/src/v2/tests.rs @@ -0,0 +1,39 @@ +use bytes::Bytes; +use vortex_array::array::ChunkedArray; +use vortex_array::stream::ArrayStreamExt; +use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant}; +use vortex_buffer::buffer; +use vortex_layout::scanner::Scan; + +use crate::v2::{OpenOptions, WriteOptions}; + +#[tokio::test] +async fn write_read() { + let arr = ChunkedArray::from_iter(vec![ + buffer![0, 1, 2].into_array(), + buffer![3, 4, 5].into_array(), + ]) + .into_array(); + + let written = WriteOptions::default() + .write_async(vec![], arr.into_array_stream()) + .await + .unwrap(); + + // TODO(ngates): no need to wrap Vec in Bytes if VortexReadAt doesn't require clone. + let vxf = OpenOptions::new(ContextRef::default()) + .open(Bytes::from(written)) + .await + .unwrap(); + + let result = vxf + .scan(Scan::all()) + .unwrap() + .into_array_data() + .await + .unwrap() + .into_primitive() + .unwrap(); + + assert_eq!(result.as_slice::(), &[0, 1, 2, 3, 4, 5]); +} diff --git a/vortex-layout/src/data.rs b/vortex-layout/src/data.rs index 2f5a288fa..f4d779fdc 100644 --- a/vortex-layout/src/data.rs +++ b/vortex-layout/src/data.rs @@ -93,8 +93,9 @@ impl LayoutData { unsafe { fb::Layout::init_from_table(Table::new(flatbuffer.as_ref(), flatbuffer_loc)) }; if fb_layout.encoding() != encoding.id().0 { vortex_bail!( - "Mismatched encoding, flatbuffer contains {}", - fb_layout.encoding() + "Mismatched encoding, flatbuffer contains {}, given {}", + fb_layout.encoding(), + encoding.id(), ); } From 14a1884c3675739b6398b6d66d8f396a41e3a3cc Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 Jan 2025 21:18:35 +0000 Subject: [PATCH 4/4] Fix dtype --- vortex-file/src/v2/file.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 4d79ca462..039b21f2d 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -35,6 +35,7 @@ impl VortexFile { /// Performs a scan operation over the file. pub fn scan(&self, scan: Scan) -> VortexResult { let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?; + let scan_dtype = layout_scan.dtype().clone(); // TODO(ngates): we could query the layout for splits and then process them in parallel. // For now, we just scan the entire layout with one mask. @@ -60,7 +61,7 @@ impl VortexFile { } }); - Ok(ArrayStreamAdapter::new(layout_scan.dtype().clone(), stream)) + Ok(ArrayStreamAdapter::new(scan_dtype, stream)) } }