From 3c2babf0d09572bcb9f8fa5d49784a3c714512d6 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 12 Oct 2021 10:54:35 +0300 Subject: [PATCH 1/2] Send TCP metrics for bytes sent and received to server It would be good to send `tcp.bytesReceived` and `tcp.bytesSend` metrics in the metrics blob to the server, so that, someone can track how much traffic is passing between the server and the client. I have manually tested the stats exported to Prometheus and verified that these two new stats are displayed there correctly. --- hazelcast/reactor.py | 14 +++++--- hazelcast/statistics.py | 30 +++++++++++++--- .../backward_compatible/client_test.py | 34 +++++++++++++++++-- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 96dd0905fe..e477466b70 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -344,6 +344,8 @@ def __init__(self): loop.shutdown() loop = _BasicLoop(self.map) self._loop = loop + self.bytes_sent = 0 + self.bytes_received = 0 def start(self): self._loop.start() @@ -421,9 +423,11 @@ def handle_read(self): try: while True: data = self.recv(receive_buffer_size) + bytes_received = len(data) + self._reactor.bytes_received += bytes_received reader.read(data) self.last_read_time = time.time() - if len(data) < receive_buffer_size: + if bytes_received < receive_buffer_size: break except socket.error as err: if err.args[0] not in _RETRYABLE_ERROR_CODES: @@ -461,7 +465,7 @@ def handle_write(self): buf.truncate(0) try: - sent = self.send(bytes_) + bytes_sent = self.send(bytes_) except socket.error as err: if err.args[0] in _RETRYABLE_ERROR_CODES: # Couldn't write the bytes but we should @@ -474,9 +478,9 @@ def handle_write(self): # No exception is thrown during the send self.last_write_time = time.time() self.sent_protocol_bytes = True - - if sent < len(bytes_): - write_queue.appendleft(bytes_[sent:]) + self._reactor.bytes_sent += bytes_sent + if bytes_sent < len(bytes_): + write_queue.appendleft(bytes_[bytes_sent:]) def handle_close(self): _logger.warning("Connection closed by server") diff --git a/hazelcast/statistics.py b/hazelcast/statistics.py index 9589306508..432797d68d 100644 --- a/hazelcast/statistics.py +++ b/hazelcast/statistics.py @@ -25,6 +25,8 @@ _NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache" _NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name" +_TCP_METRICS_PREFIX = "tcp" + class Statistics(object): def __init__( @@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG self._registered_system_gauges[gauge_name] = (gauge_fn, value_type) except Exception as e: _logger.debug( - "Unable to register the system related gauge %s. Error: %s" % (gauge_name, e) + "Unable to register the system related gauge %s. Error: %s", gauge_name, e ) def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG): @@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON self._registered_process_gauges[gauge_name] = (gauge_fn, value_type) except Exception as e: _logger.debug( - "Unable to register the process related gauge %s. Error: %s" % (gauge_name, e) + "Unable to register the process related gauge %s. Error: %s", gauge_name, e ) def _collect_and_send_stats(self): @@ -157,6 +159,7 @@ def _collect_and_send_stats(self): self._add_client_attributes(attributes, connection) self._add_near_cache_metrics(attributes, compressor) self._add_system_and_process_metrics(attributes, compressor) + self._add_tcp_metrics(compressor) self._send_stats( collection_timestamp, "".join(attributes), compressor.generate_blob(), connection ) @@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor): attributes, compressor, gauge_name, value, value_type ) except: - _logger.exception("Error while collecting '%s'." % gauge_name) + _logger.exception("Error while collecting '%s'.", gauge_name) if not self._registered_process_gauges: # Do not create the process object if no process-related @@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor): attributes, compressor, gauge_name, value, value_type ) except: - _logger.exception("Error while collecting '%s'." % gauge_name) + _logger.exception("Error while collecting '%s'.", gauge_name) def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type): # We don't have any metrics that do not have prefix. @@ -342,9 +345,26 @@ def _add_near_cache_metric( self._add_attribute(attributes, metric, value, nc_name_with_prefix) except: _logger.exception( - "Error while collecting %s metric for near cache '%s'" % (metric, nc_name) + "Error while collecting %s metric for near cache '%s'.", metric, nc_name ) + def _add_tcp_metrics(self, compressor): + self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent) + self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received) + + def _add_tcp_metric( + self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES + ): + descriptor = MetricDescriptor( + metric=metric, + prefix=_TCP_METRICS_PREFIX, + unit=unit, + ) + try: + self._add_metric(compressor, descriptor, value, value_type) + except: + _logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric) + def _add_metric(self, compressor, descriptor, value, value_type): if value_type == ValueType.LONG: compressor.add_long(descriptor, value) diff --git a/tests/integration/backward_compatible/client_test.py b/tests/integration/backward_compatible/client_test.py index e1da5b74ad..a89a36a69a 100644 --- a/tests/integration/backward_compatible/client_test.py +++ b/tests/integration/backward_compatible/client_test.py @@ -1,10 +1,11 @@ import time +import unittest -from tests.base import HazelcastTestCase +from tests.base import HazelcastTestCase, SingleMemberTestCase from hazelcast.client import HazelcastClient from hazelcast.lifecycle import LifecycleState from tests.hzrc.ttypes import Lang -from tests.util import get_current_timestamp +from tests.util import get_current_timestamp, random_string class ClientTest(HazelcastTestCase): @@ -110,3 +111,32 @@ def get_labels_from_member(self, client_uuid): client_uuid ) return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result + + +class ClientTcpMetricsTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + def test_bytes_received(self): + reactor = self.client._reactor + + bytes_received = reactor.bytes_received + self.assertGreater(bytes_received, 0) + + m = self.client.get_map(random_string()).blocking() + m.get(random_string()) + + self.assertGreater(reactor.bytes_received, bytes_received) + + def test_bytes_sent(self): + reactor = self.client._reactor + + bytes_sent = reactor.bytes_sent + self.assertGreater(bytes_sent, 0) + + m = self.client.get_map(random_string()).blocking() + m.set(random_string(), random_string()) + + self.assertGreater(reactor.bytes_sent, bytes_sent) From e645e182f6c528b54a210d190abfdd9a205a4678 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 12 Oct 2021 17:17:54 +0300 Subject: [PATCH 2/2] remove unused import --- tests/integration/backward_compatible/client_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/backward_compatible/client_test.py b/tests/integration/backward_compatible/client_test.py index a89a36a69a..da8e741eb5 100644 --- a/tests/integration/backward_compatible/client_test.py +++ b/tests/integration/backward_compatible/client_test.py @@ -1,5 +1,4 @@ import time -import unittest from tests.base import HazelcastTestCase, SingleMemberTestCase from hazelcast.client import HazelcastClient