Skip to content

Commit 4ba962c

Browse files
authored
[Data] ConcurrencyCapBackpressurePolicy - Only increase threshold (#58023)
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. > ⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold When `_update_queue_threshold` to adjust the queue threshold to cap concurrency based on current queued bytes, - Only allow increasing the threshold or maintaining it. - Cannot decrease threshold because the steady state of queued bytes is not known. ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
1 parent 244caa8 commit 4ba962c

File tree

2 files changed

+52
-87
lines changed

2 files changed

+52
-87
lines changed

python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,7 @@ def _update_queue_threshold(
208208
Motivation: Adaptive thresholds prevent both over-throttling (too aggressive) and
209209
under-throttling (too permissive). The logic balances responsiveness with stability:
210210
- Fast upward response to pressure spikes (immediate threshold increase)
211-
- Gradual downward response to prevent oscillation (EWMA smoothing)
212-
- Complete reset when idle (threshold = 0) to avoid stuck high thresholds
211+
- Thresholds only increase, never decrease, since we don't know if low pressure is steady state
213212
214213
Args:
215214
op: Operator whose threshold is being updated.
@@ -219,56 +218,23 @@ def _update_queue_threshold(
219218
The updated threshold in bytes.
220219
221220
Examples:
222-
# Example 1: First sample (bootstrap)
221+
# Bootstrap: first sample sets threshold
223222
# Input: current_queue_size_bytes = 1000, level_prev = 0, dev_prev = 0
224-
# EWMA: level = 1000, dev = 0 (first sample)
225-
# Base: 1000 + 4*0 = 1000
226-
# Threshold: max(1000, 1000) = 1000
227-
# prev_threshold = 0, threshold = 1000
223+
# EWMA: level = 1000, dev = 0
224+
# Base: 1000 + 4*0 = 1000, Threshold: max(1000, 1000) = 1000
228225
# Result: 1000 (bootstrap)
229226
230-
# Example 2: Upward adjustment (immediate)
227+
# Pressure increase: threshold updated immediately
231228
# Input: current_queue_size_bytes = 1500, level_prev = 1000, dev_prev = 100
232-
# EWMA: level = 1000 + 0.2*(1500-1000) = 1100, dev = 100 + 0.2*(500-100) = 180
233-
# Base: 1100 + 4*180 = 1820
234-
# Threshold: max(1820, 1500) = 1820
235-
# prev_threshold = 1000, threshold = 1820
236-
# Result: 1820 (immediate upward)
229+
# EWMA: level = 1100, dev = 180, Base: 1100 + 4*180 = 1820
230+
# Threshold: max(1820, 1500) = 1820, prev_threshold = 1000
231+
# Result: 1820 (threshold increased)
237232
238-
# Example 3: Downward adjustment (smoothed)
233+
# Pressure decrease: threshold maintained (no decrease)
239234
# Input: current_queue_size_bytes = 100, level_prev = 200, dev_prev = 50
240-
# EWMA: level = 200 + 0.2*(100-200) = 180, dev = 50 + 0.2*(100-50) = 60
241-
# Base: 180 + 4*60 = 420
242-
# Threshold: max(420, 100) = 420
243-
# prev_threshold = 500, threshold = 420
244-
# smoothed = asymmetric_ewma(500, 420) = 500 + 0.2*(420-500) = 484
245-
# Result: 484 (gradual downward adjustment using asymmetric EWMA)
246-
247-
# Example 4: System becomes idle
248-
# Input: current_queue_size_bytes = 0, level_prev = 200, dev_prev = 50
249-
# EWMA: level = 200 + 0.2*(0-200) = 160, dev = 50 + 0.2*(200-50) = 80
250-
# Base: 160 + 4*80 = 480
251-
# Threshold: max(480, 0) = 480
252-
# prev_threshold = 484, threshold = 480
253-
# smoothed = asymmetric_ewma(484, 480) = 484 + 0.2*(480-484) = 483
254-
# Result: 483 (gradual downward adjustment using asymmetric EWMA)
255-
256-
# Example 5: Continued idle (gradual decay)
257-
# Input: current_queue_size_bytes = 0, level_prev = 160, dev_prev = 80
258-
# EWMA: level = 160 + 0.2*(0-160) = 128, dev = 80 + 0.2*(160-80) = 96
259-
# Base: 128 + 4*96 = 512
260-
# Threshold: max(512, 0) = 512
261-
# prev_threshold = 483, threshold = 512
262-
# Result: 512 (gradual upward, EWMA still adjusting using asymmetric EWMA)
263-
264-
# Example 6: After many idle samples (threshold finally resets)
265-
# Input: current_queue_size_bytes = 0, level_prev = 50, dev_prev = 10
266-
# EWMA: level = 50 + 0.2*(0-50) = 40, dev = 10 + 0.2*(50-10) = 18
267-
# Base: 40 + 4*18 = 112
268-
# Threshold: max(112, 0) = 112
269-
# prev_threshold = 200, threshold = 112
270-
# smoothed = asymmetric_ewma(200, 112) = 200 + 0.2*(112-200) = 182
271-
# Result: 182 (gradual downward adjustment using asymmetric EWMA)
235+
# EWMA: level = 180, dev = 60, Base: 180 + 4*60 = 420
236+
# Threshold: max(420, 100) = 420, prev_threshold = 500
237+
# Result: 500 (threshold maintained, no decrease)
272238
273239
"""
274240
hist = self._queue_history[op]
@@ -303,26 +269,21 @@ def _update_queue_threshold(
303269
# Step 3: fast ramp-up
304270
threshold = max(1, int(max(base, q)))
305271

306-
# Step 4: cache & return with gentle downward response using EWMA_ALPHA
272+
# Step 4: cache & return
307273
prev_threshold = self._queue_thresholds[op]
308274

309275
# Bootstrap
310276
if prev_threshold == 0:
311277
self._queue_thresholds[op] = max(1, threshold)
312278
return self._queue_thresholds[op]
313279

314-
# Upward: apply immediately
315-
if threshold >= prev_threshold:
280+
# Only increase threshold when there's clear pressure
281+
if threshold > prev_threshold:
316282
self._queue_thresholds[op] = max(1, threshold)
317283
return self._queue_thresholds[op]
318284

319-
# Downward: smooth using asymmetric EWMA
320-
# Prevents oscillation by allowing gradual downward adjustments
321-
# Uses same asymmetric behavior as EWMA: slow to adjust downward
322-
# Example: prev_threshold=200, threshold=100 -> smoothed using asymmetric EWMA
323-
smoothed = int(self._update_ewma_asymmetric(prev_threshold, threshold))
324-
self._queue_thresholds[op] = max(1, smoothed)
325-
return self._queue_thresholds[op]
285+
# Keep existing threshold when pressure decreases
286+
return prev_threshold
326287

327288
def _effective_cap(self, op: "PhysicalOperator") -> int:
328289
"""Compute a reduced concurrency cap via a tiny {-1,0,+1,+2} controller.

python/ray/data/tests/test_backpressure_policies.py

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -229,46 +229,54 @@ def test_update_queue_threshold_asymmetric_ewma(self):
229229
self.assertGreater(policy._q_level_nbytes[mock_op], 150.0)
230230
self.assertLess(policy._q_level_nbytes[mock_op], 200.0)
231231

232-
def test_update_queue_threshold_downward_smoothing(self):
233-
"""Test threshold update with downward smoothing logic."""
232+
def test_update_queue_threshold_no_decrease(self):
233+
"""Test that thresholds are never decreased, only maintained or increased."""
234234
mock_op = MagicMock()
235235
policy = ConcurrencyCapBackpressurePolicy(
236236
DataContext.get_current(),
237237
{mock_op: MagicMock()},
238238
MagicMock(),
239239
)
240240

241-
# Set up initial state with high threshold and very low level/dev to force downward adjustment
241+
# Set up initial state with high threshold
242242
policy._queue_thresholds[mock_op] = 200
243243
policy._q_level_nbytes[mock_op] = 10.0 # Very low level
244244
policy._q_level_dev[mock_op] = 1.0 # Very low deviation
245245
policy._queue_history[mock_op] = deque([10, 11, 12, 13, 14, 15])
246246

247-
# Test downward adjustment (should be smoothed)
248-
# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should be smoothed
247+
# Test that threshold is maintained when calculated threshold is lower
249248
threshold = policy._update_queue_threshold(mock_op, 150)
250249

251-
# Should be smoothed between 200 and 150
252-
self.assertLess(threshold, 200)
253-
self.assertGreaterEqual(threshold, 150)
250+
# Should maintain the existing threshold (no decrease)
251+
self.assertEqual(threshold, 200)
252+
self.assertEqual(policy._queue_thresholds[mock_op], 200)
254253

255-
# Test that the method works correctly - just verify it doesn't crash
256-
# and returns a reasonable threshold value
257-
mock_op2 = MagicMock()
258-
policy2 = ConcurrencyCapBackpressurePolicy(
254+
# Test with even lower queue size
255+
threshold_small = policy._update_queue_threshold(mock_op, 50)
256+
self.assertEqual(threshold_small, 200) # Still maintained
257+
self.assertEqual(policy._queue_thresholds[mock_op], 200)
258+
259+
def test_update_queue_threshold_increase(self):
260+
"""Test that thresholds are increased when calculated threshold is higher."""
261+
mock_op = MagicMock()
262+
policy = ConcurrencyCapBackpressurePolicy(
259263
DataContext.get_current(),
260-
{mock_op2: MagicMock()},
264+
{mock_op: MagicMock()},
261265
MagicMock(),
262266
)
263-
policy2._queue_thresholds[mock_op2] = 200
264-
policy2._q_level_nbytes[mock_op2] = 10.0
265-
policy2._q_level_dev[mock_op2] = 1.0
266-
policy2._queue_history[mock_op2] = deque([10, 11, 12, 13, 14, 15])
267267

268-
threshold_small = policy2._update_queue_threshold(mock_op2, 50)
268+
# Set up initial state with moderate threshold
269+
policy._queue_thresholds[mock_op] = 100
270+
policy._q_level_nbytes[mock_op] = 50.0
271+
policy._q_level_dev[mock_op] = 20.0
272+
policy._queue_history[mock_op] = deque([50, 60, 70, 80, 90, 100])
273+
274+
# Test that threshold is increased when calculated threshold is higher
275+
threshold = policy._update_queue_threshold(mock_op, 200)
269276

270-
# Just verify it returns a reasonable threshold (at least as high as input)
271-
self.assertGreaterEqual(threshold_small, 50)
277+
# Should increase the threshold
278+
self.assertGreaterEqual(threshold, 200)
279+
self.assertGreaterEqual(policy._queue_thresholds[mock_op], 200)
272280

273281
def test_effective_cap_calculation_with_trend(self):
274282
"""Test effective cap calculation with different trend scenarios."""
@@ -474,7 +482,7 @@ def test_threshold_calculation_formula(self):
474482
self.assertAlmostEqual(threshold, expected, places=5)
475483

476484
def test_threshold_update_logic_comprehensive(self):
477-
"""Test comprehensive threshold update logic including bootstrap, upward, and downward cases."""
485+
"""Test comprehensive threshold update logic including bootstrap, upward, and no-decrease cases."""
478486
mock_op = MagicMock()
479487
policy = ConcurrencyCapBackpressurePolicy(
480488
DataContext.get_current(),
@@ -489,7 +497,7 @@ def test_threshold_update_logic_comprehensive(self):
489497
# Bootstrap: threshold = max(level + K_DEV * dev, q_now) = max(100 + 4*0, 100) = 100
490498
self.assertEqual(threshold1, 100)
491499

492-
# Test 2: Upward adjustment (threshold >= prev_threshold)
500+
# Test 2: Upward adjustment (threshold > prev_threshold)
493501
policy._queue_thresholds[mock_op] = 100
494502
policy._q_level_nbytes[mock_op] = 50.0
495503
policy._q_level_dev[mock_op] = 10.0
@@ -499,17 +507,13 @@ def test_threshold_update_logic_comprehensive(self):
499507
# Just verify it's >= 200 (upward adjustment)
500508
self.assertGreaterEqual(threshold2, 200)
501509

502-
# Test 3: Downward adjustment (threshold < prev_threshold)
510+
# Test 3: No decrease (threshold < prev_threshold, should maintain existing)
503511
policy._queue_thresholds[mock_op] = 200
504-
policy._q_level_nbytes[
505-
mock_op
506-
] = 10.0 # Very low level to force downward adjustment
512+
policy._q_level_nbytes[mock_op] = 10.0 # Very low level
507513
policy._q_level_dev[mock_op] = 1.0 # Very low deviation
508514
policy._queue_history[mock_op] = deque([10, 11, 12, 13, 14, 15])
509515
threshold3 = policy._update_queue_threshold(mock_op, 150)
510-
# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should be smoothed
511-
self.assertLess(threshold3, 200)
512-
self.assertGreaterEqual(threshold3, 150)
516+
self.assertEqual(threshold3, 200)
513517

514518
# Test 4: Zero threshold case
515519
fresh_mock_op = MagicMock()
@@ -521,8 +525,8 @@ def test_threshold_update_logic_comprehensive(self):
521525
fresh_policy._queue_thresholds[fresh_mock_op] = 0
522526
fresh_policy._queue_history[fresh_mock_op] = deque([0])
523527
# Fresh policy starts with clean EWMA state
524-
threshold5 = fresh_policy._update_queue_threshold(fresh_mock_op, 0)
525-
self.assertEqual(threshold5, 1) # Should round up to 1
528+
threshold4 = fresh_policy._update_queue_threshold(fresh_mock_op, 0)
529+
self.assertEqual(threshold4, 1) # Should round up to 1
526530

527531
def test_trend_and_effective_cap_formulas(self):
528532
"""Test trend calculation and effective cap formulas."""

0 commit comments

Comments
 (0)