Skip to content

Commit b5863ff

Browse files
authored
feat: Add client-side OS version filtering for Pub/Sub tasks (#5023)
## Description This PR introduces a client-side filtering mechanism to make ClusterFuzz bots more resilient to misrouted Pub/Sub messages. It ensures that bots only process tasks intended for their specific operating system version, preventing errors and wasted resources when tasks are sent to legacy, unfiltered subscriptions. ### The Problem When introducing new OS versions, such as Ubuntu 24.04, we create new, filtered Pub/Sub subscriptions (e.g., `my-queue-ubuntu-24-04`). However, existing bots subscribed to legacy, unfiltered queues (e.g., `my-queue`) could still pull messages intended for the new OS. Since Pub/Sub subscription filters are immutable, we cannot simply update old subscriptions. This can lead to bots attempting to execute incompatible tasks, causing errors and inefficiencies. ### The Solution This change implements a robust, centralized check within the task-pulling logic. 1. **Centralized Filtering Function:** A new private function, `_filter_task_for_os_mismatch`, has been created in `src/clusterfuzz/_internal/base/tasks/__init__.py`. This function encapsulates all the logic for OS version validation. 2. **Behavior:** - When a bot pulls a message, this function compares the `base_os_version` attribute on the message with the bot's `BASE_OS_VERSION` environment variable. - If a mismatch is detected, the function logs a warning and immediately **acknowledges (`ack()`)** the message. - Acknowledging the message permanently removes it from that subscription, effectively skipping it for the current bot. This assumes the message was also correctly delivered to another, properly filtered subscription for processing. - If the OS versions match, or if either the bot or the message does not have an OS version specified, the task is processed as usual. 3. **Integration:** This check is performed within `get_task_from_message`, ensuring it is applied to all types of tasks pulled from Pub/Sub (`regular`, `preprocess`, `postprocess`, etc.) without code duplication. ### Benefits - **Resilience:** Bots are now resilient to misrouted messages and will not fail on incompatible tasks. - **Cleanliness:** The logic is centralized in a single, well-documented function, improving code maintainability. - **Forward-Compatibility:** This provides a safety net for future OS migrations and ensures that legacy bots can coexist with newer ones without issue. ### Testing - Added comprehensive unit tests in `src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py` to validate the filtering logic. - Tests cover all scenarios: - OS mismatch (message is skipped and acked). - OS match (message is processed). - Bot has an OS, but the message does not (message is processed). - Message has an OS, but the bot does not (message is processed). - All new and existing tests pass.
1 parent f50c6ce commit b5863ff

File tree

2 files changed

+117
-28
lines changed

2 files changed

+117
-28
lines changed

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,11 +585,52 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
585585
track_task_end()
586586

587587

588+
def _filter_task_for_os_mismatch(message, queue) -> bool:
589+
"""Filters a Pub/Sub message if its OS version does not match the bot's OS.
590+
591+
This function checks the `base_os_version` attribute in the incoming message
592+
against the bot's `BASE_OS_VERSION` environment variable. This handles cases
593+
where a message is misrouted or received from a legacy subscription without
594+
OS-specific filters.
595+
596+
If an OS version mismatch is detected, the function logs a warning and
597+
acknowledges (`ack()`) the message. Acknowledging the message permanently
598+
removes it from the current subscription, effectively skipping it for this
599+
bot. This assumes the message was also correctly delivered to another,
600+
properly filtered subscription for processing.
601+
602+
Args:
603+
message: The `pubsub.Message` object to check.
604+
queue: The name of the queue from which the message was pulled.
605+
606+
Returns:
607+
True if the message had a mismatch and was acknowledged; False otherwise.
608+
"""
609+
base_os_version = environment.get_value('BASE_OS_VERSION')
610+
message_base_os_version = message.attributes.get('base_os_version')
611+
612+
if not (message_base_os_version and base_os_version and
613+
message_base_os_version != base_os_version):
614+
return False
615+
616+
logs.warning(
617+
'Skipping task for different OS.',
618+
queue=queue,
619+
message_os_version=message_base_os_version,
620+
base_os_version=base_os_version)
621+
message.ack()
622+
return True
623+
624+
588625
def get_task_from_message(message, queue=None, can_defer=True,
589626
task_cls=None) -> Optional[PubSubTask]:
590627
"""Returns a task constructed from the first of |messages| if possible."""
591628
if message is None:
592629
return None
630+
631+
if _filter_task_for_os_mismatch(message, queue):
632+
return None
633+
593634
try:
594635
task = initialize_task(message, task_cls=task_cls)
595636
if task is None:

src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -255,48 +255,96 @@ def test_get_machine_template_for_high_end_linux_queue(self):
255255
class GetTaskFromMessageTest(unittest.TestCase):
256256
"""Tests for get_task_from_message."""
257257

258+
def setUp(self):
259+
self.mock_message = mock.MagicMock()
260+
self.mock_task = mock.Mock(defer=mock.Mock(return_value=False))
261+
self.mock_task.set_queue.return_value = self.mock_task
262+
263+
self.initialize_task_patcher = mock.patch(
264+
'clusterfuzz._internal.base.tasks.initialize_task',
265+
return_value=self.mock_task)
266+
self.mock_initialize_task = self.initialize_task_patcher.start()
267+
268+
self.env_patcher = mock.patch(
269+
'clusterfuzz._internal.system.environment.get_value')
270+
self.mock_env_get = self.env_patcher.start()
271+
self.mock_env_get.return_value = None
272+
273+
def tearDown(self):
274+
self.initialize_task_patcher.stop()
275+
self.env_patcher.stop()
276+
258277
def test_no_message(self):
259-
self.assertEqual(tasks.get_task_from_message(None), None)
278+
"""Test that no task is returned when the message is None."""
279+
self.assertIsNone(tasks.get_task_from_message(None))
260280

261281
def test_success(self):
262-
mock_task = mock.Mock(defer=mock.Mock(return_value=False))
263-
mock_task.set_queue.return_value = mock_task
264-
with mock.patch(
265-
'clusterfuzz._internal.base.tasks.initialize_task',
266-
return_value=mock_task):
267-
self.assertEqual(tasks.get_task_from_message(mock.Mock()), mock_task)
282+
"""Test successful task creation from a message."""
283+
self.assertEqual(
284+
tasks.get_task_from_message(self.mock_message), self.mock_task)
268285

269286
def test_key_error(self):
270-
mock_message = mock.Mock()
271-
with mock.patch(
272-
'clusterfuzz._internal.base.tasks.initialize_task',
273-
side_effect=KeyError):
274-
self.assertEqual(tasks.get_task_from_message(mock_message), None)
275-
mock_message.ack.assert_called_with()
287+
"""Test that a message is acked and skipped on a KeyError."""
288+
self.mock_initialize_task.side_effect = KeyError
289+
self.assertIsNone(tasks.get_task_from_message(self.mock_message))
290+
self.mock_message.ack.assert_called_with()
276291

277292
def test_defer(self):
278-
mock_task = mock.Mock(defer=mock.Mock(return_value=True))
279-
with mock.patch(
280-
'clusterfuzz._internal.base.tasks.initialize_task',
281-
return_value=mock_task):
282-
self.assertEqual(tasks.get_task_from_message(mock.Mock()), None)
293+
"""Test that a task is deferred if its ETA is in the future."""
294+
self.mock_task.defer.return_value = True
295+
self.assertIsNone(tasks.get_task_from_message(self.mock_message))
283296

284297
def test_set_queue(self):
285298
"""Tests the set_queue method of a task."""
286299
mock_queue = mock.Mock()
287-
mock_task = mock.Mock()
300+
task = tasks.get_task_from_message(self.mock_message, queue=mock_queue)
301+
task.set_queue.assert_called_with(mock_queue)
288302

289-
mock_task.configure_mock(
290-
queue=mock_queue,
291-
set_queue=mock.Mock(return_value=mock_task),
292-
defer=mock.Mock(return_value=False))
303+
@mock.patch('clusterfuzz._internal.metrics.logs.warning')
304+
def test_os_mismatch(self, mock_log_warning):
305+
"""Test that a message is skipped and acked if OS versions mismatch."""
306+
self.mock_env_get.return_value = 'ubuntu-24-04'
307+
self.mock_message.attributes = {'base_os_version': 'ubuntu-22-04'}
293308

294-
with mock.patch(
295-
'clusterfuzz._internal.base.tasks.initialize_task',
296-
return_value=mock_task):
297-
task = tasks.get_task_from_message(mock.Mock())
309+
result = tasks.get_task_from_message(self.mock_message)
310+
311+
self.assertIsNone(result)
312+
self.mock_message.ack.assert_called_once()
313+
mock_log_warning.assert_called_with(
314+
'Skipping task for different OS.',
315+
queue=None,
316+
message_os_version='ubuntu-22-04',
317+
base_os_version='ubuntu-24-04')
318+
319+
def test_os_match(self):
320+
"""Test that a message is processed if OS versions match."""
321+
self.mock_env_get.return_value = 'ubuntu-24-04'
322+
self.mock_message.attributes = {'base_os_version': 'ubuntu-24-04'}
323+
324+
result = tasks.get_task_from_message(self.mock_message)
325+
326+
self.assertEqual(result, self.mock_task)
327+
self.mock_message.ack.assert_not_called()
328+
329+
def test_bot_has_os_message_does_not(self):
330+
"""Test that a message is processed if the bot has an OS but the message does not."""
331+
self.mock_env_get.return_value = 'ubuntu-24-04'
332+
self.mock_message.attributes = {}
333+
334+
result = tasks.get_task_from_message(self.mock_message)
335+
336+
self.assertEqual(result, self.mock_task)
337+
self.mock_message.ack.assert_not_called()
338+
339+
def test_bot_has_no_os_message_does(self):
340+
"""Test that a message is processed if the message has an OS but the bot does not."""
341+
self.mock_env_get.return_value = None
342+
self.mock_message.attributes = {'base_os_version': 'ubuntu-24-04'}
343+
344+
result = tasks.get_task_from_message(self.mock_message)
298345

299-
self.assertEqual(task.queue, mock_queue)
346+
self.assertEqual(result, self.mock_task)
347+
self.mock_message.ack.assert_not_called()
300348

301349

302350
@test_utils.with_cloud_emulators('datastore')

0 commit comments

Comments
 (0)