Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vortex Layouts - Drivers #1914

Merged
merged 18 commits into from
Jan 13, 2025
Merged

Vortex Layouts - Drivers #1914

merged 18 commits into from
Jan 13, 2025

Conversation

gatesn
Copy link
Contributor

@gatesn gatesn commented Jan 13, 2025

Introduces the idea of a driver as a pluggable way of orchestrating I/O and CPU based work.

An ExecDriver takes a Stream<Future<T>> and returns a Stream<T>. The abstraction means we can implement drivers that use the current runtime thread (Inline), spawn the work onto an explicit Runtime, or spawn the work onto a thread pool.

The IoDriver takes a stream of segment requests and has to resolve them. For now, we have a FileIoDriver that assumes the bytes are laid out on disk (and therefore coalescing is desirable), so it pulls requests off the queue, coalesces them, then launches concurrent I/O requests to resolve them.

The VortexFile remains generic over the I/O driver, meaning the send-ness of the resulting ArrayStream is inferred based on the send-ness of the I/O driver (in the case of FileDriver, that is the send-ness of the VortexReadAt).

Part of #1676

@@ -27,13 +27,19 @@ impl Deref for SegmentId {
pub trait AsyncSegmentReader: Send + Sync {
/// Attempt to get the data associated with a given segment ID.
///
/// If the segment ID is not found, `None` is returned.
/// TODO(ngates): should this accept multiple segment IDs?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the implementation is remote and invocation can have some non trivial constant overhead you want to take multiple but in most cases the answer to your question is "no"

@gatesn gatesn merged commit d658e5f into develop Jan 13, 2025
20 of 21 checks passed
@gatesn gatesn deleted the ngates/segment-stream branch January 13, 2025 15:21
Comment on lines +287 to +289
// FIXME(ngates): how should we write into the segment cache? Feels like it should be
// non-blocking and on some other thread?
// segments.put(segment_id, buffer)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this function if you don't put anything?

}

/// TODO(ngates): outsource coalescing to a trait
fn coalesce(requests: Vec<FileSegmentRequest>) -> Vec<CoalescedSegmentRequest> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not take existing coalescing code? You can change the threshold upon which it choses to coalesce

pub enum ExecutionMode {
/// Executes the tasks inline as part of polling the returned
/// [`vortex_array::stream::ArrayStream`]. In other words, uses the same runtime.
Inline,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CurrentThread?

}
.boxed()
}
Err(e) => futures::future::ready(Err(e)).boxed(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import?

let reader: Arc<dyn LayoutReader> = self
.layout
.reader(self.segments.reader(), self.ctx.clone())?;
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the lifetime signature here. How does use<...> change anything if the lifetime has to be 'static?

Comment on lines +85 to +86
// Coalesce the segment requests to minimize the number of I/O operations.
.map(coalesce)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just is a type wrapper?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants