Skip to content

Commit

Permalink
Allow configuring the forwarders to run concurrently or not.
Browse files Browse the repository at this point in the history
Fixes #15

Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
  • Loading branch information
s0undt3ch committed Jan 19, 2022
1 parent 3918a8f commit ace3ec5
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 52 deletions.
41 changes: 30 additions & 11 deletions src/saf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
"""
import asyncio
import logging
from types import ModuleType
from typing import List

from saf.models import CollectConfigBase
from saf.models import CollectedEvent
from saf.models import ForwardConfigBase
from saf.models import PipelineConfig
from saf.models import ProcessConfigBase
Expand Down Expand Up @@ -70,17 +72,34 @@ async def _run(self) -> None:
# Restore the original event
event = original_event
# Forward the event
coros = []
for forward_config in self.forward_configs:
forward_plugin = forward_config.loaded_plugin
try:
await forward_plugin.forward(
config=forward_config,
event=event.copy(),
)
except Exception as exc: # pylint: disable=broad-except
log.error(
"An exception occurred while forwarding the event through config %r: %s",
coros.append(
self._wrap_forwarder_plugin_call(
forward_plugin,
forward_config,
exc,
exc_info=True,
)
event.copy(),
),
)
print(555.0, coros)
if self.config.concurrent_forwarders:
print(555.1)
await asyncio.gather(*coros)
else:
for coro in coros:
print(555.2, coro)
await coro

async def _wrap_forwarder_plugin_call(
self, plugin: ModuleType, config: ForwardConfigBase, event: CollectedEvent
) -> None:
try:
await plugin.forward(config=config, event=event)
except Exception as exc: # pylint: disable=broad-except
log.error(
"An exception occurred while forwarding the event through config %r: %s",
config,
exc,
exc_info=True,
)
55 changes: 55 additions & 0 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2021-2022 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
from typing import Any
from typing import Dict
from typing import TYPE_CHECKING

import pytest
import pytest_asyncio

from saf.manager import Manager
from saf.models import AnalyticsConfig


if TYPE_CHECKING:
try:
from typing import AsyncGenerator
except ImportError:
from typing_extensions import AsyncGenerator

try:
asyncio_fixture = pytest_asyncio.fixture
except AttributeError:
# On Py3.6 and older version of the pytest gets installed which does not
# have the `.fixture` function.
# Fallback to `pytest.fixture`.
asyncio_fixture = pytest.fixture


async def _run_manager(manager):
try:
await manager.run()
except asyncio.CancelledError:
pass
finally:
await manager.await_stopped()


@pytest.fixture
def analytics_config(analytics_config_dict: Dict[str, Any]):
return AnalyticsConfig.parse_obj(analytics_config_dict)


@asyncio_fixture
async def manager(analytics_config: AnalyticsConfig) -> "AsyncGenerator[Manager, None]":
_manager = Manager(analytics_config)
loop = asyncio.get_event_loop()
task = loop.create_task(_run_manager(_manager))
try:
yield _manager
finally:
if not task.done():
task.cancel()
await task
Empty file.
80 changes: 80 additions & 0 deletions tests/functional/forwarders/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2022 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
import pathlib

import pytest


@pytest.fixture
def forwarder_dump_path(tmp_path):
return tmp_path / "forwarder-dump.txt"


@pytest.fixture(params=(True, False), ids=lambda x: f"ConcurrentForwarders({x})")
def concurrent_forwarders(request):
return request.param


@pytest.fixture
def analytics_config_dict(forwarder_dump_path, concurrent_forwarders):
return {
"collectors": {
"noop-collector": {"plugin": "noop", "interval": 5},
},
"forwarders": {
"forwarder-1": {
"plugin": "test",
"sleep": 0.1,
"path": forwarder_dump_path,
"message": "1",
},
"forwarder-2": {
"plugin": "test",
"sleep": 0.3,
"path": forwarder_dump_path,
"message": "2",
},
"forwarder-3": {
"plugin": "test",
"sleep": 0.5,
"path": forwarder_dump_path,
"message": "3",
},
},
"pipelines": {
"my-pipeline": {
"enabled": True,
"concurrent_forwarders": concurrent_forwarders,
"collect": "noop-collector",
"forward": [
"forwarder-3",
"forwarder-2",
"forwarder-1",
],
}
},
"salt_config": {},
}


@pytest.mark.asyncio
@pytest.mark.usefixtures("manager")
async def test_pipeline(forwarder_dump_path: pathlib.Path, concurrent_forwarders):
synchronous_outcome = ["3", "2", "1"]
timeout = 5
while timeout:
await asyncio.sleep(1)
timeout -= 1
if not forwarder_dump_path.exists():
continue
lines = forwarder_dump_path.read_text().splitlines()
if len(lines) >= 3:
if concurrent_forwarders is False:
assert lines[:3] == synchronous_outcome
else:
assert lines[:3] != synchronous_outcome
break
else:
pytest.fail(f"Failed to find dumped events in {forwarder_dump_path}")
41 changes: 0 additions & 41 deletions tests/functional/manager/conftest.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,7 @@
# Copyright 2021-2022 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
#
import asyncio

import pytest
import pytest_asyncio

from saf.manager import Manager
from saf.models import AnalyticsConfig

try:
asyncio_fixture = pytest_asyncio.fixture
except AttributeError:
# On Py3.6 and older version of the pytest gets installed which does not
# have the `.fixture` function.
# Fallback to `pytest.fixture`.
asyncio_fixture = pytest.fixture


async def _run_manager(manager):
try:
await manager.run()
except asyncio.CancelledError:
pass
finally:
await manager.await_stopped()


@pytest.fixture
Expand Down Expand Up @@ -53,21 +30,3 @@ def analytics_config_dict():
},
"salt_config": {},
}


@pytest.fixture
def analytics_config(analytics_config_dict):
return AnalyticsConfig.parse_obj(analytics_config_dict)


@asyncio_fixture
async def manager(analytics_config):
_manager = Manager(analytics_config)
loop = asyncio.get_event_loop()
task = loop.create_task(_run_manager(_manager))
try:
yield _manager
finally:
if not task.done():
task.cancel()
await task

0 comments on commit ace3ec5

Please sign in to comment.