Skip to content

Commit

Permalink
Broker message (#28)
Browse files Browse the repository at this point in the history
* 🦄 refactor: broker message proxy
  • Loading branch information
mic1on authored Dec 27, 2023
1 parent 6af4455 commit db06e93
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/onestep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
NeverRetry, AlwaysRetry, TimesRetry, RetryIfException, AdvancedRetry
)
from .broker import (
BaseBroker, BaseConsumer, BaseLocalBroker, BaseLocalConsumer,
BaseBroker, BaseConsumer,
MemoryBroker, RabbitMQBroker, WebHookBroker, CronBroker, RedisStreamBroker, RedisPubSubBroker
)
from .middleware import (
Expand Down
4 changes: 2 additions & 2 deletions src/onestep/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .base import (
BaseBroker, BaseConsumer, BaseLocalBroker, BaseLocalConsumer
BaseBroker, BaseConsumer
)
from .memory import MemoryBroker
from .memory import MemoryBroker, MemoryConsumer
from .webhook import WebHookBroker
from .rabbitmq import RabbitMQBroker
from .redis import RedisStreamBroker, RedisPubSubBroker
Expand Down
79 changes: 8 additions & 71 deletions src/onestep/broker/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
import abc
import json
import logging
from queue import Queue, Empty, Full as FullException
from queue import Queue, Empty
from typing import Any, Optional, List, Callable

from onestep.middleware import BaseMiddleware
Expand All @@ -13,10 +12,11 @@


class BaseBroker:
message_cls = Message

def __init__(self,
name: Optional[str] = None,
queue: Optional[str] = None,
queue: Optional[Queue] = None,
middlewares: Optional[List[BaseMiddleware]] = None,
once: bool = False,
cancel_consume: Optional[Callable] = None):
Expand Down Expand Up @@ -46,7 +46,7 @@ def add_middleware(self, middleware: BaseMiddleware):
def send(self, message):
"""对消息进行预处理,然后再发送"""
if not isinstance(message, Message):
message = Message(body=message)
message = self.message_cls(body=message)
# TODO: 对消息发送进行N次重试,确保消息发送成功。
return self.publish(message.to_json())

Expand Down Expand Up @@ -112,80 +112,17 @@ def __str__(self):

class BaseConsumer:

def __init__(self, queue: Queue, *args, **kwargs):
self.queue = queue
def __init__(self, broker: BaseBroker, *args, **kwargs):
self.queue = broker.queue
self.message_cls = broker.message_cls or Message
self.timeout = kwargs.pop("timeout", 1000)

@abc.abstractmethod
def _to_message(self, data: Any):
"""
转换消息内容到 Message , 则必须实现此方法
"""
raise NotImplementedError('Please implement in subclasses.')

def __next__(self):
try:
data = self.queue.get(timeout=self.timeout / 1000)
return self._to_message(data)
return self.message_cls.from_broker(broker_message=data)
except Empty:
return None

def __iter__(self):
return self


class BaseLocalBroker(BaseBroker):

def __init__(self, maxsize=0, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = Queue(maxsize)

def publish(self, message: Any):
try:
self.queue.put_nowait(message)
except FullException:
logger.warning("CronBroker queue is full, skip this task, "
"you can increase maxsize with `maxsize` argument")

def consume(self, *args, **kwargs):
return BaseLocalConsumer(self.queue, *args, **kwargs)

def confirm(self, message: Message):
"""确认消息"""
pass

def reject(self, message: Message):
"""拒绝消息"""
pass

def requeue(self, message: Message, is_source=False):
"""重发消息:先拒绝 再 重入"""
if is_source:
self.publish(message.msg)
else:
self.send(message)

def __repr__(self):
return f"<{self.__class__.__name__} {self.name}>"

def __str__(self):
return self.name


class BaseLocalConsumer(BaseConsumer):

def _to_message(self, data: Any):
if isinstance(data, (str, bytes, bytearray)):
try:
message = json.loads(data)
except json.JSONDecodeError:
message = {"body": data}
else:
message = data
if not isinstance(message, dict):
message = {"body": message}
if "body" not in message:
# 来自 外部的消息 可能没有 body, 故直接认为都是 message.body
message = {"body": message}

return Message(body=message.get("body"), extra=message.get("extra"), msg=data)
10 changes: 5 additions & 5 deletions src/onestep/broker/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

from croniter import croniter

from .base import BaseLocalBroker, BaseLocalConsumer
from .memory import MemoryBroker, MemoryConsumer

logger = logging.getLogger(__name__)


class CronBroker(BaseLocalBroker):
class CronBroker(MemoryBroker):
_thread = None

def __init__(self, cron, name=None, middlewares=None, body: Any = None, *args, **kwargs):
Expand All @@ -31,13 +31,13 @@ def _scheduler(self):
self._thread = threading.Timer(interval=1, function=self._scheduler)
self._thread.start()

def consume(self):
def consume(self, *args, **kwargs):
self._scheduler()
return CronConsumer(self.queue)
return CronConsumer(self, *args, **kwargs)

def shutdown(self):
self._thread.cancel()


class CronConsumer(BaseLocalConsumer):
class CronConsumer(MemoryConsumer):
...
71 changes: 67 additions & 4 deletions src/onestep/broker/memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,72 @@
from .base import BaseLocalBroker, BaseLocalConsumer
import logging
import json
from queue import Queue, Full as FullException
from typing import Any

from .base import BaseBroker, BaseConsumer

class MemoryBroker(BaseLocalBroker):
...
from ..message import Message

logger = logging.getLogger(__name__)


class MemoryMessage(Message):

@classmethod
def from_broker(cls, broker_message: Any):
if isinstance(broker_message, (str, bytes, bytearray)):
try:
message = json.loads(broker_message)
except json.JSONDecodeError:
message = {"body": broker_message}
else:
message = broker_message
if not isinstance(message, dict):
message = {"body": message}
if "body" not in message:
# 来自 外部的消息 可能没有 body, 故直接认为都是 message.body
message = {"body": message}
return cls(body=message.get("body"), extra=message.get("extra"), message=broker_message)


class MemoryBroker(BaseBroker):
message_cls = MemoryMessage

def __init__(self, maxsize=0, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = Queue(maxsize)

def publish(self, message: Any):
try:
self.queue.put_nowait(message)
except FullException:
logger.warning("CronBroker queue is full, skip this task, "
"you can increase maxsize with `maxsize` argument")

def consume(self, *args, **kwargs):
return MemoryConsumer(self, *args, **kwargs)

def confirm(self, message: Message):
"""确认消息"""
pass

def reject(self, message: Message):
"""拒绝消息"""
pass

def requeue(self, message: Message, is_source=False):
"""重发消息:先拒绝 再 重入"""
if is_source:
self.publish(message.message)
else:
self.send(message)

def __repr__(self):
return f"<{self.__class__.__name__} {self.name}>"

def __str__(self):
return self.name


class MemoryConsumer(BaseLocalConsumer):
class MemoryConsumer(BaseConsumer):
...
41 changes: 24 additions & 17 deletions src/onestep/broker/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,25 @@
from ..message import Message


class _RabbitMQMessage(Message):

@classmethod
def from_broker(cls, broker_message: amqpstorm.Message):
try:
message = json.loads(broker_message.body)
except json.JSONDecodeError:
message = {"body": broker_message.body}
if not isinstance(message, dict):
message = {"body": message}
if "body" not in message:
# 来自 外部的消息 可能没有 body, 故直接认为都是 message.body
message = {"body": message}

return cls(body=message.get("body"), extra=message.get("extra"), message=broker_message)


class RabbitMQBroker(BaseBroker):
message_cls = _RabbitMQMessage

def __init__(self, queue_name, params: Optional[Dict] = None, prefetch: Optional[int] = 1, auto_create=True, *args,
**kwargs):
Expand All @@ -37,18 +55,18 @@ def consume(self, *args, **kwargs):
thread.daemon = daemon
thread.start()
self.threads.append(thread)
return RabbitMQConsumer(self.queue)
return RabbitMQConsumer(self)

def publish(self, message: Any):
self.client.send(self.queue_name, message)

def confirm(self, message: Message):
"""确认消息"""
message.msg.ack()
message.message.ack()

def reject(self, message: Message):
"""拒绝消息"""
message.msg.reject(requeue=False)
message.message.reject(requeue=False)

def requeue(self, message: Message, is_source=False):
"""
Expand All @@ -58,9 +76,9 @@ def requeue(self, message: Message, is_source=False):
:param is_source: 是否是原始消息,True: 使用原始消息重入当前队列,False: 使用消息的最新数据重入当前队列
"""
if is_source:
message.msg.reject(requeue=True)
message.message.reject(requeue=True)
else:
message.msg.reject(requeue=False)
message.message.reject(requeue=False)
self.send(message)

def shutdown(self):
Expand All @@ -70,15 +88,4 @@ def shutdown(self):


class RabbitMQConsumer(BaseConsumer):
def _to_message(self, data: amqpstorm.Message):
try:
message = json.loads(data.body)
except json.JSONDecodeError:
message = {"body": data.body}
if not isinstance(message, dict):
message = {"body": message}
if "body" not in message:
# 来自 外部的消息 可能没有 body, 故直接认为都是 message.body
message = {"body": message}

return Message(body=message.get("body"), extra=message.get("extra"), msg=data)
...
37 changes: 22 additions & 15 deletions src/onestep/broker/redis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,25 @@
from ..base import BaseBroker, BaseConsumer, Message


class _RedisPubSubMessage(Message):

@classmethod
def from_broker(cls, broker_message: Any):
if "channel" in broker_message:
try:
message = json.loads(broker_message.get("data")) # 已转换的 message
except (json.JSONDecodeError, TypeError):
message = {"body": broker_message.get("data")} # 未转换的 message
else:
# 来自 外部的消息 直接认为都是 message.body
message = {"body": broker_message.body}

yield cls(body=message.get("body"), extra=message.get("extra"), message=broker_message)


class RedisPubSubBroker(BaseBroker):
""" Redis PubSub Broker """
message_cls = _RedisPubSubMessage

def __init__(self, channel: str, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -40,14 +57,14 @@ def consume(self, *args, **kwargs):
thread.daemon = daemon
thread.start()
self.threads.append(thread)
return RedisPubSubConsumer(self.queue)
return RedisPubSubConsumer(self)

def send(self, message: Any):
"""Publish message to the Redis channel"""
if not isinstance(message, Message):
message = Message(body=message)
message = self.message_cls(body=message)

print(self.client.publish(self.channel, message.to_json()))
self.client.publish(self.channel, message.to_json())

publish = send

Expand All @@ -67,20 +84,10 @@ def requeue(self, message: Message, is_source=False):
self.reject(message)

if is_source:
self.client.publish(self.channel, message.msg['data'])
self.client.publish(self.channel, message.message['data'])
else:
self.send(message)


class RedisPubSubConsumer(BaseConsumer):
def _to_message(self, data):
if "channel" in data:
try:
message = json.loads(data.get("data")) # 已转换的 message
except (json.JSONDecodeError, TypeError):
message = {"body": data.get("data")} # 未转换的 message
else:
# 来自 外部的消息 直接认为都是 message.body
message = {"body": data.body}

yield Message(body=message.get("body"), extra=message.get("extra"), msg=data)
...
Loading

0 comments on commit db06e93

Please sign in to comment.