Skip to content

Commit

Permalink
Refactor plugin architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
cburgdorf committed Oct 9, 2018
1 parent 2c2c70c commit e879f92
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 124 deletions.
22 changes: 4 additions & 18 deletions trinity/extensibility/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
Type,
TYPE_CHECKING,
)
from argparse import (
Namespace,
)

from trinity.config import (
TrinityConfig,
from lahja import (
BaseEvent,
)


Expand All @@ -18,23 +15,12 @@
)


class BaseEvent:
"""
The base class for all plugin events. Plugin events can be broadcasted for all different
kind of reasons. Plugins can act based on these events and consume the events even before
the plugin is started, giving plugins the chance to start based on an event or a series of
events. The startup of Trinity itself can be an event as well as the start of a plugin itself
which, for instance, gives other plugins the chance to start based on these previous events.
"""
pass


class PluginStartedEvent(BaseEvent):
"""
Broadcasted when a plugin was started
"""
def __init__(self, plugin: 'BasePlugin') -> None:
self.plugin = plugin
def __init__(self, plugin_type: Type['BasePlugin']) -> None:
self.plugin_type = plugin_type


class ResourceAvailableEvent(BaseEvent):
Expand Down
8 changes: 8 additions & 0 deletions trinity/extensibility/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,11 @@ class UnsuitableShutdownError(BaseTrinityError):
``PluginManager`` instance that operates in the ``SharedProcessScope``.
"""
pass


class EventBusNotReady(BaseTrinityError):
"""
Raised when a plugin tried to access an EventBus before the plugin
had received its ``ready`` call.
"""
pass
55 changes: 35 additions & 20 deletions trinity/extensibility/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
MAIN_EVENTBUS_ENDPOINT
)
from trinity.events import (
ShutdownRequest
ShutdownRequest,
)
from trinity.extensibility.events import (
BaseEvent
BaseEvent,
PluginStartedEvent,
)
from trinity.extensibility.exceptions import (
EventBusNotReady,
)
from trinity.utils.ipc import (
kill_process_gracefully
Expand Down Expand Up @@ -70,6 +74,7 @@ def shutdown_host(self) -> None:
class BasePlugin(ABC):

context: PluginContext = None
running: bool = False

@property
@abstractmethod
Expand All @@ -85,35 +90,41 @@ def name(self) -> str:
def logger(self) -> logging.Logger:
return logging.getLogger('trinity.extensibility.plugin.BasePlugin#{0}'.format(self.name))

@property
def event_bus(self) -> Endpoint:
if self.context is None:
raise EventBusNotReady("Tried accessing ``event_bus`` before ``ready`` was called")

return self.context.event_bus

def set_context(self, context: PluginContext) -> None:
"""
Set the :class:`~trinity.extensibility.plugin.PluginContext` for this plugin.
"""
self.context = context

def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAction) -> None:
def ready(self) -> None:
"""
Called at startup, giving the plugin a chance to amend the Trinity CLI argument parser
Called after the plugin received its context and is ready to bootstrap itself.
"""
pass

def handle_event(self, activation_event: BaseEvent) -> None:
def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAction) -> None:
"""
Notify the plugin about an event, giving it the chance to do internal accounting right
before :meth:`~trinity.extensibility.plugin.BasePlugin.should_start` is called
Called at startup, giving the plugin a chance to amend the Trinity CLI argument parser
"""

pass

def should_start(self) -> bool:
def handle_start(self) -> None:
"""
Return ``True`` if the plugin should start, otherwise return ``False``
Prepare the plugin to get started and eventually cause ``start`` to get called.
"""

return False

def _start(self) -> None:
self.running = True
self.start()
self.event_bus.broadcast(
PluginStartedEvent(type(self))
)
self.logger.info("Plugin started: %s", self.name)

def start(self) -> None:
"""
Expand Down Expand Up @@ -159,18 +170,26 @@ class BaseIsolatedPlugin(BaseSyncStopPlugin):

_process: Process = None

def _start(self) -> None:
def handle_start(self) -> None:
"""
Prepare the plugin to get started and eventually cause ``start`` to get called.
"""
self.running = True
self._process = ctx.Process(
target=self._prepare_start,
)

self._process.start()
self.logger.info("Plugin started: %s", self.name)

def _prepare_start(self) -> None:
log_queue = self.context.boot_kwargs['log_queue']
level = self.context.boot_kwargs.get('log_level', logging.INFO)
setup_queue_logging(log_queue, level)

self.event_bus.connect_no_wait()
self.event_bus.broadcast(
PluginStartedEvent(type(self))
)
self.start()

def stop(self) -> None:
Expand All @@ -193,10 +212,6 @@ def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAct
def handle_event(self, activation_event: BaseEvent) -> None:
self.logger.info("Debug plugin: handle_event called: %s", activation_event)

def should_start(self) -> bool:
self.logger.info("Debug plugin: should_start called")
return True

def start(self) -> None:
self.logger.info("Debug plugin: start called")
asyncio.ensure_future(self.count_forever())
Expand Down
44 changes: 7 additions & 37 deletions trinity/extensibility/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
from trinity.config import (
TrinityConfig
)
from trinity.extensibility.events import (
BaseEvent,
PluginStartedEvent,
)
from trinity.extensibility.exceptions import (
UnsuitableShutdownError,
)
Expand Down Expand Up @@ -136,7 +132,6 @@ class PluginManager:
def __init__(self, scope: BaseManagerProcessScope) -> None:
self._scope = scope
self._plugin_store: List[BasePlugin] = []
self._started_plugins: List[BasePlugin] = []
self._logger = logging.getLogger("trinity.extensibility.plugin_manager.PluginManager")

@property
Expand Down Expand Up @@ -165,34 +160,6 @@ def amend_argparser_config(self,
for plugin in self._plugin_store:
plugin.configure_parser(arg_parser, subparser)

def broadcast(self, event: BaseEvent, exclude: BasePlugin = None) -> None:
"""
Notify every registered :class:`~trinity.extensibility.plugin.BasePlugin` about an
event and check whether the plugin wants to start based on that event.
If a plugin gets started it will cause a
:class:`~trinity.extensibility.events.PluginStartedEvent` to get
broadcasted to all other plugins, giving them the chance to start based on that.
"""
for plugin in self._plugin_store:

if plugin is exclude or not self._scope.is_responsible_for_plugin(plugin):
self._logger.debug("Skipping plugin %s (not responsible)", plugin.name)
continue

plugin.handle_event(event)

if plugin in self._started_plugins:
continue

if not plugin.should_start():
continue

plugin._start()
self._started_plugins.append(plugin)
self._logger.info("Plugin started: %s", plugin.name)
self.broadcast(PluginStartedEvent(plugin), plugin)

def prepare(self,
args: Namespace,
trinity_config: TrinityConfig,
Expand All @@ -208,6 +175,7 @@ def prepare(self,

context = self._scope.create_plugin_context(plugin, args, trinity_config, boot_kwargs)
plugin.set_context(context)
plugin.ready()

def shutdown_blocking(self) -> None:
"""
Expand All @@ -219,14 +187,15 @@ def shutdown_blocking(self) -> None:

self._logger.info("Shutting down PluginManager with scope %s", type(self._scope))

for plugin in self._started_plugins:
for plugin in self._plugin_store:

if not isinstance(plugin, BaseSyncStopPlugin):
if not isinstance(plugin, BaseSyncStopPlugin) or not plugin.running:
continue

try:
self._logger.info("Stopping plugin: %s", plugin.name)
plugin.stop()
plugin.running = False
self._logger.info("Successfully stopped plugin: %s", plugin.name)
except Exception:
self._logger.exception("Exception thrown while stopping plugin %s", plugin.name)
Expand All @@ -241,8 +210,8 @@ async def shutdown(self) -> None:
self._logger.info("Shutting down PluginManager with scope %s", type(self._scope))

async_plugins = [
plugin for plugin in self._started_plugins
if isinstance(plugin, BaseAsyncStopPlugin)
plugin for plugin in self._plugin_store
if isinstance(plugin, BaseAsyncStopPlugin) and plugin.running
]

stop_results = await asyncio.gather(
Expand All @@ -255,6 +224,7 @@ async def shutdown(self) -> None:
'Exception thrown while stopping plugin %s: %s', plugin.name, result
)
else:
plugin.running = False
self._logger.info("Successfully stopped plugin: %s", plugin.name)

def _stop_plugins(self,
Expand Down
6 changes: 3 additions & 3 deletions trinity/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
def main() -> None:
event_bus = EventBus(ctx)
main_endpoint = event_bus.create_endpoint(MAIN_EVENTBUS_ENDPOINT)
main_endpoint.connect()
main_endpoint.connect_no_wait()

plugin_manager = setup_plugins(
MainAndIsolatedProcessScope(event_bus, main_endpoint)
Expand Down Expand Up @@ -343,7 +343,7 @@ def _sigint_handler(*args: Any) -> None:
def launch_node(args: Namespace, trinity_config: TrinityConfig, endpoint: Endpoint) -> None:
with trinity_config.process_id_file('networking'):

endpoint.connect()
endpoint.connect_no_wait()

NodeClass = trinity_config.node_class
# Temporary hack: We setup a second instance of the PluginManager.
Expand All @@ -356,7 +356,7 @@ def launch_node(args: Namespace, trinity_config: TrinityConfig, endpoint: Endpoi
plugin_manager = setup_plugins(SharedProcessScope(endpoint))
plugin_manager.prepare(args, trinity_config)

node = NodeClass(plugin_manager, trinity_config)
node = NodeClass(endpoint, trinity_config)
loop = node.get_event_loop()
asyncio.ensure_future(handle_networking_exit(node, plugin_manager, endpoint), loop=loop)
asyncio.ensure_future(node.run(), loop=loop)
Expand Down
38 changes: 24 additions & 14 deletions trinity/nodes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
Type,
)

from lahja import (
Endpoint,
BroadcastConfig,
)

from eth.chains.base import BaseChain

from p2p.peer import BasePeerPool
Expand All @@ -23,9 +28,6 @@
from trinity.config import (
TrinityConfig,
)
from trinity.extensibility import (
PluginManager,
)
from trinity.extensibility.events import (
ResourceAvailableEvent
)
Expand All @@ -42,17 +44,16 @@ class Node(BaseService):
"""
chain_class: Type[BaseChain] = None

def __init__(self, plugin_manager: PluginManager, trinity_config: TrinityConfig) -> None:
def __init__(self, event_bus: Endpoint, trinity_config: TrinityConfig) -> None:
super().__init__()
self._plugin_manager = plugin_manager
self._db_manager = create_db_manager(trinity_config.database_ipc_path)
self._db_manager.connect() # type: ignore
self._headerdb = self._db_manager.get_headerdb() # type: ignore

self._jsonrpc_ipc_path: Path = trinity_config.jsonrpc_ipc_path
self._network_id = trinity_config.network_id

self.event_bus = plugin_manager.event_bus_endpoint
self.event_bus = event_bus

async def handle_network_id_requests(self) -> None:
async def f() -> None:
Expand Down Expand Up @@ -103,18 +104,27 @@ def notify_resource_available(self) -> None:
# as the `PeerPool` is available. In the long term, the peer pool may become
# a plugin itself and we can get rid of this.
peer_pool = self.get_peer_pool()
self._plugin_manager.broadcast(ResourceAvailableEvent(
resource=(peer_pool, self.cancel_token),
resource_type=type(peer_pool)
))

self.event_bus.broadcast(
ResourceAvailableEvent(
resource=(peer_pool, self.cancel_token),
resource_type=type(peer_pool)
),
BroadcastConfig(internal=True),
)

# This broadcasts the *local* chain, which is suited for tasks that aren't blocking
# for too long. There may be value in also broadcasting the proxied chain.
self._plugin_manager.broadcast(ResourceAvailableEvent(
resource=self.get_chain(),
resource_type=BaseChain
))
self.event_bus.broadcast(
ResourceAvailableEvent(
resource=self.get_chain(),
resource_type=BaseChain
),
BroadcastConfig(internal=True),
)

async def _run(self) -> None:
await self.event_bus.wait_for_connection()
self.notify_resource_available()
self.run_daemon_task(self.handle_network_id_requests())
await self.get_p2p_server().run()
1 change: 0 additions & 1 deletion trinity/nodes/full.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def __init__(self, plugin_manager: PluginManager, trinity_config: TrinityConfig)
self._node_key = trinity_config.nodekey
self._node_port = trinity_config.port
self._max_peers = trinity_config.max_peers
self.notify_resource_available()

def get_chain(self) -> BaseChain:
if self._chain is None:
Expand Down
Loading

0 comments on commit e879f92

Please sign in to comment.