-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sdk): row-based access to results in sdk #2903
Changes from 2 commits
e9d2065
8bbc7eb
f641f29
1e80cfe
4d8388d
1b589c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ pub use datafusion::error::DataFusionError; | |
use datafusion::logical_expr::LogicalPlan; | ||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter; | ||
pub use datafusion::physical_plan::SendableRecordBatchStream; | ||
pub use datafusion::scalar::ScalarValue; | ||
use derive_builder::Builder; | ||
use futures::lock::Mutex; | ||
use futures::stream::{Stream, StreamExt}; | ||
|
@@ -312,6 +313,76 @@ impl From<Result<SendableRecordBatchStream, ExecError>> for RecordStream { | |
} | ||
} | ||
|
||
/// RowMap represents a single record in an ordered map. | ||
type RowMap = indexmap::IndexMap<String, ScalarValue>; | ||
|
||
/// RowMapBatch is equivalent to a row-based view of a record batch. | ||
pub struct RowMapBatch(Vec<RowMap>); | ||
|
||
impl TryFrom<RecordBatch> for RowMapBatch { | ||
type Error = DataFusionError; | ||
|
||
fn try_from(batch: RecordBatch) -> Result<Self, Self::Error> { | ||
let schema = batch.schema(); | ||
let mut out = Vec::with_capacity(batch.num_rows()); | ||
|
||
for row in 0..batch.num_rows() { | ||
let mut record = RowMap::with_capacity(batch.num_columns()); | ||
for (idx, field) in schema.fields.into_iter().enumerate() { | ||
record.insert( | ||
field.name().to_owned(), | ||
ScalarValue::try_from_array(batch.column(idx), row)?, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering this is operation is already very expensive, I'd prefer to see some fast paths instead of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems fine to me. The bounds check is pretty much a constant assert, and the downcast needs to happen somewhere, and here is as good as any. We use |
||
); | ||
} | ||
out.push(record); | ||
} | ||
Ok(RowMapBatch(out)) | ||
} | ||
} | ||
|
||
impl TryFrom<Result<RecordBatch, DataFusionError>> for RowMapBatch { | ||
type Error = DataFusionError; | ||
|
||
fn try_from(value: Result<RecordBatch, DataFusionError>) -> Result<Self, Self::Error> { | ||
Ok(RowMapBatch::try_from(value?)?) | ||
} | ||
} | ||
|
||
impl Extend<RowMap> for RowMapBatch { | ||
fn extend<T: IntoIterator<Item = RowMap>>(&mut self, iter: T) { | ||
for elem in iter { | ||
self.0.push(elem) | ||
} | ||
} | ||
} | ||
|
||
impl Default for RowMapBatch { | ||
fn default() -> Self { | ||
RowMapBatch(Vec::new()) | ||
} | ||
} | ||
|
||
impl RowMapBatch { | ||
pub fn iter(&self) -> impl Iterator<Item = RowMap> { | ||
self.0.clone().into_iter() | ||
} | ||
Comment on lines
+366
to
+368
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should implement |
||
|
||
/// Returns the row at the specific index, or None if the index is | ||
/// out of bounds. | ||
pub fn row(&self, idx: usize) -> Option<RowMap> { | ||
self.0.get(idx).cloned() | ||
} | ||
|
||
/// The number of rows in the RowMapBatch. | ||
pub fn len(&self) -> usize { | ||
self.0.len() | ||
} | ||
|
||
pub fn is_empty(&self) -> bool { | ||
self.0.is_empty() | ||
} | ||
} | ||
|
||
impl RecordStream { | ||
// Collects all of the record batches in a stream, aborting if | ||
// there are any errors. | ||
|
@@ -320,6 +391,13 @@ impl RecordStream { | |
stream.try_collect().await | ||
} | ||
|
||
/// Collects all of the record batches and rotates the results for | ||
/// a map-based row-oriented format. | ||
pub async fn to_rows(&mut self) -> Result<Vec<RowMapBatch>, DataFusionError> { | ||
let stream = &mut self.0; | ||
stream.map(RowMapBatch::try_from).try_collect().await | ||
} | ||
|
||
// Iterates through the stream, ensuring propagating any errors, | ||
// but discarding all of the data. | ||
pub async fn check(&mut self) -> Result<(), DataFusionError> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a quick comment this should be used sparingly/for tests.
Also, we could use an
Arc<str>
instead ofString
for the key if we want to keep the size down.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arc str!
My other theory was to just use the bson library, but the bson value enum has fewer types than the scalar value type, so avoiding more downcasting than necessary seems good