Skip to content

Commit

Permalink
Show number of queries ahead in queue when executing (#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Short authored and jezdez committed Aug 19, 2019
1 parent c873e0c commit e3f4b47
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 4 deletions.
1 change: 1 addition & 0 deletions client/app/pages/queries/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ <h3>
</div>
<div class="alert alert-info m-t-15" ng-if="queryResult.getStatus() == 'waiting'">
Query in queue&hellip;
<span ng-if="queryResult && queryResult.queueStatus">(waiting on {{queryResult.queueStatus}} <ng-pluralize count=queryResult.queueStatus when="{'0': 'queries', 'one': 'query', 'other': 'queries'}"></ng-pluralize>)</span>
<rd-timer from="queryResult.getUpdatedAt()"></rd-timer>
<button type="button" class="btn btn-warning btn-xs pull-right" ng-disabled="cancelling" ng-click="cancelExecution()">Cancel
</button>
Expand Down
22 changes: 22 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {

// extended status flags
this.isLoadingResult = false;
this.queueStatus = null;

if (props) {
this.update(props);
Expand Down Expand Up @@ -367,6 +368,26 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
);
}

refreshQueueStatus(dataSourceId) {
if (this.getStatus() === 'waiting') {
const actions = {
get: { method: 'GET', cache: false, isArray: false },
};
$resource('api/jobs/:id/data_source/:dataSourceId/status', { id: '@id', dataSourceId: '@dataSourceId' }, actions).get(
{
id: this.job.id,
dataSourceId,
},
(statusResponse) => {
$timeout(() => this.refreshQueueStatus(dataSourceId), 10000);
this.queueStatus = statusResponse.data.num_tasks;
},
);
} else {
this.queueStatus = null;
}
}

getLink(queryId, fileType, apiKey) {
let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`;
if (apiKey) {
Expand Down Expand Up @@ -424,6 +445,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {

if ('job' in response) {
queryResult.refreshStatus(query, parameters);
queryResult.refreshQueueStatus(dataSourceId);
}
},
(error) => {
Expand Down
3 changes: 2 additions & 1 deletion redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
QueryTagsResource,
QueryRegenerateApiKeyResource)
from redash.handlers.query_results import (JobResource,
JobStatusResource,
QueryResultDropdownResource,
QueryDropdownsResource,
QueryResultListResource,
Expand Down Expand Up @@ -140,7 +141,7 @@ def json_representation(data, code, headers=None):
'/api/jobs/<job_id>',
'/api/queries/<query_id>/jobs/<job_id>',
endpoint='job')

api.add_org_resource(JobStatusResource, '/api/jobs/<job_id>/data_source/<data_source_id>/status', endpoint='job_queue_status')
api.add_org_resource(UserListResource, '/api/users', endpoint='users')
api.add_org_resource(UserResource, '/api/users/<user_id>', endpoint='user')
api.add_org_resource(UserInviteResource, '/api/users/<user_id>/invite', endpoint='user_invite')
Expand Down
16 changes: 16 additions & 0 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from redash.models.parameterized_query import (ParameterizedQuery, InvalidParameterError,
QueryDetachedFromDataSourceError, dropdown_values)
from redash.serializers import serialize_query_result, serialize_query_result_to_csv, serialize_query_result_to_xlsx
from redash.monitor import parse_tasks, get_waiting_in_queue
from redash.worker import celery


def error_response(message, http_status=400):
Expand Down Expand Up @@ -326,3 +328,17 @@ def delete(self, job_id):
"""
job = QueryTask(job_id=job_id)
job.cancel()


class JobStatusResource(BaseResource):
def get(self, job_id, data_source_id):
job = QueryTask(job_id=job_id)
job._async_result._get_task_meta()
data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
reserved_tasks = [
task
for task in parse_tasks(celery.control.inspect().reserved(), 'reserved')
if task['queue'] == data_source.queue_name
]
tasks = reserved_tasks + get_waiting_in_queue(data_source.queue_name)
return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)}
56 changes: 53 additions & 3 deletions tests/handlers/test_query_results.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import mock
from tests import BaseTestCase

from redash.models import db
from redash.utils import json_dumps
from redash.handlers.query_results import error_messages
from redash.utils import json_dumps


class TestQueryResultsCacheHeaders(BaseTestCase):
Expand Down Expand Up @@ -39,7 +40,6 @@ def test_get_existing_result(self):
self.assertEquals(query_result.id, rv.json['query_result']['id'])

def test_execute_new_query(self):
query_result = self.factory.create_query_result()
query = self.factory.create_query()

rv = self.make_request('post', '/api/query_results',
Expand All @@ -51,6 +51,56 @@ def test_execute_new_query(self):
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)

def test_queue_length(self):
query = self.factory.create_query()
tasks = []

def fake_all(*a, **kw):
return tasks

def enqueue_query(query, *a, **kw):
from redash.tasks.queries import enqueue_query
job = enqueue_query(query, *a, **kw)
tasks.append(dict(
state='waiting_in_queue',
task_name='test task',
worker=None,
worker_pid=None,
start_time=None,
task_id=job.id,
queue='queries',
))
return job

patch_all = mock.patch('redash.handlers.query_results.get_waiting_in_queue', fake_all)
patch_parse_tasks = mock.patch('redash.handlers.query_results.parse_tasks', lambda *_: [])
patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query',
enqueue_query)
db.session.commit()
with patch_all, patch_enqueue_query, patch_parse_tasks:
job0 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})
rv0 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format(
job0.json['job']['id'], self.factory.data_source.id))
job1 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})
rv1 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format(
job1.json['job']['id'], self.factory.data_source.id))
job2 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})
rv2 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format(
job2.json['job']['id'], self.factory.data_source.id))

self.assertEquals(rv0.json['num_tasks'], 1)
self.assertEquals(rv1.json['num_tasks'], 2)
self.assertEquals(rv2.json['num_tasks'], 3)

def test_execute_query_without_access(self):
group = self.factory.create_group()
db.session.commit()
Expand Down Expand Up @@ -144,7 +194,7 @@ def test_execute_new_query(self):

self.assertEquals(rv.status_code, 200)
self.assertIn('job', rv.json)

def test_execute_but_has_no_access_to_data_source(self):
ds = self.factory.create_data_source(group=self.factory.create_group())
query = self.factory.create_query(data_source=ds)
Expand Down

0 comments on commit e3f4b47

Please sign in to comment.