Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal for runners #536

Merged
merged 1 commit into from
May 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/upgrading/2_to_3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ InitNornir
Configuration
=============

1. Order of resolution is now file -> paramters to InitNornir -> env var
1. Order of resolution is parameters to InitNornit > config > env

Todo
----
Expand Down
63 changes: 23 additions & 40 deletions nornir/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import logging.config
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, TYPE_CHECKING, Dict, Any
from typing import List, Optional, TYPE_CHECKING

from nornir.core.configuration import Config
from nornir.core.inventory import Inventory
from nornir.core.plugins.runners import RunnerPlugin
from nornir.core.processor import Processor, Processors
from nornir.core.state import GlobalState
from nornir.core.task import AggregatedResult, Task
from nornir.core.task import Task

if TYPE_CHECKING:
from nornir.core.inventory import Host # noqa: W0611
Expand Down Expand Up @@ -39,11 +39,13 @@ def __init__(
config: Config = None,
data: GlobalState = None,
processors: Optional[Processors] = None,
runner: Optional[RunnerPlugin] = None,
) -> None:
self.data = data if data is not None else GlobalState()
self.inventory = inventory
self.config = config or Config()
self.processors = processors or Processors()
self.runner = runner

def __enter__(self):
return self
Expand All @@ -58,6 +60,13 @@ def with_processors(self, processors: List[Processor]) -> "Nornir":
"""
return Nornir(**{**self.__dict__, **{"processors": Processors(processors)}})

def with_runner(self, runner: RunnerPlugin) -> "Nornir":
"""
Given a runner return a copy of the nornir object with the runner
assigned to the copy. The orinal object is left unmodified.
"""
return Nornir(**{**self.__dict__, **{"runner": runner}})

def filter(self, *args, **kwargs):
"""
See :py:meth:`nornir.core.inventory.Inventory.filter`
Expand All @@ -69,38 +78,13 @@ def filter(self, *args, **kwargs):
b.inventory = self.inventory.filter(*args, **kwargs)
return b

def _run_serial(self, task: Task, hosts, **kwargs):
result = AggregatedResult(kwargs.get("name") or task.name)
for host in hosts:
result[host.name] = task.copy().start(host, self)
return result

def _run_parallel(
self,
task: Task,
hosts: List["Host"],
num_workers: int,
**kwargs: Dict[str, Any],
) -> AggregatedResult:
agg_result = AggregatedResult(kwargs.get("name") or task.name)
futures = []
with ThreadPoolExecutor(num_workers) as pool:
for host in hosts:
future = pool.submit(task.copy().start, host, self)
futures.append(future)

for future in futures:
worker_result = future.result()
agg_result[worker_result.host.name] = worker_result
return agg_result

def run(
self,
task,
num_workers=None,
raise_on_error=None,
on_good=True,
on_failed=False,
name: Optional[str] = None,
**kwargs,
):
"""
Expand All @@ -109,7 +93,6 @@ def run(
Arguments:
task (``callable``): function or callable that will be run against each device in
the inventory
num_workers(``int``): Override for how many hosts to run in parallel for this task
raise_on_error (``bool``): Override raise_on_error behavior
on_good(``bool``): Whether to run or not this task on hosts marked as good
on_failed(``bool``): Whether to run or not this task on hosts marked as failed
Expand All @@ -122,11 +105,15 @@ def run(
Returns:
:obj:`nornir.core.task.AggregatedResult`: results of each execution
"""
task = Task(task, **kwargs)
task = Task(
task,
global_dry_run=self.data.dry_run,
name=name,
processors=self.processors,
**kwargs,
)
self.processors.task_started(task)

num_workers = num_workers or self.config.core.num_workers

run_on = []
if on_good:
for name, host in self.inventory.hosts.items():
Expand All @@ -138,21 +125,17 @@ def run(
run_on.append(host)

num_hosts = len(self.inventory.hosts)
task_name = kwargs.get("name") or task.name
if num_hosts:
logger.info(
"Running task %r with args %s on %d hosts",
task_name,
task.name,
kwargs,
num_hosts,
)
else:
logger.warning("Task %r has not been run – 0 hosts selected", task_name)
logger.warning("Task %r has not been run – 0 hosts selected", task.name)

if num_workers == 1:
result = self._run_serial(task, run_on, **kwargs)
else:
result = self._run_parallel(task, run_on, num_workers, **kwargs)
result = self.runner.run(task, run_on)

raise_on_error = (
raise_on_error
Expand Down
47 changes: 39 additions & 8 deletions nornir/core/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,41 +200,66 @@ def configure(self) -> None:
logger_.addHandler(stderr_handler)


class CoreConfig(object):
__slots__ = ("num_workers", "raise_on_error")
class RunnerConfig(object):
__slots__ = ("plugin", "options")

class Parameters:
num_workers = Parameter(default=20, envvar="NORNIR_CORE_NUM_WORKERS")
raise_on_error = Parameter(default=False, envvar="NORNIR_CORE_RAISE_ON_ERROR")
plugin = Parameter(default="parallel", envvar="NORNIR_RUNNER_PLUGIN")
options = Parameter(default={}, envvar="NORNIR_RUNNER_OPTIONS")

def __init__(
self, num_workers: Optional[int] = None, raise_on_error: Optional[bool] = None
self, plugin: Optional[str] = None, options: Optional[Dict[str, Any]] = None
) -> None:
self.num_workers = self.Parameters.num_workers.resolve(num_workers)
self.plugin = self.Parameters.plugin.resolve(plugin)
self.options = self.Parameters.options.resolve(options)

def dict(self) -> Dict[str, Any]:
return {
"plugin": self.plugin,
"options": self.options,
}


class CoreConfig(object):
__slots__ = "raise_on_error"

class Parameters:
raise_on_error = Parameter(default=False, envvar="NORNIR_CORE_RAISE_ON_ERROR")

def __init__(self, raise_on_error: Optional[bool] = None) -> None:
self.raise_on_error = self.Parameters.raise_on_error.resolve(raise_on_error)

def dict(self) -> Dict[str, Any]:
return {
"num_workers": self.num_workers,
"raise_on_error": self.raise_on_error,
}


class Config(object):
__slots__ = ("core", "ssh", "inventory", "jinja2", "logging", "user_defined")
__slots__ = (
"core",
"runner",
"ssh",
"inventory",
"jinja2",
"logging",
"user_defined",
)

def __init__(
self,
inventory: Optional[InventoryConfig] = None,
ssh: Optional[SSHConfig] = None,
logging: Optional[LoggingConfig] = None,
core: Optional[CoreConfig] = None,
runner: Optional[RunnerConfig] = None,
user_defined: Optional[Dict[str, Any]] = None,
) -> None:
self.inventory = inventory or InventoryConfig()
self.ssh = ssh or SSHConfig()
self.logging = logging or LoggingConfig()
self.core = core or CoreConfig()
self.runner = runner or RunnerConfig()
self.user_defined = user_defined or {}

@classmethod
Expand All @@ -244,13 +269,15 @@ def from_dict(
ssh: Optional[Dict[str, Any]] = None,
logging: Optional[Dict[str, Any]] = None,
core: Optional[Dict[str, Any]] = None,
runner: Optional[Dict[str, Any]] = None,
user_defined: Optional[Dict[str, Any]] = None,
) -> "Config":
return cls(
inventory=InventoryConfig(**inventory or {}),
ssh=SSHConfig(**ssh or {}),
logging=LoggingConfig(**logging or {}),
core=CoreConfig(**core or {}),
runner=RunnerConfig(**runner or {}),
user_defined=user_defined or {},
)

Expand All @@ -262,12 +289,14 @@ def from_file(
ssh: Optional[Dict[str, Any]] = None,
logging: Optional[Dict[str, Any]] = None,
core: Optional[Dict[str, Any]] = None,
runner: Optional[Dict[str, Any]] = None,
user_defined: Optional[Dict[str, Any]] = None,
) -> "Config":
inventory = inventory or {}
ssh = ssh or {}
logging = logging or {}
core = core or {}
runner = runner or {}
user_defined = user_defined or {}
with open(config_file, "r") as f:
yml = ruamel.yaml.YAML(typ="safe")
Expand All @@ -277,6 +306,7 @@ def from_file(
ssh=SSHConfig(**{**data.get("ssh", {}), **ssh}),
logging=LoggingConfig(**{**data.get("loggin", {}), **logging}),
core=CoreConfig(**{**data.get("core", {}), **core}),
runner=RunnerConfig(**{**data.get("runner", {}), **runner}),
user_defined={**data.get("user_defined", {}), **user_defined},
)

Expand All @@ -286,5 +316,6 @@ def dict(self) -> Dict[str, Any]:
"ssh": self.ssh.dict(),
"logging": self.logging.dict(),
"core": self.core.dict(),
"runner": self.runner.dict(),
"user_defined": self.user_defined,
}
29 changes: 29 additions & 0 deletions nornir/core/plugins/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Any, List, Type

from nornir.core.task import AggregatedResult, Task
from nornir.core.inventory import Host
from nornir.core.plugins.register import PluginRegister

from typing_extensions import Protocol


RUNNERS_PLUGIN_PATH = "nornir.plugins.runners"


class RunnerPlugin(Protocol):
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
This method configures the plugin
"""
raise NotImplementedError("needs to be implemented by the plugin")

def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
"""
This method runs the given task over all the hosts
"""
raise NotImplementedError("needs to be implemented by the plugin")


RunnersPluginRegister: PluginRegister[Type[RunnerPlugin]] = PluginRegister(
RUNNERS_PLUGIN_PATH
)
Loading