Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run postprocess on tworkers #4555

Closed
wants to merge 9 commits into from
2 changes: 2 additions & 0 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ def tworker_get_task():
# queue that is probably empty) to do a single preprocess. Investigate
# combining preprocess and postprocess queues and allowing pulling of
# multiple messages.
if random.random() < .5:
return get_postprocess_task()
return get_preprocess_task()


Expand Down
64 changes: 6 additions & 58 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,50 +84,6 @@ def __init__(self, subtask: _Subtask):
self._subtask = subtask
self._labels = None
self.utask_main_failure = None
self._utask_success_conditions = [
uworker_msg_pb2.ErrorType.NO_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.ANALYZE_NO_CRASH, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_BAD_STATE_MIN_MAX, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_NO_CRASH, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_LOW_CONFIDENCE_IN_REGRESSION_RANGE, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.MINIMIZE_UNREPRODUCIBLE_CRASH, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.MINIMIZE_CRASH_TOO_FLAKY, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.LIBFUZZER_MINIMIZATION_UNREPRODUCIBLE, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.ANALYZE_CLOSE_INVALID_UPLOADED, # pylint: disable=no-member
]
self._utask_maybe_retry_conditions = [
uworker_msg_pb2.ErrorType.ANALYZE_BUILD_SETUP, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.ANALYZE_NO_REVISIONS_LIST, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.TESTCASE_SETUP, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.MINIMIZE_SETUP, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.FUZZ_DATA_BUNDLE_SETUP_FAILURE, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.FUZZ_NO_FUZZ_TARGET_SELECTED, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_NO_CRASH, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_TIMEOUT, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_BUILD_SETUP_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_BUILD_SETUP_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_TIMEOUT_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.SYMBOLIZE_BUILD_SETUP_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.MINIMIZE_DEADLINE_EXCEEDED, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.MINIMIZE_DEADLINE_EXCEEDED_IN_MAIN_FILE_PHASE, # pylint: disable=no-member
]
self._utask_failure_conditions = [
uworker_msg_pb2.ErrorType.ANALYZE_NO_REVISION_INDEX, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.UNHANDLED, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.VARIANT_BUILD_SETUP, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.FUZZ_BUILD_SETUP_FAILURE, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.FUZZ_NO_FUZZER, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_REVISION_LIST_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_BUILD_NOT_FOUND, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.PROGRESSION_BAD_BUILD, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_REVISION_LIST_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_BUILD_NOT_FOUND, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.REGRESSION_BAD_BUILD_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.LIBFUZZER_MINIMIZATION_FAILED, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.CORPUS_PRUNING_FUZZER_SETUP_FAILED, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.CORPUS_PRUNING_ERROR, # pylint: disable=no-member
uworker_msg_pb2.ErrorType.FUZZ_BAD_BUILD, # pylint: disable=no-member
]

if subtask == _Subtask.PREPROCESS:
self._preprocess_start_time_ns = self.start_time_ns
Expand Down Expand Up @@ -169,18 +125,6 @@ def set_task_details(self,
# Ensure we always have a value after this method returns.
assert self._preprocess_start_time_ns is not None

def _infer_uworker_main_outcome(self, exc_type, uworker_error):
'''Infers, on a best effort basis, whether an uworker output implies
success or failure. If an unequivocal response is not possible,
classifies as maybe_retry.'''
if exc_type or uworker_error in self._utask_failure_conditions:
outcome = 'error'
elif uworker_error in self._utask_maybe_retry_conditions:
outcome = 'maybe_retry'
else:
outcome = 'success'
return outcome

def __exit__(self, _exc_type, _exc_value, _traceback):
# Ignore exception details, let Python continue unwinding the stack.

Expand All @@ -201,8 +145,7 @@ def __exit__(self, _exc_type, _exc_value, _traceback):
# The only case where a task might fail without throwing, is in
# utask_main, by returning an ErrorType proto which indicates
# failure.
outcome = self._infer_uworker_main_outcome(_exc_type,
self.utask_main_failure)
outcome = 'error' if _exc_type or self.utask_main_failure else 'success'
monitoring_metrics.TASK_OUTCOME_COUNT.increment({
**self._labels, 'outcome': outcome
})
Expand All @@ -223,6 +166,11 @@ def __exit__(self, _exc_type, _exc_value, _traceback):
monitoring_metrics.TASK_OUTCOME_COUNT_BY_ERROR_TYPE.increment(
trimmed_labels)

if error_condition != 'UNHANDLED_EXCEPTION':
task = self._labels['task']
subtask = self._labels['subtask']
logs.info(f'Task {task}, at subtask {subtask}, finished successfully.')


def ensure_uworker_env_type_safety(uworker_env):
"""Converts all values in |uworker_env| to str types.
Expand Down
3 changes: 2 additions & 1 deletion src/clusterfuzz/_internal/datastore/data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,8 @@ def add_build_metadata(job_type,
def create_data_bundle_bucket_and_iams(data_bundle_name, emails):
"""Creates a data bundle bucket and adds iams for access."""
bucket_name = get_data_bundle_bucket_name(data_bundle_name)
if not storage.create_bucket_if_needed(bucket_name):
location = local_config.ProjectConfig().get('data_bundle_bucket_location')
if not storage.create_bucket_if_needed(bucket_name, location=location):
return False

client = storage.create_discovery_storage_client()
Expand Down
61 changes: 40 additions & 21 deletions src/clusterfuzz/_internal/google_cloud_utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
class StorageProvider:
"""Core storage provider interface."""

def create_bucket(self, name, object_lifecycle, cors):
def create_bucket(self, name, object_lifecycle, cors, location):
"""Create a new bucket."""
raise NotImplementedError

Expand Down Expand Up @@ -198,7 +198,7 @@ def _chunk_size(self):

return None

def create_bucket(self, name, object_lifecycle, cors):
def create_bucket(self, name, object_lifecycle, cors, location):
"""Create a new bucket."""
project_id = utils.get_application_id()
request_body = {'name': name}
Expand All @@ -208,6 +208,9 @@ def create_bucket(self, name, object_lifecycle, cors):
if cors:
request_body['cors'] = cors

if location:
request_body['location'] = location

client = create_discovery_storage_client()
try:
client.buckets().insert(project=project_id, body=request_body).execute()
Expand Down Expand Up @@ -237,7 +240,6 @@ def list_blobs(self, remote_path, recursive=True, names_only=False):

client = _storage_client()
bucket = client.bucket(bucket_name)
properties = {}

if recursive:
delimiter = None
Expand All @@ -249,23 +251,37 @@ def list_blobs(self, remote_path, recursive=True, names_only=False):
else:
fields = None

iterator = bucket.list_blobs(
prefix=path, delimiter=delimiter, fields=fields)
for blob in iterator:
properties['bucket'] = bucket_name
properties['name'] = blob.name
properties['updated'] = blob.updated
properties['size'] = blob.size

yield properties

if not recursive:
# When doing delimiter listings, the "directories" will be in `prefixes`.
for prefix in iterator.prefixes:
properties['bucket'] = bucket_name
properties['name'] = prefix
iterations = 0
while True:
iterations += 1
iterator = bucket.list_blobs(
prefix=path, delimiter=delimiter, fields=fields)
for blob in iterator:
properties = {
'bucket': bucket_name,
'name': blob.name,
'updated': blob.updated,
'size': blob.size,
}

yield properties

if not recursive:
# When doing delimiter listings, the "directories" will be in
# `prefixes`.
for prefix in iterator.prefixes:
properties = {
'bucket': bucket_name,
'name': prefix,
}
yield properties

next_page_token = iterator.next_page_token
if next_page_token is None:
break
if iterations and iterations % 50 == 0:
logs.error('Might be infinite looping.')

def copy_file_from(self, remote_path, local_path):
"""Copy file from a remote path to a local path."""
client = _storage_client()
Expand Down Expand Up @@ -543,7 +559,7 @@ def convert_path_for_write(self, remote_path, directory=OBJECTS_DIR):

return fs_path

def create_bucket(self, name, object_lifecycle, cors):
def create_bucket(self, name, object_lifecycle, cors, location):
"""Create a new bucket."""
bucket_path = self._fs_bucket_path(name)
if os.path.exists(bucket_path):
Expand Down Expand Up @@ -905,13 +921,16 @@ def set_bucket_iam_policy(client, bucket_name, iam_policy):
return None


def create_bucket_if_needed(bucket_name, object_lifecycle=None, cors=None):
def create_bucket_if_needed(bucket_name,
object_lifecycle=None,
cors=None,
location=None):
"""Creates a GCS bucket."""
provider = _provider()
if provider.get_bucket(bucket_name):
return True

if not provider.create_bucket(bucket_name, object_lifecycle, cors):
if not provider.create_bucket(bucket_name, object_lifecycle, cors, location):
return False

time.sleep(CREATE_BUCKET_DELAY)
Expand Down
4 changes: 4 additions & 0 deletions src/clusterfuzz/_internal/platforms/android/adb.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ def get_fastboot_command_line(fastboot_cmd):

def get_fastboot_path():
"""Return path to fastboot binary."""
fastboot_path = environment.get_value('FASTBOOT')
if fastboot_path:
return fastboot_path

return os.path.join(environment.get_platform_resources_directory(),
'fastboot')

Expand Down
6 changes: 4 additions & 2 deletions src/clusterfuzz/_internal/system/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,11 @@ def get_engine_for_job(job_name=None):
def is_minimization_supported():
"""Return True if the current job supports minimization.

Currently blackbox-fuzzer jobs or libfuzzer support minimization.
Currently blackbox-fuzzer jobs or libfuzzer support minimization, unless
skipped using the SKIP_MINIMIZATION environment variable.
"""
return not is_engine_fuzzer_job() or is_libfuzzer_job()
skipped = get_value('SKIP_MINIMIZATION', False)
return not skipped and (not is_engine_fuzzer_job() or is_libfuzzer_job())


def is_posix():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def setUp(self):
self.local_gcs_buckets_path = tempfile.mkdtemp()
os.environ['LOCAL_GCS_BUCKETS_PATH'] = self.local_gcs_buckets_path
os.environ['TEST_BLOBS_BUCKET'] = 'blobs-bucket'
storage._provider().create_bucket('blobs-bucket', None, None)
storage._provider().create_bucket('blobs-bucket', None, None, None)
helpers.patch(self, [
'clusterfuzz._internal.bot.fuzzers.engine_common.unpack_seed_corpus_if_needed',
'clusterfuzz._internal.bot.tasks.task_creation.create_tasks',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import parameterized
from pyfakefs import fake_filesystem_unittest

from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_handler
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.google_cloud_utils import blobs
Expand Down Expand Up @@ -73,14 +72,27 @@ class DataHandlerTest(unittest.TestCase):

def setUp(self):
helpers.patch_environ(self)
project_config_get = local_config.ProjectConfig.get
helpers.patch(self, [
'clusterfuzz._internal.base.utils.default_project_name',
'clusterfuzz._internal.config.db_config.get',
('project_config_get',
'clusterfuzz._internal.config.local_config.ProjectConfig.get'),
'clusterfuzz._internal.config.local_config.ProjectConfig',
('get_storage_provider',
'clusterfuzz._internal.google_cloud_utils.storage._provider'),
'clusterfuzz._internal.google_cloud_utils.storage.create_discovery_storage_client',
'clusterfuzz._internal.google_cloud_utils.storage.get_bucket_iam_policy',
])

self.mock.default_project_name.return_value = 'project'

self.storage_provider = mock.Mock()
self.mock.get_storage_provider.return_value = self.storage_provider

self.project_config = {}
self.mock.ProjectConfig.return_value = self.project_config

# Disable artificial delay when creating buckets.
storage.CREATE_BUCKET_DELAY = 0

self.job = data_types.Job(
name='linux_asan_chrome',
environment_string=('SUMMARY_PREFIX = project\n'
Expand Down Expand Up @@ -175,8 +187,6 @@ def setUp(self):

environment.set_value('FUZZ_DATA', '/tmp/inputs/fuzzer-common-data-bundles')
environment.set_value('FUZZERS_DIR', '/tmp/inputs/fuzzers')
self.mock.default_project_name.return_value = 'project'
self.mock.project_config_get.side_effect = project_config_get

def test_find_testcase(self):
"""Ensure that find_testcase behaves as expected."""
Expand Down Expand Up @@ -449,15 +459,34 @@ def test_get_issue_summary_bad_cast_without_crash_function(self):
summary, 'project: Bad-cast to blink::LayoutBlock from '
'blink::LayoutTableSection')

def test_create_data_bundle_bucket_and_iams(self):
self.storage_provider.get_bucket.return_value = None
self.storage_provider.create_bucket.return_value = True

self.assertTrue(data_handler.create_data_bundle_bucket_and_iams('test', []))

self.storage_provider.create_bucket.assert_called_with(
'test-corpus.test-clusterfuzz.appspot.com', None, None, None)

def test_create_data_bundle_bucket_and_iams_with_location(self):
self.storage_provider.get_bucket.return_value = None
self.storage_provider.create_bucket.return_value = True

self.project_config['data_bundle_bucket_location'] = 'NORTH-POLE'

self.assertTrue(data_handler.create_data_bundle_bucket_and_iams('test', []))

self.storage_provider.create_bucket.assert_called_with(
'test-corpus.test-clusterfuzz.appspot.com', None, None, 'NORTH-POLE')

def test_get_data_bundle_name_default(self):
"""Test getting the default data bundle bucket name."""
self.assertEqual('test-corpus.test-clusterfuzz.appspot.com',
data_handler.get_data_bundle_bucket_name('test'))

def test_get_data_bundle_name_custom_suffix(self):
"""Test getting the data bundle bucket name with custom suffix."""
self.mock.project_config_get.side_effect = None
self.mock.project_config_get.return_value = 'custom.suffix.com'
self.project_config['bucket_domain_suffix'] = 'custom.suffix.com'
self.assertEqual('test-corpus.custom.suffix.com',
data_handler.get_data_bundle_bucket_name('test'))

Expand Down Expand Up @@ -485,7 +514,7 @@ def test_filter_stack_trace_upload(self):
exceeds limit and an upload_url is provided."""
blob_name = blobs.generate_new_blob_name()
blobs_bucket = 'blobs_bucket'
storage._provider().create_bucket(blobs_bucket, None, None) # pylint: disable=protected-access
storage._provider().create_bucket(blobs_bucket, None, None, None) # pylint: disable=protected-access

gcs_path = storage.get_cloud_storage_file_path(blobs_bucket, blob_name)
signed_upload_url = storage.get_signed_upload_url(gcs_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def setUp(self):
test_utils.set_up_pyfakefs(self)
os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local'
os.environ['TEST_BLOBS_BUCKET'] = 'blobs-bucket'
self.provider.create_bucket('blobs-bucket', None, None)
self.provider.create_bucket('blobs-bucket', None, None, None)

def test_get_blob_signed_upload_url_then_delete_blob(self):
"""Tests get_blob_signed_upload_url."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def setUp(self):

def test_create_bucket(self):
"""Test create_bucket."""
self.provider.create_bucket('test-bucket', None, None)
self.provider.create_bucket('test-bucket', None, None, None)
self.assertTrue(os.path.isdir('/local/test-bucket'))

def test_get_bucket(self):
Expand Down Expand Up @@ -281,7 +281,7 @@ def test_download_signed_url(self):
def test_upload_signed_url(self):
"""Tests upload_signed_url."""
contents = b'aa'
self.provider.create_bucket('test-bucket', None, None)
self.provider.create_bucket('test-bucket', None, None, None)
self.provider.upload_signed_url(contents, 'gs://test-bucket/a')
with open('/local/test-bucket/objects/a', 'rb') as fp:
return self.assertEqual(fp.read(), contents)
Expand Down
Loading
Loading