@@ -330,6 +330,67 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
330330 return None
331331 return self ._op_resource_allocator .get_budget (op )
332332
333+ def is_op_eligible (self , op : PhysicalOperator ) -> bool :
334+ """Whether the op is eligible for memory reservation."""
335+ return (
336+ not op .throttling_disabled ()
337+ # As long as the op has finished execution, even if there are still
338+ # non-taken outputs, we don't need to allocate resources for it.
339+ and not op .execution_finished ()
340+ )
341+
342+ def get_eligible_ops (self ) -> List [PhysicalOperator ]:
343+ return [op for op in self ._topology if self .is_op_eligible (op )]
344+
345+ def get_downstream_ineligible_ops (
346+ self , op : PhysicalOperator
347+ ) -> Iterable [PhysicalOperator ]:
348+ """Get the downstream ineligible operators of the given operator.
349+
350+ E.g.,
351+ - "cur_map->downstream_map" will return an empty list.
352+ - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
353+ """
354+ for next_op in op .output_dependencies :
355+ if not self .is_op_eligible (next_op ):
356+ yield next_op
357+ yield from self .get_downstream_ineligible_ops (next_op )
358+
359+ def get_downstream_eligible_ops (
360+ self , op : PhysicalOperator
361+ ) -> Iterable [PhysicalOperator ]:
362+ """Get the downstream eligible operators of the given operator, ignoring
363+ intermediate ineligible operators.
364+
365+ E.g.,
366+ - "cur_map->downstream_map" will return [downstream_map].
367+ - "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
368+ """
369+ for next_op in op .output_dependencies :
370+ if self .is_op_eligible (next_op ):
371+ yield next_op
372+ else :
373+ yield from self .get_downstream_eligible_ops (next_op )
374+
375+ def get_op_outputs_object_store_usage_with_downstream (
376+ self , op : PhysicalOperator
377+ ) -> int :
378+ """Get the outputs memory usage of the given operator, including the downstream
379+ ineligible operators.
380+ """
381+ # Outputs usage of the current operator.
382+ op_outputs_usage = self ._mem_op_outputs [op ]
383+ # Also account the downstream ineligible operators' memory usage.
384+ op_outputs_usage += sum (
385+ self .get_op_usage (next_op ).object_store_memory
386+ for next_op in self .get_downstream_ineligible_ops (op )
387+ )
388+ return op_outputs_usage
389+
390+ def get_op_internal_object_store_usage (self , op : PhysicalOperator ) -> int :
391+ """Get the internal object store memory usage of the given operator"""
392+ return self ._mem_op_internal [op ]
393+
333394
334395class OpResourceAllocator (ABC ):
335396 """An interface for dynamic operator resource allocation.
@@ -479,20 +540,6 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
479540
480541 self ._idle_detector = self .IdleDetector ()
481542
482- def _is_op_eligible (self , op : PhysicalOperator ) -> bool :
483- """Whether the op is eligible for memory reservation."""
484- return (
485- not op .throttling_disabled ()
486- # As long as the op has finished execution, even if there are still
487- # non-taken outputs, we don't need to allocate resources for it.
488- and not op .execution_finished ()
489- )
490-
491- def _get_eligible_ops (self ) -> List [PhysicalOperator ]:
492- return [
493- op for op in self ._resource_manager ._topology if self ._is_op_eligible (op )
494- ]
495-
496543 def _get_ineligible_ops_with_usage (self ) -> List [PhysicalOperator ]:
497544 """
498545 Resource reservation is based on the number of eligible operators.
@@ -519,14 +566,14 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
519566 # filter out downstream ineligible operators since they are omitted from reservation calculations.
520567 for op in last_completed_ops :
521568 ops_to_exclude_from_reservation .extend (
522- list (self ._get_downstream_ineligible_ops (op ))
569+ list (self ._resource_manager . get_downstream_ineligible_ops (op ))
523570 )
524571 ops_to_exclude_from_reservation .append (op )
525572 return list (set (ops_to_exclude_from_reservation ))
526573
527574 def _update_reservation (self ):
528575 global_limits = self ._resource_manager .get_global_limits ().copy ()
529- eligible_ops = self ._get_eligible_ops ()
576+ eligible_ops = self ._resource_manager . get_eligible_ops ()
530577
531578 self ._op_reserved .clear ()
532579 self ._reserved_for_op_outputs .clear ()
@@ -610,7 +657,7 @@ def _should_unblock_streaming_output_backpressure(
610657 # launch tasks. Then we should temporarily unblock the streaming output
611658 # backpressure by allowing reading at least 1 block. So the current operator
612659 # can finish at least one task and yield resources to the downstream operators.
613- for next_op in self ._get_downstream_eligible_ops (op ):
660+ for next_op in self ._resource_manager . get_downstream_eligible_ops (op ):
614661 if not self ._reserved_min_resources [next_op ]:
615662 # Case 1: the downstream operator hasn't reserved the minimum resources
616663 # to run at least one task.
@@ -623,25 +670,14 @@ def _should_unblock_streaming_output_backpressure(
623670 return True
624671 return False
625672
626- def _get_op_outputs_usage_with_downstream (self , op : PhysicalOperator ) -> float :
627- """Get the outputs memory usage of the given operator, including the downstream
628- ineligible operators.
629- """
630- # Outputs usage of the current operator.
631- op_outputs_usage = self ._resource_manager ._mem_op_outputs [op ]
632- # Also account the downstream ineligible operators' memory usage.
633- op_outputs_usage += sum (
634- self ._resource_manager .get_op_usage (next_op ).object_store_memory
635- for next_op in self ._get_downstream_ineligible_ops (op )
636- )
637- return op_outputs_usage
638-
639673 def max_task_output_bytes_to_read (self , op : PhysicalOperator ) -> Optional [int ]:
640674 if op not in self ._op_budgets :
641675 return None
642676 res = self ._op_budgets [op ].object_store_memory
643677 # Add the remaining of `_reserved_for_op_outputs`.
644- op_outputs_usage = self ._get_op_outputs_usage_with_downstream (op )
678+ op_outputs_usage = (
679+ self ._resource_manager .get_op_outputs_object_store_usage_with_downstream (op )
680+ )
645681 res += max (self ._reserved_for_op_outputs [op ] - op_outputs_usage , 0 )
646682 if math .isinf (res ):
647683 self ._output_budgets [op ] = res
@@ -654,41 +690,11 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
654690 self ._output_budgets [op ] = res
655691 return res
656692
657- def _get_downstream_ineligible_ops (
658- self , op : PhysicalOperator
659- ) -> Iterable [PhysicalOperator ]:
660- """Get the downstream ineligible operators of the given operator.
661-
662- E.g.,
663- - "cur_map->downstream_map" will return an empty list.
664- - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
665- """
666- for next_op in op .output_dependencies :
667- if not self ._is_op_eligible (next_op ):
668- yield next_op
669- yield from self ._get_downstream_ineligible_ops (next_op )
670-
671- def _get_downstream_eligible_ops (
672- self , op : PhysicalOperator
673- ) -> Iterable [PhysicalOperator ]:
674- """Get the downstream eligible operators of the given operator, ignoring
675- intermediate ineligible operators.
676-
677- E.g.,
678- - "cur_map->downstream_map" will return [downstream_map].
679- - "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
680- """
681- for next_op in op .output_dependencies :
682- if self ._is_op_eligible (next_op ):
683- yield next_op
684- else :
685- yield from self ._get_downstream_eligible_ops (next_op )
686-
687693 def update_usages (self ):
688694 self ._update_reservation ()
689695
690696 self ._op_budgets .clear ()
691- eligible_ops = self ._get_eligible_ops ()
697+ eligible_ops = self ._resource_manager . get_eligible_ops ()
692698 if len (eligible_ops ) == 0 :
693699 return
694700
@@ -699,10 +705,14 @@ def update_usages(self):
699705 op_mem_usage = 0
700706 # Add the memory usage of the operator itself,
701707 # excluding `_reserved_for_op_outputs`.
702- op_mem_usage += self ._resource_manager ._mem_op_internal [op ]
708+ op_mem_usage += self ._resource_manager .get_op_internal_object_store_usage (
709+ op
710+ )
703711 # Add the portion of op outputs usage that has
704712 # exceeded `_reserved_for_op_outputs`.
705- op_outputs_usage = self ._get_op_outputs_usage_with_downstream (op )
713+ op_outputs_usage = self ._resource_manager .get_op_outputs_object_store_usage_with_downstream (
714+ op
715+ )
706716 op_mem_usage += max (op_outputs_usage - self ._reserved_for_op_outputs [op ], 0 )
707717 op_usage = self ._resource_manager .get_op_usage (op ).copy (
708718 object_store_memory = op_mem_usage
0 commit comments