Skip to content

Commit c5fb605

Browse files
authored
Move generate_series projection logic into LazyMemoryStream (#18373)
## Which issue does this PR close? - None, This is a follow-up for #18298 ## Rationale for this change This moves the projection logic from `generate_series` out of the generator into `LazyMemoryStream` as discussed in #18298 (comment) This makes the projection logic generic for all generators. ## What changes are included in this PR? The projection logic is moved from `generate_series` into the `LazyMemoryStream` and relevant tests, where `LazyMemoryStream` is used, are adapted accordingly. ## Are these changes tested? This is only a small refactoring; the changes are covered by the tests from #18298 ## Are there any user-facing changes? There is a new parameter added to LazyMemoryExec::try_new method
1 parent db5f47c commit c5fb605

File tree

3 files changed

+36
-20
lines changed

3 files changed

+36
-20
lines changed

datafusion/functions-table/src/generate_series.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ impl GenerateSeriesTable {
237237
pub fn as_generator(
238238
&self,
239239
batch_size: usize,
240-
projection: Option<Vec<usize>>,
241240
) -> Result<Arc<RwLock<dyn LazyBatchGenerator>>> {
242241
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = match &self.args {
243242
GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })),
@@ -256,7 +255,6 @@ impl GenerateSeriesTable {
256255
batch_size,
257256
include_end: *include_end,
258257
name,
259-
projection,
260258
})),
261259
GenSeriesArgs::TimestampArgs {
262260
start,
@@ -297,7 +295,6 @@ impl GenerateSeriesTable {
297295
batch_size,
298296
include_end: *include_end,
299297
name,
300-
projection,
301298
}))
302299
}
303300
GenSeriesArgs::DateArgs {
@@ -327,7 +324,6 @@ impl GenerateSeriesTable {
327324
batch_size,
328325
include_end: *include_end,
329326
name,
330-
projection,
331327
})),
332328
};
333329

@@ -345,7 +341,6 @@ pub struct GenericSeriesState<T: SeriesValue> {
345341
current: T,
346342
include_end: bool,
347343
name: &'static str,
348-
projection: Option<Vec<usize>>,
349344
}
350345

351346
impl<T: SeriesValue> GenericSeriesState<T> {
@@ -401,11 +396,7 @@ impl<T: SeriesValue> LazyBatchGenerator for GenericSeriesState<T> {
401396

402397
let array = self.current.create_array(buf)?;
403398
let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?;
404-
let projected = match self.projection.as_ref() {
405-
Some(projection) => batch.project(projection)?,
406-
None => batch,
407-
};
408-
Ok(Some(projected))
399+
Ok(Some(batch))
409400
}
410401
}
411402

@@ -481,14 +472,12 @@ impl TableProvider for GenerateSeriesTable {
481472
_limit: Option<usize>,
482473
) -> Result<Arc<dyn ExecutionPlan>> {
483474
let batch_size = state.config_options().execution.batch_size;
484-
let schema = match projection {
485-
Some(projection) => Arc::new(self.schema.project(projection)?),
486-
None => self.schema(),
487-
};
488-
489-
let generator = self.as_generator(batch_size, projection.cloned())?;
475+
let generator = self.as_generator(batch_size)?;
490476

491-
Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
477+
Ok(Arc::new(
478+
LazyMemoryExec::try_new(self.schema(), vec![generator])?
479+
.with_projection(projection.cloned()),
480+
))
492481
}
493482
}
494483

datafusion/physical-plan/src/memory.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
153153
pub struct LazyMemoryExec {
154154
/// Schema representing the data
155155
schema: SchemaRef,
156+
/// Optional projection for which columns to load
157+
projection: Option<Vec<usize>>,
156158
/// Functions to generate batches for each partition
157159
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
158160
/// Plan properties cache storing equivalence properties, partitioning, and execution mode
@@ -199,12 +201,28 @@ impl LazyMemoryExec {
199201

200202
Ok(Self {
201203
schema,
204+
projection: None,
202205
batch_generators: generators,
203206
cache,
204207
metrics: ExecutionPlanMetricsSet::new(),
205208
})
206209
}
207210

211+
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
212+
match projection.as_ref() {
213+
Some(columns) => {
214+
let projected = Arc::new(self.schema.project(columns).unwrap());
215+
self.cache = self.cache.with_eq_properties(EquivalenceProperties::new(
216+
Arc::clone(&projected),
217+
));
218+
self.schema = projected;
219+
self.projection = projection;
220+
self
221+
}
222+
_ => self,
223+
}
224+
}
225+
208226
pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> {
209227
if partitioning.partition_count() != self.batch_generators.len() {
210228
internal_err!(
@@ -320,6 +338,7 @@ impl ExecutionPlan for LazyMemoryExec {
320338

321339
let stream = LazyMemoryStream {
322340
schema: Arc::clone(&self.schema),
341+
projection: self.projection.clone(),
323342
generator: Arc::clone(&self.batch_generators[partition]),
324343
baseline_metrics,
325344
};
@@ -338,6 +357,8 @@ impl ExecutionPlan for LazyMemoryExec {
338357
/// Stream that generates record batches on demand
339358
pub struct LazyMemoryStream {
340359
schema: SchemaRef,
360+
/// Optional projection for which columns to load
361+
projection: Option<Vec<usize>>,
341362
/// Generator to produce batches
342363
///
343364
/// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream
@@ -361,7 +382,14 @@ impl Stream for LazyMemoryStream {
361382
let batch = self.generator.write().generate_next_batch();
362383

363384
let poll = match batch {
364-
Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
385+
Ok(Some(batch)) => {
386+
// return just the columns requested
387+
let batch = match self.projection.as_ref() {
388+
Some(columns) => batch.project(columns)?,
389+
None => batch,
390+
};
391+
Poll::Ready(Some(Ok(batch)))
392+
}
365393
Ok(None) => Poll::Ready(None),
366394
Err(e) => Poll::Ready(Some(Err(e))),
367395
};

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,8 +1940,7 @@ impl protobuf::PhysicalPlanNode {
19401940
};
19411941

19421942
let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1943-
let generator =
1944-
table.as_generator(generate_series.target_batch_size as usize, None)?;
1943+
let generator = table.as_generator(generate_series.target_batch_size as usize)?;
19451944

19461945
Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
19471946
}

0 commit comments

Comments
 (0)