Skip to content

Commit

Permalink
Messages are written asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
elchupanebrej committed Jul 5, 2023
1 parent 443ac65 commit 936a37b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
43 changes: 38 additions & 5 deletions src/pytest_bdd/message_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from io import BufferedIOBase, TextIOBase
from pathlib import Path
from platform import machine, processor, system, version
from time import time_ns
from typing import Callable, Dict, Union, cast
from queue import Empty, Queue
from threading import Event, Thread
from time import sleep, time_ns
from typing import Callable, Dict, Optional, Union, cast

from attr import attrib, attrs
from ci_environment import detect_ci_environment
Expand Down Expand Up @@ -47,6 +49,37 @@ class MessagePlugin:
current_test_case_step_to_definition_mapping = attrib(default=None)
parameter_type_registry: Dict[int, CucumberExpressionParameterType] = dict()

def __attrs_post_init__(self):
self.queue = Queue()
self.process_messages_stop_event = Event()
self.process_messages_thread = Thread(
target=type(self).process_messages,
args=(self.queue, self.process_messages_stop_event, self.config.option.messages_ndjson_path),
daemon=True,
)
self.process_messages_thread.start()
sleep(0)
pass

@staticmethod
def process_messages(queue: Queue, stop_event: Event, messages_file_path: Optional[Union[str, Path]]):
if messages_file_path is None:
return

while not stop_event.is_set(): # give one more enter to take all left messages
with Path(messages_file_path).open(mode="a+") as f:
while not queue.empty():
try:
message = queue.get_nowait()
except Empty:
pass
else:
f.write(message.json(exclude_none=True, by_alias=True))
f.write("\n")
f.flush()
queue.task_done()
sleep(0)

def get_timestamp(self):
timestamp = time_ns()
test_run_started_seconds = timestamp // 10**9
Expand All @@ -70,9 +103,7 @@ def add_options(parser: Parser) -> None:
def pytest_bdd_message(self, config: Config, message: Message):
if config.option.messages_ndjson_path is None:
return
with Path(self.config.option.messages_ndjson_path).open(mode="a+") as f:
f.write(message.json(exclude_none=True, by_alias=True))
f.write("\n")
self.queue.put_nowait(message)

def pytest_runtestloop(self, session: Session):
config = session.config
Expand Down Expand Up @@ -130,6 +161,8 @@ def pytest_sessionfinish(self, session, exitstatus):
)
),
)
self.queue.join()
self.process_messages_stop_event.set()

@hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
Expand Down
2 changes: 1 addition & 1 deletion tests/messages/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def parse_and_unflold_messages(lines):
parsed_messages.append(Message.parse_obj(json.loads(line)))
except ValidationError as e: # pragma: nocover
errors.append(e)
if errors:
if errors: # pragma: nocover
raise ParseError(f"Could not parse messages: {errors}")

return list(map(unfold_message, parsed_messages))
Expand Down

0 comments on commit 936a37b

Please sign in to comment.