Skip to content

Commit

Permalink
Use requests.Session in WavefrontClient (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
ustinov authored Mar 24, 2023
1 parent 12e883f commit 4528a1a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 96 deletions.
99 changes: 64 additions & 35 deletions example.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#! /usr/bin/env python3
"""Wavefront SDK Usage Example."""

import platform
import sys
import time
import uuid
Expand All @@ -11,68 +13,95 @@
def send_metrics(wavefront_client):
"""Send a metric."""
wavefront_client.send_metric(
'python.proxy.new york.power.usage',
42422.0, None, 'localhost', None)
'python.sdk.example.new_york.power.usage', # metric name (str)
time.time() % 3600, # metric value (float)
None, # metric timestamp (int)
platform.node(), # metric source (str)
{}) # metric tags (dict)


def send_delta_counter(wavefront_client):
"""Send a delta counter."""
wavefront_client.send_delta_counter(
'python.delta.proxy.counter',
1.0, 'localhost', None)
'python.sdk.example.delta.proxy.counter', # counter name (str)
1.0, # counter value (float)
platform.node(), # source name (str)
None) # counter tags (dict)


def send_histogram(wavefront_client):
"""Send a histogram."""
wavefront_client.send_distribution(
'python.proxy.request.latency',
[(30, 20), (5.1, 10)], {histogram_granularity.DAY,
histogram_granularity.HOUR,
histogram_granularity.MINUTE},
None, 'appServer1', {'region': 'us-west'})
'python.sdk.example.request.latency', # histogram name (str)
[(30, 20), (5.1, 10)], # histogram centroids (list)
{histogram_granularity.DAY,
histogram_granularity.HOUR,
histogram_granularity.MINUTE}, # granularities (set)
None, # histogram timestamp (int)
platform.node(), # histogram source (str)
{'region': 'us-west'}) # histogram tags (dict)


def send_tracing_span(wavefront_client):
"""Send a tracing span."""
wavefront_client.send_span(
'getAllUsersFromPythonProxy', 1533529977, 343500, 'localhost',
uuid.UUID('7b3bf470-9456-11e8-9eb6-529269fb1459'),
uuid.UUID('0313bafe-9457-11e8-9eb6-529269fb1459'),
[uuid.UUID('2f64e538-9457-11e8-9eb6-529269fb1459')], None,
[('application', 'Wavefront'),
('http.method', 'GET')], None)
'getAllUsersFromPythonProxy', # span name (str)
1533529977, # start milliseconds (int)
343500, # duration milliseconds (int)
platform.node(), # span source (str)
uuid.UUID('7b3bf470-9456-11e8-9eb6-529269fb1459'), # trace ID (UUID)
uuid.UUID('0313bafe-9457-11e8-9eb6-529269fb1459'), # span ID (UUID)
[
uuid.UUID('2f64e538-9457-11e8-9eb6-529269fb1459')
], # parents (list[UUID])
None, # follows from (list[UUID])
[
('application', 'Wavefront'),
('http.method', 'GET')
], # span tags (list[tuple])
None # span log
)


def send_event(wavefront_client):
"""Send an event."""
wavefront_client.send_event(
'event_via_proxy',
1590650592,
1590650692,
"localhost",
["env:", "test"],
{"severity": "info",
"type": "backup",
"details": "broker backup"})


if __name__ == '__main__':
# Either "proxy://our.proxy.lb.com:2878"
# Or "https://someToken@DOMAIN.wavefront.com"
'python_sdk_example_event', # event name (str)
1590650592, # event start milliseconds (int)
1590650692, # event end milliseconds (int)
platform.node(), # event source (str)
["env:", "test"], # event tags (list[str] or tuple[str])
{
"severity": "info",
"type": "backup",
"details": "broker backup"
} # event annotations (dict)
)


def main():
"""Send sample metrics in a loop."""
wavefront_proxy_url = sys.argv[1]

client_factory = WavefrontClientFactory()
client_factory.add_client(wavefront_proxy_url)
wavefront_client = client_factory.get_client()
wfront_client = client_factory.get_client()

try:
while True:
send_metrics(wavefront_client)
send_histogram(wavefront_client)
send_tracing_span(wavefront_client)
send_delta_counter(wavefront_client)
send_event(wavefront_client)
send_metrics(wfront_client)
send_histogram(wfront_client)
send_tracing_span(wfront_client)
send_delta_counter(wfront_client)
send_event(wfront_client)

time.sleep(15)
finally:
wavefront_client.close()
wfront_client.close()


if __name__ == '__main__':
# Either "proxy://our.proxy.lb.com:2878"
# Or "https://someToken@DOMAIN.wavefront.com"
# should be passed as an input in sys.argv[1]
main()
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setuptools.setup(
name='wavefront-sdk-python',
version='1.8.15', # Please update with each pull request.
version='1.9.0', # Please update with each pull request.
author='VMware Aria Operations for Applications Team',
url='https://github.com/wavefrontHQ/wavefront-sdk-python',
license='Apache-2.0',
Expand Down Expand Up @@ -55,7 +55,7 @@
packages=setuptools.find_packages(exclude=('*.tests', '*.tests.*',
'tests.*', 'tests')),
install_requires=(
'requests>=2.18.4',
'requests>=2.27',
'tdigest>=0.5.2',
'Deprecated>=1.2.10'
)
Expand Down
62 changes: 25 additions & 37 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

import unittest
import uuid
from unittest.mock import ANY
from unittest.mock import Mock
from unittest.mock import patch

import requests

from wavefront_sdk.client import WavefrontClient
import wavefront_sdk
from wavefront_sdk.common.metrics import registry
from wavefront_sdk.entities.tracing.span_log import SpanLog

Expand All @@ -20,26 +17,24 @@ class TestUtils(unittest.TestCase):
"""Test direct ingestion."""

def setUp(self):
self._sender = WavefrontClient(
'no_server',
'no_token',
self._sender = wavefront_sdk.client.WavefrontClient(
'no_server', 'no_token',
flush_interval_seconds=86400, # turn off auto flushing
enable_internal_metrics=False)
self._spans_log_buffer = self._sender._spans_log_buffer
self._tracing_spans_buffer = self._sender._tracing_spans_buffer
self._response = Mock()
self._response = unittest.mock.Mock()
self._response.status_code = 200

def test_send_version_with_internal_metrics(self):
no_registry = registry.WavefrontSdkMetricsRegistry(
wf_metric_sender=None)
with patch.object(
with unittest.mock.patch.object(
registry,
'WavefrontSdkMetricsRegistry',
return_value=no_registry) as mock_registry:
WavefrontClient(
'no_server',
'no_token',
wavefront_sdk.client.WavefrontClient(
'no_server', 'no_token',
flush_interval_seconds=86400,
enable_internal_metrics=True)
self.assertRegex(
Expand All @@ -62,8 +57,6 @@ def test_send_span_with_span_logs(self):
1635123789456000,
{"FooLogKey": "FooLogValue"})])

self.maxDiff = None

# Verify span logs correctly emitted
actual_line = self._spans_log_buffer.get()
expected_line = (
Expand Down Expand Up @@ -104,8 +97,6 @@ def test_send_span_without_span_logs(self):
[('application', 'Wavefront'), ('service', 'test-spans')],
[])

self.maxDiff = None

# Assert no span logs emitted
self.assertTrue(self._spans_log_buffer.empty())

Expand All @@ -122,30 +113,27 @@ def test_send_span_without_span_logs(self):

def test_report_event(self):

with patch.object(
requests, 'post', return_value=self._response) as mock_post:
self._sender._report(
'', self._sender.WAVEFRONT_EVENT_FORMAT, '', Mock())

mock_post.assert_called_once_with(
ANY,
params=None,
headers=ANY,
data=ANY,
timeout=self._sender.HTTP_TIMEOUT)
with unittest.mock.patch.object(
requests.Session, 'post',
return_value=self._response) as mock_post:
self._sender._report('',
self._sender.WAVEFRONT_EVENT_FORMAT,
'',
unittest.mock.Mock())
mock_post.assert_called_once_with(unittest.mock.ANY,
headers=unittest.mock.ANY,
data=unittest.mock.ANY)

def test_report_non_event(self):

with patch.object(
requests, 'post', return_value=self._response) as mock_post:
self._sender._report('', 'metric', '', Mock())

mock_post.assert_called_once_with(
ANY,
params=ANY,
headers=ANY,
data=ANY,
timeout=self._sender.HTTP_TIMEOUT)
with unittest.mock.patch.object(
requests.Session, 'post',
return_value=self._response) as mock_post:
self._sender._report('', 'metric', '', unittest.mock.Mock())
mock_post.assert_called_once_with(unittest.mock.ANY,
params=unittest.mock.ANY,
headers=unittest.mock.ANY,
data=unittest.mock.ANY)


if __name__ == '__main__':
Expand Down
9 changes: 2 additions & 7 deletions wavefront_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,15 @@
import pkg_resources

from .client_factory import WavefrontClientFactory
from .direct import WavefrontDirectClient
from .proxy import WavefrontProxyClient


__all__ = ['WavefrontDirectClient',
'WavefrontProxyClient',
'WavefrontClientFactory']
__all__ = ['WavefrontClientFactory']

__version__ = None

try:
__version__ = pkg_resources.get_distribution(
'wavefront-sdk-python'
).version
'wavefront-sdk-python').version
except pkg_resources.DistributionNotFound:
# __version__ is only available when distribution is installed.
pass
30 changes: 15 additions & 15 deletions wavefront_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ def __init__(self, server, token, max_queue_size=50000, batch_size=10000,
self._tracing_spans_buffer = queue_impl(max_queue_size)
self._spans_log_buffer = queue_impl(max_queue_size)
self._events_buffer = queue_impl(max_queue_size)
self._headers = {'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip'}
self._event_headers = {'Content-Type': 'application/json',
'Content-Encoding': 'gzip'}
self._headers = {'Content-Type': 'application/octet-stream'}
self._event_headers = {'Content-Type': 'application/json'}
self._closed = False
self._schedule_lock = threading.RLock()
self._timer = None
self._schedule_timer()
self._session = requests.Session()
self._session.headers.update({'Content-Encoding': 'gzip'})
self._session.timeout = self.HTTP_TIMEOUT

if token:
self._headers['Authorization'] = 'Bearer ' + token
self._event_headers['Authorization'] = 'Bearer ' + token
self._session.headers.update({'Authorization': 'Bearer ' + token})
ingestion_type = 'direct'
else:
ingestion_type = 'proxy'
Expand Down Expand Up @@ -192,18 +192,18 @@ def _report(self, points, data_format, entity_prefix, report_errors):
"""
try:
if data_format == self.WAVEFRONT_EVENT_FORMAT and self._token:
response = requests.post(self.server + self.EVENT_END_POINT,
params=None,
headers=self._event_headers,
data=points,
timeout=self.HTTP_TIMEOUT)
response = self._session.post(
self.server + self.EVENT_END_POINT,
headers=self._event_headers,
data=points)
else:
params = {'f': data_format}
compressed_data = utils.gzip_compress(points.encode('utf-8'))
response = requests.post(self.server + self.REPORT_END_POINT,
params=params, headers=self._headers,
data=compressed_data,
timeout=self.HTTP_TIMEOUT)
response = self._session.post(
self.server + self.REPORT_END_POINT,
headers=self._headers,
data=compressed_data,
params=params)

self._sdk_metrics_registry.new_delta_counter(
f'{entity_prefix}.report.{response.status_code}').inc()
Expand Down

0 comments on commit 4528a1a

Please sign in to comment.