Skip to content

Commit

Permalink
Show number of queries in queue while waiting for execution (#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Short committed Mar 26, 2019
1 parent b0bc84a commit b7a34a4
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 7 deletions.
2 changes: 1 addition & 1 deletion client/app/pages/queries/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ <h3>
<rd-timer timestamp="queryResult.getUpdatedAt()"></rd-timer>
</div>
<div class="alert alert-info m-t-15" ng-show="queryResult.getStatus() == 'waiting'">
Query in queue&hellip;
Query in queue <span ng-if="queryResult && queryResult.queueStatus">(waiting on {{queryResult.queueStatus}} <ng-pluralize count=queryResult.queueStatus when="{'one': 'query', 'other': 'queries'}"></ng-pluralize>)</span>&hellip;
<rd-timer timestamp="queryResult.getUpdatedAt()"></rd-timer>
<button type="button" class="btn btn-warning btn-xs pull-right" ng-disabled="cancelling" ng-click="cancelExecution()">Cancel
</button>
Expand Down
16 changes: 15 additions & 1 deletion client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {
seriesCollection[seriesName].data.push(point);
}

function QueryResultService($resource, $timeout, $q, QueryResultError) {
function QueryResultService($http, $resource, $timeout, $q, QueryResultError) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
Expand Down Expand Up @@ -89,6 +89,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {

// extended status flags
this.isLoadingResult = false;
this.queueStatus = null;

if (props) {
this.update(props);
Expand Down Expand Up @@ -519,6 +520,18 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {
);
}

refreshQueueStatus(dataSourceId) {
if (this.getStatus() === 'waiting') {
const p = $http.get(`/api/queue_status/${this.job.id}?data_source=${dataSourceId}`);
p.then((response) => {
$timeout(() => this.refreshQueueStatus(dataSourceId), 10000);
this.queueStatus = response.data.num_tasks;
});
} else {
this.queueStatus = null;
}
}

getLink(queryId, fileType, apiKey) {
let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`;
if (apiKey) {
Expand Down Expand Up @@ -576,6 +589,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError) {

if ('job' in response) {
queryResult.refreshStatus(query);
queryResult.refreshQueueStatus(dataSourceId);
}
},
(error) => {
Expand Down
5 changes: 3 additions & 2 deletions redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
QueryResultDropdownResource,
QueryDropdownsResource,
QueryResultListResource,
QueryResultResource)
QueryResultResource,
QueueStatusResource)
from redash.handlers.query_snippets import (QuerySnippetListResource,
QuerySnippetResource)
from redash.handlers.settings import OrganizationSettings
Expand Down Expand Up @@ -133,7 +134,7 @@ def json_representation(data, code, headers=None):
'/api/queries/<query_id>/results/<query_result_id>.<filetype>',
endpoint='query_result')
api.add_org_resource(JobResource, '/api/jobs/<job_id>', endpoint='job')

api.add_org_resource(QueueStatusResource, '/api/queue_status/<job_id>', endpoint='2job_queue_status')
api.add_org_resource(UserListResource, '/api/users', endpoint='users')
api.add_org_resource(UserResource, '/api/users/<user_id>', endpoint='user')
api.add_org_resource(UserInviteResource, '/api/users/<user_id>/invite', endpoint='user_invite')
Expand Down
14 changes: 14 additions & 0 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from redash.tasks.queries import enqueue_query
from redash.utils import (collect_parameters_from_request, gen_query_hash, json_dumps, utcnow, to_filename)
from redash.models.parameterized_query import ParameterizedQuery, InvalidParameterError, dropdown_values
from redash.monitor import parse_tasks, get_waiting_in_queue
from redash.worker import celery


def error_response(message):
Expand Down Expand Up @@ -341,3 +343,15 @@ def delete(self, job_id):
"""
job = QueryTask(job_id=job_id)
job.cancel()


class QueueStatusResource(BaseResource):
def get(self, job_id):
job = QueryTask(job_id=job_id)
task_info = job._async_result._get_task_meta()

data_source_id = request.args.get('data_source')
data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
tasks = ([t for t in parse_tasks(celery.control.inspect().reserved(), 'reserved') if t['queue'] == data_source.queue_name] +
get_waiting_in_queue(data_source.queue_name))
return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)}
11 changes: 11 additions & 0 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from redash.query_runner import InterruptException
from redash.tasks.alerts import check_alerts_for_query
from redash.utils import gen_query_hash, json_dumps, json_loads, utcnow, mustache_render
from redash.monitor import get_waiting_in_queue
from redash.worker import celery

logger = get_task_logger(__name__)
Expand Down Expand Up @@ -71,12 +72,22 @@ def to_dict(self):
else:
query_result_id = None

queries_ahead = 0
if task_status == 'PENDING':
waiting = QueryTaskTracker.all(QueryTaskTracker.WAITING_LIST)
waiting_length = len(waiting)
# find our job's index, return number of queries after it
for i, waiting_task in enumerate(waiting):
if waiting_task.data['task_id'] == self._async_result.id:
queries_ahead = waiting_length - i - 1
break
return {
'id': self._async_result.id,
'updated_at': updated_at,
'status': status,
'error': error,
'query_result_id': query_result_id,
'queue_length': queries_ahead,
}

@property
Expand Down
45 changes: 42 additions & 3 deletions tests/handlers/test_query_results.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import mock
from tests import BaseTestCase

from redash.models import db
from redash.utils import json_dumps

from redash.utils import gen_query_hash, json_dumps

class TestQueryResultsCacheHeaders(BaseTestCase):
def test_uses_cache_headers_for_specific_result(self):
Expand Down Expand Up @@ -38,7 +38,6 @@ def test_get_existing_result(self):
self.assertEquals(query_result.id, rv.json['query_result']['id'])

def test_execute_new_query(self):
query_result = self.factory.create_query_result()
query = self.factory.create_query()

rv = self.make_request('post', '/api/query_results',
Expand All @@ -50,6 +49,46 @@ def test_execute_new_query(self):
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)

def test_queue_length(self):
query = self.factory.create_query()
tasks = []
def fake_all(*a, **kw):
return tasks
def enqueue_query(query, *a, **kw):
from redash.tasks.queries import enqueue_query
job = enqueue_query(query, *a, **kw)
tasks.append(dict(
state='waiting_in_queue',
task_name='test task',
worker=None,
worker_pid=None,
start_time=None,
task_id=job.id,
queue='queries',
))
return job
patch_all = mock.patch('redash.tasks.queries.get_waiting_in_queue', fake_all)
patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query',
enqueue_query)
with patch_all, patch_enqueue_query:
rv0 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})
rv1 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})
rv2 = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': query.query_text,
'max_age': 0})

self.assertEquals(rv0.json['job']['queue_length'], 0)
self.assertEquals(rv1.json['job']['queue_length'], 1)
self.assertEquals(rv2.json['job']['queue_length'], 2)


def test_execute_query_without_access(self):
group = self.factory.create_group()
db.session.commit()
Expand Down

0 comments on commit b7a34a4

Please sign in to comment.