Skip to content

Commit

Permalink
added new serialization/deserialization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
artas728 committed Nov 21, 2022
1 parent 345a360 commit acc5a4b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 138 deletions.
35 changes: 9 additions & 26 deletions panini/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,23 +195,17 @@ def set_logger(
def listen(
self,
subject: list or str,
data_type="json",
validator: type = None,
validation_error_cb: FunctionType = None,
data_type=dict,
):
return self._event_manager.listen(
subject=subject,
data_type=data_type,
validator=validator,
validation_error_cb=validation_error_cb
)

def js_listen(
self,
subject: str = None,
data_type="json",
validator: type = None,
validation_error_cb: FunctionType = None,
data_type=dict,
queue: str = None,
durable: Optional[str] = None,
stream: Optional[str] = None,
Expand All @@ -225,9 +219,7 @@ def js_listen(
"PUSH" JetStream listen(subscribe)
:param subject: Subject from a stream from JetStream.
:param data_type: Expected message type.
:param validator: Validator cls instance to check each message.
:param validation_error_cb: Callback if validation failed.
:param data_type: Type of message to convert to.
:param queue: Deliver group name from a set a of queue subscribers.
:param durable: Name of the durable consumer to which the the subscription should be bound.
:param stream: Name of the stream to which the subscription should be bound. If not set,
Expand All @@ -241,8 +233,6 @@ def js_listen(
return self._event_manager.js_listen(
subject=subject,
data_type=data_type,
validator=validator,
validation_error_cb=validation_error_cb,
queue=queue,
durable=durable,
stream=stream,
Expand All @@ -259,15 +249,13 @@ async def publish(
message,
reply_to: str = "",
force: bool = False,
data_type: type or str = "json",
headers: dict = None
):
return await self.nats.publish(
subject=subject,
message=message,
reply_to=reply_to,
force=force,
data_type=data_type,
headers=headers
)

Expand All @@ -277,15 +265,13 @@ def publish_sync(
message,
reply_to: str = "",
force: bool = False,
data_type: type or str = "json",
headers: dict = None
):
return self.nats.publish_sync(
subject=subject,
message=message,
reply_to=reply_to,
force=force,
data_type=data_type,
headers=headers
)

Expand All @@ -294,14 +280,14 @@ async def request(
subject: str,
message,
timeout: int = 10,
data_type: type or str = "json",
response_data_type: type = dict,
headers: dict = None
):
return await self.nats.request(
subject=subject,
message=message,
timeout=timeout,
data_type=data_type,
response_data_type=response_data_type,
headers=headers
)

Expand All @@ -310,14 +296,14 @@ def request_sync(
subject: str,
message,
timeout: int = 10,
data_type: type or str = "json",
response_data_type: type = dict,
headers: dict = None
):
return self.nats.request_sync(
subject=subject,
message=message,
timeout=timeout,
data_type=data_type,
response_data_type=response_data_type,
headers=headers
)

Expand All @@ -327,23 +313,21 @@ async def publish_js(
message,
timeout: float = None,
stream: str = None,
data_type: type or str = "json",
headers: dict = None,
):
return await self.nats.publish_js(
subject=subject,
message=message,
timeout=timeout,
stream=stream,
data_type=data_type,
headers=headers,
)

def subscribe_new_subject_sync(
self,
subject: str,
callback: Callable,
data_type="json",
data_type=dict,
queue="",
):
return self.nats.subscribe_new_subject_sync(
Expand All @@ -359,7 +343,7 @@ async def subscribe_new_subject(
self,
subject: str,
callback: Callable,
data_type="json",
data_type=dict,
queue="",
):
return await self.nats.subscribe_new_subject(
Expand All @@ -385,7 +369,6 @@ def _start_event(self):
self.nats._publish(
f"panini_events.{self.service_name}.{self.client_nats_name}.started",
b"{}",
data_type=bytes,
force=True
)
)
Expand Down
62 changes: 10 additions & 52 deletions panini/managers/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import asyncio
from dataclasses import dataclass
from typing import Optional, Callable

from nats.aio.msg import Msg
from nats.js import api

from panini import exceptions
from panini.exceptions import ValidationError


@dataclass
class Listen:
callback: Callable
subject: str
data_type: str or type = "json"
data_type: type = dict
queue: str = ""

@dataclass
Expand Down Expand Up @@ -47,18 +41,15 @@ def js_subscriptions(self):
def listen(
self,
subject: list or str,
data_type="json",
validator: type = None,
validation_error_cb: Callable[[Msg, ValidationError], None] = None,
data_type: type = dict,
**kwargs
):
def wrapper(function):
wrapped = self.wrap_function_by_validator(function, validator, validation_error_cb)
def wrapper(callback):
if isinstance(subject, list):
for s in subject:
self._create_subscription_if_missing(s)
listen_obj = Listen(
callback=wrapped,
callback=callback,
subject=s,
data_type=data_type,
**kwargs
Expand All @@ -67,66 +58,33 @@ def wrapper(function):
else:
self._create_subscription_if_missing(subject)
listen_obj = Listen(
callback=wrapped,
callback=callback,
subject=subject,
data_type=data_type,
**kwargs
)
self._subscriptions[subject].append(listen_obj)
return wrapped
return callback
return wrapper

def js_listen(
self,
subject: list or str,
data_type: type or str = "json",
validator: type = None,
validation_error_cb: Callable[[Msg, ValidationError], None] = None,
data_type: type = dict,
**kwargs,
):
def wrapper(function):
wrapped = self.wrap_function_by_validator(function, validator, validation_error_cb)
def wrapper(callback):
self._create_subscription_if_missing(subject, js=True)
js_listen_obj = JsListen(
callback=wrapped,
callback=callback,
subject=subject,
data_type=data_type,
**kwargs
)
self._js_subscriptions[subject].append(js_listen_obj)
return wrapped
return callback
return wrapper

def wrap_function_by_validator(self, function, validator, validation_error_cb):
def validate_message(msg):
try:
if validator is not None:
validator.validated_message(msg.data)
except exceptions.ValidationError as se:
if validation_error_cb:
return validation_error_cb(msg, se)
error = f"subject: {msg.subject} error: {str(se)}"
return {"success": False, "error": error}
except Exception as e:
raise ValidationError(e)
return True

def wrapper(msg):
validation_result = validate_message(msg)
if validation_result is not True:
return validation_result
return function(msg)

async def wrapper_async(msg):
validation_result = validate_message(msg)
if validation_result is not True:
return validation_result
return await function(msg)

if asyncio.iscoroutinefunction(function):
return wrapper_async
else:
return wrapper

def _create_subscription_if_missing(self, subscription, js=False):
if js:
Expand Down
Loading

0 comments on commit acc5a4b

Please sign in to comment.