From 8027fb543f632a0472e853aba5bd690974184b88 Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Fri, 20 Mar 2020 11:40:47 -0400 Subject: [PATCH] feat: update plugin refresh to also update active/inactive state --- synse_server/plugin.py | 44 +++++++++++++++++++-- tests/unit/test_plugin.py | 82 +++++++++++++++++++++++++++++++++++---- 2 files changed, 115 insertions(+), 11 deletions(-) diff --git a/synse_server/plugin.py b/synse_server/plugin.py index 2e19cf15..62b01816 100644 --- a/synse_server/plugin.py +++ b/synse_server/plugin.py @@ -221,9 +221,8 @@ def refresh(self) -> None: """Refresh the manager's tracked plugin state. This refreshes plugin state by checking if any new plugins are available - to Synse Server. Note that other than the case for new plugin registration, - this does not update the active/inactive state of a plugin. That behavior - is tied to the plugin itself. + to Synse Server. Once any plugins are added or disabled, it will also + update the active/inactive state of each of the enabled plugins. Refresh does not re-load plugins from configuration. That is done on initialization. New plugins may only be added at runtime via plugin @@ -288,6 +287,10 @@ def refresh(self) -> None: finally: self.is_refreshing = False + # Now, ensure that all enabled plugins have their active/inactive state refreshed. + for p in self.plugins.values(): + p.refresh_state() + logger.debug( 'plugin manager refresh complete', plugin_count=len(self.plugins), @@ -404,7 +407,7 @@ async def _reconnect(self): bo = backoff.ExponentialBackoff() while True: - logger.debug('plugin reconnect task: attempting reconnect') + _l.debug('plugin reconnect task: attempting reconnect') try: self.client.test() except Exception as ex: @@ -424,6 +427,39 @@ async def _reconnect(self): _l.debug('plugin reconnect task: waiting until next retry', delay=delay) await asyncio.sleep(delay) + def refresh_state(self): + """Refresh the state of the plugin. + + When a plugin becomes inactive, it will start a task to periodically retry + establishing a connection to the plugin with exponential backoff. This can be + good at automated recovery, especially for quick intermittent errors. It + becomes less useful when there is plugin maintenance or some other longer + window of downtime, as the exponential backoff will take a while to re-establish + a connection. + + To alleviate this potential use case, this function performs the same state + refresh for the plugin, but it is executed manually via a call to the + `/plugin?refresh=true`. Setting refresh to true for the endpoint will both + ensure that Synse Server refreshes the state of known plugins, and that it + also refreshes the state of each existing individual plugin to ensure that + its active/inactive state is up-to-date. + """ + _l = logger.bind(plugin=self.id) + _l.info('refreshing plugin state') + + if self.disabled: + _l.info('plugin is disabled, will not refresh') + return + + try: + self.client.test() + except Exception as ex: + _l.debug('plugin refresh: failed to connect to plugin', errror=ex) + self.mark_inactive() + else: + _l.debug('plugin refresh: successfully connected to plugin') + self.mark_active() + def is_ready(self): """Check whether the plugin is ready to communicate with. diff --git a/tests/unit/test_plugin.py b/tests/unit/test_plugin.py index a1fd1dff..2d48240b 100644 --- a/tests/unit/test_plugin.py +++ b/tests/unit/test_plugin.py @@ -410,57 +410,71 @@ def test_refresh_no_addresses(self): @mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')]) @mock.patch('synse_server.plugin.PluginManager.register') - def test_refresh_loaded_ok(self, mock_register, mock_load): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_loaded_ok(self, mock_refresh, mock_register, mock_load): m = plugin.PluginManager() m.refresh() mock_load.assert_called_once() mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp') + # empty because register is mocked, so nothing gets added to manager + mock_refresh.assert_has_calls([]) @mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')]) @mock.patch('synse_server.plugin.PluginManager.register', side_effect=ValueError) - def test_refresh_loaded_fail(self, mock_register, mock_load): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_loaded_fail(self, mock_refresh, mock_register, mock_load): m = plugin.PluginManager() m.refresh() mock_load.assert_called_once() mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp') + mock_refresh.assert_has_calls([]) @mock.patch( 'synse_server.plugin.PluginManager.discover', return_value=[('localhost:5001', 'tcp')], ) @mock.patch('synse_server.plugin.PluginManager.register') - def test_refresh_discover_ok(self, mock_register, mock_discover): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_discover_ok(self, mock_refresh, mock_register, mock_discover): m = plugin.PluginManager() m.refresh() mock_discover.assert_called_once() mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp') + # empty because register is mocked, so nothing gets added to manager + mock_refresh.assert_has_calls([]) @mock.patch( 'synse_server.plugin.PluginManager.discover', return_value=[('localhost:5001', 'tcp')], ) @mock.patch('synse_server.plugin.PluginManager.register', side_effect=ValueError) - def test_refresh_discover_fail(self, mock_register, mock_discover): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_discover_fail(self, mock_refresh, mock_register, mock_discover): m = plugin.PluginManager() m.refresh() mock_discover.assert_called_once() mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp') + mock_refresh.assert_has_calls([]) @mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')]) @mock.patch('synse_server.plugin.PluginManager.register') - def test_refresh_new_plugin(self, register_mock, load_mock): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_new_plugin(self, mock_refresh, register_mock, load_mock): m = plugin.PluginManager() m.refresh() load_mock.assert_called_once() register_mock.assert_called_once_with(address='localhost:5001', protocol='tcp') + # empty because register is mocked, so nothing gets added to manager + mock_refresh.assert_has_calls([]) @mock.patch('synse_server.plugin.PluginManager.load', return_value=[]) - def test_refresh_removed_plugin(self, load_mock, simple_plugin): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_removed_plugin(self, mock_refresh, load_mock, simple_plugin): m = plugin.PluginManager() m.plugins[simple_plugin.id] = simple_plugin simple_plugin.cancel_tasks = mock.MagicMock() @@ -471,9 +485,11 @@ def test_refresh_removed_plugin(self, load_mock, simple_plugin): assert simple_plugin.disabled is True load_mock.assert_called_once() simple_plugin.cancel_tasks.assert_called_once() + mock_refresh.assert_has_calls([]) @mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5432', 'tcp')]) - def test_refresh_existing_plugin(self, load_mock, simple_plugin): + @mock.patch('synse_server.plugin.Plugin.refresh_state') + def test_refresh_existing_plugin(self, mock_refresh, load_mock, simple_plugin): m = plugin.PluginManager() m.plugins[simple_plugin.id] = simple_plugin simple_plugin.disabled = True @@ -482,6 +498,7 @@ def test_refresh_existing_plugin(self, load_mock, simple_plugin): assert simple_plugin.disabled is False load_mock.assert_called_once() + mock_refresh.assert_has_calls([]) class TestPlugin: @@ -679,3 +696,54 @@ async def test_reconnect_with_retries(self, test_mock, delay_mock): test_mock.assert_has_calls([ mock.call(), mock.call(), mock.call(), ]) + + @mock.patch('synse_grpc.client.PluginClientV3.test') + def test_refresh_state(self, test_mock): + p = plugin.Plugin( + client=client.PluginClientV3('localhost:5001', 'tcp'), + info={'tag': 'test/foo', 'id': '123'}, + version={}, + ) + + p.disabled = False + p.active = False + + p.refresh_state() + + assert p.disabled is False + assert p.active is True + test_mock.assert_called_once() + + @mock.patch('synse_grpc.client.PluginClientV3.test') + def test_refresh_state_plugin_disabled(self, test_mock): + p = plugin.Plugin( + client=client.PluginClientV3('localhost:5001', 'tcp'), + info={'tag': 'test/foo', 'id': '123'}, + version={}, + ) + + p.disabled = True + p.active = False + + p.refresh_state() + + assert p.disabled is True + assert p.active is False + test_mock.assert_not_called() + + @mock.patch('synse_grpc.client.PluginClientV3.test', side_effect=ValueError()) + def test_refresh_state_fails_refresh(self, test_mock): + p = plugin.Plugin( + client=client.PluginClientV3('localhost:5001', 'tcp'), + info={'tag': 'test/foo', 'id': '123'}, + version={}, + ) + + p.disabled = False + p.active = True + + p.refresh_state() + + assert p.disabled is False + assert p.active is False + test_mock.assert_called_once()