Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Send TCP metrics for bytes sent and received to server #497

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def __init__(self):
loop.shutdown()
loop = _BasicLoop(self.map)
self._loop = loop
self.bytes_sent = 0
self.bytes_received = 0

def start(self):
self._loop.start()
Expand Down Expand Up @@ -421,9 +423,11 @@ def handle_read(self):
try:
while True:
data = self.recv(receive_buffer_size)
bytes_received = len(data)
self._reactor.bytes_received += bytes_received
reader.read(data)
self.last_read_time = time.time()
if len(data) < receive_buffer_size:
if bytes_received < receive_buffer_size:
break
except socket.error as err:
if err.args[0] not in _RETRYABLE_ERROR_CODES:
Expand Down Expand Up @@ -461,7 +465,7 @@ def handle_write(self):
buf.truncate(0)

try:
sent = self.send(bytes_)
bytes_sent = self.send(bytes_)
except socket.error as err:
if err.args[0] in _RETRYABLE_ERROR_CODES:
# Couldn't write the bytes but we should
Expand All @@ -474,9 +478,9 @@ def handle_write(self):
# No exception is thrown during the send
self.last_write_time = time.time()
self.sent_protocol_bytes = True

if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])
self._reactor.bytes_sent += bytes_sent
if bytes_sent < len(bytes_):
write_queue.appendleft(bytes_[bytes_sent:])

def handle_close(self):
_logger.warning("Connection closed by server")
Expand Down
30 changes: 25 additions & 5 deletions hazelcast/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
_NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache"
_NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name"

_TCP_METRICS_PREFIX = "tcp"


class Statistics(object):
def __init__(
Expand Down Expand Up @@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG
self._registered_system_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the system related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the system related gauge %s. Error: %s", gauge_name, e
)

def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG):
Expand All @@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON
self._registered_process_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the process related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the process related gauge %s. Error: %s", gauge_name, e
)

def _collect_and_send_stats(self):
Expand All @@ -157,6 +159,7 @@ def _collect_and_send_stats(self):
self._add_client_attributes(attributes, connection)
self._add_near_cache_metrics(attributes, compressor)
self._add_system_and_process_metrics(attributes, compressor)
self._add_tcp_metrics(compressor)
self._send_stats(
collection_timestamp, "".join(attributes), compressor.generate_blob(), connection
)
Expand All @@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

if not self._registered_process_gauges:
# Do not create the process object if no process-related
Expand All @@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
# We don't have any metrics that do not have prefix.
Expand Down Expand Up @@ -342,9 +345,26 @@ def _add_near_cache_metric(
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
except:
_logger.exception(
"Error while collecting %s metric for near cache '%s'" % (metric, nc_name)
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
)

def _add_tcp_metrics(self, compressor):
self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent)
self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received)

def _add_tcp_metric(
self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES
):
descriptor = MetricDescriptor(
metric=metric,
prefix=_TCP_METRICS_PREFIX,
unit=unit,
)
try:
self._add_metric(compressor, descriptor, value, value_type)
except:
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)

def _add_metric(self, compressor, descriptor, value, value_type):
if value_type == ValueType.LONG:
compressor.add_long(descriptor, value)
Expand Down
34 changes: 32 additions & 2 deletions tests/integration/backward_compatible/client_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import time
import unittest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not used anymore I think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also don't you check unused imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected it, thanks for the catch.
We use black to lint the code, but unfortunately, it does not have this feature. It seems autoflake and pylint have it, but enabling them requires a bit of work to get rid of their warnings. There was a task about this in Jira


from tests.base import HazelcastTestCase
from tests.base import HazelcastTestCase, SingleMemberTestCase
from hazelcast.client import HazelcastClient
from hazelcast.lifecycle import LifecycleState
from tests.hzrc.ttypes import Lang
from tests.util import get_current_timestamp
from tests.util import get_current_timestamp, random_string


class ClientTest(HazelcastTestCase):
Expand Down Expand Up @@ -110,3 +111,32 @@ def get_labels_from_member(self, client_uuid):
client_uuid
)
return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result


class ClientTcpMetricsTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

def test_bytes_received(self):
reactor = self.client._reactor

bytes_received = reactor.bytes_received
self.assertGreater(bytes_received, 0)

m = self.client.get_map(random_string()).blocking()
m.get(random_string())

self.assertGreater(reactor.bytes_received, bytes_received)

def test_bytes_sent(self):
reactor = self.client._reactor

bytes_sent = reactor.bytes_sent
self.assertGreater(bytes_sent, 0)

m = self.client.get_map(random_string()).blocking()
m.set(random_string(), random_string())

self.assertGreater(reactor.bytes_sent, bytes_sent)