Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poller: handle multiple exceptions #851

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions suzieq/poller/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
import logging
import signal
from collections import defaultdict
from copy import deepcopy
from pathlib import Path
from typing import Dict, List
from copy import deepcopy

from suzieq.poller.controller.base_controller_plugin import ControllerPlugin
from suzieq.poller.controller.inventory_async_plugin import \
InventoryAsyncPlugin
from suzieq.poller.worker.services.service_manager import ServiceManager
from suzieq.shared.exceptions import InventorySourceError, SqPollerConfError
from suzieq.shared.exceptions import (InventorySourceError, SqPollerConfError,
SqRuntimeError)
from suzieq.shared.utils import sq_get_config_file

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -278,9 +279,12 @@ async def run(self):
)

tasks = list(pending)
exceptions = []
for task in done:
if task.exception():
raise task.exception()
exceptions.append(task.exception())
if exceptions:
raise SqRuntimeError(exceptions)
# Ignore completed task if started with single-run mode
if self._single_run_mode:
continue
Expand Down Expand Up @@ -314,7 +318,7 @@ async def _inventory_sync(self):
))
except asyncio.TimeoutError:
raise InventorySourceError(
f'Timeout error: source {inv_src.name} took'
f'Timeout error: source {inv_src.name} took '
'too much time'
)

Expand Down
24 changes: 15 additions & 9 deletions suzieq/poller/sq_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from suzieq.poller.controller.controller import Controller
from suzieq.poller.worker.writers.output_worker import OutputWorker
from suzieq.shared.exceptions import InventorySourceError, PollingError, \
SqPollerConfError
SqPollerConfError, SqRuntimeError
from suzieq.shared.utils import (poller_log_params, init_logger,
load_sq_config, print_version)
from suzieq.poller.controller.utils.inventory_utils import read_inventory
Expand Down Expand Up @@ -48,15 +48,21 @@ async def start_controller(user_args: argparse.Namespace, config_data: Dict):
controller = Controller(user_args, config_data)
controller.init()
await controller.run()
except (SqPollerConfError, InventorySourceError, PollingError) as error:
if not log_stdout:
print(f"ERROR: {error}")
logger.error(error)
sys.exit(1)
except Exception as error:
if not log_stdout:
traceback.print_exc()
logger.critical(f'{error}\n{traceback.format_exc()}')
if isinstance(error, SqRuntimeError):
exceptions = error.exceptions
else:
exceptions = [error]
for exc in exceptions:
if any(isinstance(exc, e) for e in
[SqPollerConfError, InventorySourceError, PollingError]):
if not log_stdout:
print(f"ERROR: {error}")
logger.error(exc)
else:
if not log_stdout:
traceback.print_exc()
logger.critical(f'{exc}\n{traceback.format_exc()}')
sys.exit(1)


Expand Down
26 changes: 17 additions & 9 deletions suzieq/poller/worker/sq_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from typing import Dict

import uvloop

from suzieq.poller.worker.worker import Worker
from suzieq.poller.worker.writers.output_worker import OutputWorker
from suzieq.shared.exceptions import InventorySourceError, SqPollerConfError
from suzieq.shared.exceptions import (InventorySourceError, SqPollerConfError,
SqRuntimeError)
from suzieq.shared.utils import init_logger, load_sq_config, poller_log_params


Expand All @@ -34,15 +36,21 @@ async def start_worker(userargs: argparse.Namespace, cfg: Dict):
worker = Worker(userargs, cfg)
await worker.init_worker()
await worker.run()
except (SqPollerConfError, InventorySourceError) as error:
if not log_stdout:
print(error)
logger.error(error)
sys.exit(1)
except Exception as error:
if not log_stdout:
traceback.print_exc()
logger.critical(f'{error}\n{traceback.format_exc()}')
if isinstance(error, SqRuntimeError):
exceptions = error.exceptions
else:
exceptions = [error]
for exc in exceptions:
if any(isinstance(exc, e) for e in
[SqPollerConfError, InventorySourceError]):
if not log_stdout:
print(f"ERROR: {error}")
logger.error(exc)
else:
if not log_stdout:
traceback.print_exc()
logger.critical(f'{exc}\n{traceback.format_exc()}')
sys.exit(1)


Expand Down
7 changes: 5 additions & 2 deletions suzieq/poller/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from suzieq.poller.worker.services.service_manager import ServiceManager
from suzieq.poller.worker.writers.output_worker_manager \
import OutputWorkerManager
from suzieq.shared.exceptions import SqPollerConfError
from suzieq.shared.exceptions import SqPollerConfError, SqRuntimeError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -140,9 +140,12 @@ async def run(self):
try:
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
exceptions = []
for d in done:
if d.exception():
raise d.exception()
exceptions.append(d.exception())
if exceptions:
raise SqRuntimeError(exceptions)
tasks = list(pending)
running_svcs = self.service_manager.running_services
if tasks and any(i._coro in running_svcs
Expand Down
12 changes: 12 additions & 0 deletions suzieq/shared/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""List of Exceptions specific to Suzieq, across all the modules."""


from typing import List


class SqCoalescerCriticalError(Exception):
"""Raised when a critical error occuur inside the coalescer"""

Expand Down Expand Up @@ -56,3 +59,12 @@ class SensitiveLoadError(Exception):
class SqBrokenFilesError(Exception):
"""Raise when there are broken files and it is not possible to return a
coherent result."""


class SqRuntimeError(Exception):
"""Contains inside self.exceptions a list of exceptions"""

def __init__(self, exceptions: List[Exception]) -> None:
self.exceptions = exceptions
message = '\n'.join([str(e) for e in exceptions])
super().__init__(message)
34 changes: 30 additions & 4 deletions tests/unit/poller/controller/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from suzieq.poller.controller.manager.static import StaticManager
from suzieq.poller.controller.source.native import SqNativeFile
from suzieq.poller.controller.source.netbox import Netbox
from suzieq.shared.exceptions import InventorySourceError, SqPollerConfError
from suzieq.shared.exceptions import InventorySourceError, SqPollerConfError, SqRuntimeError
from suzieq.shared.utils import load_sq_config
from tests.conftest import create_dummy_config_file, get_async_task_mock

Expand Down Expand Up @@ -695,9 +695,22 @@ def mock_set_device(self):
with patch.multiple(SqNativeFile, set_device=mock_set_device, name='n'):
with patch.multiple(Netbox, set_device=mock_set_device, name='n'):
c.init()
with pytest.raises(InventorySourceError,
match='No devices to poll'):
try:
await c.run()
except SqRuntimeError as err:
assert len(err.exceptions) == 1, (
f'got multiple exceptions: {err}')
exc = err.exceptions[0]
assert isinstance(exc, InventorySourceError), (
'wrong exception type. expected InventorySourceError, got '
f'{type(exc)}')
exp_error = 'No devices to poll'
assert str(exc) == exp_error, (
f'Wrong error message: expected {exp_error}, got '
f'{str(exc)}'
)
except Exception as e:
pytest.fail(f'Unexpected exception: ({type(e)}) {e}')


@pytest.mark.poller
Expand Down Expand Up @@ -726,5 +739,18 @@ async def mock_netbox_run(self):
c = generate_controller(default_args, inv_file, config_file)
c.init()
with patch.multiple(Netbox, run=mock_netbox_run, name='n'):
with pytest.raises(InventorySourceError):
try:
await c.run()
except SqRuntimeError as err:
assert len(err.exceptions) == 1, (
f'got multiple exceptions: {err}')
exc = err.exceptions[0]
assert isinstance(exc, InventorySourceError), (
'wrong exception type. expected InventorySourceError, got '
f'{type(exc)}')
exp_error = 'Timeout error: source n took too much time'
assert str(exc) == exp_error, (
f'Wrong error message: expected {exp_error}, got {str(exc)}'
)
except Exception as e:
pytest.fail(f'Unexpected exception: ({type(e)}) {e}')