Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add value/data/seq properties to message #326

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
Release History
===============

1.6.0 (Unreleased)
+++++++++++++++++++

- Added `data`, `value`, `sequence` properties to `uamqp.Message`, which return the body if the body type corresponds.
- Added `message_annotations` property to `uamqp.Message`, which is an alias for the `annotations` instance variable.
- Added `data` property to `uamqp.BatchMessage`, which returns the iterable body of the batch.
- Added `ttl` property to `uamqp.MessageHeader`, which is an alias for the `time_to_live` instance variable.

1.5.3 (2022-03-23)
+++++++++++++++++++

Expand Down
3 changes: 3 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ def hack_open(ins):
assert type(exc) == RuntimeError
client.close()
assert not client._keep_alive_thread

# check that kwargs can be passed to client.do_work
client.do_work(fake_kwarg="ignore")
16 changes: 16 additions & 0 deletions tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,18 @@ def test_deepcopy_batch_message():
batch_message = list(message_batch._body_gen)[0]
batch_copy_message = list(message_batch_copy._body_gen)[0]
assert len(list(message_batch._body_gen)) == len(list(message_batch_copy._body_gen))
assert len(list(message_batch.data)) == len(list(message_batch_copy._body_gen))
assert list(list(message_batch.data)[0].data) == list(list(message_batch_copy._body_gen)[0].data)
assert message_batch.value is None
assert message_batch.sequence is None

# check message attributes are equal to deepcopied message attributes
assert list(batch_message.get_data()) == list(batch_copy_message.get_data())
assert batch_message.footer == batch_copy_message.footer
assert batch_message.state == batch_copy_message.state
assert batch_message.application_properties == batch_copy_message.application_properties
assert batch_message.annotations == batch_copy_message.annotations
assert batch_message.annotations == batch_copy_message.message_annotations
assert batch_message.delivery_annotations == batch_copy_message.delivery_annotations
assert batch_message.settled == batch_copy_message.settled
assert batch_message.properties.message_id == batch_copy_message.properties.message_id
Expand All @@ -224,6 +229,7 @@ def test_deepcopy_batch_message():
assert batch_message.properties.reply_to_group_id == batch_copy_message.properties.reply_to_group_id
assert batch_message.header.delivery_count == batch_copy_message.header.delivery_count
assert batch_message.header.time_to_live == batch_copy_message.header.time_to_live
assert batch_message.header.ttl == batch_copy_message.header.time_to_live
assert batch_message.header.first_acquirer == batch_copy_message.header.first_acquirer
assert batch_message.header.durable == batch_copy_message.header.durable
assert batch_message.header.priority == batch_copy_message.header.priority
Expand Down Expand Up @@ -285,6 +291,9 @@ def test_message_body_data_type():
assert check_list[0] == multiple_data[0]
assert check_list[1] == multiple_data[1].encode("UTF-8")
assert str(multiple_data_message)
assert list(multiple_data_message.data) == list(multiple_data_message.get_data())
assert multiple_data_message.value is None
assert multiple_data_message.sequence is None

with pytest.raises(TypeError):
Message(body={"key": "value"}, body_type=MessageBodyType.Data)
Expand Down Expand Up @@ -324,6 +333,10 @@ def test_message_body_value_type():
assert isinstance(string_value_message._body, ValueBody)
assert str(compound_list_value_message)

assert compound_list_value_message.value == compound_list_value_message.get_data()
assert compound_list_value_message.data is None
assert compound_list_value_message.sequence is None


def test_message_body_sequence_type():

Expand All @@ -343,6 +356,9 @@ def test_message_body_sequence_type():
assert check_list[0] == multiple_lists[0]
assert check_list[1] == multiple_lists[1]
assert str(multiple_lists_message)
assert list(multiple_lists_message.sequence) == list(multiple_lists_message.get_data())
assert multiple_lists_message.value is None
assert multiple_lists_message.data is None

with pytest.raises(TypeError):
Message(body={"key": "value"}, body_type=MessageBodyType.Sequence)
Expand Down
2 changes: 1 addition & 1 deletion uamqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
pass # Async not supported.


__version__ = "1.5.3"
__version__ = "1.6.0"


_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async def client_ready_async(self):
return False
return True

async def do_work_async(self):
async def do_work_async(self, **kwargs): # pylint: disable=unused-argument
"""Run a single connection iteration asynchronously.
This will return `True` if the connection is still open
and ready to be used for further work, or `False` if it needs
Expand Down
2 changes: 1 addition & 1 deletion uamqp/authentication/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _encode(self, value):
return value.encode(self._encoding) if isinstance(value, six.text_type) else value

def set_io(self, hostname, port, http_proxy, transport_type):
if transport_type == TransportType.AmqpOverWebsocket or http_proxy is not None:
if transport_type and transport_type.value == TransportType.AmqpOverWebsocket.value or http_proxy is not None:
self.set_wsio(hostname, port or constants.DEFAULT_AMQP_WSS_PORT, http_proxy)
else:
self.set_tlsio(hostname, port or constants.DEFAULT_AMQPS_PORT)
Expand Down
2 changes: 1 addition & 1 deletion uamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def client_ready(self):
return False
return True

def do_work(self):
def do_work(self, **kwargs): # pylint: disable=unused-argument
"""Run a single connection iteration.
This will return `True` if the connection is still open
and ready to be used for further work, or `False` if it needs
Expand Down
48 changes: 48 additions & 0 deletions uamqp/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ def annotations(self, value):
raise TypeError("Message annotations must be a dictionary.")
self._annotations = value

@property
def message_annotations(self):
return self.annotations

@property
def delivery_annotations(self):
if self._need_further_parse:
Expand All @@ -235,6 +239,33 @@ def delivery_annotations(self):
def delivery_annotations(self, value):
self._delivery_annotations = value

@property
def data(self):
if not self._message or not self._body:
return None
# pylint: disable=protected-access
if self._body.type == c_uamqp.MessageBodyType.DataType:
return self._body.data
return None

@property
def sequence(self):
if not self._message or not self._body:
return None
# pylint: disable=protected-access
if self._body.type == c_uamqp.MessageBodyType.SequenceType:
return self._body.data
return None

@property
def value(self):
if not self._message or not self._body:
return None
# pylint: disable=protected-access
if self._body.type == c_uamqp.MessageBodyType.ValueType:
return self._body.data
return None

@classmethod
def decode_from_bytes(cls, data):
"""Decode an AMQP message from a bytearray.
Expand Down Expand Up @@ -730,6 +761,14 @@ def _multi_message_generator(self):
_logger.debug("Sent all batched data.")
break

@property
def data(self):
"""Returns an iterable source of data, where each value will be considered the
body of a single message in the batch.
:rtype: iterable
"""
return self._body_gen

def gather(self):
"""Return all the messages represented by this object. This will convert
the batch data into individual Message objects, which may be one
Expand Down Expand Up @@ -1351,6 +1390,15 @@ def __str__(self):
}
)

@property
def ttl(self):
"""
Alias for time_to_live.

:rtype: int
"""
return self.time_to_live

def get_header_obj(self):
"""Get the underlying C reference from this object.

Expand Down