Skip to content

Commit

Permalink
Bug 1679162 - [RELENG-158] Split failed task log parsing into multipl…
Browse files Browse the repository at this point in the history
…e queues and workers (#6891)

* remove technical debt
* remove log_autoclassify
* update etl/jobs to split error vs raw
* update Procfile
* queues
* keep parse_logs async, but only call with one log at a time
  • Loading branch information
escapewindow authored Dec 5, 2020
1 parent 33b7e93 commit 3cc5cea
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 41 deletions.
7 changes: 4 additions & 3 deletions Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ worker_store_pulse_data: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery

# Handles the log parsing tasks scheduled by `worker_store_pulse_data` as part of job ingestion.
worker_log_parser: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser --concurrency=7
worker_log_parser_fail: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser_fail --concurrency=1
worker_log_parser_fail_raw_sheriffed: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser_fail_raw_sheriffed --concurrency=1
worker_log_parser_fail_raw_unsheriffed: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser_fail_raw_unsheriffed --concurrency=1
worker_log_parser_fail_json_sheriffed: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser_fail_json_sheriffed --concurrency=7
worker_log_parser_fail_json_unsheriffed: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_parser_fail_json_unsheriffed --concurrency=7

# Autoclassify workers
worker_autoclassify: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q log_autoclassify,log_autoclassify_fail --concurrency=3
# Tasks that don't need a dedicated worker.
worker_misc: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q default,generate_perf_alerts,pushlog,seta_analyze_failures --concurrency=3
14 changes: 11 additions & 3 deletions app.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,23 @@
"quantity": 1,
"size": "Standard-1X"
},
"worker_autoclassify": {
"worker_log_parser": {
"quantity": 1,
"size": "Standard-1X"
},
"worker_log_parser": {
"worker_log_parser_fail_raw_sheriffed": {
"quantity": 1,
"size": "Standard-1X"
},
"worker_log_parser_fail_raw_unsheriffed": {
"quantity": 1,
"size": "Standard-1X"
},
"worker_log_parser_fail_json_sheriffed": {
"quantity": 1,
"size": "Standard-1X"
},
"worker_log_parser_fail": {
"worker_log_parser_fail_json_unsheriffed": {
"quantity": 1,
"size": "Standard-1X"
},
Expand Down
11 changes: 8 additions & 3 deletions treeherder/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,14 @@
CELERY_TASK_QUEUES = [
Queue('default', Exchange('default'), routing_key='default'),
Queue('log_parser', Exchange('default'), routing_key='log_parser.normal'),
Queue('log_parser_fail', Exchange('default'), routing_key='log_parser.failures'),
Queue('log_autoclassify', Exchange('default'), routing_key='autoclassify.normal'),
Queue('log_autoclassify_fail', Exchange('default'), routing_key='autoclassify.failures'),
Queue('log_parser_fail_raw_sheriffed', Exchange('default'), routing_key='log_parser.failures'),
Queue(
'log_parser_fail_raw_unsheriffed', Exchange('default'), routing_key='log_parser.failures'
),
Queue('log_parser_fail_json_sheriffed', Exchange('default'), routing_key='log_parser.failures'),
Queue(
'log_parser_fail_json_unsheriffed', Exchange('default'), routing_key='log_parser.failures'
),
Queue('pushlog', Exchange('default'), routing_key='pushlog'),
Queue('generate_perf_alerts', Exchange('default'), routing_key='generate_perf_alerts'),
Queue('store_pulse_tasks', Exchange('default'), routing_key='store_pulse_tasks'),
Expand Down
41 changes: 28 additions & 13 deletions treeherder/etl/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ def _load_job(repository, job_datum, push_id):

job_logs.append(jl)

_schedule_log_parsing(job, job_logs, result)
_schedule_log_parsing(job, job_logs, result, repository)

return job_guid


def _schedule_log_parsing(job, job_logs, result):
def _schedule_log_parsing(job, job_logs, result, repository):
"""Kick off the initial task that parses the log data.
log_data is a list of job log objects and the result for that job
Expand All @@ -315,6 +315,13 @@ def _schedule_log_parsing(job, job_logs, result):
from treeherder.log_parser.tasks import parse_logs

task_types = {"errorsummary_json", "live_backing_log"}
sheriffed_repos = {
"autoland",
"mozilla-central",
"mozilla-beta",
"mozilla-release",
"mozilla-esr78",
}

job_log_ids = []
for job_log in job_logs:
Expand All @@ -331,17 +338,25 @@ def _schedule_log_parsing(job, job_logs, result):

job_log_ids.append(job_log.id)

# TODO: Replace the use of different queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was
# only to ensure failures are dealt with first if there is a backlog).
if result != 'success':
queue = 'log_parser_fail'
priority = 'failures'
else:
queue = 'log_parser'
priority = "normal"

parse_logs.apply_async(queue=queue, args=[job.id, job_log_ids, priority])
# TODO: Replace the use of different queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was
# only to ensure failures are dealt with first if there is a backlog).
if result != 'success':
if job_log.name == "errorsummary_json":
queue = "log_parser_fail_json"
priority = "failures"
else:
queue = "log_parser_fail_raw"
priority = "failures"
if repository.name in sheriffed_repos:
queue += "_sheriffed"
else:
queue += "_unsheriffed"
else:
queue = 'log_parser'
priority = "normal"

parse_logs.apply_async(queue=queue, args=[job.id, [job_log.id], priority])


def store_job_data(repository, originalData):
Expand Down
19 changes: 0 additions & 19 deletions treeherder/log_parser/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from celery.exceptions import SoftTimeLimitExceeded
from requests.exceptions import HTTPError

from treeherder.autoclassify.tasks import autoclassify
from treeherder.etl.artifact import serialize_artifact_json_blobs, store_job_artifacts
from treeherder.log_parser.artifactbuildercollection import (
ArtifactBuilderCollection,
LogSizeException,
)
from treeherder.log_parser.crossreference import crossreference_job
from treeherder.model.models import Job, JobLog
from treeherder.workers.task import retryable_task

Expand Down Expand Up @@ -77,23 +75,6 @@ def parse_logs(job_id, job_log_ids, priority):
if first_exception:
raise first_exception

if "errorsummary_json" in completed_names and "live_backing_log" in completed_names:

success = crossreference_job(job)

if success:
logger.debug("Scheduling autoclassify for job %i", job_id)
# TODO: Replace the use of different queues for failures vs not with the
# RabbitMQ priority feature (since the idea behind separate queues was
# only to ensure failures are dealt with first if there is a backlog).
queue = 'log_autoclassify_fail' if priority == 'failures' else 'log_autoclassify'
autoclassify.apply_async(args=[job_id], queue=queue)
else:
job.autoclassify_status = Job.SKIPPED
else:
job.autoclassify_status = Job.SKIPPED
job.save()


def store_failure_lines(job_log):
"""Store the failure lines from a log corresponding to the structured
Expand Down

0 comments on commit 3cc5cea

Please sign in to comment.