Skip to content

Commit

Permalink
test client data_type attr fix, issue #239
Browse files Browse the repository at this point in the history
  • Loading branch information
artas728 committed Nov 21, 2022
1 parent acc5a4b commit 54cc2c4
Showing 1 changed file with 12 additions and 19 deletions.
31 changes: 12 additions & 19 deletions panini/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .exceptions import TestClientError
from .utils.helper import start_process, start_thread
from nats.aio.client import Msg

from panini.managers.schema_manager import SchemaManager

# Annotations for `Session.request()`
Cookies = typing.Union[
Expand Down Expand Up @@ -152,14 +152,6 @@ def create_nats_client(self, suffix: str, nats_timeout) -> NATSClient:
socket_timeout=nats_timeout,
)

@staticmethod
def _dict_to_bytes(message: dict) -> bytes:
return json.dumps(message).encode("utf-8")

@staticmethod
def _bytes_to_dict(payload: bytes) -> dict:
return json.loads(payload)

@staticmethod
def wrap_run_panini(
run_panini,
Expand Down Expand Up @@ -261,8 +253,9 @@ def stop(self):
self.websocket_session.close()

def publish(self, subject: str, message: dict, reply: str = "") -> None:
deserialized_message = SchemaManager.deserialize_message(type(message), message)
self.nats_client_sender.publish(
subject=subject, payload=self._dict_to_bytes(message), reply=reply
subject=subject, payload=deserialized_message, reply=reply
)

def reconnect(self):
Expand All @@ -274,14 +267,13 @@ def reconnect(self):
except Exception:
self.nats_client_sender.reconnect()

def request(self, subject: str, message: dict) -> dict:
def request(self, subject: str, message: dict, response_data_type=dict) -> dict:
self.reconnect()

return self._bytes_to_dict(
self.nats_client_sender.request(
subject=subject, payload=self._dict_to_bytes(message)
deserialized_message = SchemaManager.deserialize_message(type(message), message)
response = self.nats_client_sender.request(
subject=subject, payload=deserialized_message
).payload
)
return SchemaManager.serialize_message(response_data_type, response)

def subscribe(
self,
Expand All @@ -307,13 +299,13 @@ def wait(self, count: int) -> None:
)
self.nats_client_listener.wait(count=count)

def listen(self, subject: str):
def listen(self, subject: str, data_type=dict):
def decorator(func):
assert isinstance(subject, str), "Subject must be only in str format"

def wrapper(incoming_message):
assert isinstance(incoming_message, NATSMessage)
incoming_message_data = self._bytes_to_dict(incoming_message.payload)
incoming_message_data = SchemaManager.serialize_message(data_type, incoming_message.payload)

msg = Msg(
_client=None,
Expand All @@ -323,9 +315,10 @@ def wrapper(incoming_message):
)
wrapper_response = func(msg)
if wrapper_response is not None and incoming_message.reply != "":
deserialized_response = SchemaManager.deserialize_message(type(wrapper_response), wrapper_response)
self.nats_client_listener.publish(
subject=incoming_message.reply,
payload=self._dict_to_bytes(wrapper_response),
payload=deserialized_response,
)

self.subscribe(subject, wrapper)
Expand Down

0 comments on commit 54cc2c4

Please sign in to comment.