Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update plugin refresh to also update active/inactive state #381

Merged
merged 1 commit into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions synse_server/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ def refresh(self) -> None:
"""Refresh the manager's tracked plugin state.

This refreshes plugin state by checking if any new plugins are available
to Synse Server. Note that other than the case for new plugin registration,
this does not update the active/inactive state of a plugin. That behavior
is tied to the plugin itself.
to Synse Server. Once any plugins are added or disabled, it will also
update the active/inactive state of each of the enabled plugins.

Refresh does not re-load plugins from configuration. That is done on
initialization. New plugins may only be added at runtime via plugin
Expand Down Expand Up @@ -288,6 +287,10 @@ def refresh(self) -> None:
finally:
self.is_refreshing = False

# Now, ensure that all enabled plugins have their active/inactive state refreshed.
for p in self.plugins.values():
p.refresh_state()

logger.debug(
'plugin manager refresh complete',
plugin_count=len(self.plugins),
Expand Down Expand Up @@ -404,7 +407,7 @@ async def _reconnect(self):
bo = backoff.ExponentialBackoff()

while True:
logger.debug('plugin reconnect task: attempting reconnect')
_l.debug('plugin reconnect task: attempting reconnect')
try:
self.client.test()
except Exception as ex:
Expand All @@ -424,6 +427,39 @@ async def _reconnect(self):
_l.debug('plugin reconnect task: waiting until next retry', delay=delay)
await asyncio.sleep(delay)

def refresh_state(self):
"""Refresh the state of the plugin.

When a plugin becomes inactive, it will start a task to periodically retry
establishing a connection to the plugin with exponential backoff. This can be
good at automated recovery, especially for quick intermittent errors. It
becomes less useful when there is plugin maintenance or some other longer
window of downtime, as the exponential backoff will take a while to re-establish
a connection.

To alleviate this potential use case, this function performs the same state
refresh for the plugin, but it is executed manually via a call to the
`/plugin?refresh=true`. Setting refresh to true for the endpoint will both
ensure that Synse Server refreshes the state of known plugins, and that it
also refreshes the state of each existing individual plugin to ensure that
its active/inactive state is up-to-date.
"""
_l = logger.bind(plugin=self.id)
_l.info('refreshing plugin state')

if self.disabled:
_l.info('plugin is disabled, will not refresh')
return

try:
self.client.test()
except Exception as ex:
_l.debug('plugin refresh: failed to connect to plugin', errror=ex)
self.mark_inactive()
else:
_l.debug('plugin refresh: successfully connected to plugin')
self.mark_active()

def is_ready(self):
"""Check whether the plugin is ready to communicate with.

Expand Down
82 changes: 75 additions & 7 deletions tests/unit/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,57 +410,71 @@ def test_refresh_no_addresses(self):

@mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')])
@mock.patch('synse_server.plugin.PluginManager.register')
def test_refresh_loaded_ok(self, mock_register, mock_load):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_loaded_ok(self, mock_refresh, mock_register, mock_load):
m = plugin.PluginManager()
m.refresh()

mock_load.assert_called_once()
mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp')
# empty because register is mocked, so nothing gets added to manager
mock_refresh.assert_has_calls([])

@mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')])
@mock.patch('synse_server.plugin.PluginManager.register', side_effect=ValueError)
def test_refresh_loaded_fail(self, mock_register, mock_load):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_loaded_fail(self, mock_refresh, mock_register, mock_load):
m = plugin.PluginManager()
m.refresh()

mock_load.assert_called_once()
mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp')
mock_refresh.assert_has_calls([])

@mock.patch(
'synse_server.plugin.PluginManager.discover',
return_value=[('localhost:5001', 'tcp')],
)
@mock.patch('synse_server.plugin.PluginManager.register')
def test_refresh_discover_ok(self, mock_register, mock_discover):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_discover_ok(self, mock_refresh, mock_register, mock_discover):
m = plugin.PluginManager()
m.refresh()

mock_discover.assert_called_once()
mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp')
# empty because register is mocked, so nothing gets added to manager
mock_refresh.assert_has_calls([])

@mock.patch(
'synse_server.plugin.PluginManager.discover',
return_value=[('localhost:5001', 'tcp')],
)
@mock.patch('synse_server.plugin.PluginManager.register', side_effect=ValueError)
def test_refresh_discover_fail(self, mock_register, mock_discover):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_discover_fail(self, mock_refresh, mock_register, mock_discover):
m = plugin.PluginManager()
m.refresh()

mock_discover.assert_called_once()
mock_register.assert_called_once_with(address='localhost:5001', protocol='tcp')
mock_refresh.assert_has_calls([])

@mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5001', 'tcp')])
@mock.patch('synse_server.plugin.PluginManager.register')
def test_refresh_new_plugin(self, register_mock, load_mock):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_new_plugin(self, mock_refresh, register_mock, load_mock):
m = plugin.PluginManager()
m.refresh()

load_mock.assert_called_once()
register_mock.assert_called_once_with(address='localhost:5001', protocol='tcp')
# empty because register is mocked, so nothing gets added to manager
mock_refresh.assert_has_calls([])

@mock.patch('synse_server.plugin.PluginManager.load', return_value=[])
def test_refresh_removed_plugin(self, load_mock, simple_plugin):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_removed_plugin(self, mock_refresh, load_mock, simple_plugin):
m = plugin.PluginManager()
m.plugins[simple_plugin.id] = simple_plugin
simple_plugin.cancel_tasks = mock.MagicMock()
Expand All @@ -471,9 +485,11 @@ def test_refresh_removed_plugin(self, load_mock, simple_plugin):
assert simple_plugin.disabled is True
load_mock.assert_called_once()
simple_plugin.cancel_tasks.assert_called_once()
mock_refresh.assert_has_calls([])

@mock.patch('synse_server.plugin.PluginManager.load', return_value=[('localhost:5432', 'tcp')])
def test_refresh_existing_plugin(self, load_mock, simple_plugin):
@mock.patch('synse_server.plugin.Plugin.refresh_state')
def test_refresh_existing_plugin(self, mock_refresh, load_mock, simple_plugin):
m = plugin.PluginManager()
m.plugins[simple_plugin.id] = simple_plugin
simple_plugin.disabled = True
Expand All @@ -482,6 +498,7 @@ def test_refresh_existing_plugin(self, load_mock, simple_plugin):

assert simple_plugin.disabled is False
load_mock.assert_called_once()
mock_refresh.assert_has_calls([])


class TestPlugin:
Expand Down Expand Up @@ -679,3 +696,54 @@ async def test_reconnect_with_retries(self, test_mock, delay_mock):
test_mock.assert_has_calls([
mock.call(), mock.call(), mock.call(),
])

@mock.patch('synse_grpc.client.PluginClientV3.test')
def test_refresh_state(self, test_mock):
p = plugin.Plugin(
client=client.PluginClientV3('localhost:5001', 'tcp'),
info={'tag': 'test/foo', 'id': '123'},
version={},
)

p.disabled = False
p.active = False

p.refresh_state()

assert p.disabled is False
assert p.active is True
test_mock.assert_called_once()

@mock.patch('synse_grpc.client.PluginClientV3.test')
def test_refresh_state_plugin_disabled(self, test_mock):
p = plugin.Plugin(
client=client.PluginClientV3('localhost:5001', 'tcp'),
info={'tag': 'test/foo', 'id': '123'},
version={},
)

p.disabled = True
p.active = False

p.refresh_state()

assert p.disabled is True
assert p.active is False
test_mock.assert_not_called()

@mock.patch('synse_grpc.client.PluginClientV3.test', side_effect=ValueError())
def test_refresh_state_fails_refresh(self, test_mock):
p = plugin.Plugin(
client=client.PluginClientV3('localhost:5001', 'tcp'),
info={'tag': 'test/foo', 'id': '123'},
version={},
)

p.disabled = False
p.active = True

p.refresh_state()

assert p.disabled is False
assert p.active is False
test_mock.assert_called_once()