Skip to content

Commit

Permalink
Progress bar in luigi visualizer (#2108)
Browse files Browse the repository at this point in the history
  • Loading branch information
fvinas authored and Tarrasch committed Aug 25, 2017
1 parent e5a2956 commit 5b80f54
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 12 deletions.
6 changes: 4 additions & 2 deletions doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ Task status tracking

For long-running or remote tasks it is convenient to see extended status information not only on
the command line or in your logs but also in the GUI of the central scheduler. Luigi implements
dynamic status messages and tracking urls which may point to an external monitoring system. You
can set this information using callbacks within Task.run_:
dynamic status messages, progress bar and tracking urls which may point to an external monitoring system.
You can set this information using callbacks within Task.run_:

.. code:: python
Expand All @@ -199,6 +199,8 @@ can set this information using callbacks within Task.run_:
# do some hard work here
if i % 10 == 0:
self.set_status_message("Progress: %d / 100" % i)
# displays a progress bar in the scheduler UI
self.set_progress_percentage(i)
.. _Events:
Expand Down
23 changes: 21 additions & 2 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def __eq__(self, other):

class Task(object):
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, tracking_url=None, status_message=None, retry_policy='notoptional'):
params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -301,6 +301,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
self.status_message = status_message
self.progress_percentage = progress_percentage
self.scheduler_disable_time = None
self.runnable = False
self.batchable = False
Expand Down Expand Up @@ -1224,7 +1225,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
'priority': task.priority,
'resources': task.resources,
'tracking_url': getattr(task, "tracking_url", None),
'status_message': getattr(task, "status_message", None)
'status_message': getattr(task, "status_message", None),
'progress_percentage': getattr(task, "progress_percentage", None)
}
if task.status == DISABLED:
ret['re_enable_able'] = task.scheduler_disable_time is not None
Expand Down Expand Up @@ -1483,6 +1485,23 @@ def get_task_status_message(self, task_id):
else:
return {"taskId": task_id, "statusMessage": ""}

@rpc_method()
def set_task_progress_percentage(self, task_id, progress_percentage):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
task.progress_percentage = progress_percentage
if task.status == RUNNING and task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.progress_percentage = progress_percentage

@rpc_method()
def get_task_progress_percentage(self, task_id):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
return {"taskId": task_id, "progressPercentage": task.progress_percentage}
else:
return {"taskId": task_id, "progressPercentage": None}

def _update_task_history(self, task, status, host=None):
try:
if status == DONE or status == FAILED:
Expand Down
10 changes: 9 additions & 1 deletion luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
{{#re_enable}}<a class="btn btn-warning btn-xs re-enable-button" title="Re-enable" data-toggle="tooltip" data-task-id="{{taskId}}">Re-enable</a>{{/re_enable}}
{{#trackingUrl}}<a target="_blank" href="{{trackingUrl}}" class="btn btn-primary btn-xs" title="Track Progress" data-toggle="tooltip"><i class="fa fa-eye"></i></a>{{/trackingUrl}}
{{#statusMessage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name={{displayName}}><i class="fa fa-comment"></i></button>{{/statusMessage}}
{{^statusMessage}}
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name={{displayName}}><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
</div>
</script>
<script type="text/template" name="errorTemplate">
Expand Down Expand Up @@ -63,9 +67,13 @@ <h4 class="modal-title" id="myModalLabel">Status message for {{displayName}}</h4
</div>
<div class="modal-body">
<pre class="pre-scrollable">{{statusMessage}}</pre>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="{{progressPercentage}}" aria-valuemin="0" aria-valuemax="100" style="min-width: 2em;">
{{progressPercentage}}%
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-info refresh"><i class="fa fa-refresh"></i> Refresh</button>
<button type="button" class="btn btn-default" data-dismiss="modal">Close</button>
</div>
</div>
Expand Down
6 changes: 6 additions & 0 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.getTaskProgressPercentage = function(taskId, callback) {
return jsonRPC(this.urlRoot + "/get_task_progress_percentage", {task_id: taskId}, function(response) {
callback(response.response);
});
};

LuigiAPI.prototype.getRunningTaskList = function(callback) {
return jsonRPC(this.urlRoot + "/task_list", {status: "RUNNING", upstream_status: "", search: searchTerm()}, function(response) {
callback(flatten(response.response));
Expand Down
34 changes: 28 additions & 6 deletions luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ function visualiserApp(luigi) {
graph: (task.status == "PENDING" || task.status == "RUNNING" || task.status == "DONE"),
error: task.status == "FAILED",
re_enable: task.status == "DISABLED" && task.re_enable_able,
statusMessage: task.status_message
statusMessage: task.status_message,
progressPercentage: task.progress_percentage
};
}

Expand Down Expand Up @@ -295,12 +296,33 @@ function visualiserApp(luigi) {

function showStatusMessage(data) {
$("#statusMessageModal").empty().append(renderTemplate("statusMessageTemplate", data));
$("#statusMessageModal .refresh").on('click', function() {
luigi.getTaskStatusMessage(data.taskId, function(data) {
$("#statusMessageModal pre").html(data.statusMessage);
});
}).trigger('click');
$("#statusMessageModal").modal({});
var refreshInterval = setInterval(function() {
if ($("#statusMessageModal").is(":hidden"))
clearInterval(refreshInterval)
else {
luigi.getTaskStatusMessage(data.taskId, function(data) {
if (data.statusMessage === null)
$("#statusMessageModal pre").hide()
else {
$("#statusMessageModal pre").html(data.statusMessage).show();
}
});
luigi.getTaskProgressPercentage(data.taskId, function(data) {
if (data.progressPercentage === null)
$("#statusMessageModal .progress").hide()
else {
$("#statusMessageModal .progress").show()
$("#statusMessageModal .progress-bar")
.attr('aria-valuenow', data.progressPercentage)
.text(data.progressPercentage + '%')
.css({'width': data.progressPercentage + '%'});
}
});
}
},
500
);
}

function preProcessGraph(dependencyGraph) {
Expand Down
3 changes: 2 additions & 1 deletion luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def __init__(self, *args, **kwargs):

self.set_tracking_url = None
self.set_status_message = None
self.set_progress_percentage = None

def initialized(self):
"""
Expand Down Expand Up @@ -681,7 +682,7 @@ def _dump(self):
pickle.dumps(self)
"""
unpicklable_properties = ('set_tracking_url', 'set_status_message')
unpicklable_properties = ('set_tracking_url', 'set_status_message', 'set_progress_percentage')
reserved_properties = {}
for property_name in unpicklable_properties:
if hasattr(self, property_name):
Expand Down
5 changes: 5 additions & 0 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ def __init__(self, task, worker_id, result_queue, status_reporter,
def _run_get_new_deps(self):
self.task.set_tracking_url = self.status_reporter.update_tracking_url
self.task.set_status_message = self.status_reporter.update_status
self.task.set_progress_percentage = self.status_reporter.update_progress_percentage

task_gen = self.task.run()

self.task.set_tracking_url = None
self.task.set_status_message = None
self.task.set_progress_percentage = None

if not isinstance(task_gen, types.GeneratorType):
return None
Expand Down Expand Up @@ -269,6 +271,9 @@ def update_tracking_url(self, tracking_url):
def update_status(self, message):
self._scheduler.set_task_status_message(self._task_id, message)

def update_progress_percentage(self, percentage):
self._scheduler.set_task_progress_percentage(self._task_id, percentage)


class SingleProcessPool(object):
"""
Expand Down
6 changes: 6 additions & 0 deletions test/scheduler_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,12 @@ def test_batch_update_status(self):
for task_id in ('A_1', 'A_2', 'A_1_2'):
self.assertEqual('test message', self.sch.get_task_status_message(task_id)['statusMessage'])

def test_batch_update_progress(self):
self._start_simple_batch()
self.sch.set_task_progress_percentage('A_1_2', 30)
for task_id in ('A_1', 'A_2', 'A_1_2'):
self.assertEqual(30, self.sch.get_task_progress_percentage(task_id)['progressPercentage'])

def test_batch_tracking_url(self):
self._start_simple_batch()
self.sch.add_task(worker=WORKER, task_id='A_1_2', tracking_url='http://test.tracking.url/')
Expand Down
38 changes: 38 additions & 0 deletions test/task_progress_percentage_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from helpers import LuigiTestCase

import luigi
import luigi.scheduler
import luigi.worker


class TaskProgressPercentageTest(LuigiTestCase):

def test_run(self):
sch = luigi.scheduler.Scheduler()
with luigi.worker.Worker(scheduler=sch) as w:
class MyTask(luigi.Task):
def run(self):
self.set_progress_percentage(30)

task = MyTask()
w.add(task)
w.run()

self.assertEqual(sch.get_task_progress_percentage(task.task_id)["progressPercentage"], 30)

0 comments on commit 5b80f54

Please sign in to comment.