Skip to content

Commit

Permalink
Add periodic state save. (#1028)
Browse files Browse the repository at this point in the history
* Add periodic state save.

* Fixing ci pipeline.

* Rename "timeout" to "interval".

Co-authored-by: Anton <polovikhin@fas.gov.ru>
  • Loading branch information
Tohman21 and Anton authored Nov 23, 2020
1 parent e71b7ff commit 1825d04
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
9 changes: 9 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ port

Run the http server on a given port (by default, `port=5555`)

.. _state_save_interval:

state_save_interval
~~~~~~~~~~~~~~~~~~

Sets the interval for saving state. state_save_interval=0 means
that periodic saving is disabled (by default, `state_save_interval=0`
in milliseconds)

.. _xheaders:

xheaders
Expand Down
5 changes: 3 additions & 2 deletions flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, options=None, capp=None, events=None,
self.capp,
db=self.options.db,
persistent=self.options.persistent,
state_save_interval=self.options.state_save_interval,
enable_events=self.options.enable_events,
io_loop=self.io_loop,
max_workers_in_memory=self.options.max_workers,
Expand All @@ -87,9 +88,9 @@ def start(self):
def stop(self):
if self.started:
self.events.stop()
logging.debug("Stoppping executors...")
logging.debug("Stopping executors...")
self.executor.shutdown(wait=False)
logging.debug("Stoppping event loop...")
logging.debug("Stopping event loop...")
self.io_loop.stop()
self.started = False

Expand Down
27 changes: 22 additions & 5 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class Events(threading.Thread):
events_enable_interval = 5000

def __init__(self, capp, db=None, persistent=False,
enable_events=True, io_loop=None, **kwargs):
enable_events=True, io_loop=None, state_save_interval=0,
**kwargs):
threading.Thread.__init__(self)
self.daemon = True

Expand All @@ -80,6 +81,7 @@ def __init__(self, capp, db=None, persistent=False,
self.persistent = persistent
self.enable_events = enable_events
self.state = None
self.state_save_timer = None

if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
Expand All @@ -88,6 +90,10 @@ def __init__(self, capp, db=None, persistent=False,
self.state = state['events']
state.close()

if state_save_interval:
self.state_save_timer = PeriodicCallback(self.save_state,
state_save_interval)

if not self.state:
self.state = EventsState(**kwargs)

Expand All @@ -100,16 +106,21 @@ def start(self):
logger.debug("Starting enable events timer...")
self.timer.start()

if self.state_save_timer:
logger.debug("Starting state save timer...")
self.state_save_timer.start()

def stop(self):
if self.enable_events:
logger.debug("Stopping enable events timer...")
self.timer.stop()

if self.state_save_timer:
logger.debug("Stopping state save timer...")
self.state_save_timer.stop()

if self.persistent:
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db)
state['events'] = self.state
state.close()
self.save_state()

def run(self):
try_interval = 1
Expand Down Expand Up @@ -137,6 +148,12 @@ def run(self):
logger.debug(e, exc_info=True)
time.sleep(try_interval)

def save_state(self):
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db)
state['events'] = self.state
state.close()

def on_enable_events(self):
# Periodically enable events for workers
# launched after flower
Expand Down
2 changes: 2 additions & 0 deletions flower/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
help="flower database file")
define("persistent", type=bool, default=False,
help="enable persistent mode")
define("state_save_interval", type=int, default=0,
help="state save interval (in milliseconds)")
define("broker_api", type=str, default=None,
help="inspect broker e.g. http://guest:guest@localhost:15672/api/")
define("ca_certs", type=str, default=None,
Expand Down
1 change: 1 addition & 0 deletions requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ celery>=3.1.0; python_version<"3.7"
celery>=4.3.0; python_version>="3.7"
vine==1.3.0
tornado>=5.0.0,<7.0.0; python_version>="3.5.2"
vine==1.3.0
prometheus_client==0.8.0
humanize
pytz

0 comments on commit 1825d04

Please sign in to comment.