Skip to content

Commit

Permalink
Send TCP metrics for bytes sent and received to server
Browse files Browse the repository at this point in the history
It would be good to send `tcp.bytesReceived` and `tcp.bytesSend`
metrics in the metrics blob to the server, so that, someone can
track how much traffic is passing between the server and the client.

I have manually tested the stats exported to Prometheus and verified
that these two new stats are displayed there correctly.
  • Loading branch information
mdumandag committed Oct 12, 2021
1 parent 6e7b244 commit 2956596
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 12 deletions.
14 changes: 9 additions & 5 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def __init__(self):
loop.shutdown()
loop = _BasicLoop(self.map)
self._loop = loop
self.bytes_sent = 0
self.bytes_received = 0

def start(self):
self._loop.start()
Expand Down Expand Up @@ -421,9 +423,11 @@ def handle_read(self):
try:
while True:
data = self.recv(receive_buffer_size)
bytes_received = len(data)
self._reactor.bytes_received += bytes_received
reader.read(data)
self.last_read_time = time.time()
if len(data) < receive_buffer_size:
if bytes_received < receive_buffer_size:
break
except socket.error as err:
if err.args[0] not in _RETRYABLE_ERROR_CODES:
Expand Down Expand Up @@ -461,7 +465,7 @@ def handle_write(self):
buf.truncate(0)

try:
sent = self.send(bytes_)
bytes_sent = self.send(bytes_)
except socket.error as err:
if err.args[0] in _RETRYABLE_ERROR_CODES:
# Couldn't write the bytes but we should
Expand All @@ -474,9 +478,9 @@ def handle_write(self):
# No exception is thrown during the send
self.last_write_time = time.time()
self.sent_protocol_bytes = True

if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])
self._reactor.bytes_sent += bytes_sent
if bytes_sent < len(bytes_):
write_queue.appendleft(bytes_[bytes_sent:])

def handle_close(self):
_logger.warning("Connection closed by server")
Expand Down
30 changes: 25 additions & 5 deletions hazelcast/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
_NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache"
_NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name"

_TCP_METRICS_PREFIX = "tcp"


class Statistics(object):
def __init__(
Expand Down Expand Up @@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG
self._registered_system_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the system related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the system related gauge %s. Error: %s", gauge_name, e
)

def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG):
Expand All @@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON
self._registered_process_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the process related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the process related gauge %s. Error: %s", gauge_name, e
)

def _collect_and_send_stats(self):
Expand All @@ -157,6 +159,7 @@ def _collect_and_send_stats(self):
self._add_client_attributes(attributes, connection)
self._add_near_cache_metrics(attributes, compressor)
self._add_system_and_process_metrics(attributes, compressor)
self._add_tcp_metrics(compressor)
self._send_stats(
collection_timestamp, "".join(attributes), compressor.generate_blob(), connection
)
Expand All @@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

if not self._registered_process_gauges:
# Do not create the process object if no process-related
Expand All @@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
# We don't have any metrics that do not have prefix.
Expand Down Expand Up @@ -342,9 +345,26 @@ def _add_near_cache_metric(
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
except:
_logger.exception(
"Error while collecting %s metric for near cache '%s'" % (metric, nc_name)
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
)

def _add_tcp_metrics(self, compressor):
self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent)
self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received)

def _add_tcp_metric(
self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES
):
descriptor = MetricDescriptor(
metric=metric,
prefix=_TCP_METRICS_PREFIX,
unit=unit,
)
try:
self._add_metric(compressor, descriptor, value, value_type)
except:
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)

def _add_metric(self, compressor, descriptor, value, value_type):
if value_type == ValueType.LONG:
compressor.add_long(descriptor, value)
Expand Down
37 changes: 35 additions & 2 deletions tests/integration/backward_compatible/client_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import time
import unittest

from tests.base import HazelcastTestCase
from tests.base import HazelcastTestCase, SingleMemberTestCase
from hazelcast.client import HazelcastClient
from hazelcast.lifecycle import LifecycleState
from tests.hzrc.ttypes import Lang
from tests.util import get_current_timestamp
from tests.util import get_current_timestamp, compare_client_version, random_string


class ClientTest(HazelcastTestCase):
Expand Down Expand Up @@ -110,3 +111,35 @@ def get_labels_from_member(self, client_uuid):
client_uuid
)
return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result


@unittest.skipIf(
compare_client_version("4.2.2") < 0, "Tests the features added in 4.1 version of the client"
)
class ClientTcpMetricsTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

def test_bytes_received(self):
reactor = self.client._reactor

bytes_received = reactor.bytes_received
self.assertGreater(bytes_received, 0)

m = self.client.get_map(random_string()).blocking()
m.get(random_string())

self.assertGreater(reactor.bytes_received, bytes_received)

def test_bytes_sent(self):
reactor = self.client._reactor

bytes_sent = reactor.bytes_sent
self.assertGreater(bytes_sent, 0)

m = self.client.get_map(random_string()).blocking()
m.set(random_string(), random_string())

self.assertGreater(reactor.bytes_sent, bytes_sent)

0 comments on commit 2956596

Please sign in to comment.