From e780eee12d30470b5e493c0994947b6911b3dca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Mon, 5 Sep 2022 14:04:20 +0300 Subject: [PATCH 1/8] implemented topic.publish_all method. --- examples/topic/topic_example.py | 3 +++ .../protocol/codec/topic_publish_all_codec.py | 17 ++++++++++++++++ hazelcast/proxy/topic.py | 20 +++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 hazelcast/protocol/codec/topic_publish_all_codec.py diff --git a/examples/topic/topic_example.py b/examples/topic/topic_example.py index ec5d0f4443..4782e5e564 100644 --- a/examples/topic/topic_example.py +++ b/examples/topic/topic_example.py @@ -16,4 +16,7 @@ def on_message(event): topic.publish("Message " + str(i)) time.sleep(0.1) +topic.publish_all(["m1", "m2", "m3", "m4", "m5"]) +time.sleep(1) + client.shutdown() diff --git a/hazelcast/protocol/codec/topic_publish_all_codec.py b/hazelcast/protocol/codec/topic_publish_all_codec.py new file mode 100644 index 0000000000..6d39046e4b --- /dev/null +++ b/hazelcast/protocol/codec/topic_publish_all_codec.py @@ -0,0 +1,17 @@ +from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer +from hazelcast.protocol.builtin import StringCodec, ListMultiFrameCodec +from hazelcast.protocol.builtin import DataCodec + +# hex: 0x040400 +_REQUEST_MESSAGE_TYPE = 263168 +# hex: 0x040401 +_RESPONSE_MESSAGE_TYPE = 263169 + +_REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE + + +def encode_request(name, message_list): + buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE) + StringCodec.encode(buf, name) + ListMultiFrameCodec.encode(buf, message_list, DataCodec.encode, True) + return OutboundMessage(buf, False) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 785fc9c6ed..b74c484407 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -4,6 +4,7 @@ from hazelcast.protocol.codec import ( topic_add_message_listener_codec, topic_publish_codec, + topic_publish_all_codec, topic_remove_message_listener_codec, ) from hazelcast.proxy.base import PartitionSpecificProxy, TopicMessage @@ -70,6 +71,22 @@ def publish(self, message: MessageType) -> Future[None]: request = topic_publish_codec.encode_request(self.name, message_data) return self._invoke(request) + def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]: + """Publishes a list of messages to all subscribers of this topic + + Args: + messages: The message list to be published + """ + try: + data_list = [] + for m in messages: + data_list.append(self._to_data(m)) + except SchemaNotReplicatedError as e: + return self._send_schema_and_retry(e, self.publish, messages) + + request = topic_publish_all_codec.encode_request(self.name, data_list) + return self._invoke(request) + def remove_listener(self, registration_id: str) -> Future[bool]: """Stops receiving messages for the given message listener. @@ -107,6 +124,9 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() + def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] + return self._wrapped.publish_all(messages).result() + def remove_listener( # type: ignore[override] self, registration_id: str, From d2d7dc0387ac8e704d4f74533cc1f9d24854773e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Sat, 8 Oct 2022 14:12:05 +0300 Subject: [PATCH 2/8] add none check for args. --- .../protocol/codec/topic_publish_all_codec.py | 7 ++++--- hazelcast/proxy/topic.py | 20 +++++++++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/hazelcast/protocol/codec/topic_publish_all_codec.py b/hazelcast/protocol/codec/topic_publish_all_codec.py index 6d39046e4b..6ed1f3b7a5 100644 --- a/hazelcast/protocol/codec/topic_publish_all_codec.py +++ b/hazelcast/protocol/codec/topic_publish_all_codec.py @@ -1,5 +1,6 @@ from hazelcast.protocol.client_message import OutboundMessage, REQUEST_HEADER_SIZE, create_initial_buffer -from hazelcast.protocol.builtin import StringCodec, ListMultiFrameCodec +from hazelcast.protocol.builtin import StringCodec +from hazelcast.protocol.builtin import ListMultiFrameCodec from hazelcast.protocol.builtin import DataCodec # hex: 0x040400 @@ -10,8 +11,8 @@ _REQUEST_INITIAL_FRAME_SIZE = REQUEST_HEADER_SIZE -def encode_request(name, message_list): +def encode_request(name, messages): buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE) StringCodec.encode(buf, name) - ListMultiFrameCodec.encode(buf, message_list, DataCodec.encode, True) + ListMultiFrameCodec.encode(buf, messages, DataCodec.encode, True) return OutboundMessage(buf, False) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index b74c484407..7e28925f85 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -58,7 +58,7 @@ def handle(item_data, publish_time, uuid): ) def publish(self, message: MessageType) -> Future[None]: - """Publishes the message to all subscribers of this topic + """Publishes the message to all subscribers of this topic. Args: message: The message to be published. @@ -72,17 +72,22 @@ def publish(self, message: MessageType) -> Future[None]: return self._invoke(request) def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]: - """Publishes a list of messages to all subscribers of this topic + """Publishes a list of messages to all subscribers of this topic. Args: - messages: The message list to be published + messages: The messages to be published. """ try: + if messages is None: + raise TypeError("Null message is not allowed!") data_list = [] for m in messages: - data_list.append(self._to_data(m)) + data = self._to_data(m) + if data is None: + raise TypeError("Null message is not allowed!") + data_list.append(data) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.publish, messages) + return self._send_schema_and_retry(e, self.publish_all, messages) request = topic_publish_all_codec.encode_request(self.name, data_list) return self._invoke(request) @@ -124,7 +129,10 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] + def publish_all( # type: ignore[override] + self, + messages: typing.Sequence[MessageType] + ) -> None: return self._wrapped.publish_all(messages).result() def remove_listener( # type: ignore[override] From 871f9b1d5c123e864d34f119a74c16f994301a73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Sat, 8 Oct 2022 14:35:59 +0300 Subject: [PATCH 3/8] add tests for topic.publish_all --- hazelcast/proxy/topic.py | 10 ++++----- .../backward_compatible/proxy/topic_test.py | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 7e28925f85..979e7e89b5 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -1,5 +1,6 @@ import typing +from hazelcast.errors import NullPointerError from hazelcast.future import Future from hazelcast.protocol.codec import ( topic_add_message_listener_codec, @@ -79,12 +80,12 @@ def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]: """ try: if messages is None: - raise TypeError("Null message is not allowed!") + raise NullPointerError("Null message is not allowed!") data_list = [] for m in messages: data = self._to_data(m) if data is None: - raise TypeError("Null message is not allowed!") + raise NullPointerError("Null message is not allowed!") data_list.append(data) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.publish_all, messages) @@ -129,10 +130,7 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all( # type: ignore[override] - self, - messages: typing.Sequence[MessageType] - ) -> None: + def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] return self._wrapped.publish_all(messages).result() def remove_listener( # type: ignore[override] diff --git a/tests/integration/backward_compatible/proxy/topic_test.py b/tests/integration/backward_compatible/proxy/topic_test.py index eae452432e..a95fbd8b79 100644 --- a/tests/integration/backward_compatible/proxy/topic_test.py +++ b/tests/integration/backward_compatible/proxy/topic_test.py @@ -1,3 +1,4 @@ +from hazelcast.errors import NullPointerError from tests.base import SingleMemberTestCase from tests.util import random_string, event_collector @@ -44,3 +45,24 @@ def assert_event(): def test_str(self): self.assertTrue(str(self.topic).startswith("Topic")) + + def test_publish_all(self): + collector = event_collector() + self.topic.add_listener(on_message=collector) + + messages = ["message1", "message2", "message3"] + self.topic.publish_all(messages) + + def assert_event(): + self.assertEqual(len(collector.events), 3) + + self.assertTrueEventually(assert_event, 5) + + def test_publish_all_none_argument(self): + with self.assertRaises(NullPointerError): + self.topic.publish_all(None) + + def test_publish_all_none_message(self): + messages = ["message1", None, "message3"] + with self.assertRaises(NullPointerError): + self.topic.publish_all(messages) From 913f7db45f03bc238247cff7d3f07755d8b335ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Mon, 10 Oct 2022 08:27:23 +0300 Subject: [PATCH 4/8] fix styling. --- hazelcast/proxy/topic.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 979e7e89b5..4c32f880c9 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -130,7 +130,10 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] + def publish_all( # type: ignore[override] + self, + messages: typing.Sequence[MessageType] + ) -> None: return self._wrapped.publish_all(messages).result() def remove_listener( # type: ignore[override] From d2f0077fbed5f664020af8143d34dd9f56bc78a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Mon, 10 Oct 2022 08:31:03 +0300 Subject: [PATCH 5/8] fix styling. --- hazelcast/proxy/topic.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 4c32f880c9..bd275143d9 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -73,7 +73,7 @@ def publish(self, message: MessageType) -> Future[None]: return self._invoke(request) def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]: - """Publishes a list of messages to all subscribers of this topic. + """Publishes the messages to all subscribers of this topic. Args: messages: The messages to be published. @@ -130,9 +130,9 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all( # type: ignore[override] - self, - messages: typing.Sequence[MessageType] + def publish_all( # type: ignore[override] + self, + messages: typing.Sequence[MessageType] ) -> None: return self._wrapped.publish_all(messages).result() From 945b96544c5d77cf52013b86047f8bb3cf29eb3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Tue, 25 Oct 2022 15:27:30 +0300 Subject: [PATCH 6/8] add compact test and replace if statements with check_not_none. --- hazelcast/proxy/topic.py | 18 +++++++----------- .../backward_compatible/proxy/topic_test.py | 3 ++- .../compact_compatibility_test.py | 17 +++++++++++++++++ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index bd275143d9..96d781c21c 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -11,6 +11,7 @@ from hazelcast.proxy.base import PartitionSpecificProxy, TopicMessage from hazelcast.types import MessageType from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_not_none class Topic(PartitionSpecificProxy["BlockingTopic"], typing.Generic[MessageType]): @@ -78,19 +79,17 @@ def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]: Args: messages: The messages to be published. """ + check_not_none(messages, "Messages cannot be None") try: - if messages is None: - raise NullPointerError("Null message is not allowed!") - data_list = [] + topic_messages = [] for m in messages: + check_not_none(m, "Message cannot be None") data = self._to_data(m) - if data is None: - raise NullPointerError("Null message is not allowed!") - data_list.append(data) + topic_messages.append(data) except SchemaNotReplicatedError as e: return self._send_schema_and_retry(e, self.publish_all, messages) - request = topic_publish_all_codec.encode_request(self.name, data_list) + request = topic_publish_all_codec.encode_request(self.name, topic_messages) return self._invoke(request) def remove_listener(self, registration_id: str) -> Future[bool]: @@ -130,10 +129,7 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all( # type: ignore[override] - self, - messages: typing.Sequence[MessageType] - ) -> None: + def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] return self._wrapped.publish_all(messages).result() def remove_listener( # type: ignore[override] diff --git a/tests/integration/backward_compatible/proxy/topic_test.py b/tests/integration/backward_compatible/proxy/topic_test.py index a95fbd8b79..f9626cd8f6 100644 --- a/tests/integration/backward_compatible/proxy/topic_test.py +++ b/tests/integration/backward_compatible/proxy/topic_test.py @@ -1,6 +1,6 @@ from hazelcast.errors import NullPointerError from tests.base import SingleMemberTestCase -from tests.util import random_string, event_collector +from tests.util import random_string, event_collector, skip_if_client_version_older_than class TopicTest(SingleMemberTestCase): @@ -47,6 +47,7 @@ def test_str(self): self.assertTrue(str(self.topic).startswith("Topic")) def test_publish_all(self): + skip_if_client_version_older_than(self, "5.2") collector = event_collector() self.topic.add_listener(on_message=collector) diff --git a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py index 178c336037..f28fe5218a 100644 --- a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py +++ b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py @@ -1402,6 +1402,23 @@ def assertion(): def test_publish(self): self.topic.publish(OUTER_COMPACT_INSTANCE) + def test_publish_all(self): + messages = [] + + def listener(message): + messages.append(message) + + self.topic.add_listener(listener) + + self.topic.publish_all([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE]) + + def assertion(): + self.assertEqual(2, len(messages)) + self.assertEqual(INNER_COMPACT_INSTANCE, messages[0].message) + self.assertEqual(OUTER_COMPACT_INSTANCE, messages[1].message) + + self.assertTrueEventually(assertion) + def _publish_from_another_client(self, item): other_client = self.create_client(self.client_config) other_client_topic = other_client.get_topic(self.topic.name).blocking() From 88a35fe1c3c42e07a66194e68b9f8dfc80372d8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Tue, 25 Oct 2022 15:53:49 +0300 Subject: [PATCH 7/8] fix error type in tests. --- tests/integration/backward_compatible/proxy/topic_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/backward_compatible/proxy/topic_test.py b/tests/integration/backward_compatible/proxy/topic_test.py index f9626cd8f6..377f603e76 100644 --- a/tests/integration/backward_compatible/proxy/topic_test.py +++ b/tests/integration/backward_compatible/proxy/topic_test.py @@ -60,10 +60,10 @@ def assert_event(): self.assertTrueEventually(assert_event, 5) def test_publish_all_none_argument(self): - with self.assertRaises(NullPointerError): + with self.assertRaises(AssertionError): self.topic.publish_all(None) def test_publish_all_none_message(self): messages = ["message1", None, "message3"] - with self.assertRaises(NullPointerError): + with self.assertRaises(AssertionError): self.topic.publish_all(messages) From b91a6e5018e63f4a14938e3680d9762d4586863e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Tokg=C3=B6z?= Date: Tue, 25 Oct 2022 21:58:27 +0300 Subject: [PATCH 8/8] address review comments. --- hazelcast/proxy/topic.py | 8 +++++--- tests/integration/backward_compatible/proxy/topic_test.py | 5 +++-- .../compact_compatibility/compact_compatibility_test.py | 8 +++++++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/hazelcast/proxy/topic.py b/hazelcast/proxy/topic.py index 96d781c21c..a17df87435 100644 --- a/hazelcast/proxy/topic.py +++ b/hazelcast/proxy/topic.py @@ -1,6 +1,5 @@ import typing -from hazelcast.errors import NullPointerError from hazelcast.future import Future from hazelcast.protocol.codec import ( topic_add_message_listener_codec, @@ -9,8 +8,8 @@ topic_remove_message_listener_codec, ) from hazelcast.proxy.base import PartitionSpecificProxy, TopicMessage -from hazelcast.types import MessageType from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.types import MessageType from hazelcast.util import check_not_none @@ -129,7 +128,10 @@ def publish( # type: ignore[override] ) -> None: return self._wrapped.publish(message).result() - def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override] + def publish_all( # type: ignore[override] + self, + messages: typing.Sequence[MessageType], + ) -> None: return self._wrapped.publish_all(messages).result() def remove_listener( # type: ignore[override] diff --git a/tests/integration/backward_compatible/proxy/topic_test.py b/tests/integration/backward_compatible/proxy/topic_test.py index 377f603e76..eda0a82525 100644 --- a/tests/integration/backward_compatible/proxy/topic_test.py +++ b/tests/integration/backward_compatible/proxy/topic_test.py @@ -1,4 +1,3 @@ -from hazelcast.errors import NullPointerError from tests.base import SingleMemberTestCase from tests.util import random_string, event_collector, skip_if_client_version_older_than @@ -59,11 +58,13 @@ def assert_event(): self.assertTrueEventually(assert_event, 5) - def test_publish_all_none_argument(self): + def test_publish_all_none_messages(self): + skip_if_client_version_older_than(self, "5.2") with self.assertRaises(AssertionError): self.topic.publish_all(None) def test_publish_all_none_message(self): + skip_if_client_version_older_than(self, "5.2") messages = ["message1", None, "message3"] with self.assertRaises(AssertionError): self.topic.publish_all(messages) diff --git a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py index f28fe5218a..eb63e53020 100644 --- a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py +++ b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py @@ -6,7 +6,12 @@ from hazelcast.errors import NullPointerError, IllegalMonitorStateError from hazelcast.predicate import Predicate, paging from tests.base import HazelcastTestCase -from tests.util import random_string, compare_client_version, compare_server_version_with_rc +from tests.util import ( + random_string, + compare_client_version, + compare_server_version_with_rc, + skip_if_client_version_older_than, +) try: from hazelcast.serialization.api import ( @@ -1403,6 +1408,7 @@ def test_publish(self): self.topic.publish(OUTER_COMPACT_INSTANCE) def test_publish_all(self): + skip_if_client_version_older_than(self, "5.2") messages = [] def listener(message):