diff --git a/client/app/pages/queries/query.html b/client/app/pages/queries/query.html index 2fa9db7e5a..8b4d17bc8d 100644 --- a/client/app/pages/queries/query.html +++ b/client/app/pages/queries/query.html @@ -214,6 +214,7 @@

Query in queue… + (waiting on {{queryResult.queueStatus}} ) diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js index a267cee050..6ec8f68601 100644 --- a/client/app/services/query-result.js +++ b/client/app/services/query-result.js @@ -74,6 +74,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { // extended status flags this.isLoadingResult = false; + this.queueStatus = null; if (props) { this.update(props); @@ -367,6 +368,26 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { ); } + refreshQueueStatus(dataSourceId) { + if (this.getStatus() === 'waiting') { + const actions = { + get: { method: 'GET', cache: false, isArray: false }, + }; + $resource('api/jobs/:id/data_source/:dataSourceId/status', { id: '@id', dataSourceId: '@dataSourceId' }, actions).get( + { + id: this.job.id, + dataSourceId, + }, + (statusResponse) => { + $timeout(() => this.refreshQueueStatus(dataSourceId), 10000); + this.queueStatus = statusResponse.data.num_tasks; + }, + ); + } else { + this.queueStatus = null; + } + } + getLink(queryId, fileType, apiKey) { let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`; if (apiKey) { @@ -424,6 +445,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { if ('job' in response) { queryResult.refreshStatus(query, parameters); + queryResult.refreshQueueStatus(dataSourceId); } }, (error) => { diff --git a/redash/handlers/api.py b/redash/handlers/api.py index 786de7622b..90ab2172e6 100644 --- a/redash/handlers/api.py +++ b/redash/handlers/api.py @@ -39,6 +39,7 @@ QueryTagsResource, QueryRegenerateApiKeyResource) from redash.handlers.query_results import (JobResource, + JobStatusResource, QueryResultDropdownResource, QueryDropdownsResource, QueryResultListResource, @@ -140,7 +141,7 @@ def json_representation(data, code, headers=None): '/api/jobs/', '/api/queries//jobs/', endpoint='job') - +api.add_org_resource(JobStatusResource, '/api/jobs//data_source//status', endpoint='job_queue_status') api.add_org_resource(UserListResource, '/api/users', endpoint='users') api.add_org_resource(UserResource, '/api/users/', endpoint='user') api.add_org_resource(UserInviteResource, '/api/users//invite', endpoint='user_invite') diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index 0899c0e7a5..cfb94cdaa8 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -13,6 +13,8 @@ from redash.utils import (collect_parameters_from_request, gen_query_hash, json_dumps, utcnow, to_filename) from redash.models.parameterized_query import ParameterizedQuery, InvalidParameterError, dropdown_values from redash.serializers import serialize_query_result_to_csv, serialize_query_result_to_xlsx +from redash.monitor import parse_tasks, get_waiting_in_queue +from redash.worker import celery def error_response(message): @@ -302,3 +304,17 @@ def delete(self, job_id): """ job = QueryTask(job_id=job_id) job.cancel() + + +class JobStatusResource(BaseResource): + def get(self, job_id, data_source_id): + job = QueryTask(job_id=job_id) + job._async_result._get_task_meta() + data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org) + reserved_tasks = [ + task + for task in parse_tasks(celery.control.inspect().reserved(), 'reserved') + if task['queue'] == data_source.queue_name + ] + tasks = reserved_tasks + get_waiting_in_queue(data_source.queue_name) + return {'queue_name': data_source.queue_name, 'num_tasks': len(tasks)} diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py index 8d9add7ec1..b69d752567 100644 --- a/tests/handlers/test_query_results.py +++ b/tests/handlers/test_query_results.py @@ -1,8 +1,8 @@ +import mock from tests import BaseTestCase from redash.models import db -from redash.utils import json_dumps - +from redash.utils import gen_query_hash, json_dumps class TestQueryResultsCacheHeaders(BaseTestCase): def test_uses_cache_headers_for_specific_result(self): @@ -38,7 +38,6 @@ def test_get_existing_result(self): self.assertEquals(query_result.id, rv.json['query_result']['id']) def test_execute_new_query(self): - query_result = self.factory.create_query_result() query = self.factory.create_query() rv = self.make_request('post', '/api/query_results', @@ -50,6 +49,56 @@ def test_execute_new_query(self): self.assertNotIn('query_result', rv.json) self.assertIn('job', rv.json) + def test_queue_length(self): + query = self.factory.create_query() + tasks = [] + + def fake_all(*a, **kw): + return tasks + + def enqueue_query(query, *a, **kw): + from redash.tasks.queries import enqueue_query + job = enqueue_query(query, *a, **kw) + tasks.append(dict( + state='waiting_in_queue', + task_name='test task', + worker=None, + worker_pid=None, + start_time=None, + task_id=job.id, + queue='queries', + )) + return job + + patch_all = mock.patch('redash.handlers.query_results.get_waiting_in_queue', fake_all) + patch_parse_tasks = mock.patch('redash.handlers.query_results.parse_tasks', lambda *_: []) + patch_enqueue_query = mock.patch('redash.handlers.query_results.enqueue_query', + enqueue_query) + db.session.commit() + with patch_all, patch_enqueue_query, patch_parse_tasks: + job0 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv0 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job0.json['job']['id'], self.factory.data_source.id)) + job1 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv1 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job1.json['job']['id'], self.factory.data_source.id)) + job2 = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': query.query_text, + 'max_age': 0}) + rv2 = self.make_request('get', '/api/jobs/{}/data_source/{}/status'.format( + job2.json['job']['id'], self.factory.data_source.id)) + + self.assertEquals(rv0.json['num_tasks'], 1) + self.assertEquals(rv1.json['num_tasks'], 2) + self.assertEquals(rv2.json['num_tasks'], 3) + def test_execute_query_without_access(self): group = self.factory.create_group() db.session.commit()