diff --git a/gcp/workers/importer/importer.py b/gcp/workers/importer/importer.py index e2006231a5e..e8e0936db79 100755 --- a/gcp/workers/importer/importer.py +++ b/gcp/workers/importer/importer.py @@ -197,10 +197,15 @@ def _request_analysis_external(self, path, deleted=False): """Request analysis.""" + # TODO(michaelkedar): Making a distinction for oss-fuzz updates so we can + # track the logic flow for our eventual decoupling of the special logic. + task_type = 'update' + if source_repo.name == 'oss-fuzz': + task_type = 'update-oss-fuzz' self._publisher.publish( self._tasks_topic, data=b'', - type='update', + type=task_type, source=source_repo.name, path=path, original_sha256=original_sha256, diff --git a/gcp/workers/importer/importer_test.py b/gcp/workers/importer/importer_test.py index 583bb742ded..4ee2a74189e 100644 --- a/gcp/workers/importer/importer_test.py +++ b/gcp/workers/importer/importer_test.py @@ -202,7 +202,7 @@ def test_basic(self, unused_mock_time: mock.MagicMock, '19a3cd03fc15360bf16187f54df92a75'), path='2021-111.yaml', source='oss-fuzz', - type='update', + type='update-oss-fuzz', req_timestamp='12345') ]) bug = osv.Bug.get_by_id('OSV-2017-134') @@ -357,7 +357,7 @@ def test_scheduled_updates(self, unused_mock_time: mock.MagicMock, '19a3cd03fc15360bf16187f54df92a75'), path='proj/OSV-2021-1337.yaml', source='oss-fuzz', - type='update', + type='update-oss-fuzz', req_timestamp='12345'), mock.call( self.tasks_topic, diff --git a/gcp/workers/worker/worker.py b/gcp/workers/worker/worker.py index fe2fc88caaf..a91a4fb29d1 100644 --- a/gcp/workers/worker/worker.py +++ b/gcp/workers/worker/worker.py @@ -621,6 +621,20 @@ def _do_process_task(self, subscriber, subscription, ack_id, message, _state.bug_id = message.attributes.get('allocated_bug_id', None) task_type = message.attributes['type'] + + # Validating that oss-fuzz-related tasks are only sent by oss-fuzz and + # the non-oss-fuzz task is not used by oss-fuzz. + if not source_id: + logging.error('got message without source_id: %s', message) + elif source_id.startswith('oss-fuzz'): + if task_type not in ('regressed', 'fixed', 'impact', 'invalid', + 'update-oss-fuzz'): + logging.error('got unexpected \'%s\' task for oss-fuzz source %s', + task_type, source_id) + elif task_type != 'update': + logging.error('got unexpected \'%s\' task for non-oss-fuzz source %s', + task_type, source_id) + if task_type in ('regressed', 'fixed'): oss_fuzz.process_bisect_task(self._oss_fuzz_dir, task_type, source_id, message) @@ -631,6 +645,9 @@ def _do_process_task(self, subscriber, subscription, ack_id, message, logging.exception('Failed to process impact: ') elif task_type == 'invalid': mark_bug_invalid(message) + elif task_type == 'update-oss-fuzz': + # TODO(michaelkedar): create separate _source_update for oss-fuzz. + self._source_update(message) elif task_type == 'update': self._source_update(message)