@@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
4343 /// Execution metrics
4444 metrics : ExecutionPlanMetricsSet ,
4545 cache : PlanProperties ,
46+ /// Optional number of rows to fetch. Stops producing rows after this fetch
47+ pub ( crate ) fetch : Option < usize > ,
4648}
4749
4850impl CoalescePartitionsExec {
@@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
5355 input,
5456 metrics : ExecutionPlanMetricsSet :: new ( ) ,
5557 cache,
58+ fetch : None ,
5659 }
5760 }
5861
@@ -82,9 +85,12 @@ impl DisplayAs for CoalescePartitionsExec {
8285 f : & mut std:: fmt:: Formatter ,
8386 ) -> std:: fmt:: Result {
8487 match t {
85- DisplayFormatType :: Default | DisplayFormatType :: Verbose => {
86- write ! ( f, "CoalescePartitionsExec" )
87- }
88+ DisplayFormatType :: Default | DisplayFormatType :: Verbose => match self . fetch {
89+ Some ( fetch) => {
90+ write ! ( f, "CoalescePartitionsExec: fetch={fetch}" )
91+ }
92+ None => write ! ( f, "CoalescePartitionsExec" ) ,
93+ } ,
8894 }
8995 }
9096}
@@ -115,9 +121,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
115121 self : Arc < Self > ,
116122 children : Vec < Arc < dyn ExecutionPlan > > ,
117123 ) -> Result < Arc < dyn ExecutionPlan > > {
118- Ok ( Arc :: new ( CoalescePartitionsExec :: new ( Arc :: clone (
119- & children [ 0 ] ,
120- ) ) ) )
124+ let mut plan = CoalescePartitionsExec :: new ( Arc :: clone ( & children [ 0 ] ) ) ;
125+ plan . fetch = self . fetch ;
126+ Ok ( Arc :: new ( plan ) )
121127 }
122128
123129 fn execute (
@@ -163,7 +169,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
163169 }
164170
165171 let stream = builder. build ( ) ;
166- Ok ( Box :: pin ( ObservedStream :: new ( stream, baseline_metrics) ) )
172+ Ok ( Box :: pin ( ObservedStream :: new (
173+ stream,
174+ baseline_metrics,
175+ self . fetch ,
176+ ) ) )
167177 }
168178 }
169179 }
@@ -173,7 +183,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
173183 }
174184
175185 fn statistics ( & self ) -> Result < Statistics > {
176- self . input . statistics ( )
186+ Statistics :: with_fetch ( self . input . statistics ( ) ? , self . schema ( ) , self . fetch , 0 , 1 )
177187 }
178188
179189 fn supports_limit_pushdown ( & self ) -> bool {
@@ -183,6 +193,19 @@ impl ExecutionPlan for CoalescePartitionsExec {
183193 fn cardinality_effect ( & self ) -> CardinalityEffect {
184194 CardinalityEffect :: Equal
185195 }
196+
197+ fn fetch ( & self ) -> Option < usize > {
198+ self . fetch
199+ }
200+
201+ fn with_fetch ( & self , limit : Option < usize > ) -> Option < Arc < dyn ExecutionPlan > > {
202+ Some ( Arc :: new ( CoalescePartitionsExec {
203+ input : Arc :: clone ( & self . input ) ,
204+ fetch : limit,
205+ metrics : self . metrics . clone ( ) ,
206+ cache : self . cache . clone ( ) ,
207+ } ) )
208+ }
186209}
187210
188211#[ cfg( test) ]
0 commit comments