Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):

# Smoothing factor for the asymmetric EWMA (slow fall, faster rise).
EWMA_ALPHA = env_float("RAY_DATA_CONCURRENCY_CAP_EWMA_ALPHA", 0.2)
EWMA_ALPHA_UP = 1.0 - (1.0 - EWMA_ALPHA) ** 2 # fast rise
# Deadband width in units of the EWMA absolute deviation estimate.
K_DEV = env_float("RAY_DATA_CONCURRENCY_CAP_K_DEV", 2.0)
# Factor to back off when the queue is too large.
BACKOFF_FACTOR = env_float("RAY_DATA_CONCURRENCY_CAP_BACKOFF_FACTOR", 1)
# Factor to ramp up when the queue is too small.
RAMPUP_FACTOR = env_float("RAY_DATA_CONCURRENCY_CAP_RAMPUP_FACTOR", 1)
# Threshold for per-Op object store budget (available) vs total usage (used)
# (available / used) ratio to enable dynamic output queue size backpressure.
OBJECT_STORE_USAGE_RATIO = env_float(
"RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_USAGE_RATIO", 0.1
# Threshold for per-Op object store budget (available) vs total
# (available / total) ratio to enable dynamic output queue size backpressure.
OBJECT_STORE_BUDGET_RATIO = env_float(
"RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_BUDGET_RATIO", 0.1
)

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(self, *args, **kwargs):
dynamic_output_queue_size_backpressure_configs = (
f", EWMA_ALPHA={self.EWMA_ALPHA}, K_DEV={self.K_DEV}, "
f"BACKOFF_FACTOR={self.BACKOFF_FACTOR}, RAMPUP_FACTOR={self.RAMPUP_FACTOR}, "
f"OBJECT_STORE_USAGE_RATIO={self.OBJECT_STORE_USAGE_RATIO}"
f"OBJECT_STORE_BUDGET_RATIO={self.OBJECT_STORE_BUDGET_RATIO}"
)
logger.debug(
f"ConcurrencyCapBackpressurePolicy caps: {self._concurrency_caps}, "
Expand All @@ -112,8 +113,8 @@ def _update_ewma_asymmetric(self, prev_value: float, sample: float) -> float:
if prev_value <= 0:
return sample

alpha_up = 1.0 - (1.0 - self.EWMA_ALPHA) ** 2 # fast rise
alpha = alpha_up if sample > prev_value else self.EWMA_ALPHA # slow fall
# fast rise if sample > prev_value, slow fall otherwise
alpha = self.EWMA_ALPHA_UP if sample > prev_value else self.EWMA_ALPHA
return (1 - alpha) * prev_value + alpha * sample

def _update_level_and_dev(self, op: "PhysicalOperator", q_bytes: int) -> None:
Expand Down Expand Up @@ -147,21 +148,17 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
):
return num_tasks_running < self._concurrency_caps[op]

# For this Op, if the objectstore budget (available) to total usage (used)
# For this Op, if the objectstore budget (available) to total
# ratio is below threshold (10%), skip dynamic output queue size backpressure.
op_usage = self._resource_manager.get_op_usage(op)
op_budget = self._resource_manager.get_budget(op)
if (
op_usage is not None
and op_budget is not None
and op_budget.object_store_memory > 0
and op_usage.object_store_memory > 0
):
if (
op_budget.object_store_memory / op_usage.object_store_memory
> self.OBJECT_STORE_USAGE_RATIO
if op_usage is not None and op_budget is not None:
total_mem = op_usage.object_store_memory + op_budget.object_store_memory
if total_mem == 0 or (
op_budget.object_store_memory / total_mem
> self.OBJECT_STORE_BUDGET_RATIO
):
# If the objectstore budget (available) to total usage (used)
# If the objectstore budget (available) to total
# ratio is above threshold (10%), skip dynamic output queue size
# backpressure, but still enforce the configured cap.
return num_tasks_running < self._concurrency_caps[op]
Expand All @@ -175,7 +172,7 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:
)

# Update EWMA state (level & dev) and compute effective cap. Note that
# we don't update the EWMA state if the objectstore budget (available) vs total usage (used)
# we don't update the EWMA state if the objectstore budget (available) vs total
# ratio is above threshold (10%), because the level and dev adjusts quickly.
self._update_level_and_dev(op, current_queue_size_bytes)
effective_cap = self._effective_cap(
Expand Down
58 changes: 45 additions & 13 deletions python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,16 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
mock_resource_manager = MagicMock()

# Mock object store memory usage ratio above threshold
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO
# Ratio = budget / (usage + budget) > OBJECT_STORE_BUDGET_RATIO
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO
mock_usage = MagicMock()
mock_usage.object_store_memory = 1000 # usage
mock_budget = MagicMock()
# Calculate budget so ratio > threshold
# budget / (usage + budget) > threshold
# budget > threshold * usage / (1 - threshold)
mock_budget.object_store_memory = int(
1000 * (threshold + 0.1)
threshold * 1000 / (1 - threshold) + 1
) # budget above threshold

mock_resource_manager.get_op_usage.return_value = mock_usage
Expand All @@ -211,11 +215,23 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
)
policy.enable_dynamic_output_queue_size_backpressure = True

# Initialize EWMA state to verify it's not updated when ratio > threshold
initial_level = 100.0
initial_dev = 20.0
policy._q_level_nbytes[map_op] = initial_level
policy._q_level_dev[map_op] = initial_dev

# Should skip dynamic backpressure and use basic cap check
# EWMA state should not be updated (early return)
self.assertTrue(policy.can_add_input(map_op)) # 3 < 5
self.assertEqual(policy._q_level_nbytes[map_op], initial_level)
self.assertEqual(policy._q_level_dev[map_op], initial_dev)

map_op.metrics.num_tasks_running = 5
self.assertFalse(policy.can_add_input(map_op)) # 5 >= 5
# EWMA state should still not be updated
self.assertEqual(policy._q_level_nbytes[map_op], initial_level)
self.assertEqual(policy._q_level_dev[map_op], initial_dev)

def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self):
"""Test can_add_input when object store memory usage ratio is below threshold."""
Expand All @@ -233,12 +249,16 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
mock_resource_manager = MagicMock()

# Mock object store memory usage ratio below threshold
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO
# Ratio = budget / (usage + budget) < OBJECT_STORE_BUDGET_RATIO
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO
mock_usage = MagicMock()
mock_usage.object_store_memory = 1000 # usage
mock_budget = MagicMock()
mock_budget.object_store_memory = int(
1000 * (threshold - 0.05)
# Calculate budget so ratio < threshold
# budget / (usage + budget) < threshold
# budget < threshold * usage / (1 - threshold)
mock_budget.object_store_memory = max(
0, int(threshold * 1000 / (1 - threshold) - 1)
) # below threshold

mock_resource_manager.get_op_usage.return_value = mock_usage
Expand All @@ -258,14 +278,23 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
policy.enable_dynamic_output_queue_size_backpressure = True

# Should proceed with dynamic backpressure logic
# Initialize EWMA state for the operator
policy._q_level_nbytes[map_op] = 300.0
policy._q_level_dev[map_op] = 50.0
# Initialize EWMA state for the operator with a different level
# so we can verify the update happens (queue size is 300)
initial_level = 200.0
initial_dev = 50.0
policy._q_level_nbytes[map_op] = initial_level
policy._q_level_dev[map_op] = initial_dev

result = policy.can_add_input(map_op)
# With queue size 300 in hold region (level=300, dev=50, bounds=[200, 400]),
# should hold current level, so running=3 < effective_cap=3 should be False
# With queue size 300, initial level=200, dev=50, bounds=[100, 300]
# Queue size 300 is at the upper bound, so should hold.
# running=3 < effective_cap=3 should be False
self.assertFalse(result)
# EWMA state should be updated when ratio < threshold
# Level should move toward 300 (queue size)
self.assertNotEqual(policy._q_level_nbytes[map_op], initial_level)
# Dev should also be updated
self.assertNotEqual(policy._q_level_dev[map_op], initial_dev)

def test_can_add_input_effective_cap_calculation(self):
"""Test that effective cap calculation works correctly with different queue sizes."""
Expand All @@ -281,12 +310,15 @@ def test_can_add_input_effective_cap_calculation(self):
topology = {map_op: MagicMock(), input_op: MagicMock()}

mock_resource_manager = MagicMock()
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_USAGE_RATIO
threshold = ConcurrencyCapBackpressurePolicy.OBJECT_STORE_BUDGET_RATIO
mock_usage = MagicMock()
mock_usage.object_store_memory = 1000
mock_budget = MagicMock()
mock_budget.object_store_memory = int(
1000 * (threshold - 0.05)
# Calculate budget so ratio < threshold
# budget / (usage + budget) < threshold
# budget < threshold * usage / (1 - threshold)
mock_budget.object_store_memory = max(
0, int(threshold * 1000 / (1 - threshold) - 1)
) # below threshold

mock_resource_manager.get_op_usage.return_value = mock_usage
Expand Down