Skip to content

Commit

Permalink
Account for when no processors are configured
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
  • Loading branch information
s0undt3ch committed May 16, 2023
1 parent 05dfcd7 commit ce50a03
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,4 @@ keep-runtime-typing = true
max-complexity = 20

[tool.ruff.pylint]
max-branches = 20
max-branches = 25
62 changes: 34 additions & 28 deletions src/saf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,41 @@ async def _run(self: P) -> None:
async for event in collect_plugin.collect(ctx=collect_ctx):
events_to_process: list[CollectedEvent] = [event]
processed_events: list[CollectedEvent] = []
# Process the event
stop_processing = False
for process_config in self.process_configs:
if stop_processing:
break
if not events_to_process:
events_to_process.extend(processed_events)
processed_events.clear()
if process_config.name not in process_ctxs:
process_ctxs[process_config.name] = PipelineRunContext.construct(
config=process_config,
shared_cache=shared_cache,
)
process_plugin = process_config.loaded_plugin
while events_to_process:
event_to_process = events_to_process.pop(0)
try:
async for processed_event in process_plugin.process(
ctx=process_ctxs[process_config.name],
event=event_to_process,
):
if processed_event is not None:
processed_events.append(processed_event)
except Exception:
log.exception(
"An exception occurred while processing the event. Stopped processing this event."
)
stop_processing = True
if not self.process_configs:
# Consider all events to process as processed
processed_events.extend(events_to_process)
events_to_process.clear()
else:
# Process the event
stop_processing = False
for process_config in self.process_configs:
if stop_processing:
break
if not events_to_process:
events_to_process.extend(processed_events)
processed_events.clear()
if process_config.name not in process_ctxs:
process_ctxs[process_config.name] = PipelineRunContext.construct(
config=process_config,
shared_cache=shared_cache,
)
process_plugin = process_config.loaded_plugin
while events_to_process:
event_to_process = events_to_process.pop(0)
try:
async for processed_event in process_plugin.process(
ctx=process_ctxs[process_config.name],
event=event_to_process,
):
if processed_event is not None:
processed_events.append(processed_event)
except Exception:
log.exception(
"An exception occurred while processing the event. "
"Stopped processing this event."
)
stop_processing = True
break

if not processed_events:
# The processor(s) did not return any events to forward
Expand Down

0 comments on commit ce50a03

Please sign in to comment.