Skip to content

Commit

Permalink
Merge pull request #80 from napalm-automation/develop
Browse files Browse the repository at this point in the history
Release 0.0.6
  • Loading branch information
dbarrosop authored Jan 22, 2018
2 parents 513dd7c + 2fc9712 commit 6c0ac18
Show file tree
Hide file tree
Showing 205 changed files with 9,913 additions and 885 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ instance/

# Sphinx documentation
docs/_build/
docs/ref/configuration/generated/*.rst

# PyBuilder
target/
Expand Down Expand Up @@ -95,8 +96,4 @@ tags
.vars
output/

demo/log

tests.log

.DS_Store
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ services:
- docker

addons:
apt:
packages:
- sshpass
apt_packages:
- pandoc

language: python
python:
Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.PHONY: start_nsot
start_nsot:
docker run -v $(PWD)/tests/inventory_data/nsot/nsot.sqlite3:/nsot.sqlite3 -p 8990:8990 -d --name=nsot nsot/nsot start --noinput

.PHONY: stop_nsot
stop_nsot:
docker rm -f nsot
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Brigade
=======

See [docs](https://brigade.readthedocs.io) and [demo/get_facts_simple.py](demo/get_facts_simple.py), [demo/get_facts_role.py](demo/get_facts_role.py), and [demo/configure.py](demo/configure.py).
See [docs](https://brigade.readthedocs.io).
6 changes: 6 additions & 0 deletions brigade/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pkg_resources

try:
__version__ = pkg_resources.get_distribution('brigade').version
except pkg_resources.DistributionNotFound:
__version__ = "Not installed"
160 changes: 109 additions & 51 deletions brigade/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import logging.config
import sys
import traceback
from multiprocessing.dummy import Pool

from brigade.core.task import AggregatedResult, Task
from brigade.core.configuration import Config
from brigade.core.task import AggregatedResult, Result, Task
from brigade.plugins.tasks import connections


if sys.version_info.major == 2:
Expand Down Expand Up @@ -31,7 +34,17 @@ def _unpickle_method(func_name, obj, cls):
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)


logger = logging.getLogger("brigade")
class Data(object):
"""
This class is just a placeholder to share data amongsts different
versions of Brigade after running ``filter`` multiple times.
Attributes:
failed_hosts (list): Hosts that have failed to run a task properly
"""

def __init__(self):
self.failed_hosts = set()


class Brigade(object):
Expand All @@ -41,32 +54,74 @@ class Brigade(object):
Arguments:
inventory (:obj:`brigade.core.inventory.Inventory`): Inventory to work with
data(:obj:`brigade.core.Data`): shared data amongst different iterations of brigade
dry_run(``bool``): Whether if we are testing the changes or not
num_workers(``int``): How many hosts run in parallel
raise_on_error (``bool``): If set to ``True``, :meth:`run` method of will
raise an exception if at least a host failed.
config (:obj:`brigade.core.configuration.Config`): Configuration object
config_file (``str``): Path to Yaml configuration file
available_connections (``dict``): dict of connection types that will be made available.
Defaults to :obj:`brigade.plugins.tasks.connections.available_connections`
Attributes:
inventory (:obj:`brigade.core.inventory.Inventory`): Inventory to work with
data(:obj:`brigade.core.Data`): shared data amongst different iterations of brigade
dry_run(``bool``): Whether if we are testing the changes or not
num_workers(``int``): How many hosts run in parallel
raise_on_error (``bool``): If set to ``True``, :meth:`run` method of will
raise an exception if at least a host failed.
config (:obj:`brigade.core.configuration.Config`): Configuration parameters
available_connections (``dict``): dict of connection types are available
"""

def __init__(self, inventory, dry_run, num_workers=20, raise_on_error=True):
def __init__(self, inventory, dry_run, config=None, config_file=None,
available_connections=None, logger=None, data=None):
self.logger = logger or logging.getLogger("brigade")

self.data = data or Data()
self.inventory = inventory
self.inventory.brigade = self

self.dry_run = dry_run
self.num_workers = num_workers
self.raise_on_error = raise_on_error
if config_file:
self.config = Config(config_file=config_file)
else:
self.config = config or Config()

format = "\033[31m%(asctime)s - %(name)s - %(levelname)s"
format += " - %(funcName)20s() - %(message)s\033[0m"
logging.basicConfig(
level=logging.ERROR,
format=format,
)
self.configure_logging()

if available_connections is not None:
self.available_connections = available_connections
else:
self.available_connections = connections.available_connections

def configure_logging(self):
format = "%(asctime)s - %(name)s - %(levelname)s"
format += " - %(funcName)10s() - %(message)s"
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"simple": {"format": format}
},
"handlers": {
"info_file_handler": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "simple",
"filename": "brigade.log",
"maxBytes": 10485760,
"backupCount": 20,
"encoding": "utf8"
},
},
"loggers": {
"brigade": {
"level": "INFO",
"handlers": ["info_file_handler"],
"propagate": "no"
},
},
"root": {
"level": "ERROR",
"handlers": ["info_file_handler"]
}
})

def filter(self, **kwargs):
"""
Expand All @@ -79,72 +134,75 @@ def filter(self, **kwargs):
b.inventory = self.inventory.filter(**kwargs)
return b

def _run_serial(self, task, **kwargs):
t = Task(task, **kwargs)
result = AggregatedResult()
def _run_serial(self, task, dry_run, **kwargs):
result = AggregatedResult(kwargs.get("name") or task.__name__)
for host in self.inventory.hosts.values():
try:
logger.debug("{}: running task {}".format(host.name, t))
r = t._start(host=host, brigade=self, dry_run=self.dry_run)
result[host.name] = r
except Exception as e:
logger.error("{}: {}".format(host, e))
result.failed_hosts[host.name] = e
result.tracebacks[host.name] = traceback.format_exc()
result[host.name] = run_task(host, self, dry_run, Task(task, **kwargs))
return result

def _run_parallel(self, task, num_workers, **kwargs):
result = AggregatedResult()
def _run_parallel(self, task, num_workers, dry_run, **kwargs):
result = AggregatedResult(kwargs.get("name") or task.__name__)

pool = Pool(processes=num_workers)
result_pool = [pool.apply_async(run_task, args=(h, self, Task(task, **kwargs)))
result_pool = [pool.apply_async(run_task,
args=(h, self, dry_run, Task(task, **kwargs)))
for h in self.inventory.hosts.values()]
pool.close()
pool.join()

for r in result_pool:
host, res, exc, traceback = r.get()
if exc:
result.failed_hosts[host] = exc
result.tracebacks[host] = exc
else:
result[host] = res
for rp in result_pool:
r = rp.get()
result[r.host.name] = r
return result

def run(self, task, num_workers=None, **kwargs):
def run(self, task, num_workers=None, dry_run=None, raise_on_error=None, **kwargs):
"""
Run task over all the hosts in the inventory.
Arguments:
task (``callable``): function or callable that will be run against each device in
the inventory
num_workers(``int``): Override for how many hosts to run in paralell for this task
dry_run(``bool``): Whether if we are testing the changes or not
raise_on_error (``bool``): Override raise_on_error behavior
**kwargs: additional argument to pass to ``task`` when calling it
Raises:
:obj:`brigade.core.exceptions.BrigadeExceptionError`: if at least a task fails
and self.raise_on_error is set to ``True``
:obj:`brigade.core.exceptions.BrigadeExecutionError`: if at least a task fails
and self.config.raise_on_error is set to ``True``
Returns:
:obj:`brigade.core.task.AggregatedResult`: results of each execution
"""
num_workers = num_workers or self.num_workers
num_workers = num_workers or self.config.num_workers

self.logger.info("Running task '{}' with num_workers: {}, dry_run: {}".format(
kwargs.get("name") or task.__name__, num_workers, dry_run))
self.logger.debug(kwargs)

if num_workers == 1:
result = self._run_serial(task, **kwargs)
result = self._run_serial(task, dry_run, **kwargs)
else:
result = self._run_parallel(task, num_workers, **kwargs)
result = self._run_parallel(task, num_workers, dry_run, **kwargs)

if self.raise_on_error:
raise_on_error = raise_on_error if raise_on_error is not None else \
self.config.raise_on_error
if raise_on_error:
result.raise_on_error()
else:
self.data.failed_hosts.update(result.failed_hosts.keys())
return result


def run_task(host, brigade, task):
def run_task(host, brigade, dry_run, task):
logger = logging.getLogger("brigade")
try:
logger.debug("{}: running task {}".format(host.name, task))
r = task._start(host=host, brigade=brigade, dry_run=brigade.dry_run)
return host.name, r, None, None
logger.info("{}: {}: running task".format(host.name, task.name))
r = task._start(host=host, brigade=brigade, dry_run=dry_run)
except Exception as e:
logger.error("{}: {}".format(host, e))
return host.name, None, e, traceback.format_exc()
tb = traceback.format_exc()
logger.error("{}: {}".format(host, tb))
r = Result(host, exception=e, result=tb, failed=True)
task.results.append(r)
r.name = task.name
return task.results
67 changes: 67 additions & 0 deletions brigade/core/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import ast
import os


import yaml


CONF = {
'num_workers': {
'description': 'Number of Brigade worker processes that are run at the same time, '
'configuration can be overridden on individual tasks by using the '
'`num_workers` argument to (:obj:`brigade.core.Brigade.run`)',
'type': 'int',
'default': 20,
},
'raise_on_error': {
'description': "If set to ``True``, (:obj:`brigade.core.Brigade.run`) method of will raise "
"an exception if at least a host failed.",
'type': 'bool',
'default': True,
},
'ssh_config_file': {
'description': 'User ssh_config_file',
'type': 'str',
'default': os.path.join(os.path.expanduser("~"), ".ssh", "config"),
'default_doc': '~/.ssh/config'
},
}

types = {
'int': int,
'str': str,
}


class Config:
"""
This object handles the configuration of Brigade.
Arguments:
config_file(``str``): Yaml configuration file.
"""

def __init__(self, config_file=None, **kwargs):

if config_file:
with open(config_file, 'r') as f:
c = yaml.load(f.read())
else:
c = {}

self._assign_properties(c)

for k, v in kwargs.items():
setattr(self, k, v)

def _assign_properties(self, c):

for p in CONF:
env = CONF[p].get('env') or 'BRIGADE_' + p.upper()
v = os.environ.get(env) or c.get(p)
v = v if v is not None else CONF[p]['default']
if CONF[p]['type'] == 'bool':
v = ast.literal_eval(str(v).title())
else:
v = types[CONF[p]['type']](v)
setattr(self, p, v)
18 changes: 9 additions & 9 deletions brigade/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ class BrigadeExecutionError(Exception):
"""
def __init__(self, result):
self.result = result
self.failed_hosts = result.failed_hosts
self.tracebacks = result.tracebacks

@property
def failed_hosts(self):
return {k: v for k, v in self.result.items() if v.failed}

def __str__(self):
text = "\n"
for k, r in self.result.items():
text += "{}\n".format("#" * 40)
text += "# {} (succeeded) \n".format(k)
text += "{}\n".format("#" * 40)
text += "{}\n".format(r)
for k, r in self.tracebacks.items():
text += "{}\n".format("#" * 40)
text += "# {} (failed) \n".format(k)
if r.failed:
text += "# {} (failed)\n".format(k)
else:
text += "# {} (succeeded)\n".format(k)
text += "{}\n".format("#" * 40)
text += "{}\n".format(r)
text += "{}\n".format(r.result)
return text
5 changes: 2 additions & 3 deletions brigade/core/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ def merge_two_dicts(x, y):
try:
z = x.copy()
except AttributeError:
z = x.items()
z = dict(x)
z.update(y)
return z


def format_string(text, task, **kwargs):
merged = merge_two_dicts(task.host.items(), task.brigade.inventory.data)
return text.format(host=task.host,
**merge_two_dicts(merged, kwargs))
**merge_two_dicts(task.host.items(), kwargs))
Loading

0 comments on commit 6c0ac18

Please sign in to comment.