Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,40 +586,43 @@ def lease(self, _event=None): # pylint: disable=arguments-differ


def _filter_task_for_os_mismatch(message, queue) -> bool:
"""Filters a Pub/Sub message if its OS version does not match the bot's OS.
"""Filters a Pub/Sub message if its OS version mismatches the bot's OS.

This function checks the `base_os_version` attribute in the incoming message
against the bot's `BASE_OS_VERSION` environment variable. This handles cases
where a message is misrouted or received from a legacy subscription without
OS-specific filters.
This function compares the `base_os_version` attribute from the message
against the bot's `BASE_OS_VERSION` environment variable.

If an OS version mismatch is detected, the function logs a warning and
acknowledges (`ack()`) the message. Acknowledging the message permanently
removes it from the current subscription, effectively skipping it for this
bot. This assumes the message was also correctly delivered to another,
properly filtered subscription for processing.
A task is skipped (and the message is acknowledged) if the message specifies
an OS version, and the bot's OS is different. This includes the legacy
scenario where a bot does not have `BASE_OS_VERSION` set (evaluating to
`None`), preventing it from processing tasks meant for newer OS versions.

If the message does not specify an OS version, it can be processed by any
bot. If the versions match, it is also processed.

Args:
message: The `pubsub.Message` object to check.
queue: The name of the queue from which the message was pulled.
message (pubsub.Message): The message object to check.
queue (str): The name of the queue from which the message was pulled.

Returns:
True if the message had a mismatch and was acknowledged; False otherwise.
bool: True if the message had a mismatch and was acknowledged, otherwise
False.
"""
base_os_version = environment.get_value('BASE_OS_VERSION')
message_base_os_version = message.attributes.get('base_os_version')

if not (message_base_os_version and base_os_version and
message_base_os_version != base_os_version):
if not message_base_os_version:
return False

logs.warning(
'Skipping task for different OS.',
queue=queue,
message_os_version=message_base_os_version,
base_os_version=base_os_version)
message.ack()
return True
if message_base_os_version != base_os_version:
logs.warning(
'Skipping task for different OS.',
queue=queue,
message_os_version=message_base_os_version,
base_os_version=base_os_version)
message.ack()
return True

return False


def get_task_from_message(message, queue=None, can_defer=True,
Expand Down
18 changes: 14 additions & 4 deletions src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ def test_no_message(self):

def test_success(self):
"""Test successful task creation from a message."""
self.mock_env_get.return_value = None
self.mock_message.attributes = {}
self.assertEqual(
tasks.get_task_from_message(self.mock_message), self.mock_task)

Expand All @@ -297,6 +299,8 @@ def test_defer(self):
def test_set_queue(self):
"""Tests the set_queue method of a task."""
mock_queue = mock.Mock()
self.mock_env_get.return_value = None
self.mock_message.attributes = {}
task = tasks.get_task_from_message(self.mock_message, queue=mock_queue)
task.set_queue.assert_called_with(mock_queue)

Expand Down Expand Up @@ -336,15 +340,21 @@ def test_bot_has_os_message_does_not(self):
self.assertEqual(result, self.mock_task)
self.mock_message.ack.assert_not_called()

def test_bot_has_no_os_message_does(self):
"""Test that a message is processed if the message has an OS but the bot does not."""
@mock.patch('clusterfuzz._internal.metrics.logs.warning')
def test_bot_has_no_os_message_does(self, mock_log_warning):
"""Test that a message is skipped if it has an OS but the bot does not."""
self.mock_env_get.return_value = None
self.mock_message.attributes = {'base_os_version': 'ubuntu-24-04'}

result = tasks.get_task_from_message(self.mock_message)

self.assertEqual(result, self.mock_task)
self.mock_message.ack.assert_not_called()
self.assertIsNone(result)
self.mock_message.ack.assert_called_once()
mock_log_warning.assert_called_with(
'Skipping task for different OS.',
queue=None,
message_os_version='ubuntu-24-04',
base_os_version=None)


@test_utils.with_cloud_emulators('datastore')
Expand Down
Loading