Skip to content

Commit

Permalink
add compact test and replace if statements with check_not_none.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehmet Tokgöz committed Oct 25, 2022
1 parent d2f0077 commit 945b965
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
18 changes: 7 additions & 11 deletions hazelcast/proxy/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from hazelcast.proxy.base import PartitionSpecificProxy, TopicMessage
from hazelcast.types import MessageType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none


class Topic(PartitionSpecificProxy["BlockingTopic"], typing.Generic[MessageType]):
Expand Down Expand Up @@ -78,19 +79,17 @@ def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]:
Args:
messages: The messages to be published.
"""
check_not_none(messages, "Messages cannot be None")
try:
if messages is None:
raise NullPointerError("Null message is not allowed!")
data_list = []
topic_messages = []
for m in messages:
check_not_none(m, "Message cannot be None")
data = self._to_data(m)
if data is None:
raise NullPointerError("Null message is not allowed!")
data_list.append(data)
topic_messages.append(data)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.publish_all, messages)

request = topic_publish_all_codec.encode_request(self.name, data_list)
request = topic_publish_all_codec.encode_request(self.name, topic_messages)
return self._invoke(request)

def remove_listener(self, registration_id: str) -> Future[bool]:
Expand Down Expand Up @@ -130,10 +129,7 @@ def publish( # type: ignore[override]
) -> None:
return self._wrapped.publish(message).result()

def publish_all( # type: ignore[override]
self,
messages: typing.Sequence[MessageType]
) -> None:
def publish_all(self, messages: typing.Sequence[MessageType]) -> None: # type: ignore[override]
return self._wrapped.publish_all(messages).result()

def remove_listener( # type: ignore[override]
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/backward_compatible/proxy/topic_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from hazelcast.errors import NullPointerError
from tests.base import SingleMemberTestCase
from tests.util import random_string, event_collector
from tests.util import random_string, event_collector, skip_if_client_version_older_than


class TopicTest(SingleMemberTestCase):
Expand Down Expand Up @@ -47,6 +47,7 @@ def test_str(self):
self.assertTrue(str(self.topic).startswith("Topic"))

def test_publish_all(self):
skip_if_client_version_older_than(self, "5.2")
collector = event_collector()
self.topic.add_listener(on_message=collector)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,23 @@ def assertion():
def test_publish(self):
self.topic.publish(OUTER_COMPACT_INSTANCE)

def test_publish_all(self):
messages = []

def listener(message):
messages.append(message)

self.topic.add_listener(listener)

self.topic.publish_all([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE])

def assertion():
self.assertEqual(2, len(messages))
self.assertEqual(INNER_COMPACT_INSTANCE, messages[0].message)
self.assertEqual(OUTER_COMPACT_INSTANCE, messages[1].message)

self.assertTrueEventually(assertion)

def _publish_from_another_client(self, item):
other_client = self.create_client(self.client_config)
other_client_topic = other_client.get_topic(self.topic.name).blocking()
Expand Down

0 comments on commit 945b965

Please sign in to comment.