diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index c7e10da61c73..d0ea76e259e9 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -42,16 +42,17 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): # Smoothing factor for the asymmetric EWMA (slow fall, faster rise). EWMA_ALPHA = env_float("RAY_DATA_CONCURRENCY_CAP_EWMA_ALPHA", 0.2) + EWMA_ALPHA_UP = 1.0 - (1.0 - EWMA_ALPHA) ** 2 # fast rise # Deadband width in units of the EWMA absolute deviation estimate. K_DEV = env_float("RAY_DATA_CONCURRENCY_CAP_K_DEV", 2.0) # Factor to back off when the queue is too large. BACKOFF_FACTOR = env_float("RAY_DATA_CONCURRENCY_CAP_BACKOFF_FACTOR", 1) # Factor to ramp up when the queue is too small. RAMPUP_FACTOR = env_float("RAY_DATA_CONCURRENCY_CAP_RAMPUP_FACTOR", 1) - # Threshold for per-Op object store budget (available) vs total usage (used) - # (available / used) ratio to enable dynamic output queue size backpressure. - OBJECT_STORE_USAGE_RATIO = env_float( - "RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_USAGE_RATIO", 0.1 + # Threshold for per-Op object store budget (available) vs total + # (available / total) ratio to enable dynamic output queue size backpressure. + OBJECT_STORE_BUDGET_RATIO = env_float( + "RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_BUDGET_RATIO", 0.1 ) def __init__(self, *args, **kwargs): @@ -92,7 +93,7 @@ def __init__(self, *args, **kwargs): dynamic_output_queue_size_backpressure_configs = ( f", EWMA_ALPHA={self.EWMA_ALPHA}, K_DEV={self.K_DEV}, " f"BACKOFF_FACTOR={self.BACKOFF_FACTOR}, RAMPUP_FACTOR={self.RAMPUP_FACTOR}, " - f"OBJECT_STORE_USAGE_RATIO={self.OBJECT_STORE_USAGE_RATIO}" + f"OBJECT_STORE_BUDGET_RATIO={self.OBJECT_STORE_BUDGET_RATIO}" ) logger.debug( f"ConcurrencyCapBackpressurePolicy caps: {self._concurrency_caps}, " @@ -112,8 +113,8 @@ def _update_ewma_asymmetric(self, prev_value: float, sample: float) -> float: if prev_value <= 0: return sample - alpha_up = 1.0 - (1.0 - self.EWMA_ALPHA) ** 2 # fast rise - alpha = alpha_up if sample > prev_value else self.EWMA_ALPHA # slow fall + # fast rise if sample > prev_value, slow fall otherwise + alpha = self.EWMA_ALPHA_UP if sample > prev_value else self.EWMA_ALPHA return (1 - alpha) * prev_value + alpha * sample def _update_level_and_dev(self, op: "PhysicalOperator", q_bytes: int) -> None: @@ -147,21 +148,17 @@ def can_add_input(self, op: "PhysicalOperator") -> bool: ): return num_tasks_running < self._concurrency_caps[op] - # For this Op, if the objectstore budget (available) to total usage (used) + # For this Op, if the objectstore budget (available) to total # ratio is below threshold (10%), skip dynamic output queue size backpressure. op_usage = self._resource_manager.get_op_usage(op) op_budget = self._resource_manager.get_budget(op) - if ( - op_usage is not None - and op_budget is not None - and op_budget.object_store_memory > 0 - and op_usage.object_store_memory > 0 - ): - if ( - op_budget.object_store_memory / op_usage.object_store_memory - > self.OBJECT_STORE_USAGE_RATIO + if op_usage is not None and op_budget is not None: + total_mem = op_usage.object_store_memory + op_budget.object_store_memory + if total_mem == 0 or ( + op_budget.object_store_memory / total_mem + > self.OBJECT_STORE_BUDGET_RATIO ): - # If the objectstore budget (available) to total usage (used) + # If the objectstore budget (available) to total # ratio is above threshold (10%), skip dynamic output queue size # backpressure, but still enforce the configured cap. return num_tasks_running < self._concurrency_caps[op] @@ -175,7 +172,7 @@ def can_add_input(self, op: "PhysicalOperator") -> bool: ) # Update EWMA state (level & dev) and compute effective cap. Note that - # we don't update the EWMA state if the objectstore budget (available) vs total usage (used) + # we don't update the EWMA state if the objectstore budget (available) vs total # ratio is above threshold (10%), because the level and dev adjusts quickly. self._update_level_and_dev(op, current_queue_size_bytes) effective_cap = self._effective_cap( diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index e1372158ad51..f71bdde27ecd 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -193,12 +193,16 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self mock_resource_manager = MagicMock() # Mock object store memory usage ratio above threshold - threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO + # Ratio = budget / (usage + budget) > OBJECT_STORE_BUDGET_RATIO + threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO mock_usage = MagicMock() mock_usage.object_store_memory = 1000 # usage mock_budget = MagicMock() + # Calculate budget so ratio > threshold + # budget / (usage + budget) > threshold + # budget > threshold * usage / (1 - threshold) mock_budget.object_store_memory = int( - 1000 * (threshold + 0.1) + threshold * 1000 / (1 - threshold) + 1 ) # budget above threshold mock_resource_manager.get_op_usage.return_value = mock_usage @@ -211,11 +215,23 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self ) policy.enable_dynamic_output_queue_size_backpressure = True + # Initialize EWMA state to verify it's not updated when ratio > threshold + initial_level = 100.0 + initial_dev = 20.0 + policy._q_level_nbytes[map_op] = initial_level + policy._q_level_dev[map_op] = initial_dev + # Should skip dynamic backpressure and use basic cap check + # EWMA state should not be updated (early return) self.assertTrue(policy.can_add_input(map_op)) # 3 < 5 + self.assertEqual(policy._q_level_nbytes[map_op], initial_level) + self.assertEqual(policy._q_level_dev[map_op], initial_dev) map_op.metrics.num_tasks_running = 5 self.assertFalse(policy.can_add_input(map_op)) # 5 >= 5 + # EWMA state should still not be updated + self.assertEqual(policy._q_level_nbytes[map_op], initial_level) + self.assertEqual(policy._q_level_dev[map_op], initial_dev) def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self): """Test can_add_input when object store memory usage ratio is below threshold.""" @@ -233,12 +249,16 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self mock_resource_manager = MagicMock() # Mock object store memory usage ratio below threshold - threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO + # Ratio = budget / (usage + budget) < OBJECT_STORE_BUDGET_RATIO + threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO mock_usage = MagicMock() mock_usage.object_store_memory = 1000 # usage mock_budget = MagicMock() - mock_budget.object_store_memory = int( - 1000 * (threshold - 0.05) + # Calculate budget so ratio < threshold + # budget / (usage + budget) < threshold + # budget < threshold * usage / (1 - threshold) + mock_budget.object_store_memory = max( + 0, int(threshold * 1000 / (1 - threshold) - 1) ) # below threshold mock_resource_manager.get_op_usage.return_value = mock_usage @@ -258,14 +278,23 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self policy.enable_dynamic_output_queue_size_backpressure = True # Should proceed with dynamic backpressure logic - # Initialize EWMA state for the operator - policy._q_level_nbytes[map_op] = 300.0 - policy._q_level_dev[map_op] = 50.0 + # Initialize EWMA state for the operator with a different level + # so we can verify the update happens (queue size is 300) + initial_level = 200.0 + initial_dev = 50.0 + policy._q_level_nbytes[map_op] = initial_level + policy._q_level_dev[map_op] = initial_dev result = policy.can_add_input(map_op) - # With queue size 300 in hold region (level=300, dev=50, bounds=[200, 400]), - # should hold current level, so running=3 < effective_cap=3 should be False + # With queue size 300, initial level=200, dev=50, bounds=[100, 300] + # Queue size 300 is at the upper bound, so should hold. + # running=3 < effective_cap=3 should be False self.assertFalse(result) + # EWMA state should be updated when ratio < threshold + # Level should move toward 300 (queue size) + self.assertNotEqual(policy._q_level_nbytes[map_op], initial_level) + # Dev should also be updated + self.assertNotEqual(policy._q_level_dev[map_op], initial_dev) def test_can_add_input_effective_cap_calculation(self): """Test that effective cap calculation works correctly with different queue sizes.""" @@ -281,12 +310,15 @@ def test_can_add_input_effective_cap_calculation(self): topology = {map_op: MagicMock(), input_op: MagicMock()} mock_resource_manager = MagicMock() - threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO + threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO mock_usage = MagicMock() mock_usage.object_store_memory = 1000 mock_budget = MagicMock() - mock_budget.object_store_memory = int( - 1000 * (threshold - 0.05) + # Calculate budget so ratio < threshold + # budget / (usage + budget) < threshold + # budget < threshold * usage / (1 - threshold) + mock_budget.object_store_memory = max( + 0, int(threshold * 1000 / (1 - threshold) - 1) ) # below threshold mock_resource_manager.get_op_usage.return_value = mock_usage