Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Porting persistence layer to Rust #705

Closed
twitu opened this issue Jul 21, 2022 · 17 comments · Fixed by #1085
Closed

Porting persistence layer to Rust #705

twitu opened this issue Jul 21, 2022 · 17 comments · Fixed by #1085
Assignees
Labels
enhancement New feature or request rust Relating to the Rust core

Comments

@twitu
Copy link
Collaborator

twitu commented Jul 21, 2022

This issue is to track the discussion and PRs for this large-ish port of the persistence layer into Rust.

Parquet layer

In the parquet layer, the key component here is the parquet reader/writer that reads/writes to arrow format which gets converted to/from chunks of a Rust structs. The current working example in #699 introduces the following traits and structs. They are parameterized over a lifetime (explained below) and the struct which they are decoding encoding.

/// write vectors of struct A to a parquet file
pub struct ParquetWriter<A> {
    pub writer: FileWriter<File>,
    pub encodings: Vec<Vec<Encoding>>,
    pub options: WriteOptions,
    pub writer_type: PhantomData<*const A>,
}

/// read vectors of struct A from a parquet file
struct ParquetReader<'a, A> {
    file_reader: FileReader<&'a File>,
    reader_type: PhantomData<*const A>,
}

/// Any struct that wants to be decoded from a parquet file
/// must implement this trait.
trait DecodeFromChunk
where
    Self: Sized,
{
    fn decode(schema: &Schema, cols: Chunk<Arc<dyn Array>>) -> Vec<Self>;
}

/// Any struct that wants to be encoded and written to a parquet file
/// must implement this struct.
trait EncodeToChunk
where
    Self: Sized,
{
    fn encode_schema() -> Schema;
    fn encode(data: Vec<Self>) -> Chunk<Box<dyn Array>>;
}

/// parquet reader implements an iterator i.e. calling next on it reads
/// the next batch of chunks from disk converts them into vector of struct A
/// and returns the vector
impl<'a, A> Iterator for ParquetReader<'a, A>
where
    A: DecodeFromChunk,
{...}

Interface layer

We need another layer on top of this to interface with Python/Cython bindings. Current persistence layer allows passing queries that project certain columns and filter certain rows. Since the parquet file reader is now provided by the rust arrow2 library, the python project filter expressions will have to be converted in this interfacing layer so that parquet layer (above) can understand it.

    /// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing
    /// Arrow's schema from it.
    ///
    /// # Error
    /// This function errors iff:
    /// * reading the metadata from the reader fails
    /// * it is not possible to derive an arrow schema from the parquet file
    /// * the projection contains columns that do not exist
    pub fn try_new(
        mut reader: R,
        projection: Option<&[usize]>,
        chunk_size: Option<usize>,
        limit: Option<usize>,
        groups_filter: Option<GroupFilter>,
    ) -> Result<Self> {...}

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;

This is the function that creates a parquet file reader.

  • Python style projections must be converted to a Option<&[usize]> i.e. a slice containing indexes of columns that the reader should expose. This part is related to (WIP) Rust Catalog: Expression filter #700. @ghill2 you can consider adding this conversion function to your PR.

  • Filtering is more involved as it needs to be passed a Rust function that returns true or false to accept or reject a row. pyo3 docs offers no way of converting a python function to a rust function. And logically to it sounds like a hard problem to interconvert functions between languages. The only way to do this is to decide on the set of required filter operations which can be passed to Rust as an enums and then be converted into actual functions. @cjdsellers we'll need your help here to figure if we can indeed narrow down such a set of operations.

    On the other hand, a simpler (less performant) solution is to simply fetch all the rows and filter them on the python side. It's a valid workaround if there's no better solution. I think we can go with this approach for now, benchmark and see the performance number before adding a layer of complexity.

Consumer layer

Next we come to problem of actually consuming the vectors. This problem is specific to how ParquetReader will be used and will dictate the control flow.

Option 1: Pass strategy to Rust

This problem arises because of the lifetime parameter 'a on ParquetReader. This happens because the underlying arrow2 parquet file reader, takes the reference to a file i.e. it can only exist in the scope the file has been created. Now since ParquetReader is consumed as an iterator. The consuming logic must also exist within the scope of the file. Here's a diagram to demonstrate.

image (1)
image

In code, at a high level it would look something like this.

class Engine:

    def run():
          strategy = Strategy()
          file = filename
          catalog_reader.run(filename, strategy)  // catalog_reader is an ffi call declared in Rust
pub fn extern "C" catalog_reader(filename: String, strategy: PyObject) -> () {
    let f = File::open(filename).unwrap();
    let pqr: ParquetReader<QuoteTick> = ParquetReader::new(&f, 10000);
    for chunk: Vec<QuoteTick> in pqr.into_iter() {
        Python::acquire_gil {
         ....
         strategy.call(chunk)
    }
}

This is the simplest approach and puts the entire control flow within rust, once the file name and strategy has been passed to it.

Option 2: Pass parquet reader to Python

We want to the control flow to stay on the Python side. In this case we'll need to use Pin to create a self-referential structure for ParquetReader. This is because we'll need to keep the file and the arrow2 file reader which depends on the reference to file being alive together. It'll look roughly like this.

/// read vectors of struct A from a parquet file
struct ParquetReader<'a, A> {
    file: Pin<Box<File>,
    file_reader: FileReader<&'a File>,
    reader_type: PhantomData<*const A>,
}

This struct can then be wrapped in a Box and then passed over ffi. In this approach we'll need to implement an ffi next method for ParquetReader that calls the internal next method of the iterator. It'll also need to convert the returned vector into something ffi safe. Here @ghill2 has already done the vector conversion work in #699.

Non-exhaustive list of helpful reads:

Passing generics across ffi

Finally we come to the issue of polymorphic structure crossing the ffi boundaries. Currently the generic functions need to be mangled, i.e. uniquely named for each possible type parameter we want to support. The below example does not work. It will have to be uniquely named.

However because lifetimes are special kind of generic parameter where mangling cannot help. We cannot send the ParquetReader across the ffi boundary without implementing option 2 given above.

#[repr(C)]
pub struct ParquetReader<'a, A> {
    file_reader: FileReader<&'a File>,
    reader_type: PhantomData<*const A>,
}

// ERROR: does NOT work because generic functions need to be mangled
pub extern "C" fn create<'a, A>(file: &'a File) -> Box<ParquetReader<'a, A>> {
    Box::new(ParquetReader::new(file, 100))
}

#700 already has some work done in passing vectors of structs across ffi boundaries

TL;DR

To complete the port, we need to figure out the following -

  1. project columns convert python style to rust slice
  2. decide on which approach to take for row filtering
  3. decide on which approach to take for consumer
@twitu twitu added the enhancement New feature or request label Jul 21, 2022
@cjdsellers
Copy link
Member

Hi @twitu

As always, its a pleasure reading your detailed write up.

Interface layer

Your statement here is also exactly what I was thinking, as we're currently handling filtering in pure Python as it is anyway:

On the other hand, a simpler (less performant) solution is to simply fetch all the rows and filter them on the python side. It's a valid workaround if there's no better solution. I think we can go with this approach for now, benchmark and see the performance number before adding a layer of complexity.

Consumer layer

Then as far as the options, I understand things are not as simple as read all structs in Rust, pass back to Cython and done. Due to the flow of the batch processing, there are lifetime issues around the file handle in Rust.

One thing I can say is we should try wherever possible at this point to avoid calling Python from Rust, as it really complicates the Rust libraries interaction with the Python runtime. The hope is as more of the system moves to Rust we just have less and less need for Cython wrappers, and more Rust calling Rust.

In this case we're looking at Option 2, keeping the flow on the Python side for now. I think boxing the reader is completely reasonable and shouldn't matter performance wise.

Moving forward

  • @twitu please let me know which of @ghill2 diffs you are happy to be merged into your work (or request changed).

  • I understand the code is still a WIP, however I'm then keen to get a prototype merged into develop which I can look over in more detail, and consider options for integrating through Cython, along with the solutions design overall.

@twitu
Copy link
Collaborator Author

twitu commented Jul 22, 2022

Yeah no problem, please go ahead with merging #699. I've updated the PR with the latest changes. I'll continue using rs-schema branch for adding new changes.


I missed one additional detail in the above comment related to Interface layer projections.

The problem with projections is that the projected values need to correspond to the fields of a struct. Suppose for e.g. struct QuoteTick has 4 fields but only 3 of them are projected from parquet, then the decoder method will have to fill in the 4th field with a default value. If it's not implemented in such a way then it will fail to construct quote ticks and the parquet reader will fail.

Now more generally any combination of projected fields is possible but the decoder can only handle decoding the fetched columns to one structure which has been defined in the trait implementation. This means valid projections are directly tied to the decoder implementation which will have to convert the projected columns into structs.

This will only be a problem if users try to use the same parquet reader with different projections. Do you see something like this happening? What are the common use cases for projections currently?


I'll look at implementing option 2 i.e ParquetReader with Pin Box so that it can be passed around.

On the Python side it will behave as an iterator over a list of python objects (which are rust structs converted to python objects). @ghill2 do you want to take a look at implementing this wrapper as a python iterator. It might look something like this.

  1. Python side calls next on the parquet reader wrapper which is implements iter next link
  2. Interface layer - the ffi next function that calls parquet readers next method (since it implements iterator) which returns a vector of structs
  3. the vector of structs is converted to a PyList of Python objects

@cjdsellers
Copy link
Member

cjdsellers commented Jul 23, 2022

Thanks @twitu

I've merged #699 and will have a tinker to see how best to integrate with Cython from the Python catalog level.

Will wait to hear from @ghill2 here or on Discord re the iterator design you mentioned.

For now we don't need to handle projections, it could be a future feature if it's valuable to enough users.

@ghill2
Copy link
Collaborator

ghill2 commented Jul 23, 2022

Thanks for the detailed study @twitu
This is an excellent plan for the iterator, I'll implement this and submit a PR.

@cjdsellers cjdsellers added the rust Relating to the Rust core label Jul 27, 2022
@twitu
Copy link
Collaborator Author

twitu commented Jul 30, 2022

#718 completes the port and has examples, tests and patterns for the integration.

Consumer layer

The consumer layer problem was not difficult to solve and actually did not require Pin Box complexity. I missed the fact that the underlying parquet reader takes a type that implements Reader trait. Both File and &File implement Reader trait so the underlying reader can take ownership of the underlying file. This eliminated the need for references and lifetimes.

The current solution is the same as option 2 without Pin Box.

pub struct ParquetReader<A> {
    file_reader: FileReader<File>,
    reader_type: PhantomData<*const A>,
}

Generic FFI

The more important challenge was to do ffi without creating a huge number of functions specializing readers and vectors for each possible data type. This solved by dropping to c level and using raw pointers. The technique is powerful and is useful in multiple places but still offers a degree of safety.

The key idea is that reference to a heap allocated value can be a coerced into a void pointer (i.e. a non-type specific pointer). And it's memory can be "leaked" as in we can tell rust not to drop it. This makes it possible to pass the data as a pointer to python side. However to ensure that memory is not truly leaked, python needs to pass back the pointer to Rust (after the work is done) so that it can be freed/dropped.

Now comes the hairy part, when the pointer is returned to Rust. It does not know what is the type of the data the pointer is pointing to. Here Python also needs to pass a discriminant (an enum) so that rust can match on in it and coerce the void pointer to a pointer of the correct type and then drop it.

These are the signatures involved

#[repr(C)]
pub enum ParquetReaderType {
    QuoteTick,
}

pub unsafe extern "C" fn parquet_reader_new(
    file_path: PyObject,
    reader_type: ParquetReaderType,
) -> *mut c_void;
pub unsafe extern "C" fn parquet_reader_drop(reader: *mut c_void, reader_type: ParquetReaderType);
pub unsafe extern "C" fn parquet_reader_next_chunk(
    reader: *mut c_void,
    reader_type: ParquetReaderType,
) -> CVec;
pub unsafe extern "C" fn parquet_reader_drop_chunk(chunk: CVec, reader_type: ParquetReaderType);

The key idea being that Python tells Rust what type it needs, a void pointer is returned by the function, then Python type casts it to a suitable type since it knows what type is needed at that point. We can control the degree of unsafeness by allowing Python to pass Rust enums to tell it the type of the underlying pointer. This will locally restrict the setup of possible values to only a few types.

In the case of ParquetReaderType, it should only be implemented for types that implement DecodeChunk trait because only they can have instances of ParquetReader.

The below diagram illustrates the control flow.

sequenceDiagram
    participant P as Python
    participant R as Rust
    P->>R: parquet_reader_new(ReaderType::QuoteTick)
    R->>P: *mut c_void (pointer to leaked ParquetReader<QuoteTick>)
    P->>R: parquet_reader_next_chunk(*mut c_void, ReaderType::QuoteTick)
    R->>P: *mut c_void (pointer to leaked underlying QuoteTick vec)
    P->>R: parquet_reader_drop_chunk(*mut c_void, ReaderType::QuoteTick)
    P->>R: parquet_reader_drop(*mut c_void, ReaderType::QuoteTick)
Loading

@cjdsellers
Copy link
Member

Great work as always @twitu Ishan 👏

So my understand is this efficient parquet reading and writing at the Rust level will be available for the standard built-in types initially.

Users implementing their own custom data types would have to make do with the pure Python implementation for now, or implement the DecodeChunk trait for Rust, and add an enum variant?

We'll move forward with the integration from the Python side soon.

@twitu
Copy link
Collaborator Author

twitu commented Nov 27, 2022

In recent changes we've made ParquetWriter and ParquetReader generic over their underlying reader and writer. This serves two purposes:

  • Support reading and writing to both a file and memory buffer
  • Generic implementation is more in line with the objective of moving more things to Rust

Currenlty we specialize ParquetReader for file and memory buffer, and use a separate enum ParquetReaderType to indicate it's underlying type when calling through an FFI function.

For ParquetWriter we only specialize for memory buffer. Python is returned the buffer and it uses fsspec which actually writes to one of the many backends it supports.


The functionality for reader and writer is exposed through unsafe FFI functions. The ones below are for Cython but with a few changes they can work with Python too.

/// # Safety
/// - Assumes `file_path` is a valid `*mut ParquetReader<QuoteTick>`.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_file_new(
    file_path: *mut ffi::PyObject,
    parquet_type: ParquetType,
    chunk_size: usize,
    // group_filter_arg: GroupFilterArg,  TODO: Comment out for now
) -> *mut c_void

/// # Safety
/// - Assumes `data` is a valid CVec with an underlying byte buffer.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_buffer_new(
    data: CVec,
    parquet_type: ParquetType,
    chunk_size: usize,
    // group_filter_arg: GroupFilterArg,  TODO: Comment out for now
) -> *mut c_void

/// # Safety
/// - Assumes `reader` is a valid `*mut ParquetReader<Struct>` where the struct.
/// has a corresponding [ParquetType] enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_free(
    reader: *mut c_void,
    parquet_type: ParquetType,
    reader_type: ParquetReaderType,
)

/// # Safety
/// - Assumes `reader` is a valid `*mut ParquetReader<Struct>` where the struct.
/// has a corresponding ParquetType enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_next_chunk(
    reader: *mut c_void,
    parquet_type: ParquetType,
    reader_type: ParquetReaderType,
) -> CVec

/// # Safety
/// - Assumes `chunk` is a valid `ptr` pointer to a contiguous array.
#[no_mangle]
pub unsafe extern "C" fn parquet_reader_drop_chunk(chunk: CVec, parquet_type: ParquetType)

ParquetReader ffi functions

ParquetReader has function to create specialized file or buffer readers, to get the next chunk, free the chunk and free the reader. The chunk is returned as C-compatible struct CVec which is an allocated vec that is not tracked, so it must be freed after it has been used.

pub struct CVec {
    /// Opaque pointer to block of memory storing elements to access the
    /// elements cast it to the underlying type.
    pub ptr: *mut c_void,
    /// The number of elements in the block.
    #[pyo3(get, set)]
    pub len: usize,
    /// The capacity of vector from which it was allocated.
    /// Used when deallocating the memory
    pub cap: usize,
}

CVec struct

ParquetWriter exposes the following functions to create it, take a buffer of structs and write them and finally flush the writer which consumes the writer and returns the underlying memory buffer. The memory buffer can then be written to whichever storage decided on the Python side.

/// ParquetWriter is generic for any writer however for ffi it only supports
/// byte buffer writers. This is so that the byte buffer can be returned after
/// the writer is ended.
///
/// # Safety
/// - Assumes `file_path` is borrowed from a valid Python UTF-8 `str`.
/// - Assumes `metadata` is borrowed from a valid Python `dict`.
#[no_mangle]
pub unsafe extern "C" fn parquet_writer_new(
    parquet_type: ParquetType,
    metadata: *mut ffi::PyObject,
) -> *mut c_void

/// Writer is flushed, consumed and dropped. The underlying writer is returned.
/// While this is generic for FFI it only considers and returns a vector of bytes
/// if the underlying writer is anything else it will fail.
///
/// # Safety
/// - Assumes `writer` is a valid `*mut ParquetWriter<Struct>` where the struct
/// has a corresponding `ParquetType` enum.
#[no_mangle]
pub unsafe extern "C" fn parquet_writer_flush(
    writer: *mut c_void,
    parquet_type: ParquetType,
) -> CVec

#[no_mangle]
/// # Safety
/// - Assumes `writer` is a valid `*mut ParquetWriter<Struct>` where the struct
/// has a corresponding ParquetType enum.
/// - Assumes  `data` is a non-null valid pointer to a contiguous block of
/// C-style structs with `len` number of elements
pub unsafe extern "C" fn parquet_writer_write(
    writer: *mut c_void,
    parquet_type: ParquetType,
    data: *mut c_void,
    len: usize,
)

ParquetWriter ffi functions


Because these functions deal with pointers, they are inherently unsafe and have a few invariants to maintain.

  • A reader should be freed after it is done reading
  • A writer must be flushed (and hence consumed) after it is done writing
  • Any chunk returned by the reader must be freed using drop chunk
  • The writer does not manage the memory of the buffer passed to it in write chunk. It must be deallocated or garbage collected on the side that called the write chunk function.
  • Flushing the writer returns the underlying memory buffer this must be freed after it has been used. This buffer is a vector of bytes and can be freed using the default method to free cvec.

Still there are currently a few cases where the interaction with Cython/Python segfaults -

  • The cython class for ParquetReader segfaults on drop chunk function - possibly because of incompatible default values
  • The ParquetWriter segfaults when trying to write a list of items of length > 8192

Both these segfaults are where Python and Rust interface with lists or buffers of items. We need a robust way to interconvert between them.

  1. The reader returns a buffer of structs. For Python to access them we need to convert them to Python objects. This can best be done by implementing a from_mem static method for the appropriate classes which can take the underlying Rust struct and construct a Python object from it. Note: that since these are C structs this conversion to Python object can only happen in Cython

  2. The writer can take a buffer of structs. These are allocated on the caller side, Cython in this case, which takes a list of Python objects and creates a contiguous memory buffer of their underlying C structs which is then passed to Rust. Again since this deals with C structs it can only be done in Cython layer.
    There are two ways to do it either using C++ vector or using Python malloc/free APIs to allcoate and deallocate a memory buffer.

One good point of this approach is that it can easily be removed when the Rust <-> Python interface is no longer needed.

@twitu
Copy link
Collaborator Author

twitu commented Jan 3, 2023

The next step is to reduce the amount of Cython code by implementing logic as a pyo3 module.

shapes(3)

We do this by moving the reader/writer logic and interface to Rust using Pyo3. The current version has the interfacing for reader and writer implemented in Cython which we move to Rust as a pyo3 module.

However we still need a thin layer of Cython to convert the data passed from Rust. The data is a buffer of Rust structs which need to be converted back to Cython objects, and this can be done only in Cython because it can directly interface with pointers.

There is one additional challenge. Since the control flow is in Python, we need to pass the buffer of structs through Python into Cython. However there is no way to manipulate pointers in Python directly. Here's an illustration of the problem.

image(3)

The "HOW??" is solved using PyCapsule12 which are opaque objects meant to transfer data among Cython extensions through Python.

These changes are implemented in this PR: #938

Footnotes

  1. https://docs.python.org/3/c-api/capsule.html

  2. https://pyo3.rs/main/doc/pyo3/types/struct.pycapsule

@twitu
Copy link
Collaborator Author

twitu commented Feb 3, 2023

Here's a high level picture of the current state. Firstly what is the source and sink of data. The source is a bunch of partitioned parquet files and the sink is an engine that can consume a list python objects.

shapes(3)

We want to go from source to sink efficiently. The current rust implementation of parquet reader (using cython) improves on the python implementation by almost 25 times. Which looks something like this.

shapes(4)

But the reader currently only works for one file. So we're implementing an abstraction on top of this that can read from multiple files in #986. This catalog unifies/splices together the stream from multiple readers, filters based on multiple conditions and feeds it into the engine.

shapes(5)

However as this recent issue #939 showed a major factor in performance is good filtering. The current filtering has limited filtering abilities. All the filtering is done after the parquet has been read, decoded and converted to python objects. This is incredibly inefficient because of a lot of wasted io operations, and allocations.

To make this comparably efficient we need to push the filtering down as much as possible. Ideally down to the io level where unneeded pages are not read. The rust parquet reader supports this in a limited way, by allowing filters on the ts_init field.

However we need to support a much richer query syntax that users are more familiar with. Here as @limx0 pointed out we should consider the additional maintenance burden as well. And that since this is a solved problem we should consider leveraging other libraries like datafusion.

shapes(6)

This will require a change in the conversion layer. Currently parquet reader implemented on with arrow2 converts a Chunk to vec of ticks. In this implementation, datafusion query returns a dataframe which will need to be converted to a vec of ticks.

@cjdsellers
Copy link
Member

cjdsellers commented Feb 4, 2023

So my understanding is:

Incorporating a DataFusion type solution will improve the efficiency of query/filtering and reading bytes up from disk.

We would then still stream these buffer(s) bytes through our parquet reader in Rust, to convert to structs efficiently, then downstream of that the Cython layer with the PyCapsule approach will remain (while we still have so much Cython in the codebase)?

@twitu
Copy link
Collaborator Author

twitu commented Feb 4, 2023

Yes datafusion or similar libraries will help with fully featured query/filtering without additional maintenance burden.

Cython PyCapsule approach will have to stay for now, since we need to convert rust structs to python objects.

The exact conversion will depend on what the library returns. For e.g. the arrow2 library returns a chunk of arrays (i.e. a row group of columns) which we're decoding in this trait. This might or might not have to change depending on the library.

pub trait DecodeFromChunk
where
    Self: Sized,
{
    fn decode(schema: &Schema, cols: Chunk<Box<dyn Array>>) -> Vec<Self>;
}

@rsmb7z
Copy link
Collaborator

rsmb7z commented Feb 4, 2023

However as this recent issue #939 showed a major factor in performance is good filtering. The current filtering has limited filtering abilities. All the filtering is done after the parquet has been read, decoded and converted to python objects. This is incredibly inefficient because of a lot of wasted io operations, and allocations.

Yes it goes through with all files (to read the filter columns), although when parititoning is specified it shall look inside that Partition only otherwise there is no reasonable purpose of Partitions. I am not sure if we are missing any parameter during query or it is actually a bug with pyarrow. Apart from this issue, Parquet is the most efficient I found so far for large data, having tried different approaches before DB Server (MySQL, Postgre), CSV, SQLite etc.

I have done testing using DataFusion in Rust, just now and it tries to load all files in the main directory ctx.register_listing_table and will not move forward. All I found in ListingOptions is table_partition_cols which simply adds the Partitions column in the dataset. The role of queries to filter out comes after this stage. So the problem seems to be more with PyArrow itself.
So we may have to manage internally where there is partition add the Partition name in OS path before reading data. So in case this is the way, will it still be worth (not sure just giving my thoughts) involving complexity involving all Rust, Cython, Python etc, alternatively this can be achieved directly with Pandas within Python.

@twitu
Copy link
Collaborator Author

twitu commented Feb 6, 2023

You're right this is a sore point with the datafusion library. Push down predicates aren't working well. However the good thing is the maintainers are aware of this and are working on it. Here's the tracking issue for enabling push down predicates - apache/datafusion#3463. There are some additional performance improvements being discussed here - apache/datafusion#4028. Hopefully this will get merged soon and they'll we'll be see performance at par with more mature implementations.

@twitu twitu self-assigned this Mar 28, 2023
@twitu
Copy link
Collaborator Author

twitu commented Mar 28, 2023

Based on the guiding principles in #991 the catalog v2 is focusing on performance, usability and most importantly reduced maintenance burden. To that end this version aims to port the whole persistence read layer to Rust. Here's an overview starting from the consumer - the nautilus BacktestEngine (engine).

  1. The nautilus engine imposes a few invariants on the data it consumes.
    • It consumes a list/batch of Data. Data can be a trade, a quote etc. This means a batch can have a mix of trades, quotes and others.

    • It consumes an arbitrary but configurable size of data per batch of data - either by size like 512 MB or by count like 1000 objects

    • The data in each batch is sorted in ascending order of ts_init field. And data across batches must also maintain this ordering. This is especially challenging because different types of data - trades, quotes come from different queries each.

      As an example consider these two lists of data given to the engine. Where T is trade, Q is quote and the number is it's ts_init value. Notice that this is invalid batching because T5 comes after Q8 which is not in ascending order.

      T1, T3, Q4, Q8 - batch 1
      T5, T7, Q9, Q9 - batch 2
      
  2. The data actually comes from many queries. Each query returns data a specific kind of data say query A returns ticks, query B quotes, and query C returns more ticks. It is assumed that each stream is sorted in ascending order of ts_init. However these streams need to be merged into a single stream that is still maintains the order of ts_init across all the data. For this we use heap based merge.
  3. Each query itself returns on kind of data only and is the result of a single query. We use datafusion as our query engine for reasons discussed in Catalog v2 #991. A datafusion query returns a chunk of records in arrow style, we use a decode layer from rust catalog v1 to convert this chunk of records into a vec of data.

This pipeline can be represented by this diagram.

shapes(2)

@twitu
Copy link
Collaborator Author

twitu commented Mar 28, 2023

These are the structs for the query engine and query results implemented in #997. The structs also implement pyo3's pyclass and expose relevant python methods as well.

The query engine takes a file path and retrieves all it's records or it takes a file and a custom sql query to run on it. The queries are not executed immediately. Once all the relevant queries have been registered, you can get the QueryResult.

pub struct PersistenceCatalog {
    session_ctx: SessionContext,
    batch_streams: Vec<Box<dyn Stream<Item = IntoIter<Data>> + Unpin>>,
}

impl PersistenceCatalog {
    // query a file for all it's records. the caller must specify `T` to indicate
    // what kind of data is expected from this query.
    pub async fn add_file<T>(&mut self, table_name: &str, file_path: &str) -> Result<()>
    where
        T: DecodeDataFromRecordBatch + Into<Data>;

    // query a file for all it's records with a custom query
    // The query should ensure the records are ordered by the
    // ts_init field in ascending order. the caller must specify `T` to indicate
    // what kind of data is expected from this query.
    pub async fn add_file_with_query<T>(
        &mut self,
        table_name: &str,
        file_path: &str,
        sql_query: &str,
    ) -> Result<()>
    where
        T: DecodeDataFromRecordBatch + Into<Data>;

    // Return the query result for the all the queries made uptil now
    pub fn get_query_result(&mut self) -> QueryResult;
}

The QueryResult is an iterator and the queries are actually executed only when the next batch of data is pulled from the QueryResult. Behind the scenes, QueryResult merges the results from all the queries into sorted batches of data using a heap based k stream merge.

pub struct QueryResult {
    data: Box<dyn Stream<Item = Vec<Data>> + Unpin>,
}

impl Iterator for QueryResult {
    type Item = Vec<Data>;

    fn next(&mut self) -> Option<Vec<Data>>;
}

@twitu
Copy link
Collaborator Author

twitu commented Mar 28, 2023

Here's a test that demonstrates how this interface can be used. It can also be found in test_catalog.rs.

use nautilus_model::data::tick::{Data, QuoteTick};
use nautilus_persistence::session::{PersistenceCatalog, QueryResult};

#[tokio::test]
async fn test_quote_ticks() {
    let mut catalog = PersistenceCatalog::default();
    catalog
        .add_file::<QuoteTick>(
            "quote_tick",
            "../../tests/test_data/quote_tick_data.parquet",
        )
        .await
        .unwrap();
    catalog
        .add_file::<QuoteTick>(
            "quote_tick_2",
            "../../tests/test_data/quote_tick_data.parquet",
        )
        .await
        .unwrap();
    let query_result: QueryResult<Data> = catalog.to_query_result();

    // NOTE: is_sorted_by_key is unstable otherwise use
    // ticks.is_sorted_by_key(|tick| tick.ts_init)
    // https://github.com/rust-lang/rust/issues/53485
    let is_ascending_by_init = |ticks: &Vec<Data>| {
        for i in 1..ticks.len() {
            // previous tick is more recent than current tick
            // this is not ascending order
            if ticks[i - 1].get_ts_init() > ticks[i].get_ts_init() {
                return false;
            }
        }
        true
    };

    let ticks: Vec<Data> = query_result.flatten().collect();
    match &ticks[0] {
        Data::Trade(_) => assert!(false),
        Data::Quote(q) => assert_eq!("EUR/USD.SIM", q.instrument_id.to_string()),
    }
    assert_eq!(ticks.len(), 19000);
    assert!(is_ascending_by_init(&ticks));
}

@twitu twitu linked a pull request May 7, 2023 that will close this issue
1 task
@twitu
Copy link
Collaborator Author

twitu commented May 21, 2023

Benchmarking results

Test Rust Python
single stream (10M) 2.4s 3.2s
multi stream (72M, 61 files) 23s NA

The python tests are expected to take slightly more time because of the overhead of initializing python objects and acquiring GIL.

The multi-stream python bench requires 8+ gb of ram and could not be run.

An appropriate measure for the catalog is its throughput which can be measured in terms of ticks per second.

  1. Rust catalog v1 - arrow2 (single stream, only rust structs, no stream merge logic) ~ 6M/s
  2. Rust catalog v2 - datafusion and kmerge ~ 3M/s
  3. Rust catalog v1 - arrow2 and Python merge logic dropping for significant implementation effort expected to be less than 3M/s from previous file load time values
  4. Catalog - Pure Python - 100K/s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request rust Relating to the Rust core
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants