Skip to content

Commit

Permalink
Processors can now return 1 or more events, they'll all get forwarded
Browse files Browse the repository at this point in the history
Fixes saltstack#37

Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
  • Loading branch information
s0undt3ch committed May 16, 2023
1 parent 2fce83e commit fb5a704
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 46 deletions.
1 change: 1 addition & 0 deletions changelog/37.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Processors can now return 1 or more events, they'll all get forwarded
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,8 @@ ignore-names = [
# Preserve types, even if a file imports `from __future__ import annotations`.
keep-runtime-typing = true

[tool.ruff.mccabe]
max-complexity = 20

[tool.ruff.pylint]
max-branches = 15
max-branches = 20
70 changes: 40 additions & 30 deletions src/saf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,49 +94,59 @@ async def _run(self: P) -> None:
try:
collect_plugin = self.collect_config.loaded_plugin
async for event in collect_plugin.collect(ctx=collect_ctx):
events_to_process: list[CollectedEvent] = [event]
processed_events: list[CollectedEvent] = []
# Process the event
stop_processing = False
for process_config in self.process_configs:
if stop_processing:
break
if not events_to_process:
events_to_process.extend(processed_events)
processed_events.clear()
if process_config.name not in process_ctxs:
process_ctxs[process_config.name] = PipelineRunContext.construct(
config=process_config,
shared_cache=shared_cache,
)
# We pass copies of the event so that, in case an exception occurs while
# the event is being processed, and the event has already been modified,
# the next processor to run will get an unmodified copy of the event, not
# the partially processed event
process_plugin = process_config.loaded_plugin
try:
event = await process_plugin.process( # noqa: PLW2901
ctx=process_ctxs[process_config.name],
event=event,
)
except Exception:
log.exception(
"An exception occurred while processing the event. Stopped processing this event."
)
break

if event is None:
# The processor decided to ignore the event
while events_to_process:
event_to_process = events_to_process.pop(0)
try:
async for processed_event in process_plugin.process(
ctx=process_ctxs[process_config.name],
event=event_to_process,
):
if processed_event is not None:
processed_events.append(processed_event)
except Exception:
log.exception(
"An exception occurred while processing the event. Stopped processing this event."
)
stop_processing = True
break

if not processed_events:
# The processor(s) did not return any events to forward
continue

# Forward the event
coros = []
for forward_config in self.forward_configs:
if forward_config.name not in forward_ctxs:
forward_ctxs[forward_config.name] = PipelineRunContext.construct(
config=forward_config,
shared_cache=shared_cache,
for processed_event in processed_events:
for forward_config in self.forward_configs:
if forward_config.name not in forward_ctxs:
forward_ctxs[forward_config.name] = PipelineRunContext.construct(
config=forward_config,
shared_cache=shared_cache,
)
forward_plugin = forward_config.loaded_plugin
coros.append(
self._wrap_forwarder_plugin_call(
forward_plugin,
forward_ctxs[forward_config.name],
processed_event.copy(),
),
)
forward_plugin = forward_config.loaded_plugin
coros.append(
self._wrap_forwarder_plugin_call(
forward_plugin,
forward_ctxs[forward_config.name],
event.copy(),
),
)
if self.config.concurrent_forwarders:
await asyncio.gather(*coros)
else:
Expand Down
5 changes: 3 additions & 2 deletions src/saf/process/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import logging
from typing import AsyncIterator
from typing import Type

from saf.models import CollectedEvent
Expand All @@ -28,9 +29,9 @@ async def process(
*,
ctx: PipelineRunContext[ProcessConfigBase], # noqa: ARG001
event: CollectedEvent,
) -> CollectedEvent:
) -> AsyncIterator[CollectedEvent]:
"""
Method called to process the event.
"""
log.info("Processing: %s", event)
return event
yield event
5 changes: 3 additions & 2 deletions src/saf/process/regex_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import re
from typing import Any
from typing import AsyncIterator
from typing import Dict
from typing import Match
from typing import Optional
Expand Down Expand Up @@ -97,12 +98,12 @@ async def process(
*,
ctx: PipelineRunContext[RegexMaskProcessConfig],
event: CollectedEvent,
) -> CollectedEvent:
) -> AsyncIterator[CollectedEvent]:
"""
Method called to mask the data based on provided regex rules.
"""
config = ctx.config
log.info("Processing event in regex_mask: %s", event.json())
event_dict = event.dict()
processed_event_dict = _regex_process(event_dict, config)
return event.parse_obj(processed_event_dict)
yield event.parse_obj(processed_event_dict)
5 changes: 3 additions & 2 deletions src/saf/process/shannon_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import math
import string
from typing import Any
from typing import AsyncIterator
from typing import Optional
from typing import Type

Expand Down Expand Up @@ -119,12 +120,12 @@ async def process(
*,
ctx: PipelineRunContext[ShannonMaskProcessConfig],
event: CollectedEvent,
) -> CollectedEvent:
) -> AsyncIterator[CollectedEvent]:
"""
Method called to mask the data based on normalized Shannon index values.
"""
config = ctx.config
log.info("Processing event in shannon_mask: %s", event.json())
event_dict = event.dict()
processed_event_dict = _shannon_process(event_dict, config)
return event.parse_obj(processed_event_dict)
yield event.parse_obj(processed_event_dict)
2 changes: 1 addition & 1 deletion src/saf/utils/eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None:
return salt_event


def _process_events( # noqa: C901
def _process_events(
opts: dict[str, Any],
events_queue: Queue[SaltEvent],
tags: set[str],
Expand Down
10 changes: 2 additions & 8 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,15 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING
from typing import Any
from typing import AsyncIterator

import pytest
import pytest_asyncio

from saf.manager import Manager
from saf.models import AnalyticsConfig

if TYPE_CHECKING:
try:
from typing import AsyncGenerator
except ImportError:
from typing import AsyncGenerator

try:
asyncio_fixture = pytest_asyncio.fixture
except AttributeError:
Expand All @@ -43,7 +37,7 @@ def analytics_config(analytics_config_dict: dict[str, Any]):


@asyncio_fixture
async def manager(analytics_config: AnalyticsConfig) -> AsyncGenerator[Manager, None]:
async def manager(analytics_config: AnalyticsConfig) -> AsyncIterator[Manager]:
_manager = Manager(analytics_config)
loop = asyncio.get_event_loop()
task = loop.create_task(_run_manager(_manager))
Expand Down

0 comments on commit fb5a704

Please sign in to comment.