Skip to content

Commit

Permalink
implemented topic.publish_all method.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehmet Tokgöz committed Sep 5, 2022
1 parent 06a48e8 commit e780eee
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
3 changes: 3 additions & 0 deletions examples/topic/topic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ def on_message(event):
topic.publish("Message " + str(i))
time.sleep(0.1)

topic.publish_all(["m1", "m2", "m3", "m4", "m5"])
time.sleep(1)

client.shutdown()
17 changes: 17 additions & 0 deletions hazelcast/protocol/codec/topic_publish_all_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer
from hazelcast.protocol.builtin import StringCodec, ListMultiFrameCodec
from hazelcast.protocol.builtin import DataCodec

# hex: 0x040400
_REQUEST_MESSAGE_TYPE = 263168
# hex: 0x040401
_RESPONSE_MESSAGE_TYPE = 263169

_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE


def encode_request(name, message_list):
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
StringCodec.encode(buf, name)
ListMultiFrameCodec.encode(buf, message_list, DataCodec.encode, True)
return OutboundMessage(buf, False)
20 changes: 20 additions & 0 deletions hazelcast/proxy/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from hazelcast.protocol.codec import (
topic_add_message_listener_codec,
topic_publish_codec,
topic_publish_all_codec,
topic_remove_message_listener_codec,
)
from hazelcast.proxy.base import PartitionSpecificProxy, TopicMessage
Expand Down Expand Up @@ -70,6 +71,22 @@ def publish(self, message: MessageType) -> Future[None]:
request = topic_publish_codec.encode_request(self.name, message_data)
return self._invoke(request)

def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]:
"""Publishes a list of messages to all subscribers of this topic
Args:
messages: The message list to be published
"""
try:
data_list = []
for m in messages:
data_list.append(self._to_data(m))
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.publish, messages)

request = topic_publish_all_codec.encode_request(self.name, data_list)
return self._invoke(request)

def remove_listener(self, registration_id: str) -> Future[bool]:
"""Stops receiving messages for the given message listener.
Expand Down Expand Up @@ -107,6 +124,9 @@ def publish( # type: ignore[override]
) -> None:
return self._wrapped.publish(message).result()

def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override]
return self._wrapped.publish_all(messages).result()

def remove_listener( # type: ignore[override]
self,
registration_id: str,
Expand Down

0 comments on commit e780eee

Please sign in to comment.